技术5/25/2026·8 min

把一个 River 队列拆成九个,并将 OTel metric 与 trace 解耦

共享单队列让每天 100 条的 apprank 批量任务挤掉了亚秒级 cron。我们按 kind 把任务拆进九个独立队列、各带并发预算,再给每个 worker 加 OTel instrument——前提是先把 metric 上报从 trace 导出里解耦,让 dashboard 不再等 collector 部署。

共享队列的队头阻塞

Mainline 的后台任务跑在 River 上——一个基于 Postgres 的 Go 任务队列。所有任务 kind——cron 健康检查、App Store 榜单扫描、签名 profile 下发、WidgetCraft 发布流程——此前全部进入 River 的单一默认队列,共用一个 MaxWorkers 池。

这种安排有一个只在特定负载形状下才暴露的故障模式。每日榜单扫描 apprank_scan_country 在 02:00 扇出,一次入队约 100 条,每条耗时约 136 秒。在单一共享池下,这些长任务会占满全部 worker 槽位约 20 分钟。排在它们后面的任务——包括 SLO 是延迟而非吞吐的亚秒级 cron——只能停在 available 状态等待。这就是队头阻塞:一个扁平池里的任务队列没有跨 kind 的优先级概念。

按 kind 路由,而非共用一个池

修法是让每个任务族拥有自己的队列和并发预算,使长批量 kind 无法吞掉延迟敏感 kind 需要的槽位。拆成九个队列,各带独立的 MaxWorkers 上限:

var QueueMaxWorkers = map[QueueName]int{
    QueueAppRankScan:    10, // 每日 100 条的批量突发,单独隔离
    QueueAppRankAgg:     3,  // 每日聚合尾部(trend / thinning / partition)
    QueueCronShort:      6,  // 亚秒级 cron,SLO 是延迟
    QueueSigning:        4,  // provision / health-check 族
    QueueSigningRefresh: 2,  // profile_auto_refresh 单独隔离
    QueuePublish:        3,  // WidgetCraft 发布流程
    QueueRelease:        3,  // App Store 发布链路
    QueueMedia:          3,  // 资源密集 / 依赖上游的 kind
    QueueImport:         2,  // 产品 / 文件导入
}

合计 36 个 worker,按 worker VM(2 CPU、4G RAM)跑 IO-bound handler 的规格定。两处隔离决策值得点出。apprank_scan 独占 10 槽,把它的突发限制在自己队列内。profile_auto_refresh——签名族里历史上最吵的任务——被隔离进 signing_refresh 只给 2 槽,这样 refresh 即使陷入 panic 循环,影响半径也封顶在两个 worker,而不会抽干 provision 与 health-check 共用的槽位。

静态路由,不做静默兜底

每个 TaskKind 通过一张静态表映射到唯一队列:

var kindToQueue = map[TaskKind]QueueName{
    TypeStaleTaskCheck:     QueueCronShort,
    TypeTrunkImport:        QueueCronShort,
    TypeReleaseStatusPoll:  QueueCronShort,
    TypeAppRankScanCountry: QueueAppRankScan,
    TypeAppRankDailyTrend:  QueueAppRankAgg,
    TypeProfileAutoRefresh: QueueSigningRefresh,
    // ... 每个 kind 一条
}

func QueueFor(kind TaskKind) (QueueName, error) {
    q, ok := kindToQueue[kind]
    if !ok {
        return "", fmt.Errorf("task: kind %q has no queue mapping", kind)
    }
    return q, nil
}

QueueFor 对未映射的 kind 返回 error,而不是回退到默认队列。这个选择是刻意的。静默兜底会把新加的 kind 路由到现在已经空了的默认队列,它的任务会永远卡在 available——没有任何 worker 消费——而这正是这张表要防的回归类型。入队路径强制执行这一点:调用方没有显式指定队列时,按 kind 查表,查不到就拒绝入队。

三层守护「每个 kind 恰好属于一个队列」这条不变量:

  1. 启动自检。 queue.AddWorker 记录每个注册的 kind(River 的 worker registry 是不透明的,没有枚举 API),启动时 bootstrap 遍历 RegisteredKinds(),发现任一 kind 缺映射就 panic。漂移在启动期 fail-fast,而不是运行时静默。
  2. 子包注册。 WidgetCraft 拥有自己的 publish kind,从 init()RegisterKindQueue 注册,让 kind 常量与 payload 保持同地而不引入 import cycle;重复注册会 panic,把歧义暴露在启动期。
  3. 单元测试。 六个守护测试钉住契约——每个已知 kind 解析到非空队列、没有死路由项、总预算保持 36(一道刻意的 review 关卡,而非随手改 map 的产物)、没有队列的 worker 数为非正、每个队列至少有一个 kind 路由进来。

这一切背后的可观测性缺口

拆队列只有在能看到每队列行为时才有回报。Mainline 经进程内 otelprom bridge 把 OTel metric 导进共享 Prometheus registry,由 /metrics 暴露给 Grafana。问题出在那道开关:单一的 OTelReady()——仅当配置了 OTLP endpoint 时为真——控制着整条 OTel 流水线,trace 和 metric 一起。

trace 确实需要远程 collector(Tempo)才有意义,所以把 trace 导出 gate 在 endpoint 上是对的。metric 不需要。otelprom bridge 是进程内的、零外部依赖;除了 no-op 之外的唯一成本就是一个 in-memory aggregator。把两者绑在一起,意味着业务 instrument 在 trace collector 部署之前在 /metrics 上什么都产不出——metric 可见性被 trace 上线进度阻塞,没有任何技术理由。

解耦把 metric 初始化整个移出这道 gate:

func (c *ObservabilityConfig) OTelReady() bool {
    return c.OTelEndpoint != "" // 只 gate trace 导出 + task 传播
}

InitOTelMetrics 不再在 endpoint 未设时提前返回,metric bridge 始终启用。同一天还需要一个跟进修复——buildOTelResource 吞掉了一个 schema-URL 冲突,导致 metric 暴露被压制——所以解耦是两个 commit,而非一个。

给 worker 加 instrument

metric 无条件上线后,每个 River worker 套一层 OTel middleware,上报三个指标:

river.job.duration  // histogram (s)        attrs: queue, kind, status
river.job.count     // counter              attrs: queue, kind, status, attempt
river.job.active    // UpDownCounter         attrs: queue

两处实现细节关键。其一是 middleware 顺序。链路是 Trace → Recovery → Metrics → Log → Controlled,最外层在前。Metrics 放在 Recovery 内层,这样当 Recovery 把 panic 转成 error 时,metric 这层仍能记到 status=error,而不是整条 job 漏记。river.job.active 在进入时 +1、在 defer 里 -1,因此即使 job panic 也能正确减回——否则 Recovery 在上层接住 panic 后,active 计数会永远停在 +1。

其二是基数。attempt 属性被 bucket 成 1 / 2 / >=3,而不是带原始计数。River 默认 max_attempts 是 25;把每个不同的 attempt 数当作独立属性值上报,会让时间序列膨胀 25 倍而毫无分析收益。同样的纪律用在一并加入的 HTTP middleware 上——otelgin 只产 trace span 不产 metric,所以一个 HTTP metric middleware 上报 http.server.request.duration,按 c.FullPath()(路由模板 /api/v1/products/:id,而非解析后的 URL——后者会因路径参数撑爆基数)打标签。

可迁移的结论

扁平任务池对「单个 kind 的负载形状独占整池」没有任何防御;隔离必须是结构性的——每队列 worker 预算——而不是寄望于公平分配。建路由表时,把未映射的情形做成 error 而非默认值:静默兜底会把缺失的映射变成卡死的任务,没有测试能抓到,但每道守护都能。把可观测性的开关限定在真正需要那个依赖的组件上:进程内运行的 metric 不该等 trace collector;一个布尔同时控制两条流水线,是一种恰好让你失去运维其余部分所需可见性的耦合。