Skip to content

事件处理与解析器

本文档为使用 eCapture 事件处理系统和解析器的开发人员提供全面指南。内容涵盖事件处理架构、解析器系统、协议检测机制,以及添加新事件类型和实现自定义解析器的分步说明。

有关事件结构和类型本身的信息,请参见事件结构与类型。有关更广泛的事件处理流程架构,请参见事件处理流程。有关特定协议的解析实现,请参见协议解析


事件处理架构概述

事件处理系统将原始 eBPF 事件转换为格式化的、协议感知的输出。该系统由三个主要层次组成:

来源: pkg/event_processor/processor.go:1-216, pkg/event_processor/iworker.go:1-366, pkg/event_processor/iparser.go:1-167


事件结构 (IEventStruct)

所有进入处理系统的事件都实现了 IEventStruct 接口,该接口为事件处理提供了标准契约。

IEventStruct 接口

该接口定义在 user/event/ievent.go:41-52

方法返回类型用途
Decode(payload []byte)error将原始 eBPF 数据反序列化为结构化事件
Payload()[]byte返回事件有效载荷数据
PayloadLen()int返回有效载荷长度
String()string返回人类可读的表示形式
StringHex()string返回十六进制转储表示形式
Clone()IEventStruct创建事件的深拷贝
EventType()Type返回事件分类(Output、ModuleData、EventProcessor)
GetUUID()string返回唯一连接标识符
Base()Base返回基础元数据(PID、IP、端口等)
ToProtobufEvent()*pb.Event转换为 protobuf 格式以进行序列化

事件类型

事件按其预期处理路径进行分类 user/event/ievent.go:26-37

来源: user/event/ievent.go:15-71


EventProcessor 组件

EventProcessor 是接收来自模块的事件并将其路由到适当 worker 的中央协调器。它维护一个 worker 池,每个 worker 负责处理来自特定连接的事件。

EventProcessor 结构

来自 pkg/event_processor/processor.go:30-50 的关键字段:

字段类型用途
incomingchan event.IEventStruct从模块接收新事件(缓冲区:1024)
outComingchan []byte向日志记录器发送格式化输出(缓冲区:1024)
destroyConnchan uint64发送套接字销毁信号以进行清理(缓冲区:1024)
workerQueuemap[string]IWorker将 UUID 映射到 worker 实例
loggerio.Writer输出目标(控制台、文件、WebSocket)
isHexbool是否以十六进制格式输出
truncateSizeuint64截断前的最大有效载荷大小
closeChanchan bool关闭信号
errChanchan error错误报告通道(缓冲区:16)

事件处理流程

EventProcessor 方法

核心方法:

Worker 管理:

来源: pkg/event_processor/processor.go:1-216


Worker 系统 (IWorker)

每个 eventWorker 处理单个连接(由 UUID 标识)的事件,累积有效载荷片段并在适当时触发解析。

IWorker 接口

定义在 pkg/event_processor/iworker.go:35-49

go
type IWorker interface {
    Write(event.IEventStruct) error      // 向处理队列添加事件
    GetUUID() string                      // 返回 worker 的 UUID
    GetDestroyUUID() uint64               // 返回用于生命周期跟踪的套接字 ID
    IfUsed() bool                         // 检查 worker 当前是否被引用
    Get()                                 // 增加引用计数
    Put()                                 // 减少引用计数
    CloseEventWorker()                    // 发信号通知 worker 关闭
}

Worker 生命周期模型

Worker 支持两种不同的生命周期模型 pkg/event_processor/iworker.go:57-63

1. LifeCycleStateDefault(自我管理)

  • 在第一个事件到达时创建
  • MaxTickerCount(10 × 100ms = 1 秒)不活动后自我销毁
  • closeChannil
  • 用于短期连接

2. LifeCycleStateSock(套接字绑定)

  • 为 UUID 前缀为 sock: 的事件创建 user/event/ievent.go:39
  • 持续存在直到显式调用 CloseEventWorker()
  • closeChan 为非 nil
  • Worker 从 UUID 提取套接字 ID:sock:Pid_Tid_Comm_Fd_DataType_Tuple_Sock
  • 用于长期套接字连接

Worker 事件处理循环

Run() 方法 pkg/event_processor/iworker.go:262-306 实现主事件处理循环:

引用计数和线程安全

Worker 使用原子引用计数来防止销毁期间的竞态条件 pkg/event_processor/iworker.go:346-360

  • Get() - 原子地将 used 标志设置为 true,由 getWorkerByUUID() 在返回 worker 之前调用
  • Put() - 原子地将 used 标志设置为 false,在写入事件后调用
  • IfUsed() - 检查 worker 当前是否被引用

drainAndClose() 方法 pkg/event_processor/iworker.go:308-337 通过以下方式确保安全清理:

  1. 排空 incoming 通道中的剩余事件
  2. 等待直到 IfUsed() 返回 false(没有其他 goroutine 持有引用)
  3. 调用 Close() 输出最终结果

来源: pkg/event_processor/iworker.go:1-366


解析器系统 (IParser)

解析器将累积的有效载荷字节转换为人类可读的、协议特定的输出。该系统支持基于有效载荷内容的动态解析器选择。

IParser 接口

定义在 pkg/event_processor/iparser.go:49-60

go
type IParser interface {
    detect(b []byte) error        // 尝试从有效载荷识别协议
    Write(b []byte) (int, error)  // 累积有效载荷数据
    ParserType() ParserType       // 返回解析器类型标识符
    PacketType() PacketType       // 返回数据包编码(gzip 等)
    Name() string                 // 返回人类可读的解析器名称
    IsDone() bool                 // 检查解析是否完成
    Init()                        // 初始化解析器状态
    Display() []byte              // 生成格式化输出
    Reset()                       // 重置解析器以供重用
}

解析器类型

系统定义了几种解析器类型 pkg/event_processor/iparser.go:40-47

ParserType用途
ParserTypeNull0默认/未知协议(十六进制转储)
ParserTypeHttpRequest1HTTP/1.x 请求解析
ParserTypeHttp2Request2HTTP/2 请求解析
ParserTypeHttpResponse3HTTP/1.x 响应解析
ParserTypeHttp2Response4HTTP/2 响应解析
ParserTypeWebSocket5WebSocket 帧解析

数据包类型

解析器可以识别数据包编码 pkg/event_processor/iparser.go:33-38

PacketType用途
PacketTypeNull未压缩数据
PacketTypeGzipGzip 压缩数据
PacketTypeWebSocketWebSocket 帧数据

来源: pkg/event_processor/iparser.go:1-167


协议检测机制

解析器选择过程使用尝试每个解析器的方法,并自动回退到十六进制转储。

解析器注册

解析器在包初始化期间使用 Register() 函数注册自己 pkg/event_processor/iparser.go:64-73

go
func init() {
    hr := &HTTPRequest{}
    hr.Init()
    Register(hr)  // 添加到全局 parsers 映射
}

所有注册的解析器都存储在全局 parsers 映射中 pkg/event_processor/iparser.go:62

检测流程

NewParser() 函数 pkg/event_processor/iparser.go:85-115 实现检测逻辑:

示例:HTTP 请求检测

HTTPRequest 解析器 pkg/event_processor/http_request.go:83-92 使用 Go 的 http.ReadRequest() 来检测有效的 HTTP 请求:

go
func (hr *HTTPRequest) detect(payload []byte) error {
    rd := bytes.NewReader(payload)
    buf := bufio.NewReader(rd)
    _, err := http.ReadRequest(buf)
    if err != nil {
        return err  // 不是有效的 HTTP 请求
    }
    return nil  // 检测到有效的 HTTP 请求
}

HTTP 响应的类似方法 pkg/event_processor/http_response.go:94-102

go
func (hr *HTTPResponse) detect(payload []byte) error {
    rd := bytes.NewReader(payload)
    buf := bufio.NewReader(rd)
    _, err := http.ReadResponse(buf, nil)
    if err != nil {
        return err  // 不是有效的 HTTP 响应
    }
    return nil  // 检测到有效的 HTTP 响应
}

来源: pkg/event_processor/iparser.go:85-115, pkg/event_processor/http_request.go:83-92, pkg/event_processor/http_response.go:94-102


添加新事件类型

要添加流经处理系统的新事件类型:

步骤 1:定义事件结构

在适当的 user/event/ 子目录中创建实现 IEventStruct 的新结构:

go
// user/event/mymodule/my_event.go
package mymodule

import (
    "encoding/binary"
    "github.com/gojue/ecapture/user/event"
    pb "github.com/gojue/ecapture/protobuf/gen/v1"
)

type MyCustomEvent struct {
    event.Base  // 嵌入基础事件元数据
    CustomField1 uint32
    CustomField2 [64]byte
}

func (e *MyCustomEvent) Decode(payload []byte) error {
    // 从 eBPF C 结构布局反序列化
    if len(payload) < 104 {  // sizeof(Base) + custom fields
        return errors.New("payload too short")
    }
    
    // 解码基础字段
    e.Base.Decode(payload)
    
    // 解码自定义字段
    offset := 40  // Base 结构的大小
    e.CustomField1 = binary.LittleEndian.Uint32(payload[offset:])
    copy(e.CustomField2[:], payload[offset+4:offset+68])
    
    return nil
}

func (e *MyCustomEvent) Clone() event.IEventStruct {
    newEvent := *e
    return &newEvent
}

func (e *MyCustomEvent) EventType() event.Type {
    return event.TypeEventProcessor  // 路由到 EventProcessor
}

func (e *MyCustomEvent) GetUUID() string {
    // 生成用于连接分组的唯一标识符
    return fmt.Sprintf("%d_%d_%s_%d", 
        e.PID, e.TID, e.PName, e.CustomField1)
}

// 实现其余的 IEventStruct 方法...

步骤 2:模块集成

在您的模块的 Decode() 方法中,实例化并填充您的事件:

go
func (m *MyModule) Decode(em *ebpf.Map, b []byte) (event.IEventStruct, error) {
    e := &mymodule.MyCustomEvent{}
    err := e.Decode(b)
    if err != nil {
        return nil, err
    }
    return e, nil
}

步骤 3:发送到 EventProcessor

在您的模块的事件读取循环中,将事件写入 EventProcessor:

go
// 在模块的 Run() 方法中
for {
    select {
    case event := <-eventChannel:
        m.processor.Write(event)  // 路由到 EventProcessor
    }
}

来源: user/event/ievent.go:41-52, pkg/event_processor/processor.go:165-175


实现自定义解析器

要添加对新协议或输出格式的支持:

步骤 1:创建解析器结构

go
// pkg/event_processor/my_protocol.go
package event_processor

import (
    "bytes"
    "bufio"
)

type MyProtocolParser struct {
    reader    *bytes.Buffer
    bufReader *bufio.Reader
    isDone    bool
    isInit    bool
    // 协议特定字段
    header    MyProtocolHeader
}

步骤 2:实现 IParser 接口

初始化解析器

go
func (p *MyProtocolParser) Init() {
    p.reader = bytes.NewBuffer(nil)
    p.bufReader = bufio.NewReader(p.reader)
    p.isDone = false
    p.isInit = false
}

func (p *MyProtocolParser) Name() string {
    return "MyProtocolParser"
}

实现检测逻辑

go
func (p *MyProtocolParser) detect(payload []byte) error {
    // 检查协议魔术字节或结构
    if len(payload) < 4 {
        return errors.New("payload too short")
    }
    
    // 示例:检查魔术数字
    magic := binary.BigEndian.Uint32(payload[0:4])
    if magic != MY_PROTOCOL_MAGIC {
        return errors.New("not my protocol")
    }
    
    return nil
}

累积数据

go
func (p *MyProtocolParser) Write(b []byte) (int, error) {
    // 第一次写入:解析头部
    if !p.isInit {
        n, err := p.reader.Write(b)
        if err != nil {
            return n, err
        }
        
        // 尝试解析头部
        err = p.parseHeader()
        if err != nil {
            return 0, err
        }
        
        p.isInit = true
        return n, nil
    }
    
    // 后续写入:累积主体
    n, err := p.reader.Write(b)
    if err != nil {
        return n, err
    }
    
    // 根据协议长度字段检查是否完整
    if p.reader.Len() >= p.header.TotalLength {
        p.isDone = true
    }
    
    return n, nil
}

格式化输出

go
func (p *MyProtocolParser) Display() []byte {
    var output bytes.Buffer
    
    // 格式化头部
    fmt.Fprintf(&output, "My Protocol v%d\n", p.header.Version)
    fmt.Fprintf(&output, "Type: %d\n", p.header.MessageType)
    fmt.Fprintf(&output, "Length: %d\n", p.header.TotalLength)
    fmt.Fprintf(&output, "\n")
    
    // 格式化主体
    body := p.reader.Bytes()[p.header.HeaderSize:]
    output.Write(body)
    
    return output.Bytes()
}

实现其余方法

go
func (p *MyProtocolParser) ParserType() ParserType {
    return ParserTypeNull  // 或定义新的 ParserType 常量
}

func (p *MyProtocolParser) PacketType() PacketType {
    return PacketTypeNull
}

func (p *MyProtocolParser) IsDone() bool {
    return p.isDone
}

func (p *MyProtocolParser) Reset() {
    p.isDone = false
    p.isInit = false
    p.reader.Reset()
    p.bufReader.Reset(p.reader)
}

步骤 3:注册解析器

添加一个 init() 函数以全局注册您的解析器:

go
func init() {
    p := &MyProtocolParser{}
    p.Init()
    Register(p)  // 使解析器可用于检测
}

步骤 4:处理特殊情况

压缩支持

如果您的协议支持压缩(如 HTTP gzip),在 Display() 中解压缩:

go
func (p *MyProtocolParser) Display() []byte {
    rawData := p.reader.Bytes()
    
    if p.isCompressed {
        reader, err := gzip.NewReader(bytes.NewReader(rawData))
        if err != nil {
            return rawData  // 错误时返回原始数据
        }
        defer reader.Close()
        
        decompressed, err := io.ReadAll(reader)
        if err != nil {
            return rawData
        }
        
        p.packerType = PacketTypeGzip
        return decompressed
    }
    
    return rawData
}

分块传输

如果您的协议使用分块传输编码,累积块:

go
func (p *MyProtocolParser) Write(b []byte) (int, error) {
    n, err := p.reader.Write(b)
    if err != nil {
        return n, err
    }
    
    // 检查块终止符
    if bytes.HasSuffix(p.reader.Bytes(), []byte("0\r\n\r\n")) {
        p.isDone = true
    }
    
    return n, nil
}

来源: pkg/event_processor/iparser.go:49-167, pkg/event_processor/http_request.go:1-164, pkg/event_processor/http_response.go:1-182


示例:HTTP 请求解析器实现

现有的 HTTP 请求解析器提供了完整的实现示例:

结构

pkg/event_processor/http_request.go:28-35

go
type HTTPRequest struct {
    request    *http.Request     // 已解析的 HTTP 请求
    packerType PacketType        // 压缩类型
    isDone     bool              // 解析完成标志
    isInit     bool              // 头部已解析标志
    reader     *bytes.Buffer     // 原始数据累加器
    bufReader  *bufio.Reader     // 用于解析的缓冲读取器
}

关键方法

检测 pkg/event_processor/http_request.go:83-92

  • 使用 Go 的标准 http.ReadRequest() 来验证 HTTP 语法
  • 如果有效载荷不是有效的 HTTP 请求则返回错误

写入 pkg/event_processor/http_request.go:54-81

  • 第一次写入时:解析 HTTP 请求行和头部
  • 后续写入时:累积主体数据
  • 当主体完整时设置 isDone(尽管目前始终为 false)

显示 pkg/event_processor/http_request.go:105-157

  • 使用 io.ReadAll() 读取请求主体
  • 检测并解压缩 gzip 编码的主体
  • 使用 httputil.DumpRequest() 格式化头部
  • 附加主体内容

HTTP 响应解析器

pkg/event_processor/http_response.go:28-37 中的类似结构,主要区别:

来源: pkg/event_processor/http_request.go:1-164, pkg/event_processor/http_response.go:1-182


解析器注册系统

全局解析器注册表

解析器系统维护一个全局注册表 pkg/event_processor/iparser.go:62

go
var parsers = make(map[string]IParser)

注册函数

pkg/event_processor/iparser.go:64-73

go
func Register(p IParser) {
    if p == nil {
        panic("Register Parser is nil")
    }
    name := p.Name()
    if _, dup := parsers[name]; dup {
        panic(fmt.Sprintf("Register called twice for Parser %s", name))
    }
    parsers[name] = p
}

访问解析器

两个实用函数提供对已注册解析器的访问:

初始化时注册模式

每个解析器文件包含一个 init() 函数,在包加载时注册解析器:

go
func init() {
    hr := &HTTPRequest{}
    hr.Init()
    Register(hr)
}

这种模式确保在事件处理期间调用 NewParser() 之前所有解析器都已自动注册。

来源: pkg/event_processor/iparser.go:62-83, pkg/event_processor/http_request.go:159-163, pkg/event_processor/http_response.go:177-181


完整事件处理示例

以下是完整事件如何流经系统的示例:

来源: pkg/event_processor/processor.go:66-109, pkg/event_processor/iworker.go:91-137, pkg/event_processor/iworker.go:262-306, pkg/event_processor/iparser.go:85-115


测试自定义解析器

单元测试模式

为您的解析器创建一个测试文件:

go
// pkg/event_processor/my_protocol_test.go
package event_processor

import (
    "testing"
)

func TestMyProtocolParser_Detect(t *testing.T) {
    p := &MyProtocolParser{}
    p.Init()
    
    // 有效载荷
    validPayload := []byte{0x12, 0x34, 0x56, 0x78, ...}
    err := p.detect(validPayload)
    if err != nil {
        t.Errorf("detect() 在有效载荷上失败:%v", err)
    }
    
    // 无效载荷
    invalidPayload := []byte{0xFF, 0xFF, 0xFF, 0xFF}
    err = p.detect(invalidPayload)
    if err == nil {
        t.Error("detect() 应该在无效载荷上失败")
    }
}

func TestMyProtocolParser_WriteAndDisplay(t *testing.T) {
    p := &MyProtocolParser{}
    p.Init()
    
    // 写入载荷块
    chunk1 := []byte{...}
    chunk2 := []byte{...}
    
    n, err := p.Write(chunk1)
    if err != nil {
        t.Fatalf("Write() 失败:%v", err)
    }
    
    n, err = p.Write(chunk2)
    if err != nil {
        t.Fatalf("Write() 失败:%v", err)
    }
    
    // 检查是否完成
    if !p.IsDone() {
        t.Error("解析器在完整载荷后应该完成")
    }
    
    // 获取格式化输出
    output := p.Display()
    if len(output) == 0 {
        t.Error("Display() 返回空输出")
    }
}

集成测试

使用完整的 EventProcessor 测试您的解析器:

go
func TestMyProtocolParser_Integration(t *testing.T) {
    // 使用测试日志记录器创建 EventProcessor
    var buf bytes.Buffer
    logger := &buf
    ep := NewEventProcessor(logger, false, 0)
    
    // 启动处理器
    go ep.Serve()
    
    // 创建测试事件
    event := &MyTestEvent{
        Base: event.Base{
            PID: 1234,
            // ... 其他字段
        },
        // ... 自定义字段
    }
    
    // 写入事件
    ep.Write(event)
    
    // 等待处理
    time.Sleep(2 * time.Second)
    
    // 检查输出
    output := buf.String()
    if !strings.Contains(output, "My Protocol") {
        t.Errorf("输出不包含预期内容:%s", output)
    }
    
    // 清理
    ep.Close()
}

来源: pkg/event_processor/iparser.go:49-167, pkg/event_processor/processor.go:206-215


总结

eCapture 事件处理和解析器系统提供:

  1. EventProcessor - 中央协调器,按 UUID 将事件路由到 worker
  2. eventWorker - 按连接的有效载荷累积,具有两种生命周期模型(默认和套接字绑定)
  3. IParser - 具有自动检测和回退的协议感知解析
  4. 解析器注册表 - 用于可扩展性的全局注册系统

要扩展系统:

  • 定义实现 IEventStruct 的新事件类型
  • 创建实现 IParser 的解析器,包含 detect()Write()Display() 方法
  • init() 函数中注册解析器以实现自动发现
  • 单独测试解析器并与完整的 EventProcessor 集成测试

该架构通过引用计数确保线程安全操作,支持短期和长期连接,并提供具有载荷排空的优雅关闭。

事件处理与解析器 has loaded