一个 OTLP 端点,三个团队,零路由规则:Elasticsearch Streams AI 分区
作者:来自 Elastic Aleksandar Panov
停止提前编写日志路由规则。看看 Streams AI Partitioning 如何读取你的数据,提出子 streams,并让你在几分钟内为每个团队设置保留策略。
将三个团队的日志发送到同一个 Elastic OTLP 端点,Streams AI 分区 会将其路由到按团队划分的子 stream,无需提前编写任何路由规则。
在这篇文章中,你生成 115 条多团队日志记录,让 AI 分析接收到的数据并提出分区建议,用自然语言进行优化,然后为每个团队分别设置独立保留策略:支付 90 天,结账 30 天,通知 7 天。整个流程在 Elastic 可观测性(Observability)内部运行,无需接触 index templates 或 ILM policies。
为什么多团队日志路由需要结构化设计
在多团队的 Elasticsearch 部署中,系统通常会逐渐收敛到一个共享索引,这在初期是可行的,但当各团队开始需要不同的保留周期、分片配置或处理 pipeline 时,这种模式就会开始出现问题。
在 Streams 出现之前,你必须在数据写入前就配置好 ingestion 脚本,把数据发送到不同的索引或 data streams,或者使用 reroute processor,通过某个字段来定义数据的目标去向。
而在 AI Partitioning 模式下,你让数据先进入系统。然后 AI 会分析已到达的数据,提出分区建议,你可以用自然语言对这些建议进行调整并应用它们。最终会生成一组 wired child streams,它们继承父 stream 的保留策略、processors 和 schema,同时仍然允许在子 stream 层级进行独立覆盖。
使用 Streams AI Partitioning 之前的准备条件
在运行示例之前,需要满足以下三个条件:
启用 Wired Streams
在 Elastic Cloud Serverless 以及 Elastic Cloud Hosted 9.4+ 中,wired streams 默认已开启。如果你是从更早版本升级而来,请打开 Streams 应用,并在 Settings 中确认该开关已启用。Elastic Managed LLM 连接器
前往Stack Management > Connectors > Create connector > Elastic Managed LLM。需要注意以下几点:该连接器已预先配置,无需外部账号或 API key。
任意 生成式 AI 连接器 都可用于该功能。
注意 Elastic Managed LLM 按输入与输出的 token 量计费(每百万 token 收费),详情见 定价说明。
使用该功能的账号需要具备
manage_inference集群权限(内置inference_admin角色已包含该权限)。
Managed OTLP 端点 URL 与 API key
打开Cloud Console > Manage > Application endpoints > Ingest。复制 Managed OTLP endpoint URL,并在同一页面生成 API key。
准备好这些之后,打开Observability > Streams,确认列表中存在一个logs.otelwired stream。这个 stream 就是我们接下来要进行分区的父 stream。
如果侧边栏中没有显示Streams,你的 Kibana space 可能使用的不是 Observability 的 solution view。你可以在 Stack Management > Spaces 中修改:编辑你的 space,将 Solution view 设置为Observability。
生成多团队日志数据
本示例使用来自同一家虚构公司的三个应用,这些应用分别由不同团队产生:
payments-api:结构化 JSON,包含transaction_id和amount_cents。数据敏感,保留周期较长。checkout-web:JSON 格式,包含cart_id和customer_id。主要为 INFO 和 ERROR 日志。notifications-worker:结构较松散,包含recipient和channel。日志量较大。
我们使用 Python 脚本,通过 OpenTelemetry Python SDK 将三个团队的日志通过 OTLP 直接发送到 Managed OTLP 端点。完整代码(包括配置与执行)在 配套 notebook 中提供。
每个团队都通过 service name 定义,并配有一组消息模板,以及一个用于生成团队特定属性的函数:
TEAMS = { "payments": { "service": "payments-api", "messages": [ ("INFO", "charge captured tx={tx} amount_cents={amt}"), ("ERROR", "charge declined tx={tx} reason=insufficient_funds"), ("INFO", "refund issued tx={tx} amount_cents={amt}"), ], "extra": lambda: { "transaction_id": f"tx_{random.randint(10000, 99999)}", "amount_cents": random.randint(100, 50000), }, }, "checkout": { "service": "checkout-web", "messages": [ ("INFO", "cart updated cart={cart} customer={cust}"), ("INFO", "checkout started cart={cart} customer={cust}"), ("ERROR", "checkout failed cart={cart} stage=address_validation"), ], "extra": lambda: { "cart_id": f"c_{random.randint(1000, 9999)}", "customer_id": f"u_{random.randint(100, 999)}", }, }, "notifications": { "service": "notifications-worker", "messages": [ ("INFO", "email queued recipient={rcp} channel=email"), ("INFO", "sms queued recipient={rcp} channel=sms"), ("ERROR", "webhook failed recipient={rcp} channel=webhook status=503"), ], "extra": lambda: { "recipient": f"+1555{random.randint(1000000, 9999999)}", "channel": random.choice(["email", "sms", "webhook"]), }, }, }将elasticsearch.index设置为logs.otel作为资源属性,会将数据路由到 wired streams 根 stream,而不是默认的 OTLP 数据 stream。
def setup_provider(): resource = Resource.create({"elasticsearch.index": "logs.otel"}) provider = LoggerProvider(resource=resource) provider.add_log_record_processor(BatchLogRecordProcessor(OTLPLogExporter())) set_logger_provider(provider) handler = LoggingHandler(level=logging.INFO, logger_provider=provider) root = logging.getLogger() root.setLevel(logging.INFO) root.addHandler(handler) return provider运行 notebook,发送 115 条记录,并在三个团队之间分布不均。
打开Observability > Streams >logs.otel,切换到Partitioning(分区)标签页。你应该能在预览面板中看到已接收的数据,其中包含team、service.name等属性,以及各团队特定字段在列视图中可见。
Streams AI 分区如何提出子 stream
Streams AI Partitioning 会分析来自父 stream 的最多 1,000 条文档,识别属性的聚类情况与基数分布,然后基于能够最好区分数据的字段提出子 stream(用于此分析的机器学习方法详见 Streams 中的自动日志解析)。
对于我们刚刚发送的数据,AI 基于attributes.service.name提出了三个子 stream:
每个建议都会展示一个 Streamlang 条件,以及在采样文档中匹配该条件的百分比。AI 选择service.name是因为它是一个标准的 OpenTelemetry 属性,并且是任何单一工作负载的自然标识符。
这个初始建议是合理的,但值得思考当部署规模增长时会发生什么。目前只有三个服务,因为只有三个团队。明天,Payments 可能会新增refunds-api和fraud-detector。每新增一个 service,都会机械地创建一个新的子 stream,随着时间推移,你会为实际上只有三个组织边界的系统生成数十个分区。
Elastic 的 分区建议 更倾向于按 team 或技术类型进行逻辑分组,并目标控制在几十个分区,而不是数百个。基于team的分区更稳定,因为即使 Payments 团队运营再多服务,它仍然只会对应一个子 stream。
用自然语言优化 Streams AI 分区建议
在 Streams AI Partitioning 中审查完初始 AI 建议后,点击Modify suggestions打开自由文本输入框。
提交后,AI 会重新生成建议。现在这三个卡片的分组键从service.name改为attributes.team:
勾选全部三个,然后点击Accept selected。会弹出一个确认对话框,显示将要创建的 streams,每一个都带有WHERE attributes.team equals <team>条件。
点击Create all streams。现在 Partitioning 标签页会显示三个子 streams,它们都作为logs.otel父 stream 的一部分:
每一条通过 OTLP 端点进入的新文档都会根据这些条件被路由到对应的子 stream。你可以打开任意子 stream 来验证其数据。例如,
logs.otel.checkout只会显示 checkout 日志:
如何在 Elasticsearch Streams 中设置按团队的日志保留策略?
在 Streams AI Partitioning 创建子 stream 之后,每个子 stream 都可以拥有独立的生命周期配置,而不需要依赖父 stream。由于 wired streams 采用父子层级结构,每个子 stream 默认会继承父级的保留策略、processors 和 schema。你只需要覆盖那些需要调整的分区即可。
打开子 streamlogs.otel.payments,进入 Retention 标签页。点击Edit retention method,选择Custom period,并将其设置为 90 天。
对其他团队做同样操作,并设置符合其需求的保留策略:
| StreamRetentionRationale | ||
|---|---|---|
logs.otel.payments | 90 天 | 敏感金融数据,满足合规要求 |
logs.otel.checkout | 30 天 | 用于排查问题,无需长期保存 |
logs.otel.notifications | 7 天 | 高吞吐量,送达确认后价值较低 |
结论:从共享索引到按团队 streams,无需路由规则
一个由多个团队向 Elastic 部署发送日志的共享系统是最常见的起点。过去要对其进行组织,通常需要提前编写路由规则,或者手动维护不同的 index templates 和 ILM policies。
有了 Streams AI Partitioning,流程变得不同:你让数据先进入系统,让 AI 读取实际到达的数据,在需要时用自然语言优化建议,然后直接接受结果。
最终得到的是一组子 streams,它们继承父 stream 的全部配置,同时为每个团队提供独立的保留策略和处理能力,而无需任何手动模板管理。
下一步
运行这个 配套 notebook,生成你自己的多团队数据。
阅读 Elastic 可观测性 Streams 如何简化保留管理,深入理解保留模型。
阅读 Streams Processing:停止与 Grok 作斗争,探索当不同团队需要不同解析逻辑时 Streams 的处理能力。
阅读 为可观测性引入 Streams,了解 Streams 所在的整体调查能力体系。
原文:Log routing without rules: Elastic Streams AI Partitioning — Elastic Observability Labs