From 3f8cde16df66d159f98e76ca68224fcf6e7b252e Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 31 Jul 2020 10:27:48 -0700 Subject: [PATCH] initial commit --- .gitignore | 1 + .gitmodules | 3 + COVID-19 | 1 + Makefile | 65 ++++++++++++++++ README.md | 1 + headers.py | 26 +++++++ ingest.py | 178 ++++++++++++++++++++++++++++++++++++++++++++ ingest2.py | 67 +++++++++++++++++ ingest3.py | 173 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 9 +++ testdata/.gitignore | 1 + 11 files changed, 525 insertions(+) create mode 100644 .gitignore create mode 100644 .gitmodules create mode 160000 COVID-19 create mode 100644 Makefile create mode 100644 README.md create mode 100644 headers.py create mode 100755 ingest.py create mode 100755 ingest2.py create mode 100644 ingest3.py create mode 100644 requirements.txt create mode 100644 testdata/.gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..264b772 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/testenv \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..23a7dd4 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "COVID-19"] + path = COVID-19 + url = https://github.com/CSSEGISandData/COVID-19.git diff --git a/COVID-19 b/COVID-19 new file mode 160000 index 0000000..21f8615 --- /dev/null +++ b/COVID-19 @@ -0,0 +1 @@ +Subproject commit 21f8615831b38706587f6cc1c8bc4e4f4b0f128e diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a3c045d --- /dev/null +++ b/Makefile @@ -0,0 +1,65 @@ +INFLUX_DATA_PATH := ${CURDIR}/testdata/influx +GRAFANA_DATA_PATH := ${CURDIR}/testdata/grafana +VENV := testenv + +INFLUX := http://localhost:10019/covid + + +.PHONY: local-venv-setup +local-venv-setup: + virtualenv -p python3 $(VENV) + ./$(VENV)/bin/python3 -m pip install -r requirements.txt + + +.PHONY: local-venv-wipe +local-venv-wipe: + rm -rf $(VENV) + + +.PHONY: run-local-influx +run-local-influx: + -docker rm covid-influx + mkdir -p $(INFLUX_DATA_PATH) + docker run -d --name covid-influx -p 10019:8086 -v ${INFLUX_DATA_PATH}:/var/lib/influxdb influxdb:1.7 + echo "http://localhost:10019" + + +.PHONY: stop-local-influx +stop-local-influx: + -docker stop covid-influx + + +.PHONY: wipe-local-influx +wipe-local-influx: + -rm -rf $(INFLUX_DATA_PATH) + + +.PHONY: local-influx-cli +local-influx-cli: + docker exec -it covid-influx influx + + +.PHONY: run-local-grafana +run-local-grafana: + -docker rm covid-grafana + mkdir -p $(GRAFANA_DATA_PATH) + docker run -d --name covid-grafana --link covid-influx:influx -p 10020:3000 -v ${GRAFANA_DATA_PATH}:/var/lib/grafana grafana/grafana + echo "http://localhost:10020" + + +.PHONY: stop-local-grafana +stop-local-grafana: + -docker stop covid-grafana + + +.PHONY: wipe-local-grafana +wipe-local-grafana: + -rm -rf $(GRAFANA_DATA_PATH) + +.PHONY: ingest +ingest: + python3 ingest.py + +.PHONY: pull +pull: + cd COVID-19/; git pull diff --git a/README.md b/README.md new file mode 100644 index 0000000..6994283 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +The amount of energy needed to refute bullshit is an order of magnitude bigger than to produce it. \ No newline at end of file diff --git a/headers.py b/headers.py new file mode 100644 index 0000000..80834e1 --- /dev/null +++ b/headers.py @@ -0,0 +1,26 @@ +import os +import csv + + +basedir = "COVID-19/csse_covid_19_data/csse_covid_19_daily_reports_us" + +headers = [] + +for fname in os.listdir(basedir): + fpath = os.path.join(basedir, fname) + with open(fpath) as f: + c = csv.reader(f) + for row in c: + headers.append(set(row)) + break + + +for headerset in headers: + for other in headers: + if headerset != other: + print(headerset) + print(other) + +print("ok") + +print(list(headers[0])) diff --git a/ingest.py b/ingest.py new file mode 100755 index 0000000..b1a2c7d --- /dev/null +++ b/ingest.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 + +import csv +from collections import namedtuple +from influxdb import InfluxDBClient +from urllib.parse import urlparse +import datetime + +row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"] +Row = namedtuple("Row", " ".join(row_fields)) + + +FIELD_INFO = { + "State": { + "aliases": { + "Province/State", + "Province_State", + "\ufeffProvince/State", + }, + "type": str, + "empty": "" + }, + "Country": { + "aliases": { + "Country/Region", + "Country_Region", + }, + "type": str, + "empty": "" + }, + "Last_Update": { + "aliases": { + "Last Update", + "Last_Update", + }, + "type": str, + "empty": "" + }, + "Confirmed": { + "aliases": set(), + "type": int, + "empty": -1 + }, + "Deaths": { + "aliases": set(), + "type": int, + "empty": -1 + }, + "Recovered": { + "aliases": set(), + "type": int, + "empty": -1 + }, + "Active": { + "aliases": {"Active", }, + "type": int, + "empty": -1 + }, + "Latitude": { + "aliases": {"Lat", }, + "type": float, + "empty": 0.0 + }, + "Longitude": { + "aliases": {"Long_", }, + "type": float, + "empty": 0.0 + }, +} + +for key in FIELD_INFO.keys(): + FIELD_INFO[key]["aliases"].update([key]) + + +# fields we allow to missing +optional_fields = frozenset(["Active", "Latitude", "Longitude"]) + + +def csv_row_to_row(header_names, row): + """ + Given a list of header names and list of row fields, + Convert the row into a Row object. + The name_aliases table above is used to alias names + """ + data = {header_names[i]: row[i] for i in range(0, len(header_names))} + + row_field_values = [] + + for field_name in row_fields: + field_info = FIELD_INFO[field_name] + valid_names = field_info["aliases"] + value = None + for name in valid_names: + try: + value = data[name] + # print(f"{field_name} -> {name}") + break + except KeyError: + continue + + if value is None: + if field_name in optional_fields: + # print(f"Zeroing '{field_name}") + value = -1 + else: + # import pdb ;pdb.set_trace() + raise Exception(f"Not matching field found for {field_name}, headers were: {header_names}") + + try: + value = field_info["type"](value) + except ValueError: + print(f"{field_name}: '{value}' -> {field_info['empty']}") + value = field_info['empty'] + + row_field_values.append(value) + + return Row(*row_field_values) + + +def get_rows(fpath): + first = True + headers = None + with open(fpath, "r") as f: + r = csv.reader(f) + for line in r: + if first: + first = False + headers = line + continue + yield csv_row_to_row(headers, line) + + +def get_data_for_influx(fpath): + data = [] + for row in get_rows(fpath): + data.append({ + "measurement": "covid", + "tags": { + "state": row.State, + "country": row.Country + }, + "time": row.Last_Update, # TODO + "fields": { + "confirmed": row.Confirmed, + "deaths": row.Deaths, + "recovered": row.Recovered, + "active": row.Active, + } + }) + return data + + +def ingest_file(influx_client, fname): + d = get_data_for_influx(fname) + influx_client.write_points(d) + + +def main(): + influx_uri = urlparse("http://localhost:10019/") + influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password) + influx_client.create_database("covid") + influx_client.switch_database("covid") + + when = datetime.date(month=1, day=22, year=2020) + now = datetime.date.today() + + while when < now: + daystring = when.strftime("%m-%d-%Y") + fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/{daystring}.csv" + print(fname) + + ingest_file(influx_client, fname) + + when = when + datetime.timedelta(days=1) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/ingest2.py b/ingest2.py new file mode 100755 index 0000000..5e7e6ef --- /dev/null +++ b/ingest2.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +import csv +from influxdb import InfluxDBClient +from urllib.parse import urlparse +import datetime + +sources = { + "recovered": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv", + "deaths": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv", + "cases": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv", +} + +left_headers = ["Province/State", "Country/Region", "Lat", "Long"] + + +def load_source(fpath, field_name): + first = True + dates = None + data = [] + + with open(fpath, "r") as f: + r = csv.reader(f) + for line in r: + if first: + first = False + dates = line[len(left_headers):] + continue + + state, country, lat, lon = line[0:4] + row = line[len(left_headers):] + + for i, date in enumerate(dates): + date_ob = datetime.datetime.strptime(date, "%m/%d/%y") + data.append({ + "measurement": "covid", + "tags": { + "country": country, + "state": state or "ALL" + }, + "time": date_ob.isoformat(), + "fields": { + field_name: row[i] + } + }) + return data + + +def get_payload(): + payload = [] + for field_name, fpath in sources.items(): + payload.extend(load_source(fpath, field_name)) + return payload + + +def main(): + influx_uri = urlparse("http://localhost:10019/") + influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password) + influx_client.create_database("covid") + influx_client.switch_database("covid") + + data = get_payload() + influx_client.write_points(data) + + +if __name__ == '__main__': + main() diff --git a/ingest3.py b/ingest3.py new file mode 100644 index 0000000..053a24f --- /dev/null +++ b/ingest3.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 + +import csv +from influxdb import InfluxDBClient +from urllib.parse import urlparse +import datetime + +row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"] + +f2int = lambda x: int(float(x)) + +row_fields = { + 'Hospitalization_Rate': float, + 'People_Hospitalized': f2int, + 'Incident_Rate': float, + 'Province_State': str, + 'FIPS': f2int, + 'People_Tested': f2int, + 'Lat': float, + 'Long_': float, + 'ISO3': str, + 'Testing_Rate': float, + 'Deaths': f2int, + 'Mortality_Rate': float, + 'Recovered': f2int, + 'Confirmed': f2int, + 'UID': f2int, + 'Last_Update': None, + 'Active': f2int, + 'Country_Region': str, +} + +# https://www.nytimes.com/elections/2016/results/president +states = { + 'red': { + 'Georgia', + 'Ohio', + 'Montana', + 'Pennsylvania', + 'South Dakota', + 'Tennessee', + 'Nebraska', + 'North Dakota', + 'Mississippi', + 'Utah', + 'Missouri', + 'Alaska', + 'Idaho', + 'Arkansas', + 'Wyoming', + 'Alabama', + 'Indiana', + 'Kentucky', + 'Louisiana', + 'Kansas', + 'Florida', + 'Iowa', + 'Oklahoma', + 'Texas', + 'West Virginia', + 'Arizona', + 'South Carolina', + 'Wisconsin', + 'North Carolina', + 'Michigan', + }, + 'blue': { + 'Minnesota', + 'New Mexico', + 'Oregon', + 'Nevada', + 'New Jersey', + 'Colorado', + 'Washington', + 'New Hampshire', + 'District of Columbia', + 'Maryland', + 'Virginia', + 'California', + 'Hawaii', + 'Massachusetts', + 'New York', + 'Rhode Island', + 'Vermont', + 'Connecticut', + 'Delaware', + 'Illinois', + 'Maine', + }, + 'other': { + 'American Samoa', + 'Guam', + 'Puerto Rico', + 'Diamond Princess', + 'Virgin Islands', + 'Grand Princess', + 'Northern Mariana Islands', + } +} + +states_bycolor = {} +for color, states in states.items(): + for state in states: + states_bycolor[state] = color + + +def convert(func, inp): + if inp == "": + return func(0) + return func(inp) + + +def get_rows(fpath): + first = True + headers = None + with open(fpath, "r") as f: + r = csv.reader(f) + for line in r: + if first: + first = False + headers = line + continue + yield {headers[i]: convert(row_fields[headers[i]], line[i]) + for i in range(0, len(headers)) + if row_fields[headers[i]]} + + +def get_data_for_influx(fpath, assigned_date=None): + data = [] + for row in get_rows(fpath): + if row["Province_State"] == "Recovered": + continue + data.append({ + "measurement": "covid", + "tags": { + "state": row["Province_State"], + "iso3": row["ISO3"], + "color": states_bycolor[row["Province_State"]] + }, + "time": assigned_date or row["Last_Update"], + "fields": row + }) + return data + + +def ingest_file(influx_client, fname, assigned_date): + d = get_data_for_influx(fname, assigned_date) + # import json + # print(json.dumps(d, indent=4)) + influx_client.write_points(d) + + +def main(): + influx_uri = urlparse("http://localhost:10019/") + influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password) + influx_client.create_database("covid") + influx_client.switch_database("covid") + + when = datetime.date(month=4, day=12, year=2020) + now = datetime.date.today() + + while when < now: + daystring = when.strftime("%m-%d-%Y") + fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports_us/{daystring}.csv" + print(fname) + + ingest_file(influx_client, fname, when.strftime("%Y-%m-%dT%H:%M:%SZ")) + + when = when + datetime.timedelta(days=1) + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..eab05c6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +certifi==2019.11.28 +chardet==3.0.4 +idna==2.9 +influxdb==5.2.3 +python-dateutil==2.8.1 +pytz==2019.3 +requests==2.23.0 +six==1.14.0 +urllib3==1.25.8 diff --git a/testdata/.gitignore b/testdata/.gitignore new file mode 100644 index 0000000..f59ec20 --- /dev/null +++ b/testdata/.gitignore @@ -0,0 +1 @@ +* \ No newline at end of file