事件处理流水线
相关源文件
以下文件被用作生成此维基页面的上下文:
- .gitignore
- cli/cmd/root.go
- pkg/event_processor/base_event.go
- pkg/event_processor/http2_request.go
- pkg/event_processor/http2_request_test.go
- pkg/event_processor/http2_response.go
- pkg/event_processor/http2_response_test.go
- pkg/event_processor/http_request.go
- pkg/event_processor/http_response.go
- pkg/event_processor/http_response_test.go
- pkg/event_processor/iparser.go
- pkg/event_processor/iworker.go
- pkg/event_processor/processor.go
- pkg/event_processor/processor_test.go
- pkg/event_processor/testdata/all.json
- pkg/util/ebpf/bpf.go
- pkg/util/ebpf/bpf_linux.go
- pkg/util/ebpf/bpf_test.go
eCapture 中的事件处理流水线负责对从 eBPF 内核空间捕获的原始数据事件进行重组、协议识别和格式化。它将碎片化的字节转换为结构化的、人类可读或机器可解析的日志,同时保持每个连接的事件顺序。
流水线概览
该流水线作为一个多级调度系统运行。事件从 eBPF perf/ring buffer 中摄取,根据连接亲和性分发给专用工作线程(worker),使用特定协议的状态机进行解析,最后通过配置的输出写入器(writer)发送。
数据流图
下图展示了数据从内核经过用户态处理实体的转换过程。
"事件处理流程"
Sources: pkg/event_processor/processor.go:64-87, pkg/event_processor/iworker.go:261-280
EventProcessor 调度循环
EventProcessor 是事件管理的中心枢纽。它在 Serve() 方法中运行一个连续循环,以处理传入事件并管理 worker 的生命周期。
核心职责
- 摄取 (Ingestion):从
incoming通道接收IEventStruct对象 pkg/event_processor/processor.go:68。 - 分发 (Dispatching):使用
dispatch()根据唯一的 UUID 将事件路由到正确的IWorkerpkg/event_processor/processor.go:89-107。 - 连接管理 (Connection Management):监控
destroyConn以在套接字关闭时清理 worker pkg/event_processor/processor.go:78-79。 - 输出聚合 (Output Aggregation):从
outComing收集处理后的字节切片,并将其写入最终的loggerpkg/event_processor/processor.go:80-81。
Sources: pkg/event_processor/processor.go:28-61
eventWorker 与 UUID 亲和性
为了确保特定连接(例如单个 TLS 会话)流量的顺序处理,eCapture 使用了 UUID 亲和性 (UUID Affinity)。
UUID 构建
UUID 根据事件元数据生成,用于唯一标识一个流: PID_TID_COMM_FD_DATATYPEpkg/event_processor/base_event.go:122-124
Worker 生命周期
每个 eventWorker 代表一个单一的事件流。
- 亲和性:
EventProcessor维护一个workerQueue映射,其中键为 UUID pkg/event_processor/processor.go:60。 - 顺序执行:每个 worker 运行自己的
Run()协程,确保同一连接的数据包按顺序处理 pkg/event_processor/iworker.go:93-95。 - 超时回收:一个
ticker(默认 100ms)触发 worker 检查是否应刷新数据,或者是否因空闲而销毁 worker pkg/event_processor/iworker.go:125。
Sources: pkg/event_processor/iworker.go:69-88, pkg/event_processor/processor.go:128-140
协议识别 (IParser)
IParser 接口定义了如何识别和解析原始载荷。当 worker 收到流的首个数据时,它使用 NewParser() 来识别协议。
支持的解析器
| 解析器名称 | 协议 | 检测逻辑 |
|---|---|---|
HTTPRequest | HTTP/1.1 请求 | 使用 http.ReadRequest pkg/event_processor/http_request.go:83-92 |
HTTPResponse | HTTP/1.1 响应 | 使用 http.ReadResponse pkg/event_processor/http_response.go:94-102 |
HTTP2Request | HTTP/2 请求 | 检查是否存在 PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n pkg/event_processor/http2_request.go:42-59 |
HTTP2Response | HTTP/2 响应 | 验证 HTTP/2 帧头部 (9 字节) pkg/event_processor/http2_response.go:56-88 |
DefaultParser | 未知/明文 | 备用方案,执行十六进制转储或 C 字符串转换 pkg/event_processor/iparser.go:117-161 |
ProcessStatus 状态机
解析状态通过 ProcessStatus 进行跟踪:
ProcessStateInit:初始状态或重置后 pkg/event_processor/iparser.go:28。ProcessStateProcessing:正在向解析器缓冲区写入字节 pkg/event_processor/iparser.go:29。ProcessStateDone:协议识别和数据提取已完成 pkg/event_processor/iparser.go:30。
Sources: pkg/event_processor/iparser.go:49-60, pkg/event_processor/iparser.go:85-115
代码实体关联
此图将逻辑处理步骤映射到 pkg/event_processor 包中具体的 Go 结构体和接口。
"流水线实体映射"
Sources: pkg/event_processor/processor.go:28-48, pkg/event_processor/iworker.go:34-48, pkg/event_processor/iparser.go:49-60
最终输出与截断
在数据发送到输出写入器之前,流水线会应用最终的转换:
- 截断 (Truncation):如果设置了
--tsize,当载荷缓冲区达到指定限制时,eventWorker将对其进行截断 pkg/event_processor/iworker.go:235-241。 - 格式化 (Formatting):根据配置,数据会被格式化为文本字符串(包含 PID、Comm 和 IP 元数据)或序列化为 Protobuf
LogEntrypkg/event_processor/iworker.go:197-226。 - 十六进制编码 (Hex Encoding):如果启用了
--hex,最终的字节切片在发送到outComing通道之前会被转换为十六进制转储 pkg/event_processor/iworker.go:191-193。
Sources: pkg/event_processor/iworker.go:174-227, cli/cmd/root.go:162-175