Geoip + elasticsearch setup

This commit is contained in:
dave 2018-07-14 19:23:50 -07:00
parent 25f1e6dcf1
commit 158a5fa20c
18 changed files with 597 additions and 139 deletions

View File

@ -1,24 +1,40 @@
CC=cc
CFLAGS := -g -I. -Wall -Wpedantic -I ../deps/build/usr/local/include/
LDFLAGS = -L ../deps/build/usr/local/lib/
LDLIBS = -ljson-c -lcurl -lpthread
override CFLAGS += -g -I. -Wall -Wpedantic -I ../deps/build/usr/local/include/
LIBPATH = ../deps/build/usr/local/lib/
LDFLAGS = -L $(LIBPATH)
LDLIBS = -ljson-c -lcurl -lpthread -lGeoIP
LDFLAGS += $(LDLIBS)
CFLAGS_STATIC = $(CFLAGS) --static
OBJ=main.o pfparser.o sysparser.o
OBJ=helpers.o pfparser.o sysparser.o msgbuffer.o geo.o elasticsearch.o server.o
TESTS=$(patsubst %.c,%.test,$(wildcard tests/*.c))
%.o: %.c $(DEPS)
$(CC) -c -o $@ $< $(CFLAGS)
csyslog: $(OBJ)
csyslog: $(OBJ) main.o
$(CC) -o $@ $^ $(LDFLAGS)
.PHONY: clean
clean:
rm -vf *.o csyslog
static: $(OBJ)
static: $(OBJ) main.o
$(CC) -o csyslog $^ $(LDFLAGS) --static
.PHONY: docker
docker: static
sudo docker build -t csyslog .
.PHONY: clean
clean: cleantests
rm -vf *.o csyslog
.PHONY: cleantests
cleantests:
rm -vf tests/*.test
$(TESTS): tests/%.test: tests/%.c $(OBJ)
$(CC) -o $@ $(OBJ) tests/$*.c $(CFLAGS) $(LDFLAGS)
.PHONY: tests
tests: $(TESTS)
@bash -c 'for test in $^ ; do printf "======== %-30s ========\n" $$test ; LD_LIBRARY_PATH=$(LIBPATH) ./$$test ; printf "%48s\n" "RC: $$?" ; done'

View File

@ -0,0 +1,73 @@
{
"template": "firewall-*",
"index_patterns": [
"firewall-*"
],
"settings": {
"number_of_replicas": 0,
"number_of_shards": 8
},
"mappings": {
"event": {
"_source": {
"enabled": true
},
"properties": {
"action": {
"type": "keyword"
},
"app": {
"type": "keyword"
},
"date": {
"type": "date"
},
"dest_addr": {
"type": "ip"
},
"dest_port": {
"type": "long"
},
"interface": {
"type": "keyword"
},
"ipversion": {
"type": "short"
},
"length": {
"type": "long"
},
"protocol_id": {
"type": "short"
},
"src_addr": {
"type": "ip"
},
"src_city": {
"type": "keyword"
},
"src_country": {
"type": "keyword"
},
"src_country_code": {
"type": "keyword"
},
"src_port": {
"type": "long"
},
"src_region": {
"type": "keyword"
},
"src_state": {
"type": "keyword"
},
"srcloc": {
"type": "geo_point"
},
"ttl": {
"type": "long"
}
}
}
}
}

70
src/elasticsearch.c Normal file
View File

@ -0,0 +1,70 @@
#include <string.h>
#include <curl/curl.h>
size_t write_data(char *dbit, size_t size, size_t nmemb, void *user_data) {
char *data = (char*)user_data;
static size_t data_size = 0;
size_t n = size * nmemb;
memcpy(data + data_size, dbit, n);
data_size += n;
data[data_size] = '\0';
return n;
}
void test_curl() {
CURL *curl;
CURLcode res;
char data[50000] = "";
curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, "http://192.168.1.120:8298/");
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_data);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 5);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &data);
res = curl_easy_perform(curl);
curl_easy_cleanup(curl);
curl_global_cleanup();
printf("%d %s", res, data);
}
int put_events(char* data) {
CURL *curl;
CURLcode res;
char response[50000] = "";
curl = curl_easy_init(); // check this and all of these curl functions
curl_easy_setopt(curl, CURLOPT_URL, "http://192.168.1.120:8298/_bulk");
curl_easy_setopt(curl, CURLOPT_POST, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 15);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_data);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
// curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
struct curl_slist *headers=NULL;
headers = curl_slist_append(headers, "Content-Type: application/json");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
res = curl_easy_perform(curl);
curl_slist_free_all(headers);
curl_easy_cleanup(curl);
curl_global_cleanup();
if(res != CURLE_OK) {
printf("%d %s\n", res, response);
}
return res == CURLE_OK ? 0 : 1;
}

2
src/elasticsearch.h Normal file
View File

@ -0,0 +1,2 @@
void test_curl();
int put_events(char* data);

View File

@ -1 +1 @@
gdb --args ./csyslog 4200
gdb -ex=r --args ./csyslog 4200

68
src/geo.c Normal file
View File

@ -0,0 +1,68 @@
#include <stdio.h>
#include <stdlib.h>
#include "geo.h"
GeoIP *gi = NULL;
GeoIP *gi6 = NULL;
void geo_init() {
gi = GeoIP_open("GeoLiteCity.dat", GEOIP_INDEX_CACHE);
gi6 = GeoIP_open("GeoLiteCityv6.dat", GEOIP_INDEX_CACHE);
if (gi == NULL || gi6 == NULL) {
fprintf(stderr, "Error opening geoip databases\n");
exit(1);
}
}
void geo_close() {
GeoIP_delete(gi);
GeoIP_delete(gi6);
}
GeoIPRecord* geo_get(char* ip) {
return GeoIP_record_by_name(gi, (const char *)ip); // GeoIP_record_by_name_v6
// must be freed later with GeoIPRecord_delete()
}
GeoIPRecord* geo_get6(char* ip) {
return GeoIP_record_by_name_v6(gi6, (const char *)ip);
// must be freed later with GeoIPRecord_delete()
}
const char* geo_country_name(GeoIPRecord* rec) {
return GeoIP_country_name_by_id(gi, GeoIP_id_by_code(rec->country_code));
}
#ifdef TEST
static const char * _mk_NA( const char * p ){
return p ? p : "N/A";
}
int main(int argc, char** argv) {
geo_init();
char* host = "24.4.129.164";
char* host6 = "2601:647:4701:733:5bf:f3c2:f2b2:9c1f";
GeoIPRecord *gir = GeoIP_record_by_name(gi, (const char *) host); // GeoIP_record_by_name_v6
// GeoIPRecord *gir = GeoIP_record_by_name_v6(gi, (const char *) host6);
printf("%s\t%s\t%s\t%s\t%s\t%s\t%f\t%f\t%d\t%d\n", host,
_mk_NA(gir->country_code),
_mk_NA(gir->region),
_mk_NA(GeoIP_region_name_by_code(gir->country_code, gir->region)),
_mk_NA(gir->city),
_mk_NA(gir->postal_code),
gir->latitude,
gir->longitude,
gir->metro_code,
gir->area_code);
GeoIPRecord_delete(gir);
geo_close();
}
#endif

8
src/geo.h Normal file
View File

@ -0,0 +1,8 @@
#include <GeoIP.h>
#include <GeoIPCity.h>
void geo_init();
void geo_close();
GeoIPRecord* geo_get(char* ip);
GeoIPRecord* geo_get6(char* ip);
const char* geo_country_name(GeoIPRecord* rec);

10
src/get-geoip.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
set -x
set -e
wget -N http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz
wget -N http://geolite.maxmind.com/download/geoip/database/GeoLiteCityv6-beta/GeoLiteCityv6.dat.gz
rm -vf *.dat
gunzip --keep Geo*.gz

8
src/helpers.c Normal file
View File

@ -0,0 +1,8 @@
#include <stdio.h>
#include <stdlib.h>
void panic(const char* s) {
perror(s);
exit(1);
}

View File

@ -1,4 +1,3 @@
/*Convert a defined token to a string*/
#define _STR(x) #x
@ -6,3 +5,5 @@
Note: #x does not work in context, 2nd layer is required ??
http://www.decompile.com/cpp/faq/file_and_line_error_string.htm*/
#define STR(x) _STR(x)
void panic(const char* s);

View File

@ -1,148 +1,27 @@
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <limits.h>
#include <signal.h>
#include "helpers.h"
#include "sysparser.h"
#include <time.h>
#include <json-c/json.h>
void panic(const char* s) {
perror(s);
exit(1);
}
#include "server.h"
/*defined here as they are used in conjunction with the shutdown signal handler*/
int running = 1;
int sock_fd;
void sig_handler(int signum) {
printf("\nExiting on signal %s\n", strsignal(signum));
running = 0; /* shut down the main loop */
shutdown(sock_fd, SHUT_RDWR); /* break the listener socket */
close(sock_fd);
}
/*UDP server bits mostly lifted from https://cs.nyu.edu/~mwalfish/classes/16sp/classnotes/handout01.pdf*/
int main(int argc, char** argv) {
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(1);
}
signal(SIGTERM, sig_handler);
signal(SIGINT, sig_handler);
/*Parse port number to integer*/
char* portend;
unsigned int portl;
portl = strtol(argv[1], &portend, 10);
if (portend == NULL) panic("strtol");
if (portend == NULL) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(1);
}
assert(portl < USHRT_MAX);
unsigned short port = (unsigned short)portl;
/*Create socket*/
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
panic("socket");
/*Set socket options*/
int one = 1;
setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
/*Bind socket*/
struct sockaddr_in my_addr, my_peer_addr;
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_addr.s_addr = INADDR_ANY;
my_addr.sin_port = htons(port); /*host to network endianess for a short - converts a *s*hort from the *h*ost's to *n*etwork's endianness*/
if (bind(sock_fd, (struct sockaddr*)&my_addr, sizeof(struct sockaddr_in)) < 0)
panic("bind failed");
time_t cur_t = {0};
struct tm cur_time = {0};
socklen_t addrlen = sizeof(struct sockaddr_in);
char msg[4096];
while (running) {
int size_recvd;
if ((size_recvd = recvfrom(sock_fd, /* socket */
msg, /* buffer */
sizeof(msg), /* buffer length */
0, /* no flags */
(struct sockaddr*)&my_peer_addr, /* whos sending */
&addrlen /* length of buffer to receive peer info */
)) < 0) {
if (running) panic("recvfrom");
else break; /*sock was closed by exit signal*/
}
assert(size_recvd < sizeof(msg)); /*messages can't be longer than our buffer. TODO if they are longer we should
dump it and wait until the next loop. if the next buffer is some portion of a too-long message, we can expect
the various parsing below to fail.*/
assert(addrlen == sizeof(struct sockaddr_in));
/*printf("\nGot message: %s\n", msg);*/
/*TODO should we check that msg[size_recvd] == \0 ?
printf("From host %s src port %d got message %.*s\n",
inet_ntoa(my_peer_addr.sin_addr), ntohs(my_peer_addr.sin_port), size_recvd, msg);*/
struct SysMessage result;
memset(&result, 0, sizeof(result)); /* Doing this or setting result above to `= {};` makes valgrind happy */
/*printf("\nsize: %lu\n\n", sizeof(result)); // curious how big the struct gets
// printf("msg[size_recvd] is: %d", msg[size_recvd]);*/
msg[size_recvd] = '\0'; /*We receive 1 full string at a time*/
/*parse syslog message into fields*/
if(sysmsg_parse(&result, msg) != 0) {
printf("Failed to parse message: %s", msg);
} else {
/*printf("syslog message is valid:\n\tpriority: %d\n\tapplication: %s\n\tDate: %s %d %02d:%02d:%02d\n",
result.priority,
result.application,
result.date.month,
result.date.day,
result.date.hour,
result.date.minute,
result.date.second);*/
/*parse MSG field into pfsense data*/
pf_data fwdata = {0};
//memset(&fwdata, 0, sizeof(fwdata));
if(pfdata_parse(msg, &fwdata) != 0) {
printf("Failed to parse pfsense data: %s\n\n", msg);
} else {
// pfdata_print(&fwdata);
cur_t = time(NULL);
cur_time = *localtime(&cur_t);
char date_formtted[32];
sprintf(date_formtted, "%04d-%02d-%02dT%02d:%02d:%02dZ",
cur_time.tm_year + 1900,
month2num(result.date.month),
result.date.day,
result.date.hour,
result.date.minute,
result.date.second);
json_object* jobj = json_object_new_object();
add_strfield(jobj, "date", date_formtted);
add_strfield(jobj, "app", result.application);
pfdata_to_json(&fwdata, jobj);
printf("%s\n",json_object_to_json_string(jobj));
json_object_put(jobj);
}
}
}
run_server(port);
exit(EXIT_SUCCESS);
}

90
src/msgbuffer.c Normal file
View File

@ -0,0 +1,90 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
struct msgbuffer_entry {
char* data;
struct msgbuffer_entry* next;
};
struct msgbuffer_entry* msgbuffer = NULL;
void buff_push(char* data) {
struct msgbuffer_entry* oldhead = msgbuffer;
msgbuffer = malloc(sizeof(struct msgbuffer_entry));
msgbuffer->data = data; // or strdup(data) if data should be moved to heap
msgbuffer->next = oldhead;
}
char* buff_pop() {
// assert(msgbuffer != NULL); // Popped when no items present
if(msgbuffer == NULL) return NULL;
char* result = msgbuffer->data;
struct msgbuffer_entry* oldhead = msgbuffer;
msgbuffer = msgbuffer->next;
free(oldhead);
return result;
}
int buff_count() {
int len = 0;
struct msgbuffer_entry* current = msgbuffer;
while (current != NULL) {
current = current->next;
len++;
}
return len;
}
void buff_freeall() {
struct msgbuffer_entry* current = msgbuffer;
struct msgbuffer_entry* next = msgbuffer;
while (current != NULL) {
next = current->next;
free(current->data); // maybe not desirable in reuse
free(current);
current = next;
}
msgbuffer = NULL;
}
char* buff_pop_head() {
// assert(msgbuffer != NULL); // Popped when no items present
if(msgbuffer == NULL) return NULL;
char* result = NULL;
struct msgbuffer_entry* previous = msgbuffer;
struct msgbuffer_entry* current = msgbuffer;
while (current->next != NULL) {
previous = current;
current = current->next;
}
result = current->data;
previous->next = NULL;
if(current == msgbuffer) msgbuffer = NULL;
free(current);
return result;
}
void buff_push_head(char* data) {
struct msgbuffer_entry* new = malloc(sizeof(struct msgbuffer_entry));
new->data = data;
new->next = NULL;
if(msgbuffer == NULL) {
msgbuffer = new;
} else {
struct msgbuffer_entry* current = msgbuffer;
while (current->next != NULL) {
current = current->next;
}
current -> next = new;
}
}

12
src/msgbuffer.h Normal file
View File

@ -0,0 +1,12 @@
void buff_push(char* data);
char* buff_pop();
int buff_count();
void buff_freeall();
char* buff_pop_head();
void buff_push_head(char* data);

BIN
src/msgbuffer.test Executable file

Binary file not shown.

View File

@ -1,6 +1,7 @@
#include <stdio.h>
#include <string.h>
#include "pfparser.h"
#include "geo.h"
int pfdata_parse(char* message, pf_data* result) {
@ -310,6 +311,17 @@ void add_strfield(json_object* obj, char* name, char* value) {
}
void add_doublefield(json_object* obj, char* name, double value) {
json_object *number = json_object_new_double(value);
json_object_object_add(obj, name, number);
}
const char* null_unknown(const char* p){
return p ? p : "unknown";
}
int pfdata_to_json(pf_data* data, json_object* obj) {
/*
Populate the passed json_object obj with data from from pf_data data.
@ -352,5 +364,21 @@ int pfdata_to_json(pf_data* data, json_object* obj) {
}
}
GeoIPRecord* ginfo = (data->ipversion == 4 ? geo_get(data->src_addr)
: geo_get6(data->src_addr));
if(ginfo != NULL) {
json_object* srcloc = json_object_new_object();
json_object_object_add(obj, "srcloc", srcloc);
add_doublefield(srcloc, "lat", ginfo->latitude);
add_doublefield(srcloc, "lon", ginfo->longitude);
add_strfield(obj, "src_country", (char*)null_unknown(geo_country_name(ginfo)));
add_strfield(obj, "src_country_code", (char*)null_unknown(ginfo->country_code));
add_strfield(obj, "src_region", (char*)null_unknown(ginfo->region));
add_strfield(obj, "src_state", (char*)null_unknown(GeoIP_region_name_by_code(ginfo->country_code, ginfo->region)));
add_strfield(obj, "src_city", (char*)null_unknown(ginfo->city));
}
GeoIPRecord_delete(ginfo);
return 0;
}

View File

@ -0,0 +1,6 @@
#!/bin/sh
set -e
set -x
curl -X PUT "http://homeapps1:8298/_template/firewall" -H Content-Type: application/json -d @elasticsearch-template.json

186
src/server.c Normal file
View File

@ -0,0 +1,186 @@
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <assert.h>
#include <json-c/json.h>
#include "helpers.h"
#include "sysparser.h"
#include "msgbuffer.h"
#include "geo.h"
#include "elasticsearch.h"
#define BUFF_MAX 5
/*defined here as they are used in conjunction with the shutdown signal handler*/
int running = 1;
int sock_fd;
void sig_handler(int signum) {
printf("\nExiting on signal %s\n", strsignal(signum));
running = 0; /* shut down the main loop */
shutdown(sock_fd, SHUT_RDWR); /* break the listener socket */
close(sock_fd);
}
time_t cur_t = {0};
struct tm cur_time = {0};
int handle_message(char* msg) {
/*TODO should we check that msg[size_recvd] == \0 ?
printf("From host %s src port %d got message %.*s\n",
inet_ntoa(my_peer_addr.sin_addr), ntohs(my_peer_addr.sin_port), size_recvd, msg);*/
struct SysMessage result;
memset(&result, 0, sizeof(result)); /* Doing this or setting result above to `= {};` makes valgrind happy */
/*printf("\nsize: %lu\n\n", sizeof(result)); // curious how big the struct gets
// printf("msg[size_recvd] is: %d", msg[size_recvd]);*/
/*parse syslog message into fields*/
if(sysmsg_parse(&result, msg) != 0) {
printf("Failed to parse message: %s", msg);
} else {
/*printf("syslog message is valid:\n\tpriority: %d\n\tapplication: %s\n\tDate: %s %d %02d:%02d:%02d\n",
result.priority,
result.application,
result.date.month,
result.date.day,
result.date.hour,
result.date.minute,
result.date.second);*/
/*parse MSG field into pfsense data*/
pf_data fwdata = {0};
//memset(&fwdata, 0, sizeof(fwdata));
if(pfdata_parse(msg, &fwdata) != 0) {
printf("Failed to parse pfsense data: %s\n\n", msg);
} else {
// pfdata_print(&fwdata);
cur_t = time(NULL);
cur_time = *localtime(&cur_t);
char date_formtted[32];
sprintf(date_formtted, "%04d-%02d-%02dT%02d:%02d:%02dZ",
cur_time.tm_year + 1900,
month2num(result.date.month),
result.date.day,
result.date.hour,
result.date.minute,
result.date.second);
json_object* jobj = json_object_new_object();
add_strfield(jobj, "date", date_formtted);
add_strfield(jobj, "app", result.application);
pfdata_to_json(&fwdata, jobj);
const char* json_msg = json_object_to_json_string(jobj);
// printf("%s\n", json_msg);
buff_push(strdup(json_msg)); // Copy message to heap and push to buffer
json_object_put(jobj);
}
}
return 0;
}
void clear_buffer() {
char* header = "{\"index\": {\"_index\": \"firewall-test\", \"_type\": \"event\"}}\n";
int header_size = strlen(header);
// Calculate how large the payload will be
int num_messages = buff_count();
char* messages[num_messages];
int message_size = 0;
for(int i=0; i<num_messages; i++) {
messages[i] = buff_pop();
message_size += strlen(messages[i]) + header_size + 1; // 1 newline
}
// Allocate and build the message
char* message = calloc(1, message_size + 1);
for(int i=0; i<num_messages; i++) {
strcat(message, header);
strcat(message, messages[i]);
strcat(message, "\n");
free(messages[i]);
}
// Send it
if(put_events(message) == 0) {
printf("Pushed %d messages\n", num_messages);
} else {
printf("Failed to post messages!\n");
}
free(message);
}
/*UDP server bits mostly lifted from https://cs.nyu.edu/~mwalfish/classes/16sp/classnotes/handout01.pdf*/
int run_server(int port) {
geo_init();
signal(SIGTERM, sig_handler);
signal(SIGINT, sig_handler);
/*Create socket*/
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
panic("socket");
/*Set socket options*/
int one = 1;
setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
/*Bind socket*/
struct sockaddr_in my_addr, my_peer_addr;
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_addr.s_addr = INADDR_ANY;
my_addr.sin_port = htons(port); /*host to network endianess for a short - converts a *s*hort from the *h*ost's to *n*etwork's endianness*/
if (bind(sock_fd, (struct sockaddr*)&my_addr, sizeof(struct sockaddr_in)) < 0)
panic("bind failed");
socklen_t addrlen = sizeof(struct sockaddr_in);
char msg[4096];
while (running) {
int size_recvd;
if ((size_recvd = recvfrom(sock_fd, /* socket */
msg, /* buffer */
sizeof(msg), /* buffer length */
0, /* no flags */
(struct sockaddr*)&my_peer_addr, /* whos sending */
&addrlen /* length of buffer to receive peer info */
)) < 0) {
if (running) panic("recvfrom");
else break; /*sock was closed by exit signal*/
}
assert(size_recvd < sizeof(msg)); /*messages can't be longer than our buffer. TODO if they are longer we should
dump it and wait until the next loop. if the next buffer is some portion of a too-long message, we can expect
the various parsing below to fail.*/
assert(addrlen == sizeof(struct sockaddr_in));
msg[size_recvd] = '\0'; /*We receive 1 full string at a time*/
/*printf("\nGot message: %s\n", msg);*/
handle_message(msg);
printf(".");
fflush(stdout);
if(buff_count() > BUFF_MAX) {
printf("\n");
clear_buffer();
}
}
printf("Clearing buffer, freeing %d entries\n", buff_count());
buff_freeall();
geo_close();
return 1;
}

1
src/server.h Normal file
View File

@ -0,0 +1 @@
int run_server(int port);