From dec0d0ae49d2aeb03ebed2754f06bcc8e26b6282 Mon Sep 17 00:00:00 2001 From: Jeff Lucovsky Date: Tue, 30 Apr 2024 10:44:54 -0400 Subject: [PATCH] output/log: Add flushing infrastructure Issue: 3449 Add flushing functions and infrastructure. This includes: - Flushing functions for packet loggers - Log file flushing support --- src/output-eve-stream.c | 2 +- src/output-json-alert.c | 10 +++++- src/output-json-anomaly.c | 10 +++++- src/output-json-common.c | 9 ++++++ src/output-json-drop.c | 2 +- src/output-json-frame.c | 2 +- src/output-json-metadata.c | 2 +- src/output-json.c | 6 ++++ src/output-json.h | 2 ++ src/util-logopenfile.c | 66 ++++++++++++++++++++------------------ src/util-logopenfile.h | 5 +++ 11 files changed, 78 insertions(+), 38 deletions(-) diff --git a/src/output-eve-stream.c b/src/output-eve-stream.c index 12f8b8c62387..5ac97c90fa60 100644 --- a/src/output-eve-stream.c +++ b/src/output-eve-stream.c @@ -453,7 +453,7 @@ void EveStreamLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = EveStreamLogger, - .FlushFunc = NULL, + .FlushFunc = OutputJsonLogFlush, .ConditionFunc = EveStreamLogCondition, .ThreadInitFunc = EveStreamLogThreadInit, .ThreadDeinitFunc = EveStreamLogThreadDeinit, diff --git a/src/output-json-alert.c b/src/output-json-alert.c index 419a1d2b42bd..ed6066be082b 100644 --- a/src/output-json-alert.c +++ b/src/output-json-alert.c @@ -823,6 +823,14 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const return TM_ECODE_OK; } +static int JsonAlertFlush(ThreadVars *tv, void *thread_data, const Packet *p) +{ + JsonAlertLogThread *aft = thread_data; + SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename); + OutputJsonFlush(aft->ctx); + return 0; +} + static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p) { JsonAlertLogThread *aft = thread_data; @@ -1067,7 +1075,7 @@ void JsonAlertLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonAlertLogger, - .FlushFunc = NULL, + .FlushFunc = JsonAlertFlush, .ConditionFunc = JsonAlertLogCondition, .ThreadInitFunc = JsonAlertLogThreadInit, .ThreadDeinitFunc = JsonAlertLogThreadDeinit, diff --git a/src/output-json-anomaly.c b/src/output-json-anomaly.c index cd9e5dc068cc..00c4cbd57085 100644 --- a/src/output-json-anomaly.c +++ b/src/output-json-anomaly.c @@ -272,6 +272,14 @@ static int AnomalyJson(ThreadVars *tv, JsonAnomalyLogThread *aft, const Packet * return rc; } +static int JsonAnomalyFlush(ThreadVars *tv, void *thread_data, const Packet *p) +{ + JsonAnomalyLogThread *aft = thread_data; + SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename); + OutputJsonFlush(aft->ctx); + return 0; +} + static int JsonAnomalyLogger(ThreadVars *tv, void *thread_data, const Packet *p) { JsonAnomalyLogThread *aft = thread_data; @@ -451,7 +459,7 @@ void JsonAnomalyLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonAnomalyLogger, - .FlushFunc = NULL, + .FlushFunc = JsonAnomalyFlush, .ConditionFunc = JsonAnomalyLogCondition, .ThreadInitFunc = JsonAnomalyLogThreadInit, .ThreadDeinitFunc = JsonAnomalyLogThreadDeinit, diff --git a/src/output-json-common.c b/src/output-json-common.c index 5723e05a386a..1ec08b69050e 100644 --- a/src/output-json-common.c +++ b/src/output-json-common.c @@ -70,6 +70,15 @@ static void OutputJsonLogDeInitCtxSub(OutputCtx *output_ctx) SCFree(output_ctx); } +int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p) +{ + OutputJsonThreadCtx *aft = thread_data; + LogFileCtx *file_ctx = aft->ctx->file_ctx; + SCLogDebug("%s flushing %s", tv->name, file_ctx->filename); + LogFileFlush(file_ctx); + return 0; +} + OutputInitResult OutputJsonLogInitSub(ConfNode *conf, OutputCtx *parent_ctx) { OutputInitResult result = { NULL, false }; diff --git a/src/output-json-drop.c b/src/output-json-drop.c index 29ead13e0748..79e663c43771 100644 --- a/src/output-json-drop.c +++ b/src/output-json-drop.c @@ -392,7 +392,7 @@ void JsonDropLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonDropLogger, - .FlushFunc = NULL, + .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonDropLogCondition, .ThreadInitFunc = JsonDropLogThreadInit, .ThreadDeinitFunc = JsonDropLogThreadDeinit, diff --git a/src/output-json-frame.c b/src/output-json-frame.c index dfd895b8ab6c..3ae80b820f66 100644 --- a/src/output-json-frame.c +++ b/src/output-json-frame.c @@ -562,7 +562,7 @@ void JsonFrameLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonFrameLogger, - .FlushFunc = NULL, + .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonFrameLogCondition, .ThreadInitFunc = JsonFrameLogThreadInit, .ThreadDeinitFunc = JsonFrameLogThreadDeinit, diff --git a/src/output-json-metadata.c b/src/output-json-metadata.c index a87d735839d4..c930eaf177b8 100644 --- a/src/output-json-metadata.c +++ b/src/output-json-metadata.c @@ -96,7 +96,7 @@ void JsonMetadataLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonMetadataLogger, - .FlushFunc = NULL, + .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonMetadataLogCondition, .ThreadInitFunc = JsonLogThreadInit, .ThreadDeinitFunc = JsonLogThreadDeinit, diff --git a/src/output-json.c b/src/output-json.c index 2880a25d87f9..6e9de8613e89 100644 --- a/src/output-json.c +++ b/src/output-json.c @@ -955,6 +955,12 @@ int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer **buffer) return 0; } +void OutputJsonFlush(OutputJsonThreadCtx *ctx) +{ + LogFileCtx *file_ctx = ctx->file_ctx; + LogFileFlush(file_ctx); +} + void OutputJsonBuilderBuffer( ThreadVars *tv, const Packet *p, Flow *f, JsonBuilder *js, OutputJsonThreadCtx *ctx) { diff --git a/src/output-json.h b/src/output-json.h index b8c11778ed27..82989a115639 100644 --- a/src/output-json.h +++ b/src/output-json.h @@ -114,6 +114,7 @@ TmEcode JsonLogThreadDeinit(ThreadVars *t, void *data); void EveAddCommonOptions(const OutputJsonCommonSettings *cfg, const Packet *p, const Flow *f, JsonBuilder *js, enum OutputJsonLogDirection dir); +int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p); void EveAddMetadata(const Packet *p, const Flow *f, JsonBuilder *js); int OutputJSONMemBufferCallback(const char *str, size_t size, void *data); @@ -121,5 +122,6 @@ int OutputJSONMemBufferCallback(const char *str, size_t size, void *data); OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx); void FreeEveThreadCtx(OutputJsonThreadCtx *ctx); void JSONFormatAndAddMACAddr(JsonBuilder *js, const char *key, const uint8_t *val, bool is_array); +void OutputJsonFlush(OutputJsonThreadCtx *ctx); #endif /* SURICATA_OUTPUT_JSON_H */ diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 800492fc49fe..e09280718165 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -126,7 +126,7 @@ static int SCLogUnixSocketReconnect(LogFileCtx *log_ctx) log_ctx->fp = SCLogOpenUnixSocketFp(log_ctx->filename, log_ctx->sock_type, 0); if (log_ctx->fp) { /* Connected at last (or reconnected) */ - SCLogNotice("Reconnected socket \"%s\"", log_ctx->filename); + SCLogDebug("Reconnected socket \"%s\"", log_ctx->filename); } else if (disconnected) { SCLogWarning("Reconnect failed: %s (will keep trying)", strerror(errno)); } @@ -189,6 +189,22 @@ static inline void OutputWriteLock(pthread_mutex_t *m) } +/** + * \brief Flush a log file. + */ +static void SCLogFileFlushNoLock(LogFileCtx *log_ctx) +{ + log_ctx->bytes_since_last_flush = 0; + SCFflushUnlocked(log_ctx->fp); +} + +static void SCLogFileFlush(LogFileCtx *log_ctx) +{ + OutputWriteLock(&log_ctx->fp_mutex); + SCLogFileFlushNoLock(log_ctx); + SCMutexUnlock(&log_ctx->fp_mutex); +} + /** * \brief Write buffer to log file. * \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of @@ -224,8 +240,15 @@ static int SCLogFileWriteNoLock(const char *buffer, int buffer_len, LogFileCtx * log_ctx->filename); } log_ctx->output_errors++; - } else if (log_ctx->buffer_size) { - SCFflushUnlocked(log_ctx->fp); + return ret; + } + + log_ctx->bytes_since_last_flush += buffer_len; + + if (log_ctx->buffer_size && log_ctx->bytes_since_last_flush >= log_ctx->buffer_size) { + SCLogDebug("%s: flushing %" PRIu64 " during write", log_ctx->filename, + log_ctx->bytes_since_last_flush); + SCLogFileFlushNoLock(log_ctx); } } @@ -248,35 +271,7 @@ static int SCLogFileWrite(const char *buffer, int buffer_len, LogFileCtx *log_ct } else #endif { - - /* Check for rotation. */ - if (log_ctx->rotation_flag) { - log_ctx->rotation_flag = 0; - SCConfLogReopen(log_ctx); - } - - if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) { - time_t now = time(NULL); - if (now >= log_ctx->rotate_time) { - SCConfLogReopen(log_ctx); - log_ctx->rotate_time = now + log_ctx->rotate_interval; - } - } - - if (log_ctx->fp) { - clearerr(log_ctx->fp); - if (1 != fwrite(buffer, buffer_len, 1, log_ctx->fp)) { - /* Only the first error is logged */ - if (!log_ctx->output_errors) { - SCLogError("%s error while writing to %s", - ferror(log_ctx->fp) ? strerror(errno) : "unknown error", - log_ctx->filename); - } - log_ctx->output_errors++; - } else { - fflush(log_ctx->fp); - } - } + ret = SCLogFileWriteNoLock(buffer, buffer_len, log_ctx); } SCMutexUnlock(&log_ctx->fp_mutex); @@ -706,6 +701,7 @@ LogFileCtx *LogFileNewCtx(void) lf_ctx->Write = SCLogFileWrite; lf_ctx->Close = SCLogFileClose; + lf_ctx->Flush = SCLogFileFlush; return lf_ctx; } @@ -970,6 +966,12 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) SCReturnInt(1); } +void LogFileFlush(LogFileCtx *file_ctx) +{ + SCLogDebug("%s: bytes-to-flush %ld", file_ctx->filename, file_ctx->bytes_since_last_flush); + file_ctx->Flush(file_ctx); +} + int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer) { if (file_ctx->type == LOGFILE_TYPE_FILE || file_ctx->type == LOGFILE_TYPE_UNIX_DGRAM || diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index efa8159686ad..19c9e5e1c717 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -86,6 +86,7 @@ typedef struct LogFileCtx_ { int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp); void (*Close)(struct LogFileCtx_ *fp); + void (*Flush)(struct LogFileCtx_ *fp); LogFileTypeCtx filetype; @@ -159,6 +160,9 @@ typedef struct LogFileCtx_ { uint64_t dropped; uint64_t output_errors; + + /* Track buffered content */ + uint64_t bytes_since_last_flush; } LogFileCtx; /* Min time (msecs) before trying to reconnect a Unix domain socket */ @@ -173,6 +177,7 @@ typedef struct LogFileCtx_ { LogFileCtx *LogFileNewCtx(void); int LogFileFreeCtx(LogFileCtx *); int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer); +void LogFileFlush(LogFileCtx *file_ctx); LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *lf_ctx); int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);