← 返回模块
3.6.6.3beta 可读 · 未来付费校验通过内容版本 2026-05-29

分布式追踪、OpenTelemetry 与日志聚合

3.6.6 · 可观测性与系统设计 · 编程

国内私募的 50ETF 期权做市,盘中 13:47 SSE 一条 IF 主力 tick 的入库延迟从平均 80 毫秒跳到 1.8 秒。L1 的结构化日志能告诉你这一笔 tick 落在 feed-handler-consumer-7c4f9d8b6-x2k4l 这个 pod 的某条 offset;L2 的 dashboard 能告诉你 p99 延迟在过去 5 分钟内从 0.1 秒涨到 1.2 秒;但 L1 与 L2 都答不出『这 1.8 秒里时间究竟花在哪一段——是 Kafka 消费、还是 Postgres INSERT、还是中间的 Redis 缓存查询?』这是 trace 这一支柱要回答的问题。本课把 OpenTelemetry SDK 与 Collector 接到 3.6.5 的 capstone 上,再把 Loki 接入做日志聚合,最后用一个 trace_id 把三支柱串起来。

为什么是 OpenTelemetry

OpenTelemetry(简称 OTel)是 CNCF 的可观测性数据采集标准,2021 年成为继 Kubernetes 之后的第二个 CNCF 毕业项目。它把『链路追踪 + 指标 + 日志』三种 telemetry 的 SDK + 协议 + Collector 一体化。国内量化私募的自建路线:OpenTelemetry SDK + OTel Collector + Grafana Tempo / Jaeger(链路追踪后端)+ Loki / Elasticsearch(日志后端)+ Prometheus(指标后端)。Apache SkyWalking(中国工程师主导的开源项目)是一体化替代,集链路 + 指标 + 日志 + 告警于一体,在国内私募的渗透率高于 Jaeger,但更紧耦合。公有云 SaaS:阿里云 ARMS APM(含全链路追踪 + 应用监控 + 浏览器 RUM,原生兼容 OpenTelemetry 协议),腾讯云 APM,华为云 APM。本课按 OSS 路径展开,因为契约是 OpenTelemetry 协议,更换后端只需改 Collector 的 exporter 配置。

OTel 的关键概念三句话:『一个 trace 由若干 span 组成,每个 span 有 trace_id + span_id + parent_span_id;context propagation 通过 W3C TraceContext 的 traceparent 头跨进程;SDK 负责生成 + 通过 Collector 转发到后端』。

OpenTelemetry SDK 的规范初始化

Fenced Python 代码块,落在 feed_handler/_tracing.py 模块里:

import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.psycopg import Psycopg3Instrumentor
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor


def configure_tracing(service: str, version: str, otlp_endpoint: str = 'http://otel-collector:4317') -> None:
    resource = Resource.create({
        'service.name': service,
        'service.version': version,
        'deployment.environment': os.environ.get('ENV', 'dev'),
    })
    provider = TracerProvider(resource=resource)
    provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True))
    )
    trace.set_tracer_provider(provider)
    RequestsInstrumentor().instrument()
    Psycopg3Instrumentor().instrument()
    ConfluentKafkaInstrumentor().instrument()
    # FastAPIInstrumentor().instrument_app(app)  # 仅当存在 FastAPI app 时打开

三条 Resource 属性是行业惯例:service.nameservice.versiondeployment.environmentBatchSpanProcessor 把 span 攒批后异步推到 Collector,避免在业务路径上同步阻塞。四个 auto-instrumentation 调用一次性把 requestspsycopgconfluent-kafkafastapi 四类常见库的进出函数包装为 span,无需改业务代码。OTLP 默认 endpoint http://otel-collector:4317 指向同集群的 Collector DaemonSet 服务(在 K8s 里通过 NODE_IP 的 hostIP 反查到本地 pod)。

在业务函数上手动加 span

Fenced Python 代码块,针对 auto-instrumentation 抓不到的业务操作:

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)


def compute_vwap_with_trace(symbol: str, window_seconds: int):
    with tracer.start_as_current_span(
        'compute_vwap',
        attributes={'symbol': symbol, 'window_seconds': window_seconds},
    ) as span:
        try:
            result = compute_vwap(symbol, window_seconds)
            span.set_attribute('rows_processed', len(result))
            span.set_status(Status(StatusCode.OK))
            return result
        except Exception as exc:
            span.record_exception(exc)
            span.set_status(Status(StatusCode.ERROR, str(exc)))
            raise

span 名 compute_vwap、两个入参 attribute symbol / window_seconds、出参 attribute rows_processed、错误路径的 record_exception(exc) + set_status(Status(StatusCode.ERROR, str(exc))) + raise 都是行业惯例。错误一定要 raise 而不是吞掉,否则上层服务的调用方看不到失败。

structlog 注入 trace context 的处理器

Fenced Python 代码块,给 L1 的 structlog 处理器链加一个 add_trace_context 处理器,让每条 JSON 日志都带 trace_idspan_id

def add_trace_context(logger, method_name, event_dict):
    from opentelemetry import trace
    span = trace.get_current_span()
    ctx = span.get_span_context() if span and span.is_recording() else None
    if ctx and ctx.trace_id != 0:
        event_dict['trace_id'] = format(ctx.trace_id, '032x')
        event_dict['span_id'] = format(ctx.span_id, '016x')
    return event_dict

处理器名 add_trace_context,函数签名 (logger, method_name, event_dict)(structlog 处理器约定),输出键 trace_id(32 位十六进制)与 span_id(16 位十六进制)。把它插入 L1 处理器链的 JSONRenderer 之前,每行日志就自动带 trace 上下文。

OpenTelemetry Collector 的 DaemonSet 配置

Fenced YAML 代码块,otel-collector-config.yaml

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: '0.0.0.0:4317'
      http:
        endpoint: '0.0.0.0:4318'
processors:
  batch:
    timeout: 5s
    send_batch_size: 1024
  tail_sampling:
    decision_wait: 10s
    policies:
      - name: keep-errors
        type: status_code
        status_code: {status_codes: [ERROR]}
      - name: sample-healthy
        type: probabilistic
        probabilistic: {sampling_percentage: 1.0}
exporters:
  otlp/tempo:
    endpoint: 'tempo:4317'
    tls: {insecure: true}
  loki:
    endpoint: 'http://loki:3100/loki/api/v1/push'
  prometheus:
    endpoint: '0.0.0.0:8889'
service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, tail_sampling]
      exporters: [otlp/tempo]
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [loki]
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [prometheus]

0.0.0.0:4317(gRPC)+ 0.0.0.0:4318(HTTP)是 OTLP 的两个标准入口;tail_sampling 的两条策略——错误全留、健康按 1.0% 采样——是国内私募常见的成本控制设置;三个 exporter 分别对应 Tempo、Loki、Prometheus;三条 pipeline 按 traces / logs / metrics 各自串联接收 / 处理 / 导出阶段。

LogQL 的三条惯例

Fenced LogQL 代码块,给 Loki 查询日志:

# Per-pod error log rate
sum by (pod) (rate({namespace="feed-dev", app="feed-handler"} | json | level="error" [5m]))
# Single-trace cross-pod log search
{namespace="feed-dev"} | json | trace_id="$trace_id"
# Per-symbol error count
sum by (symbol) (count_over_time({app="feed-handler"} | json | level="error" [5m]))

{namespace=..., app=...} 是 Loki 的标签选择器(Loki 按标签建索引,不全文索引);| json 把每行 JSON 解析成可访问字段;trace_id="$trace_id" 用 Grafana dashboard 的模板变量做交互式跳转——值班从 dashboard 一个面板点 trace_id,Grafana 在另一面板自动渲染对应 trace 的所有日志。

W3C TraceContext header 的形状

Inline-code 列出:header 名 traceparent,格式 <version>-<trace-id>-<span-id>-<flags>,示例值 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01(version 00,32 位十六进制 trace-id,16 位十六进制 span-id,flags 01 表示 sampled);同名伴侣 header tracestate 用于厂商私有传播。OTel SDK 与 auto-instrumentation 默认按此契约读写,跨语言 / 跨厂商兼容。

三支柱关联机制

Inline-code 列举(顺序固定):

  1. log -> trace:L1 的 add_trace_context 处理器在每行 JSON 日志注入 trace_id + span_id,所以 LogQL {app="feed-handler"} | json | trace_id="<id>" 找到一条 trace 的所有日志行。
  2. metric -> trace:Prometheus exemplars 把采样的 trace_id 挂在 histogram 桶样本上,写法 histogram.labels(...).observe(value, exemplar={'trace_id': trace_id}),于是在 Grafana 的延迟尖峰点上点开就直接跳到对应 Tempo trace。
  3. trace -> log + metric:trace_id 是 join 键——给一条慢 trace,日志面板按相同 trace_id 过滤,指标面板按 trace 的时间窗口缩放。

SLI / SLO / 错误预算

Inline-code 列举三条 SLO(顺序固定):

  1. Availability SLO: sum(rate(requests_total{status!~"5.."}[30d])) / sum(rate(requests_total[30d])) >= 0.999(30 天成功率 99.9%)。
  2. Latency SLO: histogram_quantile(0.99, sum by (le) (rate(order_latency_seconds_bucket[30d]))) < 0.5(p99 < 500ms)。
  3. Freshness SLO: max(kafka_consumer_lag_messages) < 10000 99% of the time over 30d(lag 受控)。

规则:『错误预算 = 1 - SLO;预算大部分尚在就积极上线;预算急速消耗就冻结发布』。

练习

Exercise

拿 L1 + L2 已仪表化的 feed-handler,加入分布式追踪 + 日志聚合 + 跨支柱关联。(a) 在 pyproject.toml 加入 opentelemetry-sdk==1.25.0opentelemetry-exporter-otlp==1.25.0opentelemetry-instrumentation-requests==0.46b0opentelemetry-instrumentation-psycopg==0.46b0opentelemetry-instrumentation-confluent-kafka-python==0.46b0 并重新锁定。(b) 撰写 feed_handler/_tracing.py,包含 configure_tracing(service: str, version: str, otlp_endpoint: str = "http://otel-collector:4317") -> None 函数,构造 TracerProvider,加 Resource.create({"service.name": service, "service.version": version, "deployment.environment": os.environ.get("ENV", "dev")}),挂 BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)),调用四个 auto-instrumentor 的 .instrument()。(c) 在 feed_handler/__main__.pyconfigure_loggingstart_metrics_server 之后调用一次 configure_tracing("feed-handler", __version__)。(d) 在 monitor 服务的 compute_vwap 周围加 with tracer.start_as_current_span("compute_vwap", attributes={"symbol": symbol, "window_seconds": 60}) as span: ...。(e) 在 L1 处理器链的 JSONRenderer 之前插入 add_trace_context 处理器,使每行日志带 trace_id + span_id。(f) Helm 部署 OTel Collector + Tempo + Loki:helm upgrade --install tempo grafana/tempo -n observabilityhelm upgrade --install loki grafana/loki -n observability,并应用 otel-collector-config.yaml。(g) 在 feed-handler-consumer Deployment 加 OTEL_EXPORTER_OTLP_ENDPOINT=http://$(NODE_IP):4317(用 valueFrom: fieldRef: fieldPath: status.hostIP 注入)使应用发送到节点本地 Collector。(h) Promtail / Grafana-Agent DaemonSet 读取 /var/log/containers/*.log 并把 JSON 转发到 Loki,标签 {namespace, app, pod}。(i) 在 Grafana 打开一条 tick 的日志行,复制其 trace_id,粘进 Tempo trace 搜索;观察 trace 在 producer / consumer / monitor 三服务的跨度树;点任一 span,旁边的 Loki 日志面板自动渲染该 span 的日志上下文。(j) 选本课一条 SLO(如 p99 latency < 500ms over 30d),表达成 PromQL recording rule,在 Grafana 加一个面板显示滚动 30 天的烧速率 (1 - feed_handler:availability:30d) / (1 - 0.999);确认它就坐在 L2 dashboard 旁。

提示
auto-instrumentor 没有抓到 Kafka span?多半 ConfluentKafkaInstrumentor().instrument()Consumer 实例化之前没调到;把 configure_tracing 放在 configure_logging 之后、Consumer(...) 之前。
提示
structlog 的 add_trace_context 处理器看不到 trace?检查它在处理器链里的位置——必须在 merge_contextvars 之后、JSONRenderer 之前,且 OTel SDK 已经在 __main__ 初始化。

跨区域阅读清单

OpenTelemetry 官方文档(opentelemetry.io/docs/,部分章节中文化,国内镜像 cncf.vercel.app)是 SDK + Collector + 语义约定的权威参考;Apache SkyWalking 中文文档(skywalking.apache.org/zh/)是国内出镜率最高的链路追踪项目;Grafana Tempo 文档覆盖 trace 后端;阿里云 ARMS APM 文档(help.aliyun.com/product/34364.html)覆盖托管路径;腾讯云 APM 文档(cloud.tencent.com/document/product/1463)给出腾讯云对应路径;极客时间《分布式追踪实战》、《SRE 实战》(节译 Google SRE Book);CNCF 中国分会公众号关于 OTel 与 SkyWalking 的系列文章;Google SRE Book 中文译本《SRE: Google 运维解密》(电子工业出版社)。一条额外注释:国内量化 firm 的 OTel + Tempo + Loki 通常由平台 / SRE 团队部署;quant developer 自行加 OTel SDK 与 auto-instrumentation 包,自行在 log 里注入 trace_id,自行在 dashboard 中配跨面板跳转;SLI / SLO 由团队共同定义,错误预算由平台团队维护跟踪表。

头部采样与尾部采样

链路追踪的采样策略决定了你能看到的与必须存储的之间的权衡,决定了 Tempo / Jaeger 的存储成本。两种主要做法是头部采样(head-based sampling)与尾部采样(tail-based sampling)。头部采样在 trace 的第一个 span 上做决定(按 trace_id hash 取百分比),下游 span 按 traceparent 中的 flags 服从;优点是简单、低开销;缺点是采样发生在还不知道这条 trace 会不会出错之前,结果就是错误 trace 可能被丢弃。尾部采样在 Collector 端等 trace 的所有 span 都到齐后再决定(OTel Collector 的 tail_sampling processor),可以基于状态码(错误必留)、延迟(>500ms 必留)、属性(特定 service 必留)等条件做策略采样;优点是不丢关键 trace,缺点是 Collector 需要在内存中缓冲一段时间(典型 decision_wait: 10s)。本课的 Collector 配置就是尾部采样的常见组合——错误全留 + 健康按 1% 抽样——既覆盖故障诊断,又把存储成本压在可接受区间。

国内私募对 OTel 的常见取舍

国内私募在选择 OTel 路线时,最常见的三条取舍:(1) Tempo vs Jaeger:新项目偏 Tempo(与 Grafana 栈集成更顺、按对象存储计费更便宜),老项目仍多 Jaeger(搜索能力更强、UI 老练);(2) OTel SDK vs SkyWalking agent:纯 Python 服务多用 OTel(标准化、跨语言一致),与 Java / .NET 混编的团队常用 SkyWalking(一体化、agent 自动加载更省事);(3) 自建 vs 公有云 SaaS:合规与等保三级以上的环境多自建(数据出域受限),中小私募则按业务量直接上阿里云 ARMS / 腾讯云 APM(运维代价更低)。OTel 的好处是这三类后端都能通过更换 Collector exporter 直接切换,不动业务代码。值班同学只需要熟悉 OTLP 协议与三支柱的关联原则;后端迁移属于平台团队的工作,与应用层完全解耦。这也是为什么本课强烈推荐先建立 OTel SDK 与 Collector 的标准管道,而非直接对接某个具体后端 SDK——后者绑定厂商,前者保留迁移空间。

衔接到 L4

下一课是模块的 capstone:把 L1 日志 + L2 指标 + L3 追踪三支柱已仪表化的 feed-handler,配上完整的 runbook、合成事故注入、以及五阶段事故响应工作流(detect / triage / mitigate / verify / post-mortem),并在沪深300、CFFEX 主力、上证 50ETF 三类标的的真实场景下走一遍。本课的两条 L2 告警(KafkaConsumerLagHighDeploymentRolloutStuck)就是 L4 runbook 的触发面。