unlock while posting events
This commit is contained in:
parent
6fc31b8ee8
commit
ea88f8fea4
46
src/server.c
46
src/server.c
|
@ -33,13 +33,25 @@ struct tm cur_time = {0};
|
||||||
|
|
||||||
void sig_handler(int signum) {
|
void sig_handler(int signum) {
|
||||||
printf("\nExiting on signal %s\n", strsignal(signum));
|
printf("\nExiting on signal %s\n", strsignal(signum));
|
||||||
running = 0; /* shut down the main loop */
|
running = 0; /* shut down the main loops */
|
||||||
shutdown(sock_fd, SHUT_RDWR); /* break the listener socket */
|
shutdown(sock_fd, SHUT_RDWR); /* break the listener socket */
|
||||||
close(sock_fd);
|
close(sock_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void clear_buffer() {
|
int submit_events(char* message) {
|
||||||
|
// Send it
|
||||||
|
if(put_events(message, "http://192.168.1.120:8298") == 0) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
printf("Failed to post messages!\n");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
free(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
char* collect_buffer() {
|
||||||
char header[72];
|
char header[72];
|
||||||
sprintf(header, "{\"index\": {\"_index\": \"firewall-%04d.%02d.%02d\", \"_type\": \"event\"}}\n",
|
sprintf(header, "{\"index\": {\"_index\": \"firewall-%04d.%02d.%02d\", \"_type\": \"event\"}}\n",
|
||||||
cur_time.tm_year + 1900,
|
cur_time.tm_year + 1900,
|
||||||
|
@ -49,7 +61,7 @@ void clear_buffer() {
|
||||||
// Calculate how large the payload will be
|
// Calculate how large the payload will be
|
||||||
int header_size = strlen(header);
|
int header_size = strlen(header);
|
||||||
int num_messages = buff_count();
|
int num_messages = buff_count();
|
||||||
if(num_messages == 0) return;
|
if(num_messages == 0) return NULL;
|
||||||
char* messages[num_messages];
|
char* messages[num_messages];
|
||||||
int message_size = 0;
|
int message_size = 0;
|
||||||
for(int i=0; i<num_messages; i++) {
|
for(int i=0; i<num_messages; i++) {
|
||||||
|
@ -65,32 +77,28 @@ void clear_buffer() {
|
||||||
strcat(message, "\n");
|
strcat(message, "\n");
|
||||||
free(messages[i]);
|
free(messages[i]);
|
||||||
}
|
}
|
||||||
|
return message;
|
||||||
// Send it
|
|
||||||
if(put_events(message, "http://192.168.1.120:8298") == 0) {
|
|
||||||
printf("Pushed %d messages\n", num_messages);
|
|
||||||
} else {
|
|
||||||
printf("Failed to post messages!\n");
|
|
||||||
}
|
|
||||||
free(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void* buffer_watch() {
|
void* buffer_watch() {
|
||||||
/*flush the buffer when larger than 10 messages or older than 5 seconds*/
|
/*flush the buffer when larger than 10 messages or older than 5 seconds*/
|
||||||
time_t last_flush = time(NULL);
|
time_t last_flush = time(NULL);
|
||||||
time_t now = last_flush;
|
char* buffer = NULL;
|
||||||
|
|
||||||
while(running) {
|
while(running) {
|
||||||
usleep(100 * 1000);
|
usleep(100 * 1000);
|
||||||
pthread_mutex_lock(&buflock); // TODO lock only while clearing the buffer
|
time_t now = time(NULL);;
|
||||||
now = time(NULL);
|
int messages = buff_count();
|
||||||
if(buff_count() >= 10 || now - last_flush > 5) {
|
pthread_mutex_lock(&buflock);
|
||||||
printf("\n");
|
if(messages > 0 && (messages >= 10 || now - last_flush > 5)) {
|
||||||
clear_buffer();
|
buffer = collect_buffer();
|
||||||
|
pthread_mutex_unlock(&buflock);
|
||||||
|
submit_events(buffer);
|
||||||
|
printf("\nPushed %d messages\n", messages);
|
||||||
last_flush = now;
|
last_flush = now;
|
||||||
|
} else {
|
||||||
|
pthread_mutex_unlock(&buflock);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&buflock);
|
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue