Skip to content

Commit

Permalink
Fix queue_close with multiple producers
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Apr 14, 2024
1 parent cb88b55 commit 18eff82
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/libnfdump/tor/tor.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static char *tmString(time_t time, char *buff, size_t len) {
}

static void printTorNode(torNode_t *node) {
char first[32], last[32], published[32];
char first[64], last[64], published[64];
char ip[32];
uint32_t torIP = ntohl(node->ipaddr);
inet_ntop(PF_INET, &torIP, ip, sizeof(ip));
Expand Down
18 changes: 14 additions & 4 deletions src/libnffile/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ queue_t *queue_init(size_t length) {
return NULL;
}

queue->producers = 1;
queue->length = length;
queue->mask = length - 1;
atomic_init(&queue->c_wait, 0);
Expand All @@ -76,6 +77,11 @@ queue_t *queue_init(size_t length) {

} // End of Queue_init

void queue_producers(queue_t *queue, unsigned producers) {
//
queue->producers = producers;
} // End of queue_producers

void queue_free(queue_t *queue) {
queue_sync(queue);
free(queue);
Expand All @@ -91,9 +97,10 @@ void queue_open(queue_t *queue) {

void queue_close(queue_t *queue) {
pthread_mutex_lock(&(queue->mutex));
queue->closed = 1;
if (queue->num_elements == 0) {
pthread_cond_signal(&(queue->cond));
queue->producers--;
if (queue->producers <= 0) queue->closed = 1;
if (queue->c_wait) {
pthread_cond_broadcast(&(queue->cond));
}
pthread_mutex_unlock(&(queue->mutex));

Expand Down Expand Up @@ -197,7 +204,10 @@ void *queue_pop(queue_t *queue) {
queue->next_avail = (queue->next_avail + 1) & queue->mask;

if (queue->p_wait) {
pthread_cond_signal(&(queue->cond));
pthread_cond_broadcast(&(queue->cond));
}
if (queue->closed && queue->c_wait) {
pthread_cond_broadcast(&(queue->cond));
}
pthread_mutex_unlock(&(queue->mutex));
return data;
Expand Down
3 changes: 3 additions & 0 deletions src/libnffile/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef struct queue_s {
size_t mask;
unsigned next_free;
unsigned next_avail;
int producers;
_Atomic unsigned c_wait;
_Atomic unsigned p_wait;
size_t num_elements;
Expand All @@ -67,6 +68,8 @@ typedef struct queue_s {

queue_t *queue_init(size_t length);

void queue_producers(queue_t *queue, unsigned producers);

void queue_free(queue_t *queue);

void *queue_push(queue_t *queue, void *data);
Expand Down
4 changes: 4 additions & 0 deletions src/nfdump/nfstat.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ static void *SRC_AS_PreProcess(void *inPtr, recordHandle_t *recordHandle);
static void *DST_AS_PreProcess(void *inPtr, recordHandle_t *recordHandle);
static void *JA3_PreProcess(void *inPtr, recordHandle_t *recordHandle);
static void *JA4_PreProcess(void *inPtr, recordHandle_t *recordHandle);
#ifdef BUILDJA4
static void *JA4S_PreProcess(void *inPtr, recordHandle_t *recordHandle);
#endif

/*
*
Expand Down Expand Up @@ -611,6 +613,7 @@ static inline void *JA4_PreProcess(void *inPtr, recordHandle_t *recordHandle) {

} // End of JA4_PreProcess

#ifdef BUILDJA4
static inline void *JA4S_PreProcess(void *inPtr, recordHandle_t *recordHandle) {
EXgenericFlow_t *genericFlow = (EXgenericFlow_t *)recordHandle->extensionList[EXgenericFlowID];

Expand Down Expand Up @@ -638,6 +641,7 @@ static inline void *JA4S_PreProcess(void *inPtr, recordHandle_t *recordHandle) {
return ja4;

} // End of JA4S_PreProcess
#endif

void AddElementStat(recordHandle_t *recordHandle) {
EXgenericFlow_t *genericFlow = (EXgenericFlow_t *)recordHandle->extensionList[EXgenericFlowID];
Expand Down

0 comments on commit 18eff82

Please sign in to comment.