From 6fc31b8ee8e3ee3198cd434e55058ae18951e2f0 Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 15 Jul 2018 15:23:52 -0700 Subject: [PATCH] offload event sending --- src/server.c | 137 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 91 insertions(+), 46 deletions(-) diff --git a/src/server.c b/src/server.c index f847b7d..aa3a99b 100644 --- a/src/server.c +++ b/src/server.c @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -17,11 +18,17 @@ #include "elasticsearch.h" -#define BUFF_MAX 5 - -/*defined here as they are used in conjunction with the shutdown signal handler*/ +/*setting running to 0 will break main loops, exiting the program*/ int running = 1; +/*socket listener for udp syslog messages*/ int sock_fd; +/*buffer flush thread*/ +pthread_t bufwatch; +/*lock protecting the buffer*/ +pthread_mutex_t buflock; +/*occasionally we lookup the time and cache it*/ +time_t cur_t = {0}; +struct tm cur_time = {0}; void sig_handler(int signum) { @@ -32,8 +39,79 @@ void sig_handler(int signum) { } -time_t cur_t = {0}; -struct tm cur_time = {0}; +void clear_buffer() { + char header[72]; + sprintf(header, "{\"index\": {\"_index\": \"firewall-%04d.%02d.%02d\", \"_type\": \"event\"}}\n", + cur_time.tm_year + 1900, + cur_time.tm_mon + 1, + cur_time.tm_mday); + + // Calculate how large the payload will be + int header_size = strlen(header); + int num_messages = buff_count(); + if(num_messages == 0) return; + char* messages[num_messages]; + int message_size = 0; + for(int i=0; i= 10 || now - last_flush > 5) { + printf("\n"); + clear_buffer(); + last_flush = now; + } + pthread_mutex_unlock(&buflock); + } + return NULL; +} + + +void start_bufwatch() { + if (pthread_mutex_init(&buflock, NULL) != 0) { + printf("\n mutex init failed\n"); + exit(1); + } + if(pthread_create(&bufwatch, NULL, buffer_watch, NULL) != 0) { + printf("Could not create thread\n"); + exit(1); + } +} + + +void bufwatch_cleanup() { + pthread_join(bufwatch, NULL); + pthread_mutex_destroy(&buflock); +} int handle_message(char* msg) { @@ -88,7 +166,11 @@ int handle_message(char* msg) { pfdata_to_json(&fwdata, jobj); const char* json_msg = json_object_to_json_string(jobj); // printf("%s\n", json_msg); - buff_push(strdup(json_msg)); // Copy message to heap and push to buffer + { + pthread_mutex_lock(&buflock); + buff_push(strdup(json_msg)); // Copy message to heap and push to buffer + pthread_mutex_unlock(&buflock); + } json_object_put(jobj); } } @@ -96,42 +178,6 @@ int handle_message(char* msg) { } -void clear_buffer() { - char header[72]; - sprintf(header, "{\"index\": {\"_index\": \"firewall-%04d.%02d.%02d\", \"_type\": \"event\"}}\n", - cur_time.tm_year + 1900, - cur_time.tm_mon + 1, - cur_time.tm_mday); - - // Calculate how large the payload will be - int header_size = strlen(header); - int num_messages = buff_count(); - char* messages[num_messages]; - int message_size = 0; - for(int i=0; i= BUFF_MAX) { - printf("\n"); - clear_buffer(); - } } + bufwatch_cleanup(); buff_freeall(); geo_close(); return 1;