Event Processing Pipeline
Relevant source files
The following files were used as context for generating this wiki page:
- .gitignore
- cli/cmd/root.go
- pkg/event_processor/base_event.go
- pkg/event_processor/http2_request.go
- pkg/event_processor/http2_request_test.go
- pkg/event_processor/http2_response.go
- pkg/event_processor/http2_response_test.go
- pkg/event_processor/http_request.go
- pkg/event_processor/http_response.go
- pkg/event_processor/http_response_test.go
- pkg/event_processor/iparser.go
- pkg/event_processor/iworker.go
- pkg/event_processor/processor.go
- pkg/event_processor/processor_test.go
- pkg/event_processor/testdata/all.json
- pkg/util/ebpf/bpf.go
- pkg/util/ebpf/bpf_linux.go
- pkg/util/ebpf/bpf_test.go
The Event Processing Pipeline in eCapture is responsible for the reassembly, protocol identification, and formatting of raw data events captured from the eBPF kernel space. It transforms fragmented bytes into structured, human-readable or machine-parsable logs while maintaining the order of events per connection.
Pipeline Overview
The pipeline operates as a multi-stage scheduling system. Events are ingested from the eBPF perf/ring buffers, dispatched to dedicated workers based on connection affinity, parsed using protocol-specific state machines, and finally emitted through configured output writers.
Data Flow Diagram
The following diagram illustrates the transition of data from the kernel through the userspace processing entities.
"Event Processing Flow"
Sources: pkg/event_processor/processor.go:64-87, pkg/event_processor/iworker.go:261-280
EventProcessor Scheduling Loop
The EventProcessor is the central hub for event management. It runs a continuous loop in its Serve() method to handle incoming events and manage the lifecycle of workers.
Key Responsibilities
- Ingestion: Receives
IEventStructobjects from theincomingchannel pkg/event_processor/processor.go:68. - Dispatching: Uses
dispatch()to route events to the correctIWorkerbased on a unique UUID pkg/event_processor/processor.go:89-107. - Connection Management: Monitors
destroyConnto clean up workers when a socket is closed pkg/event_processor/processor.go:78-79. - Output Aggregation: Collects processed byte slices from
outComingand writes them to the finalloggerpkg/event_processor/processor.go:80-81.
Sources: pkg/event_processor/processor.go:28-61
eventWorker and UUID Affinity
To ensure in-order processing of traffic for a specific connection (e.g., a single TLS session), eCapture uses UUID Affinity.
UUID Construction
The UUID is generated from the event metadata to uniquely identify a stream: PID_TID_COMM_FD_DATATYPEpkg/event_processor/base_event.go:122-124
Worker Lifecycle
Each eventWorker represents a single stream of events.
- Affinity: The
EventProcessormaintains aworkerQueuemap where the key is the UUID pkg/event_processor/processor.go:60. - In-Order Execution: Each worker runs its own
Run()goroutine, ensuring that packets for the same connection are processed sequentially pkg/event_processor/iworker.go:93-95. - Timeout Reclamation: A
ticker(default 100ms) triggers the worker to check if data should be flushed or if the worker should be destroyed due to inactivity pkg/event_processor/iworker.go:125.
Sources: pkg/event_processor/iworker.go:69-88, pkg/event_processor/processor.go:128-140
Protocol Identification (IParser)
The IParser interface defines how raw payloads are identified and parsed. When a worker receives the first data for a stream, it uses NewParser() to identify the protocol.
Supported Parsers
| Parser Name | Protocol | Detection Logic |
|---|---|---|
HTTPRequest | HTTP/1.1 Request | Uses http.ReadRequest pkg/event_processor/http_request.go:83-92 |
HTTPResponse | HTTP/1.1 Response | Uses http.ReadResponse pkg/event_processor/http_response.go:94-102 |
HTTP2Request | HTTP/2 Request | Checks for PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n pkg/event_processor/http2_request.go:42-59 |
HTTP2Response | HTTP/2 Response | Validates HTTP/2 Frame Header (9 octets) pkg/event_processor/http2_response.go:56-88 |
DefaultParser | Unknown/Plaintext | Fallback that performs a hex dump or C-string conversion pkg/event_processor/iparser.go:117-161 |
ProcessStatus State Machine
The parsing state is tracked via ProcessStatus:
ProcessStateInit: Initial state or after a reset pkg/event_processor/iparser.go:28.ProcessStateProcessing: Actively writing bytes into the parser's buffer pkg/event_processor/iparser.go:29.ProcessStateDone: Protocol identification and data extraction complete pkg/event_processor/iparser.go:30.
Sources: pkg/event_processor/iparser.go:49-60, pkg/event_processor/iparser.go:85-115
Code Entity Association
This diagram maps the logical processing steps to the specific Go structs and interfaces in the pkg/event_processor package.
"Pipeline Entity Mapping"
Sources: pkg/event_processor/processor.go:28-48, pkg/event_processor/iworker.go:34-48, pkg/event_processor/iparser.go:49-60
Final Output and Truncation
Before data is sent to the output writers, the pipeline applies final transformations:
- Truncation: If
--tsizeis set, theeventWorkerwill truncate the payload buffer once it reaches the specified limit pkg/event_processor/iworker.go:235-241. - Formatting: Depending on the configuration, data is formatted as a text string (with PID, Comm, and IP metadata) or serialized into a Protobuf
LogEntrypkg/event_processor/iworker.go:197-226. - Hex Encoding: If
--hexis enabled, the final byte slice is converted to a hex dump before being sent to theoutComingchannel pkg/event_processor/iworker.go:191-193.
Sources: pkg/event_processor/iworker.go:174-227, cli/cmd/root.go:162-175