FlowWorker线程模块的处理函数为FlowWorker,主要包括五部分功能,分别是Flow处理、应用层协议识别、特征检测、输出、tcp segment清理。这里简单记录flow的处理。

Flow结构体

对流的处理围绕Flow这个数据结构展开,下面贴出结构体声明并介绍各个成员。

Flow结构体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/**
* \brief Flow data structure.
*
* The flow is a global data structure that is created for new packets of a
* flow and then looked up for the following packets of a flow.
*
* Locking
*
* The flow is updated/used by multiple packets at the same time. This is why
* there is a flow-mutex. It's a mutex and not a spinlock because some
* operations on the flow can be quite expensive, thus spinning would be
* too expensive.
*
* The flow "header" (addresses, ports, proto, recursion level) are static
* after the initialization and remain read-only throughout the entire live
* of a flow. This is why we can access those without protection of the lock.
*/

typedef struct Flow_
{
/* flow "header", used for hashing and flow lookup. Static after init,
* so safe to look at without lock */
FlowAddress src, dst;
union {
Port sp; /**< tcp/udp source port */
uint8_t type; /**< icmp type */
};
union {
Port dp; /**< tcp/udp destination port */
uint8_t code; /**< icmp code */
};
uint8_t proto;
uint8_t recursion_level;
uint16_t vlan_id[2];

/** flow hash - the flow hash before hash table size mod. */
uint32_t flow_hash;

/* time stamp of last update (last packet). Set/updated under the
* flow and flow hash row locks, safe to read under either the
* flow lock or flow hash row lock. */
struct timeval lastts;

/* end of flow "header" */

SC_ATOMIC_DECLARE(FlowStateType, flow_state);

/** how many pkts and stream msgs are using the flow *right now*. This
* variable is atomic so not protected by the Flow mutex "m".
*
* On receiving a packet the counter is incremented while the flow
* bucked is locked, which is also the case on timeout pruning.
*/
SC_ATOMIC_DECLARE(FlowRefCount, use_cnt);

/** flow tenant id, used to setup flow timeout and stream pseudo
* packets with the correct tenant id set */
uint32_t tenant_id;

uint32_t probing_parser_toserver_alproto_masks;
uint32_t probing_parser_toclient_alproto_masks;

uint32_t flags; /**< generic flags */

uint16_t file_flags; /**< file tracking/extraction flags */
/* coccinelle: Flow:file_flags:FLOWFILE_ */

/** destination port to be used in protocol detection. This is meant
* for use with STARTTLS and HTTP CONNECT detection */
uint16_t protodetect_dp; /**< 0 if not used */

#ifdef FLOWLOCK_RWLOCK
SCRWLock r;
#elif defined FLOWLOCK_MUTEX
SCMutex m;
#else
#error Enable FLOWLOCK_RWLOCK or FLOWLOCK_MUTEX
#endif

/** protocol specific data pointer, e.g. for TcpSession */
void *protoctx;

/** mapping to Flow's protocol specific protocols for timeouts
and state and free functions. */
uint8_t protomap;

uint8_t flow_end_flags;
/* coccinelle: Flow:flow_end_flags:FLOW_END_FLAG_ */

AppProto alproto; /**< \brief application level protocol */
AppProto alproto_ts;
AppProto alproto_tc;

/** original application level protocol. Used to indicate the previous
protocol when changing to another protocol , e.g. with STARTTLS. */
AppProto alproto_orig;
/** expected app protocol: used in protocol change/upgrade like in
* STARTTLS. */
AppProto alproto_expect;

/** detection engine ctx version used to inspect this flow. Set at initial
* inspection. If it doesn't match the currently in use de_ctx, the
* stored sgh ptrs are reset. */
uint32_t de_ctx_version;

/** Thread ID for the stream/detect portion of this flow */
FlowThreadId thread_id;

/** application level storage ptrs.
*
*/
AppLayerParserState *alparser; /**< parser internal state */
void *alstate; /**< application layer state */

/** toclient sgh for this flow. Only use when FLOW_SGH_TOCLIENT flow flag
* has been set. */
const struct SigGroupHead_ *sgh_toclient;
/** toserver sgh for this flow. Only use when FLOW_SGH_TOSERVER flow flag
* has been set. */
const struct SigGroupHead_ *sgh_toserver;

/* pointer to the var list */
GenericVar *flowvar;

/** hash list pointers, protected by fb->s */
struct Flow_ *hnext; /* hash list */
struct Flow_ *hprev;
struct FlowBucket_ *fb;

/** queue list pointers, protected by queue mutex */
struct Flow_ *lnext; /* list */
struct Flow_ *lprev;
struct timeval startts;

uint32_t todstpktcnt;
uint32_t tosrcpktcnt;
uint64_t todstbytecnt;
uint64_t tosrcbytecnt;
} Flow;
  • src
    四层源地址。
  • dst
    四层目的地址。
  • sp/type
    四层源端口号。
  • dp/code
    四层目的端口号。
  • proto
    四层协议类型,从packet的proto赋值而来。
  • recursion_level
    指示经历了几次隧道封装。普通数据包这个值是0。从packet成员recursion_level赋值而来。
  • vlan_id
    成员长度为2的数组,存储了vlan的id,允许vlan嵌套,数组中的两项分别代表该嵌套层的vlan id。从packet成员vlan_id赋值而来。
  • flow_hash
    从packet的flow_hash赋值过来,未经过flow_config.hash_size取余的哈希值。
  • lastts
    当前flow最后的数据包更新时间。
  • flow_state
    flow的当前状态
    • FLOW_STATE_NEW,
    • FLOW_STATE_ESTABLISHED,
    • FLOW_STATE_CLOSED,
    • FLOW_STATE_LOCAL_BYPASSED,
    • FLOW_STATE_CAPTURE_BYPASSED,
  • use_cnt
  • tenant_id
  • probing_parser_toserver_alproto_masks
    应用层协议检测probing parser方法时,标记该方向已经验证过不符合的应用层协议,避免重复验证。
  • probing_parser_toclient_alproto_masks
    应用层协议检测probing parser方法时,标记该方向已经验证过不符合的应用层协议,避免重复验证。
  • flags
    标明flow的一些标识。见flow.h 43行。
  • file_flags
  • protodetect_dp
  • r/m
  • protoctx
    特定协议使用的数据指针,比如TCP使用的会话信息TcpSession。
  • protomap
    根据packet协议映射得到的枚举型,比如IPPROTO_TCP映射为FLOW_PROTO_TCP。应用层协议检测和协议分析时会使用,因为只对TCP、UDP、ICMP、SCTP四种类型做应用层检测与分析,因此重新映射为新的枚举值,便于用作数组索引项。
  • flow_end_flags
  • alproto
  • alproto_ts
  • alproto_tc
  • alproto_orig
  • de_ctx_version
  • thread_id
  • alparser
  • alstate
  • sgh_toclient
  • sgh_toserver
  • flowvar
  • hnext
    挂载到FlowBucket时的链表指针。
  • hprev
    挂载到FlowBucket时的链表指针。
  • fb
    挂载到FlowBucket时表明flow所属的bucket
  • lnext
    挂载到FlowQueue时的链表指针。比如flow_spare_q、flow_recycle_q。
  • lprev
    挂载到FlowQueue时的链表指针。比如flow_spare_q、flow_recycle_q。
  • startts
    flow初始化时间戳,从首个packet成员ts赋值而来。
  • todstpktcnt
  • tosrcpktcnt
  • todstbytecnt
  • tosrcbytecnt

Flow处理配置初始化

main -> PostConfLoadedSetup -> PreRunInit -> FlowInitConfig,这是流处理配置初始化运行位置。相关变量如下。

  • flow_config
    流处理中配置项集中存储与此。
    • hash_rand
      初始化生成的随机数,用于计算hash值。
    • hash_size
      hash值的范围,也就是bucket的数目。配置文件条目flow.hash-size。默认值65536
    • memcap
      内存使用的上限。配置文件条目flow.memcap。默认值32MB。
    • prealloc
      预先分配的Flow数目。配置文件条目flow.prealloc。默认值10000。
    • emergency_recovery
      flow超时清除在emergency模式时需要达到一定条件才能切换回normal模式,这个条件就是空闲队列与预分配数目比例达到此值。配置文件条目flow.emergency-recovery,合法范围[1, 100]。默认值30。
  • flow_flags
    用于标记当前是否处于FLOW_EMERGENCY模式。未发现其他用处。
  • flow_memuse
    flow和其hash table所使用的内存总量。
  • flow_prune_idx
    从使用中的flow中选出一个清空并再次使用时,使用该值选择遍历开始的hash table行号。
  • flow_spare_q
    空闲flow队列。在初始化时会创建flow_config.prealloc个数的flow放入这个队列中,内存使用计入flow_memuse。队列有锁保护。
  • flow_recycle_q
    等待回收的flow队列。超时的flow会放入此队列,等待后续移入空闲队列。
  • flow_hash
    flow的全局hash table,所有在使用的flow都会存在这里,由一组FlowBucket组成,分配的内存计入flow_memuse中。hash冲突使用链表解决,每个链表就是一个FlowBucket结构。这里内存分配时使用64字节对齐,考虑x86 cpu cache line。这里初始化了每个FlowBucket中的锁和next_ts项,next_tx是一个时间戳,用于提示flow manager检查该bucket。
  • flow_timeouts
    用于flow的超时清除,宏展开后为flow_timeouts_sc_atomit_,这是一个指针,有两个可能的指向,分别是flow_timeouts_normal与flow_timeouts_emerg,这两个变量结构相同,都有4个成员,new_timeout、est_timeout、closed_timeout、bypassed_timeout。当flow的空闲队列为空且内存达到限额无法继续创建flow时会切换为flow_timeouts_emerg,当flow manager发现flow空闲队列长度达到预分配数量的一定百分比时会切换为flow_timeouts_normal,这个百分比由flow_config.emergency_recovery决定。

Flow处理逻辑流程

流处理部分代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* handle Flow */
if (p->flags & PKT_WANTS_FLOW) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);

FlowHandlePacket(tv, fw->dtv, p);
if (likely(p->flow != NULL)) {
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
if (FlowUpdate(p) == TM_ECODE_DONE) {
FLOWLOCK_UNLOCK(p->flow);
return TM_ECODE_OK;
}
}
/* Flow is now LOCKED */

FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);

/* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
* pseudo packet created by the flow manager. */
} else if (p->flags & PKT_HAS_FLOW) {
FLOWLOCK_WRLOCK(p->flow);
}

FlowHandlePacket

这个函数主要功能:

  • 获取一个匹配的flow并与packet关联,可能是从hash table中匹配到的,可能是新创建的,也可能是从hash table中清理出来。
  • 需要的情况下将flow初始化,并适当配置flow状态。
  • 需要的情况下对flow做reuse处理。

具体逻辑如下:

  • 取得一个与packet匹配的flow。这里首先根据packet的flow_hash与flow_config.hash_size取余,定位FlowBucket并上锁。
    • 如果bucket为空。
      • 调用FlowGetNew函数取得一个空的且上锁的flow。如果flow为NULL,则解锁bucket,直接返回NULL,匹配flow失败。
      • 将flow插入bucket。
      • 调用FlowInit根据packet内容填充flow相关内容。并设置flow的flow_hash(packet成员flow_hash相等)与fb(指向bucket的指针)成员。
      • 将flow的flow_state设置为FLOW_STATE_NEW。
      • 将flow成员use_cnt加一。
      • 将packet成员flow指向刚刚获取的flow。
      • 解锁bucket,返回flow。
    • 如果bucket不空。
      • 调用FlowCompare函数循环比较bucket内链接的所有flow,直到遇到一个匹配成功的flow。如果到链表末尾仍未匹配成功则调用FlowGetNew函数获取新flow,如果获取失败则解锁bucket并返回NULL。
      • 匹配到flow后,将flow移动到bucket链表首部(对于新获取的flow,代码中实际将其置于链表尾部,这里并不合理)。
      • 对于bucket中匹配到的flow,由于没有上锁,这里需要先对flow上锁。
      • 这里对于tcp协议检查了是否需要对tcp会话做reuse处理。如果需要resue,则调用TcpReuseReplace函数进行reuse处理。
        • 旧的flow的flags成员增加标记FLOW_TCP_REUSED。
        • 解锁旧flow。
        • 调用FlowGetNew获取新flow,插入bucket头,调用FlowInit初始化,设置flow_hash、fb、thread_id。返回新flow。
      • 将flow成员use_cnt加一。
      • 将packet成员flow指向刚刚获取的flow。
      • 解锁bucket,返回flow。
  • 如果之前匹配流成功,将packet成员flags增加标记PKT_HAS_FLOW。

FlowGetNew用于获取一个空的且上锁的flow,逻辑如下:

  • 从空闲队列取得flow,如果非NULL,锁定该flow,更新ThreadVars中协议相关计数器,返回该flow。
  • 空闲队列空的情况下,检查内存限额是否足够创建新的flow。
    • 如果内存限额不足。
      • 配置全局变量flow_flags增加标记FLOW_EMERGENCY。
      • 超时配置项切换为emergency。
      • 通过信号量唤醒flow manager线程。
      • 尝试从使用中的flow中清理出一个。
    • 如果内存限额足够,则尝试分配内存创建一个新的flow。

FlowInit用于初始化一个flow,根据首个packet数据包填充,逻辑如下:

  • 配置flow成员proto、recursion_level、vlan_id,与packet中成员相同。
  • 如果packet是ipv4或ipv6,配置flow成员src、dst,分别是源地址、目的地址,并配置flow成员flags增加标记FLOW_IPV4或FLOW_IPV6。
  • 针对tcp、udp、icmp、sctp协议,配置flow成员源端口目的端口或type、code。
  • 配置flow成员startts为packet成员ts。
  • 配置flow成员protomap,根据packet协议映射得到的枚举型,比如IPPROTO_TCP映射为FLOW_PROTO_TCP。TODO这里的具体作用,应该有free函数相关。

FlowCompare用于检查flow是否匹配packet,逻辑如下:

  • icmp。
    不关心。
  • tcp。
    源目的地址、源目的端口分别相等或反向相等,proto、recursion_level、vlan_id分别相等,同时这个flow的flags不能包含标记FLOW_TCP_REUSED。
  • udp
    源目的地址、源目的端口分别相等或反向相等,proto、recursion_level、vlan_id分别相等。

FlowUpdate

这个函数主要作用:

  • 判断packet的方向以及是否是该方向的首个数据包。
  • 判断flow是否已经分别处理两个方向的数据包。同时将协议检测的方向标记由flow移动到packet上。如果flow标记了不再检查数据包或载荷,那么对packet做同样的标记。
  • 判断flow状态是否需要bypass,如果需要那么FlowWorker的后续操作都将跳过。

具体逻辑如下:

  • 检查flow成员flow_state状态。
    • 如果状态为FLOW_STATE_CAPTURE_BYPASSED,检查packet的时间戳与flow成员lastts差值大于FLOW_BYPASSED_TIMEOUT的一半,如果大于则更新flow成员lastts为packet的时间戳,同时更新flow状态为FLOW_STATE_LOCAL_BYPASSED。
    • 如果状态不为FLOW_STATE_CAPTURE_BYPASSED,更新flow成员lastts为packet时间戳ts。
  • 判断packet方向。这里的方向判断方式为,flow的发起方为client,被动接收方位server。
    • 如果packet方向为TOSERVER。
      • 更新flow计数器成员todstpktcnt、todstbytecnt。
      • packet成员flowflags设置标记为FLOW_PKT_TOSERVER。
      • 检查flow成员flags,如果没有标记FLOW_TO_DST_SEEN,则增加该标记并对packet成员flowflags增加标记FLOW_PKT_TOSERVER_FIRST。
      • TODO检查flow成员flags,如果包含标记FLOW_PROTO_DETECT_TS_DONE,则移除该标记并对packet成员flags增加标记PKT_PROTO_DETECT_TS_DONE。
    • 如果packet方向为TOCLIENT。
      • 更新flow计数器成员tosrcpktcnt、tosrcbytecnt。
      • packet成员flowflags设置标记为FLOW_PKT_TOCLIENT。
      • 检查flow成员flags,如果没有标记FLOW_TO_SRC_SEEN,则增加该标记并对packet成员flowflags增加标记FLOW_PKT_TOCLIENT_FIRST。
      • TODO检查flow成员flags,如果包含标记FLOW_PROTO_DETECT_TC_DONE,则移除该标记并对packet成员flags增加标记PKT_PROTO_DETECT_TC_DONE。
  • 如果flow成员flags同时包含标记FLOW_TO_DST_SEEN与FLOW_TO_SRC_SEEN。
    • 将packet成员flowflags增加标记FLOW_PKT_ESTABLISHED。
    • 如果flow成员proto不是IPPROTO_TCP,将flow状态修改为FLOW_STATE_ESTABLISHED。我理解这里TCP需要三次握手。
  • 如果flow成员flags包含标记FLOW_NOPACKET_INSPECTION,将packet成员flags增加标记PKT_NOPACKET_INSPECTION。
  • 如果flow成员flags包含标记FLOW_NOPAYLOAD_INSPECTION,将packet成员flags增加标记PKT_NOPAYLOAD_INSPECTION。
  • 检查flow状态如果是FLOW_STATE_CAPTURE_BYPASSED或FLOW_STATE_LOCAL_BYPASSED,将返回TM_ECODE_DONE,这意味着后续将直接解锁flow并由FlowWorker返回,不再对packet进行后续操作。