Skip to content

Commit

Permalink
Cleanup nfdump main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Apr 6, 2024
1 parent e7ef540 commit 0c9293d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 69 deletions.
1 change: 1 addition & 0 deletions src/libnffile/nffile.c
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ nffile_t *NewFile(nffile_t *nffile) {
if (!nffile->processQueue) {
return NULL;
}
queue_close(nffile->processQueue);
}

memset((void *)nffile->file_header, 0, sizeof(fileHeaderV2_t));
Expand Down
125 changes: 56 additions & 69 deletions src/nfdump/nfdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include <time.h>
#include <unistd.h>

#include "barrier.h"
#include "conf/nfconf.h"
#include "config.h"
#include "exporter.h"
Expand Down Expand Up @@ -83,7 +84,7 @@ static uint64_t t_first_flow, t_last_flow;

extension_map_list_t *extension_map_list;

enum processType { FLOWSTAT = 1, ELEMENTSTAT, SORTFLOWS, WRITEFILE, PRINTRECORD };
enum processType { FLOWSTAT = 1, ELEMENTSTAT, ELEMENTFLOWSTAT, SORTRECORDS, WRITEFILE, PRINTRECORD };

extern exporter_t **exporter_list;

Expand All @@ -94,8 +95,8 @@ static int SetStat(char *str, int *element_stat, int *flow_stat);

static void PrintSummary(stat_record_t *stat_record, outputParams_t *outputParams);

static stat_record_t process_data(void *engine, char *wfile, int element_stat, int flow_stat, int sort_flows, RecordPrinter_t print_record,
timeWindow_t *timeWindow, uint64_t limitRecords, outputParams_t *outputParams, int compress);
static stat_record_t process_data(void *engine, int processMode, char *wfile, RecordPrinter_t print_record, timeWindow_t *timeWindow,
uint64_t limitRecords, outputParams_t *outputParams, int compress);

/* Functions */

Expand Down Expand Up @@ -236,17 +237,14 @@ static int SetStat(char *str, int *element_stat, int *flow_stat) {

} // End of SetStat

static stat_record_t process_data(void *engine, char *wfile, int element_stat, int flow_stat, int sort_flows, RecordPrinter_t print_record,
timeWindow_t *timeWindow, uint64_t limitRecords, outputParams_t *outputParams, int compress) {
nffile_t *nffile_w, *nffile_r;
stat_record_t stat_record;
uint64_t twin_msecFirst, twin_msecLast;
twin_msecFirst = twin_msecLast = 0;

// time window of all matched flows
memset((void *)&stat_record, 0, sizeof(stat_record_t));
static stat_record_t process_data(void *engine, int processwMode, char *wfile, RecordPrinter_t print_record, timeWindow_t *timeWindow,
uint64_t limitRecords, outputParams_t *outputParams, int compress) {
stat_record_t stat_record = {0};
stat_record.firstseen = 0x7fffffffffffffffLL;

// time window of all matched flows
uint64_t twin_msecFirst, twin_msecLast;
twin_msecFirst = twin_msecLast = 0;
if (timeWindow) {
twin_msecFirst = timeWindow->first * 1000LL;
if (timeWindow->last)
Expand All @@ -255,48 +253,25 @@ static stat_record_t process_data(void *engine, char *wfile, int element_stat, i
twin_msecLast = 0x7FFFFFFFFFFFFFFFLL;
}

// do not print flows when doing any stats are sorting
if (sort_flows || flow_stat || element_stat) {
print_record = NULL;
}

// do not write flows to file, when doing any stats
// -w may apply for flow_stats later
int write_file = !(sort_flows || flow_stat || element_stat) && wfile;
nffile_r = NULL;
nffile_w = NULL;
nffile_t *nffile_r = NewFile(NULL);
nffile_t *nffile_w = NULL;
dataBlock_t *dataBlock_r = NULL;
dataBlock_t *dataBlock_w = NULL;

// Get the first file handle
nffile_r = GetNextFile(NULL);
if (!nffile_r) {
LogError("GetNextFile() error in %s line %d", __FILE__, __LINE__);
return stat_record;
}
if (nffile_r == NULL) {
LogError("Empty file list. No files to process\n");
return stat_record;
}

// preset time window of all processed flows to the stat record in first flow file
t_first_flow = nffile_r->stat_record->firstseen;
t_last_flow = nffile_r->stat_record->lastseen;

// prepare output file if requested
if (write_file) {
if (wfile) {
nffile_w = OpenNewFile(wfile, NULL, CREATOR_NFDUMP, compress, NOT_ENCRYPTED);
if (!nffile_w) {
if (nffile_r) {
CloseFile(nffile_r);
DisposeFile(nffile_r);
}
stat_record.firstseen = 0;
DisposeFile(nffile_r);
return stat_record;
}
dataBlock_w = WriteBlock(nffile_w, NULL);
SetIdent(nffile_w, nffile_r->ident);
}
FilterSetParam(engine, nffile_r->ident, outputParams->hasGeoDB);

recordHandle_t *recordHandle = (recordHandle_t *)calloc(1, sizeof(recordHandle_t));
if (!recordHandle) {
Expand All @@ -309,14 +284,13 @@ static stat_record_t process_data(void *engine, char *wfile, int element_stat, i
// get next data block from file
dataBlock_r = ReadBlock(nffile_r, dataBlock_r);
if (dataBlock_r == NULL) {
nffile_t *next = GetNextFile(nffile_r);
if (next == NULL) {
// continue with next file
if (GetNextFile(nffile_r) == NULL) {
done = 1;
} else {
// Update global time span window
if (next->stat_record->firstseen < t_first_flow) t_first_flow = next->stat_record->firstseen;
if (next->stat_record->lastseen > t_last_flow) t_last_flow = next->stat_record->lastseen;
// continue with next file
if (nffile_r->stat_record->firstseen < t_first_flow) t_first_flow = nffile_r->stat_record->firstseen;
if (nffile_r->stat_record->lastseen > t_last_flow) t_last_flow = nffile_r->stat_record->lastseen;
}
FilterSetParam(engine, nffile_r->ident, outputParams->hasGeoDB);
continue;
Expand Down Expand Up @@ -395,26 +369,28 @@ static stat_record_t process_data(void *engine, char *wfile, int element_stat, i

UpdateStatRecord(&stat_record, recordHandle);

if (flow_stat) {
AddFlowCache(recordHandle);
if (element_stat) {
switch (processwMode) {
case FLOWSTAT:
AddFlowCache(recordHandle);
break;
case ELEMENTSTAT:
AddElementStat(recordHandle);
}
} else if (element_stat) {
AddElementStat(recordHandle);
} else if (sort_flows) {
InsertFlow(recordHandle);
} else if (write_file) {
dataBlock_w = AppendToBuffer(nffile_w, dataBlock_w, (void *)process_ptr, process_ptr->size);
} else if (print_record) {
// if we need to print out this record
print_record(stdout, recordHandle, outputParams->doTag);
} else {
// mutually exclusive conditions should prevent executing this code
// this is buggy!
printf("Bug! - this code should never get executed in file %s line %d\n", __FILE__, __LINE__);
exit(EXIT_FAILURE);
break;
case ELEMENTFLOWSTAT:
AddFlowCache(recordHandle);
AddElementStat(recordHandle);
break;
case SORTRECORDS:
InsertFlow(recordHandle);
break;
case WRITEFILE:
dataBlock_w = AppendToBuffer(nffile_w, dataBlock_w, (void *)process_ptr, process_ptr->size);
break;
case PRINTRECORD:
print_record(stdout, recordHandle, outputParams->doTag);
break;
}

} break;
case ExtensionMapType: {
extension_map_t *map = (extension_map_t *)record_ptr;
Expand All @@ -426,7 +402,7 @@ static stat_record_t process_data(void *engine, char *wfile, int element_stat, i
case ExporterInfoRecordType: {
int ret = AddExporterInfo((exporter_info_record_t *)record_ptr);
if (ret != 0) {
if (write_file && ret == 1) dataBlock_w = AppendToBuffer(nffile_w, dataBlock_w, (void *)record_ptr, record_ptr->size);
if (nffile_w) dataBlock_w = AppendToBuffer(nffile_w, dataBlock_w, (void *)record_ptr, record_ptr->size);
} else {
LogError("Failed to add Exporter Record\n");
}
Expand All @@ -440,7 +416,7 @@ static stat_record_t process_data(void *engine, char *wfile, int element_stat, i
case SamplerRecordType: {
int ret = AddSamplerRecord((sampler_record_t *)record_ptr);
if (ret != 0) {
if (write_file && ret == 1) dataBlock_w = AppendToBuffer(nffile_w, dataBlock_w, (void *)record_ptr, record_ptr->size);
if (nffile_w) dataBlock_w = AppendToBuffer(nffile_w, dataBlock_w, (void *)record_ptr, record_ptr->size);
} else {
LogError("Failed to add Sampler Record\n");
}
Expand Down Expand Up @@ -509,7 +485,7 @@ int main(int argc, char **argv) {
char *print_format;
char *print_order, *query_file, *configFile, *nameserver, *aggr_fmt;
int ffd, element_stat, fdump;
int processType, flow_stat, aggregate, aggregate_mask, bidir;
int flow_stat, aggregate, aggregate_mask, bidir;
int print_stat, gnuplot_stat, syntax_only, compress, worker;
int GuessDir, ModifyCompress;
uint32_t limitRecords;
Expand All @@ -525,7 +501,6 @@ int main(int argc, char **argv) {
aggregate_mask = 0;
bidir = 0;
syntax_only = 0;
processType = 0;
flow_stat = 0;
print_stat = 0;
gnuplot_stat = 0;
Expand Down Expand Up @@ -1023,9 +998,21 @@ int main(int argc, char **argv) {
PrintProlog(outputParams);
}

// evaluate cli flags and define processMode
int processMode = PRINTRECORD;
if (aggregate || flow_stat) {
processMode = FLOWSTAT;
if (element_stat) processMode = ELEMENTFLOWSTAT;
} else if (element_stat) {
processMode = ELEMENTSTAT;
} else if (print_order != NULL) {
processMode = SORTRECORDS;
} else if (wfile) {
processMode = WRITEFILE;
}

nfprof_start(&profile_data);
sum_stat = process_data(engine, wfile, element_stat, aggregate || flow_stat, print_order != NULL, print_record, flist.timeWindow, limitRecords,
outputParams, compress);
sum_stat = process_data(engine, processMode, wfile, print_record, flist.timeWindow, limitRecords, outputParams, compress);
nfprof_end(&profile_data, processed);

if (passed == 0) {
Expand Down

0 comments on commit 0c9293d

Please sign in to comment.