Skip to content

Commit

Permalink
Fix #516
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Mar 29, 2024
1 parent 32770d5 commit c4a2575
Show file tree
Hide file tree
Showing 32 changed files with 791 additions and 947 deletions.
141 changes: 136 additions & 5 deletions src/collector/collector.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2009-2023, Peter Haag
* Copyright (c) 2009-2024, Peter Haag
* Copyright (c) 2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
* All rights reserved.
*
Expand Down Expand Up @@ -49,6 +49,8 @@

#include "bookkeeper.h"
#include "conf/nfconf.h"
#include "flist.h"
#include "launch.h"
#include "nfdump.h"
#include "nffile.h"
#include "nfxV3.h"
Expand Down Expand Up @@ -354,10 +356,139 @@ FlowSource_t *AddDynamicSource(FlowSource_t **FlowSource, struct sockaddr_storag

} // End of AddDynamicSource

void RotateFlowFiles(time_t t_start, char *time_extension, FlowSource_t *fs, int done) {
// periodic file rotation
struct tm *now = localtime(&t_start);
char fmt[32];
strftime(fmt, sizeof(fmt), time_extension, now);

// prepare sub dir hierarchy
char *subdir = NULL;
if (fs->subdir) {
subdir = GetSubDir(now);
if (!subdir) {
// failed to generate subdir path - put flows into base directory
LogError("Failed to create subdir path!");
// failed to generate subdir path - put flows into base directory
}
}

// for each flow source update the stats, close the file and re-initialize the new file
while (fs) {
char nfcapd_filename[MAXPATHLEN];
char error[255];
nffile_t *nffile = fs->nffile;

// prepare filename
if (subdir) {
if (SetupSubDir(fs->datadir, subdir, error, 255)) {
snprintf(nfcapd_filename, MAXPATHLEN - 1, "%s/%s/nfcapd.%s", fs->datadir, subdir, fmt);
} else {
LogError("Ident: %s, Failed to create sub hier directories: %s", fs->Ident, error);
// skip subdir - put flows directly into current directory
snprintf(nfcapd_filename, MAXPATHLEN - 1, "%s/nfcapd.%s", fs->datadir, fmt);
}
} else {
snprintf(nfcapd_filename, MAXPATHLEN - 1, "%s/nfcapd.%s", fs->datadir, fmt);
}
nfcapd_filename[MAXPATHLEN - 1] = '\0';

// update stat record
// if no flows were collected, fs->msecLast is still 0
// set msecFirst and msecLast and to start of this time slot
if (fs->msecLast == 0) {
fs->msecFirst = 1000LL * (uint64_t)t_start;
fs->msecLast = fs->msecFirst;
}
nffile->stat_record->firstseen = fs->msecFirst;
nffile->stat_record->lastseen = fs->msecLast;

// Flush Exporter Stat to file
FlushExporterStats(fs);
// Flush open datablock
fs->dataBlock = WriteBlock(fs->nffile, fs->dataBlock);
// Close file
CloseUpdateFile(nffile);

// if rename fails, we are in big trouble, as we need to get rid of the old .current
// file otherwise, we will loose flows and can not continue collecting new flows
if (RenameAppend(fs->current, nfcapd_filename) < 0) {
LogError("Ident: %s, Can't rename dump file: %s", fs->Ident, strerror(errno));

// we do not update the books here, as the file failed to rename properly
// otherwise the books may be wrong
} else {
struct stat fstat;

// Update books
stat(nfcapd_filename, &fstat);
UpdateBooks(fs->bookkeeper, t_start, 512 * fstat.st_blocks);
}

// log stats
LogInfo("Ident: '%s' Flows: %llu, Packets: %llu, Bytes: %llu, Sequence Errors: %u, Bad Packets: %u, Blocks: %u", fs->Ident,
(unsigned long long)nffile->stat_record->numflows, (unsigned long long)nffile->stat_record->numpackets,
(unsigned long long)nffile->stat_record->numbytes, nffile->stat_record->sequence_failure, fs->bad_packets, ReportBlocks());

// reset stats
fs->bad_packets = 0;
fs->msecFirst = 0xffffffffffffLL;
fs->msecLast = 0;

if (!done) {
fs->nffile = OpenNewFile(fs->current, fs->nffile, CREATOR_NFCAPD, INHERIT, INHERIT);
if (!fs->nffile) {
LogError("killed due to fatal error: ident: %s", fs->Ident);
break;
}
SetIdent(fs->nffile, fs->Ident);

// Dump all exporters/samplers to the buffer
FlushStdRecords(fs);
}

// next flow source
fs = fs->next;

} // end of while (fs)

} // End of RotateFlowFiles

void TriggerLauncher(time_t t_start, char *time_extension, int pfd, FlowSource_t *fs) {
struct tm *now = localtime(&t_start);
char fmt[32];
strftime(fmt, sizeof(fmt), time_extension, now);

// prepare sub dir hierarchy
char *subdir = NULL;
if (fs->subdir) {
subdir = GetSubDir(now);
if (!subdir) {
// failed to generate subdir path - put flows into base directory
LogError("Failed to create subdir path!");
// failed to generate subdir path - put flows into base directory
}
}

// for each flow source update the stats, close the file and re-initialize the new file
while (fs) {
// trigger launcher if required
// Send launcher message
if (SendLauncherMessage(pfd, t_start, subdir, fmt, fs->datadir, fs->Ident) < 0) {
LogError("Failed to send launcher message");
} else {
LogVerbose("Send launcher message");
}

// next flow source
fs = fs->next;
}
} // End of TriggerLauncher

int FlushInfoExporter(FlowSource_t *fs, exporter_info_record_t *exporter) {
exporter->sysid = AssignExporterID();
fs->exporter_count++;
AppendToBuffer(fs->nffile, (void *)exporter, exporter->header.size);
fs->dataBlock = AppendToBuffer(fs->nffile, fs->dataBlock, (void *)exporter, exporter->header.size);

#ifdef DEVEL
{
Expand Down Expand Up @@ -390,9 +521,9 @@ void FlushStdRecords(FlowSource_t *fs) {

while (e) {
sampler_t *sampler = e->sampler;
AppendToBuffer(fs->nffile, (void *)&(e->info), e->info.header.size);
fs->dataBlock = AppendToBuffer(fs->nffile, fs->dataBlock, (void *)&(e->info), e->info.header.size);
while (sampler) {
AppendToBuffer(fs->nffile, (void *)&(sampler->record), sampler->record.size);
fs->dataBlock = AppendToBuffer(fs->nffile, fs->dataBlock, (void *)&(sampler->record), sampler->record.size);
sampler = sampler->next;
}
e = e->next;
Expand Down Expand Up @@ -447,7 +578,7 @@ void FlushExporterStats(FlowSource_t *fs) {
i++;
e = e->next;
}
AppendToBuffer(fs->nffile, (void *)exporter_stats, size);
fs->dataBlock = AppendToBuffer(fs->nffile, fs->dataBlock, (void *)exporter_stats, size);
free(exporter_stats);

if (i != fs->exporter_count) {
Expand Down
14 changes: 10 additions & 4 deletions src/collector/collector.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2009-2023, Peter Haag
* Copyright (c) 2009-2024, Peter Haag
* Copyright (c) 2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
* All rights reserved.
*
Expand Down Expand Up @@ -80,9 +80,11 @@ typedef struct FlowSource_s {
bookkeeper_t *bookkeeper;

// all about data storage
char *datadir; // where to store data for this source
char *current; // current file name - typically nfcad.current.pid
nffile_t *nffile; // the writing file handle
char *datadir; // where to store data for this source
char *current; // current file name - typically nfcad.current.pid
int subdir; // sub dir structur
nffile_t *nffile; // the writing file handle
dataBlock_t *dataBlock; // writing buffer

// statistical data per source
uint32_t bad_packets;
Expand Down Expand Up @@ -118,6 +120,10 @@ int SetDynamicSourcesDir(FlowSource_t **FlowSource, char *dir);

FlowSource_t *AddDynamicSource(FlowSource_t **FlowSource, struct sockaddr_storage *ss);

void RotateFlowFiles(time_t t_start, char *time_extension, FlowSource_t *fs, int done);

void TriggerLauncher(time_t t_start, char *time_extension, int pfd, FlowSource_t *fs);

void FlushStdRecords(FlowSource_t *fs);

void FlushExporterStats(FlowSource_t *fs);
Expand Down
30 changes: 11 additions & 19 deletions src/ft2nfdump/ft2nfdump.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* All rights reserved.
* Copyright (c) 2009-2023, Peter Haag
* Copyright (c) 2009-2024, Peter Haag
* Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
* Copyright (c) 2001 Mark Fullmer and The Ohio State University
* All rights reserved.
Expand Down Expand Up @@ -33,14 +33,11 @@
*
*/

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -51,10 +48,6 @@
#include <time.h>
#include <unistd.h>

#ifdef HAVE_STDINT_H
#include <stdint.h>
#endif

#include "ftlib.h"
#include "nfdump.h"
#include "nffile.h"
Expand Down Expand Up @@ -174,6 +167,7 @@ static int flows2nfdump(struct ftio *ftio, char *wfile, int compress, int extend
LogError("OpenNewFile() failed.");
return 1;
}
dataBlock_t *dataBlock = WriteBlock(nffile, NULL);

ftio_get_ver(ftio, &ftv);
memset((void *)&fo, 0xFF, sizeof(fo));
Expand All @@ -193,14 +187,13 @@ static int flows2nfdump(struct ftio *ftio, char *wfile, int compress, int extend
while ((rec = ftio_read(ftio))) {
int i, exID;
dbg_printf("FT record %u\n", cnt);
if (!CheckBufferSpace(nffile, recordSize)) {
// fishy! - should never happen. maybe disk full?
LogError("ft2nfdump: output buffer size error. Abort record processing");
CloseFile(nffile);
return 1;
if (!IsAvailable(dataBlock, recordSize)) {
// flush block - get an empty one
dataBlock = WriteBlock(nffile, dataBlock);
}

AddV3Header(nffile->buff_ptr, recordHeader);
void *buffPtr = GetCurrentCursor(dataBlock);
AddV3Header(buffPtr, recordHeader);

// header data
recordHeader->engineType = *((uint8_t *)(rec + fo.engine_type));
Expand Down Expand Up @@ -266,13 +259,11 @@ static int flows2nfdump(struct ftio *ftio, char *wfile, int compress, int extend
}

// update file record size ( -> output buffer size )
nffile->block_header->NumRecords++;
nffile->block_header->size += recordSize;
dataBlock->NumRecords++;
dataBlock->size += recordSize;

dbg_assert(recordHeader->size == recordSize);

nffile->buff_ptr += recordSize;

if (extended) {
flow_record_short(stdout, recordHeader);
}
Expand All @@ -283,6 +274,7 @@ static int flows2nfdump(struct ftio *ftio, char *wfile, int compress, int extend
} /* while */

SetIdent(nffile, ident);
FlushBlock(nffile, dataBlock);
CloseUpdateFile(nffile);
return 0;

Expand Down
4 changes: 2 additions & 2 deletions src/include/exporter.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2023, Peter Haag
* Copyright (c) 2012-2024, Peter Haag
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
Expand Down Expand Up @@ -188,7 +188,7 @@ int AddSamplerLegacyRecord(samplerV0_record_t *sampler_record);

int AddExporterStat(exporter_stats_record_t *stat_record);

void ExportExporterList(nffile_t *nffile);
dataBlock_t *ExportExporterList(nffile_t *nffile, dataBlock_t *dataBlock);

exporter_t *GetExporterInfo(int exporterID);

Expand Down
52 changes: 11 additions & 41 deletions src/inline/nffile_inline.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,9 @@
*
*/

static inline size_t CheckBufferSpace(nffile_t *nffile, size_t required);

static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *recordHeaderV3, uint32_t flowCount);

static inline void AppendToBuffer(nffile_t *nffile, void *record, size_t required);

static inline size_t CheckBufferSpace(nffile_t *nffile, size_t required) {
// if actual output size is unknown, make sure at least
// MAXRECORDSIZE is available
if (required == 0) {
required = MAXRECORDSIZE;
}
dbg_printf("Buffer Size %u, check for %zu\n", nffile->block_header->size, required);

// flush current buffer to disc
if ((nffile->block_header->size + required) > WRITE_BUFFSIZE) {
if (required > WRITE_BUFFSIZE) {
// this should never happen, but catch it anyway
LogError("Required buffer size %zu too big for output buffer!", required);
return 0;
}

if (WriteBlock(nffile) <= 0) {
LogError("Failed to write output buffer to disk: '%s'", strerror(errno));
return 0;
}
}

dbg_printf("CheckBuffer returns %u\n", WRITE_BUFFSIZE - nffile->block_header->size);
return WRITE_BUFFSIZE - nffile->block_header->size;

} // End of CheckBufferSpace
static inline dataBlock_t *AppendToBuffer(nffile_t *nffile, dataBlock_t *dataBlock, void *record, size_t required);

static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *recordHeaderV3, uint32_t flowCount) {
if (handle->extensionList[SSLindex]) free(handle->extensionList[SSLindex]);
Expand Down Expand Up @@ -99,20 +70,19 @@ static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *reco
return 1;
}

static inline void AppendToBuffer(nffile_t *nffile, void *record, size_t required) {
// flush current buffer to disc
if (!CheckBufferSpace(nffile, required)) {
return;
static inline dataBlock_t *AppendToBuffer(nffile_t *nffile, dataBlock_t *dataBlock, void *record, size_t required) {
if (!IsAvailable(dataBlock, required)) {
// flush block - get an empty one
dataBlock = WriteBlock(nffile, dataBlock);
// map output memory buffer
}

void *cur = GetCurrentCursor(dataBlock);
// enough buffer space available at this point
memcpy(nffile->buff_ptr, record, required);
memcpy(cur, record, required);

// update stat
nffile->block_header->NumRecords++;
nffile->block_header->size += required;

// advance write pointer
nffile->buff_ptr = (void *)((pointer_addr_t)nffile->buff_ptr + required);
dataBlock->NumRecords++;
dataBlock->size += required;

return dataBlock;
} // End of AppendToBuffer
Loading

0 comments on commit c4a2575

Please sign in to comment.