commit 3f8cde16df66d159f98e76ca68224fcf6e7b252e Author: dave Date: Fri Jul 31 10:27:48 2020 -0700 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..264b772 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/testenv \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..23a7dd4 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "COVID-19"] + path = COVID-19 + url = https://github.com/CSSEGISandData/COVID-19.git diff --git a/COVID-19 b/COVID-19 new file mode 160000 index 0000000..21f8615 --- /dev/null +++ b/COVID-19 @@ -0,0 +1 @@ +Subproject commit 21f8615831b38706587f6cc1c8bc4e4f4b0f128e diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a3c045d --- /dev/null +++ b/Makefile @@ -0,0 +1,65 @@ +INFLUX_DATA_PATH := ${CURDIR}/testdata/influx +GRAFANA_DATA_PATH := ${CURDIR}/testdata/grafana +VENV := testenv + +INFLUX := http://localhost:10019/covid + + +.PHONY: local-venv-setup +local-venv-setup: + virtualenv -p python3 $(VENV) + ./$(VENV)/bin/python3 -m pip install -r requirements.txt + + +.PHONY: local-venv-wipe +local-venv-wipe: + rm -rf $(VENV) + + +.PHONY: run-local-influx +run-local-influx: + -docker rm covid-influx + mkdir -p $(INFLUX_DATA_PATH) + docker run -d --name covid-influx -p 10019:8086 -v ${INFLUX_DATA_PATH}:/var/lib/influxdb influxdb:1.7 + echo "http://localhost:10019" + + +.PHONY: stop-local-influx +stop-local-influx: + -docker stop covid-influx + + +.PHONY: wipe-local-influx +wipe-local-influx: + -rm -rf $(INFLUX_DATA_PATH) + + +.PHONY: local-influx-cli +local-influx-cli: + docker exec -it covid-influx influx + + +.PHONY: run-local-grafana +run-local-grafana: + -docker rm covid-grafana + mkdir -p $(GRAFANA_DATA_PATH) + docker run -d --name covid-grafana --link covid-influx:influx -p 10020:3000 -v ${GRAFANA_DATA_PATH}:/var/lib/grafana grafana/grafana + echo "http://localhost:10020" + + +.PHONY: stop-local-grafana +stop-local-grafana: + -docker stop covid-grafana + + +.PHONY: wipe-local-grafana +wipe-local-grafana: + -rm -rf $(GRAFANA_DATA_PATH) + +.PHONY: ingest +ingest: + python3 ingest.py + +.PHONY: pull +pull: + cd COVID-19/; git pull diff --git a/README.md b/README.md new file mode 100644 index 0000000..6994283 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +The amount of energy needed to refute bullshit is an order of magnitude bigger than to produce it. \ No newline at end of file diff --git a/headers.py b/headers.py new file mode 100644 index 0000000..80834e1 --- /dev/null +++ b/headers.py @@ -0,0 +1,26 @@ +import os +import csv + + +basedir = "COVID-19/csse_covid_19_data/csse_covid_19_daily_reports_us" + +headers = [] + +for fname in os.listdir(basedir): + fpath = os.path.join(basedir, fname) + with open(fpath) as f: + c = csv.reader(f) + for row in c: + headers.append(set(row)) + break + + +for headerset in headers: + for other in headers: + if headerset != other: + print(headerset) + print(other) + +print("ok") + +print(list(headers[0])) diff --git a/ingest.py b/ingest.py new file mode 100755 index 0000000..b1a2c7d --- /dev/null +++ b/ingest.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 + +import csv +from collections import namedtuple +from influxdb import InfluxDBClient +from urllib.parse import urlparse +import datetime + +row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"] +Row = namedtuple("Row", " ".join(row_fields)) + + +FIELD_INFO = { + "State": { + "aliases": { + "Province/State", + "Province_State", + "\ufeffProvince/State", + }, + "type": str, + "empty": "" + }, + "Country": { + "aliases": { + "Country/Region", + "Country_Region", + }, + "type": str, + "empty": "" + }, + "Last_Update": { + "aliases": { + "Last Update", + "Last_Update", + }, + "type": str, + "empty": "" + }, + "Confirmed": { + "aliases": set(), + "type": int, + "empty": -1 + }, + "Deaths": { + "aliases": set(), + "type": int, + "empty": -1 + }, + "Recovered": { + "aliases": set(), + "type": int, + "empty": -1 + }, + "Active": { + "aliases": {"Active", }, + "type": int, + "empty": -1 + }, + "Latitude": { + "aliases": {"Lat", }, + "type": float, + "empty": 0.0 + }, + "Longitude": { + "aliases": {"Long_", }, + "type": float, + "empty": 0.0 + }, +} + +for key in FIELD_INFO.keys(): + FIELD_INFO[key]["aliases"].update([key]) + + +# fields we allow to missing +optional_fields = frozenset(["Active", "Latitude", "Longitude"]) + + +def csv_row_to_row(header_names, row): + """ + Given a list of header names and list of row fields, + Convert the row into a Row object. + The name_aliases table above is used to alias names + """ + data = {header_names[i]: row[i] for i in range(0, len(header_names))} + + row_field_values = [] + + for field_name in row_fields: + field_info = FIELD_INFO[field_name] + valid_names = field_info["aliases"] + value = None + for name in valid_names: + try: + value = data[name] + # print(f"{field_name} -> {name}") + break + except KeyError: + continue + + if value is None: + if field_name in optional_fields: + # print(f"Zeroing '{field_name}") + value = -1 + else: + # import pdb ;pdb.set_trace() + raise Exception(f"Not matching field found for {field_name}, headers were: {header_names}") + + try: + value = field_info["type"](value) + except ValueError: + print(f"{field_name}: '{value}' -> {field_info['empty']}") + value = field_info['empty'] + + row_field_values.append(value) + + return Row(*row_field_values) + + +def get_rows(fpath): + first = True + headers = None + with open(fpath, "r") as f: + r = csv.reader(f) + for line in r: + if first: + first = False + headers = line + continue + yield csv_row_to_row(headers, line) + + +def get_data_for_influx(fpath): + data = [] + for row in get_rows(fpath): + data.append({ + "measurement": "covid", + "tags": { + "state": row.State, + "country": row.Country + }, + "time": row.Last_Update, # TODO + "fields": { + "confirmed": row.Confirmed, + "deaths": row.Deaths, + "recovered": row.Recovered, + "active": row.Active, + } + }) + return data + + +def ingest_file(influx_client, fname): + d = get_data_for_influx(fname) + influx_client.write_points(d) + + +def main(): + influx_uri = urlparse("http://localhost:10019/") + influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password) + influx_client.create_database("covid") + influx_client.switch_database("covid") + + when = datetime.date(month=1, day=22, year=2020) + now = datetime.date.today() + + while when < now: + daystring = when.strftime("%m-%d-%Y") + fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/{daystring}.csv" + print(fname) + + ingest_file(influx_client, fname) + + when = when + datetime.timedelta(days=1) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/ingest2.py b/ingest2.py new file mode 100755 index 0000000..5e7e6ef --- /dev/null +++ b/ingest2.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +import csv +from influxdb import InfluxDBClient +from urllib.parse import urlparse +import datetime + +sources = { + "recovered": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv", + "deaths": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv", + "cases": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv", +} + +left_headers = ["Province/State", "Country/Region", "Lat", "Long"] + + +def load_source(fpath, field_name): + first = True + dates = None + data = [] + + with open(fpath, "r") as f: + r = csv.reader(f) + for line in r: + if first: + first = False + dates = line[len(left_headers):] + continue + + state, country, lat, lon = line[0:4] + row = line[len(left_headers):] + + for i, date in enumerate(dates): + date_ob = datetime.datetime.strptime(date, "%m/%d/%y") + data.append({ + "measurement": "covid", + "tags": { + "country": country, + "state": state or "ALL" + }, + "time": date_ob.isoformat(), + "fields": { + field_name: row[i] + } + }) + return data + + +def get_payload(): + payload = [] + for field_name, fpath in sources.items(): + payload.extend(load_source(fpath, field_name)) + return payload + + +def main(): + influx_uri = urlparse("http://localhost:10019/") + influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password) + influx_client.create_database("covid") + influx_client.switch_database("covid") + + data = get_payload() + influx_client.write_points(data) + + +if __name__ == '__main__': + main() diff --git a/ingest3.py b/ingest3.py new file mode 100644 index 0000000..053a24f --- /dev/null +++ b/ingest3.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 + +import csv +from influxdb import InfluxDBClient +from urllib.parse import urlparse +import datetime + +row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"] + +f2int = lambda x: int(float(x)) + +row_fields = { + 'Hospitalization_Rate': float, + 'People_Hospitalized': f2int, + 'Incident_Rate': float, + 'Province_State': str, + 'FIPS': f2int, + 'People_Tested': f2int, + 'Lat': float, + 'Long_': float, + 'ISO3': str, + 'Testing_Rate': float, + 'Deaths': f2int, + 'Mortality_Rate': float, + 'Recovered': f2int, + 'Confirmed': f2int, + 'UID': f2int, + 'Last_Update': None, + 'Active': f2int, + 'Country_Region': str, +} + +# https://www.nytimes.com/elections/2016/results/president +states = { + 'red': { + 'Georgia', + 'Ohio', + 'Montana', + 'Pennsylvania', + 'South Dakota', + 'Tennessee', + 'Nebraska', + 'North Dakota', + 'Mississippi', + 'Utah', + 'Missouri', + 'Alaska', + 'Idaho', + 'Arkansas', + 'Wyoming', + 'Alabama', + 'Indiana', + 'Kentucky', + 'Louisiana', + 'Kansas', + 'Florida', + 'Iowa', + 'Oklahoma', + 'Texas', + 'West Virginia', + 'Arizona', + 'South Carolina', + 'Wisconsin', + 'North Carolina', + 'Michigan', + }, + 'blue': { + 'Minnesota', + 'New Mexico', + 'Oregon', + 'Nevada', + 'New Jersey', + 'Colorado', + 'Washington', + 'New Hampshire', + 'District of Columbia', + 'Maryland', + 'Virginia', + 'California', + 'Hawaii', + 'Massachusetts', + 'New York', + 'Rhode Island', + 'Vermont', + 'Connecticut', + 'Delaware', + 'Illinois', + 'Maine', + }, + 'other': { + 'American Samoa', + 'Guam', + 'Puerto Rico', + 'Diamond Princess', + 'Virgin Islands', + 'Grand Princess', + 'Northern Mariana Islands', + } +} + +states_bycolor = {} +for color, states in states.items(): + for state in states: + states_bycolor[state] = color + + +def convert(func, inp): + if inp == "": + return func(0) + return func(inp) + + +def get_rows(fpath): + first = True + headers = None + with open(fpath, "r") as f: + r = csv.reader(f) + for line in r: + if first: + first = False + headers = line + continue + yield {headers[i]: convert(row_fields[headers[i]], line[i]) + for i in range(0, len(headers)) + if row_fields[headers[i]]} + + +def get_data_for_influx(fpath, assigned_date=None): + data = [] + for row in get_rows(fpath): + if row["Province_State"] == "Recovered": + continue + data.append({ + "measurement": "covid", + "tags": { + "state": row["Province_State"], + "iso3": row["ISO3"], + "color": states_bycolor[row["Province_State"]] + }, + "time": assigned_date or row["Last_Update"], + "fields": row + }) + return data + + +def ingest_file(influx_client, fname, assigned_date): + d = get_data_for_influx(fname, assigned_date) + # import json + # print(json.dumps(d, indent=4)) + influx_client.write_points(d) + + +def main(): + influx_uri = urlparse("http://localhost:10019/") + influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password) + influx_client.create_database("covid") + influx_client.switch_database("covid") + + when = datetime.date(month=4, day=12, year=2020) + now = datetime.date.today() + + while when < now: + daystring = when.strftime("%m-%d-%Y") + fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports_us/{daystring}.csv" + print(fname) + + ingest_file(influx_client, fname, when.strftime("%Y-%m-%dT%H:%M:%SZ")) + + when = when + datetime.timedelta(days=1) + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..eab05c6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +certifi==2019.11.28 +chardet==3.0.4 +idna==2.9 +influxdb==5.2.3 +python-dateutil==2.8.1 +pytz==2019.3 +requests==2.23.0 +six==1.14.0 +urllib3==1.25.8 diff --git a/testdata/.gitignore b/testdata/.gitignore new file mode 100644 index 0000000..f59ec20 --- /dev/null +++ b/testdata/.gitignore @@ -0,0 +1 @@ +* \ No newline at end of file