offload event sending

This commit is contained in:
dave 2018-07-15 15:23:52 -07:00
parent cf6e2935b4
commit 6fc31b8ee8
1 changed files with 91 additions and 46 deletions

View File

@ -7,6 +7,7 @@
#include <signal.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#include <json-c/json.h>
@ -17,11 +18,17 @@
#include "elasticsearch.h"
#define BUFF_MAX 5
/*defined here as they are used in conjunction with the shutdown signal handler*/
/*setting running to 0 will break main loops, exiting the program*/
int running = 1;
/*socket listener for udp syslog messages*/
int sock_fd;
/*buffer flush thread*/
pthread_t bufwatch;
/*lock protecting the buffer*/
pthread_mutex_t buflock;
/*occasionally we lookup the time and cache it*/
time_t cur_t = {0};
struct tm cur_time = {0};
void sig_handler(int signum) {
@ -32,8 +39,79 @@ void sig_handler(int signum) {
}
time_t cur_t = {0};
struct tm cur_time = {0};
void clear_buffer() {
char header[72];
sprintf(header, "{\"index\": {\"_index\": \"firewall-%04d.%02d.%02d\", \"_type\": \"event\"}}\n",
cur_time.tm_year + 1900,
cur_time.tm_mon + 1,
cur_time.tm_mday);
// Calculate how large the payload will be
int header_size = strlen(header);
int num_messages = buff_count();
if(num_messages == 0) return;
char* messages[num_messages];
int message_size = 0;
for(int i=0; i<num_messages; i++) {
messages[i] = buff_pop();
message_size += strlen(messages[i]) + header_size + 1; // 1 newline
}
// Allocate and build the message
char* message = calloc(1, message_size + 1);
for(int i=0; i<num_messages; i++) {
strcat(message, header);
strcat(message, messages[i]);
strcat(message, "\n");
free(messages[i]);
}
// Send it
if(put_events(message, "http://192.168.1.120:8298") == 0) {
printf("Pushed %d messages\n", num_messages);
} else {
printf("Failed to post messages!\n");
}
free(message);
}
void* buffer_watch() {
/*flush the buffer when larger than 10 messages or older than 5 seconds*/
time_t last_flush = time(NULL);
time_t now = last_flush;
while(running) {
usleep(100 * 1000);
pthread_mutex_lock(&buflock); // TODO lock only while clearing the buffer
now = time(NULL);
if(buff_count() >= 10 || now - last_flush > 5) {
printf("\n");
clear_buffer();
last_flush = now;
}
pthread_mutex_unlock(&buflock);
}
return NULL;
}
void start_bufwatch() {
if (pthread_mutex_init(&buflock, NULL) != 0) {
printf("\n mutex init failed\n");
exit(1);
}
if(pthread_create(&bufwatch, NULL, buffer_watch, NULL) != 0) {
printf("Could not create thread\n");
exit(1);
}
}
void bufwatch_cleanup() {
pthread_join(bufwatch, NULL);
pthread_mutex_destroy(&buflock);
}
int handle_message(char* msg) {
@ -88,7 +166,11 @@ int handle_message(char* msg) {
pfdata_to_json(&fwdata, jobj);
const char* json_msg = json_object_to_json_string(jobj);
// printf("%s\n", json_msg);
buff_push(strdup(json_msg)); // Copy message to heap and push to buffer
{
pthread_mutex_lock(&buflock);
buff_push(strdup(json_msg)); // Copy message to heap and push to buffer
pthread_mutex_unlock(&buflock);
}
json_object_put(jobj);
}
}
@ -96,42 +178,6 @@ int handle_message(char* msg) {
}
void clear_buffer() {
char header[72];
sprintf(header, "{\"index\": {\"_index\": \"firewall-%04d.%02d.%02d\", \"_type\": \"event\"}}\n",
cur_time.tm_year + 1900,
cur_time.tm_mon + 1,
cur_time.tm_mday);
// Calculate how large the payload will be
int header_size = strlen(header);
int num_messages = buff_count();
char* messages[num_messages];
int message_size = 0;
for(int i=0; i<num_messages; i++) {
messages[i] = buff_pop();
message_size += strlen(messages[i]) + header_size + 1; // 1 newline
}
// Allocate and build the message
char* message = calloc(1, message_size + 1);
for(int i=0; i<num_messages; i++) {
strcat(message, header);
strcat(message, messages[i]);
strcat(message, "\n");
free(messages[i]);
}
// Send it
if(put_events(message, "http://192.168.1.120:8298") == 0) {
printf("Pushed %d messages\n", num_messages);
} else {
printf("Failed to post messages!\n");
}
free(message);
}
/*UDP server bits mostly lifted from https://cs.nyu.edu/~mwalfish/classes/16sp/classnotes/handout01.pdf*/
int run_server(int port) {
geo_init();
@ -155,6 +201,8 @@ int run_server(int port) {
if (bind(sock_fd, (struct sockaddr*)&my_addr, sizeof(struct sockaddr_in)) < 0)
panic("bind failed");
start_bufwatch();
socklen_t addrlen = sizeof(struct sockaddr_in);
char msg[4096];
while (running) {
@ -181,12 +229,9 @@ int run_server(int port) {
printf(".");
fflush(stdout);
if(buff_count() >= BUFF_MAX) {
printf("\n");
clear_buffer();
}
}
bufwatch_cleanup();
buff_freeall();
geo_close();
return 1;