Skip to content

Commit

Permalink
Cleanup hash code for nfstat and nflowcache
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed May 3, 2024
1 parent 885f7df commit d222bc0
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 279 deletions.
225 changes: 64 additions & 161 deletions src/nfdump/blocksort.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,34 @@
/*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of the author nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/

#include "blocksort.h"

#include <fcntl.h>
#include <pthread.h>
#include <stdint.h>
Expand All @@ -6,18 +37,13 @@
#include <sys/time.h>
#include <unistd.h>

typedef struct SortRecord {
void *record;
uint64_t count;
} SortRecord_t;

void blocksort(SortRecord_t *data, int len);
void blocksort(SortElement_t *data, int len);

#define swap(a, b) \
{ \
SortRecord_t _h = (a); \
(a) = (b); \
(b) = _h; \
#define swap(a, b) \
{ \
SortElement_t _h = (a); \
(a) = (b); \
(b) = _h; \
}

#define min(a, b) ((a) < (b) ? (a) : (b))
Expand All @@ -28,7 +54,7 @@ void blocksort(SortRecord_t *data, int len);
if ((c).count < (b).count) { \
swap(a, c); \
} else { \
SortRecord_t h = (a); \
SortElement_t h = (a); \
(a) = (b); \
(b) = (c); \
(c) = h; \
Expand All @@ -39,7 +65,7 @@ void blocksort(SortRecord_t *data, int len);
} else { \
if ((c).count < (b).count) { \
if ((c).count < (a).count) { \
SortRecord_t h = (c); \
SortElement_t h = (c); \
(c) = (b); \
(b) = (a); \
(a) = h; \
Expand All @@ -54,27 +80,27 @@ static int n_threads;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

// static void init(SortRecord_t *data, int len);
// static void init(SortElement_t *data, int len);

static void qusort(SortRecord_t *left, SortRecord_t *right);
static void qusort(SortElement_t *left, SortElement_t *right);

static void insert_sort(SortRecord_t *left, SortRecord_t *right);
static void insert_sort(SortElement_t *left, SortElement_t *right);

static void partition(SortRecord_t *left0, SortRecord_t *right0, SortRecord_t **l1, SortRecord_t **r1, SortRecord_t **l2, SortRecord_t **r2);
static void partition(SortElement_t *left0, SortElement_t *right0, SortElement_t **l1, SortElement_t **r1, SortElement_t **l2, SortElement_t **r2);

static void *sort_thr(void *arg);

void insert_sort(SortRecord_t *left, SortRecord_t *right) {
void insert_sort(SortElement_t *left, SortElement_t *right) {
// put minimum to left position, so we can save
// one inner loop comparison for insert sort
for (SortRecord_t *pi = left + 1; pi <= right; pi++) {
for (SortElement_t *pi = left + 1; pi <= right; pi++) {
if (pi->count < left->count) {
swap(*pi, *left);
}
}
for (SortRecord_t *pi = left + 2; pi <= right; pi++) {
SortRecord_t h = *pi;
SortRecord_t *pj = pi - 1;
for (SortElement_t *pi = left + 2; pi <= right; pi++) {
SortElement_t h = *pi;
SortElement_t *pj = pi - 1;
while (h.count < pj->count) {
*(pj + 1) = *pj;
pj -= 1;
Expand All @@ -83,12 +109,12 @@ void insert_sort(SortRecord_t *left, SortRecord_t *right) {
}
}

static void partition(SortRecord_t *left0, SortRecord_t *right0, SortRecord_t **l1, SortRecord_t **r1, SortRecord_t **l2, SortRecord_t **r2) {
SortRecord_t *left = left0 + 1;
SortRecord_t *right = right0;
static void partition(SortElement_t *left0, SortElement_t *right0, SortElement_t **l1, SortElement_t **r1, SortElement_t **l2, SortElement_t **r2) {
SortElement_t *left = left0 + 1;
SortElement_t *right = right0;

SortRecord_t *mid = left0 + (right0 - left0) / 2;
SortRecord_t piv = *mid;
SortElement_t *mid = left0 + (right0 - left0) / 2;
SortElement_t piv = *mid;
*mid = *left;
sort3fast(*left0, piv, *right0);
*left = piv;
Expand Down Expand Up @@ -117,85 +143,8 @@ static void partition(SortRecord_t *left0, SortRecord_t *right0, SortRecord_t **
}
}

/*
static void partition(SortRecord_t *left0, SortRecord_t *right0,
SortRecord_t **l1, SortRecord_t **r1,
SortRecord_t **l2, SortRecord_t **r2) {
SortRecord_t *mid = left0 + (right0 - left0) / 2;
SortRecord_t piv = *mid;
*mid = *(left0 + 1);
sort3fast(*left0, piv, *right0);
*(left0 + 1) = piv;
SortRecord_t *left, *right;
#define BSZ 256
if (right0 - left0 > 2 * BSZ + 3) {
left = left0 + 2;
right = right0 - 1;
SortRecord_t *offl[BSZ];
SortRecord_t *offr[BSZ];
SortRecord_t **ol = offl;
SortRecord_t **or = offr;
do {
if (ol == offl) {
SortRecord_t *pd = left;
do {
*ol = pd;
ol += (piv.count < pd->count);
pd += 1;
}
while (pd < left + BSZ);
}
if (or == offr) {
SortRecord_t* pd = right;
do {
*or = pd;
or += (piv.count > pd->count);
pd -= 1;
}
while (pd > right - BSZ);
}
int min = min(ol - offl, or - offr);
ol -= min;
or -= min;
for (int i = 0; i < min; i++) {
swap(**(ol + i), **(or + i));
}
if (ol == offl) left += BSZ;
if (or == offr) right -= BSZ;
}
while (right - left > 2 * BSZ);
left -= 1;
right += 1;
}
else {
left = left0 + 1;
right = right0;
}
while (1) {
do left += 1; while(left->count < piv.count);
do right -= 1; while (right->count > piv.count);
if (left >= right) break;
swap(*left, *right);
}
*(left0 + 1) = *right;
*right = piv;
if (right < mid) {
*l1 = left0; *r1 = right - 1;
*l2 = right + 1; *r2 = right0;
}
else {
*l1 = right + 1; *r1 = right0;
*l2 = left0; *r2 = right - 1;
}
}
*/

static void *sort_thr(void *arg) {
SortRecord_t **par = (SortRecord_t **)arg;
SortElement_t **par = (SortElement_t **)arg;
qusort(par[0], par[1]);
free(arg);
pthread_mutex_lock(&mutex);
Expand All @@ -207,24 +156,24 @@ static void *sort_thr(void *arg) {
return NULL;
}

static void qusort_single(SortRecord_t *left, SortRecord_t *right) {
SortRecord_t *l, *r;
static void qusort_single(SortElement_t *left, SortElement_t *right) {
SortElement_t *l, *r;
while (right - left >= 50) {
partition(left, right, &l, &r, &left, &right);
qusort(l, r);
}
insert_sort(left, right);
}

static void qusort(SortRecord_t *left, SortRecord_t *right) {
static void qusort(SortElement_t *left, SortElement_t *right) {
while (right - left >= 50) {
SortRecord_t *l, *r;
SortElement_t *l, *r;
partition(left, right, &l, &r, &left, &right);

if (right - left > 100000 && n_threads < max_threads) {
// start a new thread - max_threads is a soft limit
pthread_t thread;
SortRecord_t **param = (SortRecord_t **)malloc(2 * sizeof(SortRecord_t *));
SortElement_t **param = (SortElement_t **)malloc(2 * sizeof(SortElement_t *));
if (!param) abort();
param[0] = left;
param[1] = right;
Expand All @@ -241,11 +190,11 @@ static void qusort(SortRecord_t *left, SortRecord_t *right) {
insert_sort(left, right);
}

void blocksort(SortRecord_t *data, int len) {
void blocksort(SortElement_t *data, int len) {
// shortcut for few entries
if (len < 50) {
SortRecord_t *left = data;
SortRecord_t *right = data + len - 1;
SortElement_t *left = data;
SortElement_t *right = data + len - 1;
qusort_single(left, right);
return;
}
Expand All @@ -257,7 +206,7 @@ void blocksort(SortRecord_t *data, int len) {
max_threads = 8;

pthread_t thread;
SortRecord_t **param = (SortRecord_t **)malloc(2 * sizeof(SortRecord_t *));
SortElement_t **param = (SortElement_t **)malloc(2 * sizeof(SortElement_t *));
if (!param) abort();
param[0] = data;
param[1] = data + len - 1;
Expand All @@ -267,51 +216,5 @@ void blocksort(SortRecord_t *data, int len) {
n_threads++;
while (n_threads > 0) pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
}

/*
static double t(void) {
static double t0;
struct timeval tv;
gettimeofday(&tv, NULL);
double h = t0;
t0 = tv.tv_sec + tv.tv_usec / 1000000.0;
return t0 - h;
}
static void init(SortRecord_t *data, int len) {
for (int i = 0; i < len; i++) {
data[i].count = rand();
}
}

static void test(SortRecord_t *data, int len) {
for (int i = 1; i < len; i++) {
if (data[i].count < data[i - 1].count) {
printf("ERROR\n");
break;
}
}
}
int main(void) {
size_t size = 50 * 1000000;
SortRecord_t *data = malloc(size * sizeof(SortRecord_t));
if ( !data ) {
perror("malloc() failed: ");
exit(255);
}
init(data, size);
printf("Sorting %lu million numbers with Quicksort ...\n", size / 1000000);
t();
blocksort(data, size);
printf("%.2fs\n", t());
test(data, size);
return 0;
}
*/
} // End of blocksort
17 changes: 8 additions & 9 deletions src/nfdump/blocksort.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2009-2020, Peter Haag
* Copyright (c) 2009-2024, Peter Haag
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -31,18 +31,17 @@
#ifndef _BLOCKSORT_H
#define _BLOCKSORT_H 1

#include <sys/types.h>

#include "config.h"
#ifdef HAVE_STDINT_H
#include <stdint.h>
#endif
#include <sys/types.h>

typedef struct SortRecord {
// generic sort type
// count - value to be sorted
// record - ptr to flow/stat reocrd
typedef struct SortElement_s {
void *record;
uint64_t count;
} SortRecord_t;
} SortElement_t;

void blocksort(SortRecord_t *data, int len);
void blocksort(SortElement_t *data, int len);

#endif //_BLOCKSORT_H
4 changes: 1 addition & 3 deletions src/nfdump/compat_1_6_x/nfx.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,10 @@ extension_map_list_t *InitExtensionMaps(int AllocateList) {
} // End of InitExtensionMaps

void FreeExtensionMaps(extension_map_list_t *extension_map_list) {
extension_info_t *l;

if (extension_map_list == NULL) return;

// free all extension infos
l = extension_map_list->map_list;
extension_info_t *l = extension_map_list->map_list;
while (l) {
extension_info_t *tmp = l;
l = l->next;
Expand Down
1 change: 1 addition & 0 deletions src/nfdump/memhandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ static void nfalloc_free(void) {
MemHandler->MaxBlocks = 0;
free((void *)MemHandler);

MemHandler = NULL;
} // End of nfalloc_free

static inline void *nfmalloc(size_t size) {
Expand Down
1 change: 1 addition & 0 deletions src/nfdump/nfdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ static stat_record_t process_data(void *engine, int processwMode, char *wfile, R
DisposeFile(nffile_w);
}

free(recordHandle);
FreeDataBlock(dataBlock_r);
DisposeFile(nffile_r);
return stat_record;
Expand Down

0 comments on commit d222bc0

Please sign in to comment.