covid/ingest.py

178 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
import csv
from collections import namedtuple
from influxdb import InfluxDBClient
from urllib.parse import urlparse
import datetime
row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"]
Row = namedtuple("Row", " ".join(row_fields))
FIELD_INFO = {
"State": {
"aliases": {
"Province/State",
"Province_State",
"\ufeffProvince/State",
},
"type": str,
"empty": ""
},
"Country": {
"aliases": {
"Country/Region",
"Country_Region",
},
"type": str,
"empty": ""
},
"Last_Update": {
"aliases": {
"Last Update",
"Last_Update",
},
"type": str,
"empty": ""
},
"Confirmed": {
"aliases": set(),
"type": int,
"empty": -1
},
"Deaths": {
"aliases": set(),
"type": int,
"empty": -1
},
"Recovered": {
"aliases": set(),
"type": int,
"empty": -1
},
"Active": {
"aliases": {"Active", },
"type": int,
"empty": -1
},
"Latitude": {
"aliases": {"Lat", },
"type": float,
"empty": 0.0
},
"Longitude": {
"aliases": {"Long_", },
"type": float,
"empty": 0.0
},
}
for key in FIELD_INFO.keys():
FIELD_INFO[key]["aliases"].update([key])
# fields we allow to missing
optional_fields = frozenset(["Active", "Latitude", "Longitude"])
def csv_row_to_row(header_names, row):
"""
Given a list of header names and list of row fields,
Convert the row into a Row object.
The name_aliases table above is used to alias names
"""
data = {header_names[i]: row[i] for i in range(0, len(header_names))}
row_field_values = []
for field_name in row_fields:
field_info = FIELD_INFO[field_name]
valid_names = field_info["aliases"]
value = None
for name in valid_names:
try:
value = data[name]
# print(f"{field_name} -> {name}")
break
except KeyError:
continue
if value is None:
if field_name in optional_fields:
# print(f"Zeroing '{field_name}")
value = -1
else:
# import pdb ;pdb.set_trace()
raise Exception(f"Not matching field found for {field_name}, headers were: {header_names}")
try:
value = field_info["type"](value)
except ValueError:
print(f"{field_name}: '{value}' -> {field_info['empty']}")
value = field_info['empty']
row_field_values.append(value)
return Row(*row_field_values)
def get_rows(fpath):
first = True
headers = None
with open(fpath, "r") as f:
r = csv.reader(f)
for line in r:
if first:
first = False
headers = line
continue
yield csv_row_to_row(headers, line)
def get_data_for_influx(fpath):
data = []
for row in get_rows(fpath):
data.append({
"measurement": "covid",
"tags": {
"state": row.State,
"country": row.Country
},
"time": row.Last_Update, # TODO
"fields": {
"confirmed": row.Confirmed,
"deaths": row.Deaths,
"recovered": row.Recovered,
"active": row.Active,
}
})
return data
def ingest_file(influx_client, fname):
d = get_data_for_influx(fname)
influx_client.write_points(d)
def main():
influx_uri = urlparse("http://localhost:10019/")
influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password)
influx_client.create_database("covid")
influx_client.switch_database("covid")
when = datetime.date(month=1, day=22, year=2020)
now = datetime.date.today()
while when < now:
daystring = when.strftime("%m-%d-%Y")
fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/{daystring}.csv"
print(fname)
ingest_file(influx_client, fname)
when = when + datetime.timedelta(days=1)
if __name__ == '__main__':
main()