302 lines
9.6 KiB
C
302 lines
9.6 KiB
C
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <sys/socket.h>
|
|
#include <string.h>
|
|
#include <unistd.h>
|
|
#include <signal.h>
|
|
#include <time.h>
|
|
#include <assert.h>
|
|
#include <pthread.h>
|
|
#include <arpa/inet.h>
|
|
#include <json-c/json.h>
|
|
|
|
#include "helpers.h"
|
|
#include "sysparser.h"
|
|
#include "msgbuffer.h"
|
|
#include "geo.h"
|
|
#include "elasticsearch.h"
|
|
|
|
|
|
/*setting running to 0 will break main loops, exiting the program*/
|
|
int running = 1;
|
|
/*socket listener for udp syslog messages*/
|
|
int sock_fd;
|
|
/*buffer flush thread*/
|
|
pthread_t bufwatch;
|
|
/*lock protecting the buffer*/
|
|
pthread_mutex_t buflock;
|
|
/*occasionally we lookup the time and cache it*/
|
|
time_t cur_t = {0};
|
|
struct tm cur_time = {0};
|
|
|
|
char* es_url = NULL;
|
|
|
|
|
|
void sig_handler(int signum) {
|
|
printf("\nExiting on signal %s\n", strsignal(signum));
|
|
running = 0; /* shut down the main loops */
|
|
shutdown(sock_fd, SHUT_RDWR); /* break the listener socket */
|
|
close(sock_fd);
|
|
}
|
|
|
|
|
|
int submit_events(char* message) {
|
|
if(elastic_put_events(message, es_url) == 0) {
|
|
return 0;
|
|
} else {
|
|
printf("Failed to post messages!\n");
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
|
|
char* collect_buffer(int max_size, int* howmany) {
|
|
/*
|
|
Pop up to $howmany items from the message buffer and allocate a buffer of at most $max_size bytes containing them.
|
|
Returns a char pointer to the buffer
|
|
*/
|
|
char header[72];
|
|
// sprintf(header, "{\"index\": {\"_index\": \"firewall-test\", \"_type\": \"event\"}}\n");
|
|
sprintf(header, "{\"index\": {\"_index\": \"firewall-%04d.%02d.%02d\", \"_type\": \"event\"}}\n",
|
|
cur_time.tm_year + 1900,
|
|
cur_time.tm_mon + 1,
|
|
cur_time.tm_mday);
|
|
|
|
// Calculate how large the payload will be
|
|
int header_size = strlen(header);
|
|
int num_messages = buff_count();
|
|
if(num_messages == 0)
|
|
return NULL;
|
|
char* messages[num_messages];
|
|
int message_size = 0;
|
|
for(int i=0; i<num_messages; i++) {
|
|
messages[i] = buff_pop();
|
|
int item_size = strlen(messages[i]);
|
|
if(item_size + message_size > max_size) {
|
|
buff_push(messages[i]);
|
|
num_messages = i;
|
|
break;
|
|
}
|
|
message_size += item_size + header_size + 1; // 1 newline
|
|
}
|
|
// Allocate and build the message
|
|
char* message = calloc(1, message_size + 1);
|
|
for(int i=0; i<num_messages; i++) {
|
|
strcat(message, header);
|
|
strcat(message, messages[i]);
|
|
strcat(message, "\n");
|
|
free(messages[i]);
|
|
}
|
|
memcpy(howmany, &num_messages, sizeof(num_messages));
|
|
return message;
|
|
}
|
|
|
|
|
|
void* buffer_watch() {
|
|
/*
|
|
Threaded task that flushes the buffer when it is larger than 10 messages or older than 5 seconds
|
|
*/
|
|
time_t last_flush = time(NULL);
|
|
char* buffer = NULL;
|
|
while(running) {
|
|
nanosleep(&(const struct timespec){0, 1000000}, NULL);
|
|
time_t now = time(NULL);
|
|
|
|
pthread_mutex_lock(&buflock);
|
|
int total_messages = buff_count();
|
|
|
|
if(total_messages > 0 && (total_messages >= 10 || now - last_flush > 5)) {
|
|
int messages = 0;
|
|
do {
|
|
int howmany = 0;
|
|
buffer = collect_buffer(16*1024, &howmany);
|
|
pthread_mutex_unlock(&buflock);
|
|
|
|
submit_events(buffer);
|
|
printf("\nPushed %d messages\n", howmany);
|
|
free(buffer);
|
|
last_flush = now;
|
|
|
|
/*more events can be received as we unlock the buffer while flushing.
|
|
stop flushing once we've flushed the whole buffer once.*/
|
|
total_messages -= howmany;
|
|
|
|
pthread_mutex_lock(&buflock);
|
|
} while(total_messages > 0 && (messages = buff_count()) > 0);
|
|
}
|
|
pthread_mutex_unlock(&buflock);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
|
|
void start_bufwatch() {
|
|
/*
|
|
Start the bufwatch thread
|
|
*/
|
|
if (pthread_mutex_init(&buflock, NULL) != 0) {
|
|
printf("\n mutex init failed\n");
|
|
exit(1);
|
|
}
|
|
if(pthread_create(&bufwatch, NULL, buffer_watch, NULL) != 0) {
|
|
printf("Could not create thread\n");
|
|
exit(1);
|
|
}
|
|
}
|
|
|
|
|
|
void bufwatch_cleanup() {
|
|
pthread_join(bufwatch, NULL);
|
|
pthread_mutex_destroy(&buflock);
|
|
}
|
|
|
|
|
|
int handle_message(char* msg, struct sockaddr_in* sender) {
|
|
/*TODO should we check that msg[size_recvd] == \0 ?
|
|
printf("From host %s src port %d got message %.*s\n",
|
|
inet_ntoa(my_peer_addr.sin_addr), ntohs(my_peer_addr.sin_port), size_recvd, msg);*/
|
|
struct SysMessage result;
|
|
memset(&result, 0, sizeof(result)); /* Doing this or setting result above to `= {};` makes valgrind happy */
|
|
/*printf("\nsize: %lu\n\n", sizeof(result)); // curious how big the struct gets
|
|
// printf("msg[size_recvd] is: %d", msg[size_recvd]);*/
|
|
|
|
/*parse syslog message into fields*/
|
|
if(sysmsg_parse(&result, msg) != 0) {
|
|
printf("Failed to parse message: %s", msg);
|
|
return 1;
|
|
}
|
|
|
|
/*printf("syslog message is valid:\n\tpriority: %d\n\tapplication: %s\n\tDate: %s %d %02d:%02d:%02d\n",
|
|
result.priority,
|
|
result.application,
|
|
result.date.month,
|
|
result.date.day,
|
|
result.date.hour,
|
|
result.date.minute,
|
|
result.date.second);*/
|
|
|
|
/*parse MSG field into pfsense data*/
|
|
pf_data fwdata = {0};
|
|
//memset(&fwdata, 0, sizeof(fwdata));
|
|
if(pfdata_parse(msg, &fwdata) != 0) {
|
|
printf("Failed to parse pfsense data: %s\n\n", msg);
|
|
return 1;
|
|
}
|
|
// pfdata_print(&fwdata);
|
|
cur_t = time(NULL);
|
|
cur_time = *localtime(&cur_t);
|
|
|
|
char date_formtted[32];
|
|
sprintf(date_formtted, "%04d-%02d-%02dT%02d:%02d:%02dZ",
|
|
cur_time.tm_year + 1900,
|
|
month2num(result.date.month),
|
|
result.date.day,
|
|
result.date.hour,
|
|
result.date.minute,
|
|
result.date.second);
|
|
|
|
char time_now[sizeof "2018-07-15T13:49:05Z"];
|
|
strftime(time_now, sizeof time_now, "%FT%TZ", gmtime(&cur_t));
|
|
|
|
json_object* jobj = json_object_new_object();
|
|
add_strfield(jobj, "date", time_now);
|
|
add_strfield(jobj, "log_date", date_formtted);
|
|
add_strfield(jobj, "app", result.application);
|
|
|
|
char sender_ip[64]; // 40
|
|
inet_ntop(AF_INET, &sender->sin_addr, sender_ip, sizeof(sender_ip));
|
|
add_strfield(jobj, "endpoint", sender_ip);
|
|
|
|
pfdata_to_json(&fwdata, jobj);
|
|
|
|
GeoIPRecord* ginfo = (fwdata.ipversion == 4 ? geo_get(fwdata.src_addr)
|
|
: geo_get6(fwdata.src_addr));
|
|
if(ginfo != NULL) {
|
|
json_object* srcloc = json_object_new_object();
|
|
json_object_object_add(jobj, "srcloc", srcloc);
|
|
add_doublefield(srcloc, "lat", ginfo->latitude);
|
|
add_doublefield(srcloc, "lon", ginfo->longitude);
|
|
add_strfield(jobj, "src_country", (char*)null_unknown(geo_country_name(ginfo)));
|
|
add_strfield(jobj, "src_country_code", (char*)null_unknown(ginfo->country_code));
|
|
add_strfield(jobj, "src_region", (char*)null_unknown(ginfo->region));
|
|
add_strfield(jobj, "src_state", (char*)null_unknown(GeoIP_region_name_by_code(ginfo->country_code, ginfo->region)));
|
|
add_strfield(jobj, "src_city", (char*)null_unknown(ginfo->city));
|
|
}
|
|
|
|
GeoIPRecord_delete(ginfo);
|
|
|
|
const char* json_msg = json_object_to_json_string(jobj);
|
|
// printf("%s\n", json_msg);
|
|
{
|
|
pthread_mutex_lock(&buflock);
|
|
buff_push(strdup(json_msg)); // Copy message to heap and push to buffer
|
|
pthread_mutex_unlock(&buflock);
|
|
}
|
|
json_object_put(jobj);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/*UDP server bits mostly lifted from https://cs.nyu.edu/~mwalfish/classes/16sp/classnotes/handout01.pdf*/
|
|
int run_server(int port, char* url) {
|
|
signal(SIGTERM, sig_handler);
|
|
signal(SIGINT, sig_handler);
|
|
|
|
if(elastic_check(url) != EXIT_SUCCESS)
|
|
die("Failed to contact elasticsearch");
|
|
|
|
geo_init();
|
|
es_url = url;
|
|
|
|
/*Create socket*/
|
|
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
|
|
panic("socket");
|
|
|
|
/*Set socket options*/
|
|
int one = 1;
|
|
setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
|
|
|
|
/*Bind socket*/
|
|
struct sockaddr_in peer_addr;
|
|
struct sockaddr_in my_addr = {AF_INET, htons(port), (struct in_addr){INADDR_ANY}};
|
|
if (bind(sock_fd, (struct sockaddr*)&my_addr, sizeof(struct sockaddr_in)) < 0)
|
|
panic("bind failed");
|
|
|
|
start_bufwatch();
|
|
|
|
socklen_t addrlen = sizeof(struct sockaddr_in);
|
|
char msg[4096];
|
|
while (running) {
|
|
int size_recvd;
|
|
if ((size_recvd = recvfrom(sock_fd, /* socket */
|
|
msg, /* buffer */
|
|
sizeof(msg), /* buffer length */
|
|
0, /* no flags */
|
|
(struct sockaddr*)&peer_addr, /* who's sending */
|
|
&addrlen /* length of buffer to receive peer info */
|
|
)) < 0) {
|
|
if (running) panic("recvfrom");
|
|
else break; /*sock was closed by exit signal*/
|
|
}
|
|
|
|
assert(size_recvd < sizeof(msg)); /*messages can't be longer than our buffer. TODO if they are longer we should
|
|
dump it and wait until the next loop. if the next buffer is some portion of a too-long message, we can expect
|
|
the various parsing below to fail.*/
|
|
|
|
assert(addrlen == sizeof(struct sockaddr_in));
|
|
msg[size_recvd] = '\0'; /*We receive 1 full string at a time*/
|
|
/*printf("\nGot message: %s\n", msg);*/
|
|
|
|
handle_message(msg, &peer_addr);
|
|
|
|
printf(".");
|
|
fflush(stdout);
|
|
}
|
|
|
|
bufwatch_cleanup();
|
|
buff_freeall();
|
|
geo_close();
|
|
return 0;
|
|
}
|