Skip to content

Commit

Permalink
Cleanup nfanon
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Mar 30, 2024
1 parent e7cac3e commit ea6322e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 76 deletions.
131 changes: 56 additions & 75 deletions src/nfanon/nfanon.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static inline void AnonRecord(recordHeaderV3_t *v3Record);

static inline dataBlock_t *WriteAnonRecord(nffile_t *wfile, dataBlock_t *dataBlock, recordHeaderV3_t *v3Record);

static void process_data(void *wfile, int verbose);
static void process_data(char *wfile, int verbose);

/* Functions */

Expand Down Expand Up @@ -219,89 +219,65 @@ static inline void AnonRecord(recordHeaderV3_t *v3Record) {

} // End of AnonRecord

static inline dataBlock_t *WriteAnonRecord(nffile_t *wfile, dataBlock_t *dataBlock, recordHeaderV3_t *v3Record) {
// output buffer size check for all expected records
if (!IsAvailable(dataBlock, v3Record->size)) {
// flush block - get an empty one
dataBlock = WriteBlock(wfile, dataBlock);
}

void *buffPtr = GetCurrentCursor(dataBlock);
memcpy(buffPtr, (void *)v3Record, v3Record->size);

dataBlock->NumRecords++;
dataBlock->size += v3Record->size;

return dataBlock;
} // End of WriteAnonRecord

static void process_data(void *wfile, int verbose) {
static void process_data(char *wfile, int verbose) {
const char spinner[4] = {'|', '/', '-', '\\'};
char outfile[MAXPATHLEN], *cfile;
char pathBuff[MAXPATHLEN];

// Get the first file handle
nffile_t *nffile_r = GetNextFile(NULL);
if (!nffile_r) {
LogError("GetNextFile() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno));
return;
}
if (nffile_r == NULL) {
LogError("Empty file list. No files to process\n");
return;
}

int cnt = 1;
cfile = nffile_r->fileName;
char *cfile = nffile_r->fileName;
if (!cfile) {
LogError("(NULL) input file name error in %s line %d\n", __FILE__, __LINE__);
return;
} else {
// prepare output file
snprintf(outfile, MAXPATHLEN - 1, "%s-tmp", cfile);
outfile[MAXPATHLEN - 1] = '\0';
if (verbose) printf(" %i Processing %s\r", cnt++, cfile);
}
if (verbose) printf(" %i Processing %s\r", cnt++, cfile);

nffile_t *nffile_w = NULL;
if (wfile)
nffile_w = OpenNewFile(wfile, NULL, CREATOR_NFANON, FILE_COMPRESSION(nffile_r), NOT_ENCRYPTED);
else
nffile_w = OpenNewFile(outfile, NULL, CREATOR_NFANON, FILE_COMPRESSION(nffile_r), NOT_ENCRYPTED);
char *outFile = NULL;
if (wfile == NULL) {
// prepare output file
snprintf(pathBuff, MAXPATHLEN - 1, "%s-tmp", cfile);
pathBuff[MAXPATHLEN - 1] = '\0';
outFile = pathBuff;
} else {
outFile = wfile;
}

nffile_t *nffile_w = OpenNewFile(outFile, NULL, CREATOR_NFANON, FILE_COMPRESSION(nffile_r), NOT_ENCRYPTED);
if (!nffile_w) {
if (nffile_r) {
CloseFile(nffile_r);
DisposeFile(nffile_r);
}
// can not create output file
CloseFile(nffile_r);
DisposeFile(nffile_r);
return;
}
dataBlock_t *dataBlock_w = WriteBlock(nffile_w, NULL);

SetIdent(nffile_w, FILE_IDENT(nffile_r));
memcpy((void *)nffile_w->stat_record, (void *)nffile_r->stat_record, sizeof(stat_record_t));

dataBlock_t *dataBlock_r = NULL;
dataBlock_t *dataBlock = NULL;
int blk_count = 0;
int done = 0;
while (!done) {
// get next data block from file
dataBlock_r = ReadBlock(nffile_r, dataBlock_r);
dataBlock = ReadBlock(nffile_r, dataBlock);
if (verbose) {
printf("\r%c", spinner[blk_count & 0x3]);
blk_count++;
}

if (dataBlock_r == NULL) {
if (wfile == NULL) {
CloseUpdateFile(nffile_w);
if (rename(outfile, cfile) < 0) {
LogError("rename() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno));
return;
}
if (dataBlock == NULL) {
CloseUpdateFile(nffile_w);
if (wfile == NULL && rename(outFile, cfile) < 0) {
LogError("rename() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno));
return;
}

nffile_t *next = GetNextFile(nffile_r);
if (next == NULL) {
if (GetNextFile(nffile_r) == NULL) {
done = 1;
printf("\nDone\n");
continue;
Expand All @@ -310,39 +286,46 @@ static void process_data(void *wfile, int verbose) {
cfile = nffile_r->fileName;
if (!cfile) {
LogError("(NULL) input file name error in %s line %d\n", __FILE__, __LINE__);
CloseFile(nffile_r);
DisposeFile(nffile_r);
return;
}
if (verbose) printf(" %i Processing %s\r", cnt++, cfile);

if (wfile == NULL) {
snprintf(outfile, MAXPATHLEN - 1, "%s-tmp", cfile);
outfile[MAXPATHLEN - 1] = '\0';

nffile_w = OpenNewFile(outfile, nffile_w, CREATOR_NFANON, FILE_COMPRESSION(nffile_r), NOT_ENCRYPTED);
if (!nffile_w) {
if (nffile_r) {
DisposeFile(nffile_r);
}
return;
}
memcpy((void *)nffile_w->stat_record, (void *)nffile_r->stat_record, sizeof(stat_record_t));
// prepare output file
snprintf(pathBuff, MAXPATHLEN - 1, "%s-tmp", cfile);
pathBuff[MAXPATHLEN - 1] = '\0';
outFile = pathBuff;
} else {
SumStatRecords(nffile_w->stat_record, nffile_r->stat_record);
outFile = wfile;
}

nffile_w = OpenNewFile(outFile, NULL, CREATOR_NFANON, FILE_COMPRESSION(nffile_r), NOT_ENCRYPTED);
if (!nffile_w) {
// can not create output file
CloseFile(nffile_r);
DisposeFile(nffile_r);
return;
}

SetIdent(nffile_w, FILE_IDENT(nffile_r));
memcpy((void *)nffile_w->stat_record, (void *)nffile_r->stat_record, sizeof(stat_record_t));

// continue with next file
continue;
}

if (dataBlock_r->type != DATA_BLOCK_TYPE_2 && dataBlock_r->type != DATA_BLOCK_TYPE_3) {
LogError("Can't process block type %u. Skip block", dataBlock_r->type);
if (dataBlock->type != DATA_BLOCK_TYPE_2 && dataBlock->type != DATA_BLOCK_TYPE_3) {
LogError("Can't process block type %u. Write block unmodified", dataBlock->type);
dataBlock = WriteBlock(nffile_w, dataBlock);
continue;
}

record_header_t *record_ptr = GetCursor(dataBlock_r);
record_header_t *record_ptr = GetCursor(dataBlock);
uint32_t sumSize = 0;
for (int i = 0; i < dataBlock_r->NumRecords; i++) {
if ((sumSize + record_ptr->size) > dataBlock_r->size || (record_ptr->size < sizeof(record_header_t))) {
for (int i = 0; i < dataBlock->NumRecords; i++) {
if ((sumSize + record_ptr->size) > dataBlock->size || (record_ptr->size < sizeof(record_header_t))) {
LogError("Corrupt data file. Inconsistent block size in %s line %d\n", __FILE__, __LINE__);
exit(255);
}
Expand All @@ -351,7 +334,6 @@ static void process_data(void *wfile, int verbose) {
switch (record_ptr->type) {
case V3Record:
AnonRecord((recordHeaderV3_t *)record_ptr);
dataBlock_w = WriteAnonRecord(nffile_w, dataBlock_w, (recordHeaderV3_t *)record_ptr);
break;
case ExporterInfoRecordType:
case ExporterStatRecordType:
Expand All @@ -364,21 +346,20 @@ static void process_data(void *wfile, int verbose) {
LogError("Skip unknown record type %i", record_ptr->type);
}
}

// Advance pointer by number of bytes for netflow record
record_ptr = (record_header_t *)((void *)record_ptr + record_ptr->size);

} // for all records

// write modified block
dataBlock = WriteBlock(nffile_w, dataBlock);

} // while

if (wfile != NULL) CloseUpdateFile(nffile_w);
FreeDataBlock(dataBlock);
DisposeFile(nffile_w);

if (nffile_r) {
FreeDataBlock(dataBlock_r);
CloseFile(nffile_r);
DisposeFile(nffile_r);
}
DisposeFile(nffile_r);

if (verbose) LogError("Processed %i files", --cnt);

Expand Down
4 changes: 3 additions & 1 deletion src/nfcapd/nfcapd.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ static void run(packet_function_t receive_packet, int socket, int pfd, int rfd,
periodic_trigger = 0;
ssize_t cnt = 0;
uint32_t ignored_packets = 0;
uint64_t packets = 0;

// wake up at least at next time slot (twin) + 1s
alarm(t_start + twin + 1 - time(NULL));
Expand Down Expand Up @@ -335,6 +336,7 @@ static void run(packet_function_t receive_packet, int socket, int pfd, int rfd,
LogError("recvfrom() error in '%s', line '%d', cnt: %d:, %s", __FILE__, __LINE__, cnt, strerror(errno));
continue;
}
packets++;
}

/* Periodic file renaming, if time limit reached or if we are done. */
Expand All @@ -355,7 +357,7 @@ static void run(packet_function_t receive_packet, int socket, int pfd, int rfd,
pfd = 0;
}

LogInfo("Total ignored packets: %u", ignored_packets);
LogInfo("Total packets received: %llu avg: %3.2f ignored packets: %u", packets, (double)packets / (double)twin, ignored_packets);
ignored_packets = 0;
periodic_trigger = 0;

Expand Down

0 comments on commit ea6322e

Please sign in to comment.