-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathConversationReconstructor.cpp
239 lines (201 loc) · 6.83 KB
/
ConversationReconstructor.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#include "ConversationReconstructor.h"
#include "types.h"
#include "TcpConnection.h"
#include "UdpConversation.h"
#include "IcmpConversation.h"
#include <assert.h>
#include <algorithm>
namespace FeatureExtractor {
using namespace std;
ConversationReconstructor::ConversationReconstructor()
: timeouts()
, timeout_interval(timeouts.get_conversation_check_interval_ms())
{
}
ConversationReconstructor::ConversationReconstructor(Config &timeouts)
: timeouts(timeouts)
, timeout_interval(timeouts.get_conversation_check_interval_ms())
{
}
ConversationReconstructor::~ConversationReconstructor()
{
// Deallocate leftover conversations in output queue
while (!output_queue.empty()) {
delete output_queue.front();
output_queue.pop();
}
// Deallocate leftover active conversations
for (ConversationMap::iterator it = conv_map.begin(); it != conv_map.end(); ++it) {
delete it->second;
}
}
void ConversationReconstructor::add_packet(const Packet *packet)
{
// Remove timed out reassembly conversations
Timestamp now = packet->get_end_ts();
check_timeouts(now);
FiveTuple key = packet->get_five_tuple();
Conversation *conversation = nullptr;
ip_field_protocol_t ip_proto = key.get_ip_proto();
// Find or insert with single lookup:
// http://stackoverflow.com/a/101980/3503528
// - iterator can will also used to remove finished connection from map
// - if connection not found, try with swapped src & dst (opposite direction)
ConversationMap::iterator it = conv_map.lower_bound(key);
if (it != conv_map.end() && !(conv_map.key_comp()(key, it->first)))
{
// Key (connection) already exists
conversation = it->second;
}
else {
// If not found, try with opposite direction for TCP & UDP (bidirectional)
if (ip_proto == TCP || ip_proto == UDP) {
FiveTuple rev_key = key.get_reversed();
ConversationMap::iterator rev_it = conv_map.lower_bound(rev_key);
if (rev_it != conv_map.end() && !(conv_map.key_comp()(rev_key, rev_it->first)))
{
// Key for opposite direction already exists
conversation = rev_it->second;
it = rev_it; // Remember iterator if connection should be erased below
}
}
}
// The key (connection) does not exist in the map
if (!conversation) {
switch (ip_proto)
{
case TCP:
conversation = new TcpConnection(packet);
break;
case UDP:
conversation = new UdpConversation(packet);
break;
case ICMP:
conversation = new IcmpConversation(packet);
break;
default:
break;
}
assert(conversation != nullptr && "Attempt to add NULL "
"conversation to conversation map. Possible unhadnled IP protocol value");
it = conv_map.insert(it, ConversationMap::value_type(key, conversation));
}
// Pass new packet to conversation
bool is_finished = conversation->add_packet(packet);
// If connection is in final state, remove it from map & enqueue to output
if (is_finished) {
conv_map.erase(it);
output_queue.push(conversation);
}
}
void ConversationReconstructor::report_time(const Timestamp &now)
{
check_timeouts(now);
}
Conversation *ConversationReconstructor::get_next_conversation()
{
if (output_queue.empty())
return nullptr;
Conversation *conv = output_queue.front();
output_queue.pop();
return conv;
}
void ConversationReconstructor::check_timeouts(const Timestamp &now)
{
// find, sort, add to queue
// Run no more often than once per timeout check interval
if (!timeout_interval.is_timedout(now)) {
timeout_interval.update_time(now);
return;
}
timeout_interval.update_time(now);
// Maximal timestamp that timedout connection can have
Timestamp max_tcp_syn = now - (timeouts.get_tcp_syn_timeout() * 1000000);
Timestamp max_tcp_estab = now - (timeouts.get_tcp_estab_timeout() * 1000000);
Timestamp max_tcp_rst = now - (timeouts.get_tcp_rst_timeout() * 1000000);
Timestamp max_tcp_fin = now - (timeouts.get_tcp_fin_timeout() * 1000000);
Timestamp max_tcp_last_ack = now - (timeouts.get_tcp_last_ack_timeout() * 1000000);
Timestamp max_udp = now - (timeouts.get_udp_timeout() * 1000000);
Timestamp max_icmp = now - (timeouts.get_icmp_timeout() * 1000000);
// Temporary list of timed out conversations
vector<Conversation *> timedout_convs;
// Erasing during iteration available since C++11
// http://stackoverflow.com/a/263958/3503528
ConversationMap::iterator it = conv_map.begin();
while (it != conv_map.end()) {
bool is_timedout = false;
Conversation *conv = it->second;
ip_field_protocol_t ip_proto = conv->get_five_tuple_ptr()->get_ip_proto();
// Check if conversation is timedout
if (ip_proto == UDP) {
is_timedout = (conv->get_last_ts() <= max_udp);
}
else if (ip_proto == ICMP) {
is_timedout = (conv->get_last_ts() <= max_icmp);
}
else if (ip_proto == TCP) {
switch (conv->get_internal_state()) {
case S0:
case S1:
is_timedout = (conv->get_last_ts() <= max_tcp_syn);
break;
case ESTAB:
is_timedout = (conv->get_last_ts() <= max_tcp_estab);
break;
case REJ:
case RSTO:
case RSTOS0:
case RSTR:
is_timedout = (conv->get_last_ts() <= max_tcp_rst);
break;
case S2:
case S3:
is_timedout = (conv->get_last_ts() <= max_tcp_fin);
break;
case S2F:
case S3F:
is_timedout = (conv->get_last_ts() <= max_tcp_last_ack);
break;
default:
break;
}
}
// If buffer is timed out, remove conversation from active conversations
// and to temporary list of timed out conversations
if (is_timedout) {
timedout_convs.push_back(conv);
conv_map.erase(it++);
}
else {
++it;
}
} // end of while(it..
// Sort timed out conversations by timestamp of last fragmet seen
// Overriden operator '<' of class Conversation is used
sort(timedout_convs.begin(), timedout_convs.end());
// Add timedout conversation to output queue in order of their last timestamp
for (vector<Conversation *>::iterator it = timedout_convs.begin(); it != timedout_convs.end(); ++it) {
output_queue.push(*it);
}
}
void ConversationReconstructor::finish_all_conversations()
{
// Temporary list of timed out conversations
vector<Conversation *> timedout_convs;
// Erasing during iteration available since C++11
// http://stackoverflow.com/a/263958/3503528
ConversationMap::iterator it = conv_map.begin();
while (it != conv_map.end()) {
Conversation *conv = it->second;
timedout_convs.push_back(conv);
conv_map.erase(it++);
}
// Sort timed out conversations by timestamp of last fragmet seen
// Overriden operator '<' of class Conversation is used
sort(timedout_convs.begin(), timedout_convs.end());
// Add timeout conversation to output queue in order of their last timestamp
for (vector<Conversation *>::iterator it = timedout_convs.begin(); it != timedout_convs.end(); ++it) {
output_queue.push(*it);
}
}
}