#!/usr/bin/env python3 import csv from collections import namedtuple from influxdb import InfluxDBClient from urllib.parse import urlparse import datetime row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"] Row = namedtuple("Row", " ".join(row_fields)) FIELD_INFO = { "State": { "aliases": { "Province/State", "Province_State", "\ufeffProvince/State", }, "type": str, "empty": "" }, "Country": { "aliases": { "Country/Region", "Country_Region", }, "type": str, "empty": "" }, "Last_Update": { "aliases": { "Last Update", "Last_Update", }, "type": str, "empty": "" }, "Confirmed": { "aliases": set(), "type": int, "empty": -1 }, "Deaths": { "aliases": set(), "type": int, "empty": -1 }, "Recovered": { "aliases": set(), "type": int, "empty": -1 }, "Active": { "aliases": {"Active", }, "type": int, "empty": -1 }, "Latitude": { "aliases": {"Lat", }, "type": float, "empty": 0.0 }, "Longitude": { "aliases": {"Long_", }, "type": float, "empty": 0.0 }, } for key in FIELD_INFO.keys(): FIELD_INFO[key]["aliases"].update([key]) # fields we allow to missing optional_fields = frozenset(["Active", "Latitude", "Longitude"]) def csv_row_to_row(header_names, row): """ Given a list of header names and list of row fields, Convert the row into a Row object. The name_aliases table above is used to alias names """ data = {header_names[i]: row[i] for i in range(0, len(header_names))} row_field_values = [] for field_name in row_fields: field_info = FIELD_INFO[field_name] valid_names = field_info["aliases"] value = None for name in valid_names: try: value = data[name] # print(f"{field_name} -> {name}") break except KeyError: continue if value is None: if field_name in optional_fields: # print(f"Zeroing '{field_name}") value = -1 else: # import pdb ;pdb.set_trace() raise Exception(f"Not matching field found for {field_name}, headers were: {header_names}") try: value = field_info["type"](value) except ValueError: print(f"{field_name}: '{value}' -> {field_info['empty']}") value = field_info['empty'] row_field_values.append(value) return Row(*row_field_values) def get_rows(fpath): first = True headers = None with open(fpath, "r") as f: r = csv.reader(f) for line in r: if first: first = False headers = line continue yield csv_row_to_row(headers, line) def get_data_for_influx(fpath): data = [] for row in get_rows(fpath): data.append({ "measurement": "covid", "tags": { "state": row.State, "country": row.Country }, "time": row.Last_Update, # TODO "fields": { "confirmed": row.Confirmed, "deaths": row.Deaths, "recovered": row.Recovered, "active": row.Active, } }) return data def ingest_file(influx_client, fname): d = get_data_for_influx(fname) influx_client.write_points(d) def main(): influx_uri = urlparse("http://localhost:10019/") influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password) influx_client.create_database("covid") influx_client.switch_database("covid") when = datetime.date(month=1, day=22, year=2020) now = datetime.date.today() while when < now: daystring = when.strftime("%m-%d-%Y") fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/{daystring}.csv" print(fname) ingest_file(influx_client, fname) when = when + datetime.timedelta(days=1) if __name__ == '__main__': main()