|
- #!/usr/bin/env python3
-
- import csv
- from collections import namedtuple
- from influxdb import InfluxDBClient
- from urllib.parse import urlparse
- import datetime
-
- row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"]
- Row = namedtuple("Row", " ".join(row_fields))
-
-
- FIELD_INFO = {
- "State": {
- "aliases": {
- "Province/State",
- "Province_State",
- "\ufeffProvince/State",
- },
- "type": str,
- "empty": ""
- },
- "Country": {
- "aliases": {
- "Country/Region",
- "Country_Region",
- },
- "type": str,
- "empty": ""
- },
- "Last_Update": {
- "aliases": {
- "Last Update",
- "Last_Update",
- },
- "type": str,
- "empty": ""
- },
- "Confirmed": {
- "aliases": set(),
- "type": int,
- "empty": -1
- },
- "Deaths": {
- "aliases": set(),
- "type": int,
- "empty": -1
- },
- "Recovered": {
- "aliases": set(),
- "type": int,
- "empty": -1
- },
- "Active": {
- "aliases": {"Active", },
- "type": int,
- "empty": -1
- },
- "Latitude": {
- "aliases": {"Lat", },
- "type": float,
- "empty": 0.0
- },
- "Longitude": {
- "aliases": {"Long_", },
- "type": float,
- "empty": 0.0
- },
- }
-
- for key in FIELD_INFO.keys():
- FIELD_INFO[key]["aliases"].update([key])
-
-
- # fields we allow to missing
- optional_fields = frozenset(["Active", "Latitude", "Longitude"])
-
-
- def csv_row_to_row(header_names, row):
- """
- Given a list of header names and list of row fields,
- Convert the row into a Row object.
- The name_aliases table above is used to alias names
- """
- data = {header_names[i]: row[i] for i in range(0, len(header_names))}
-
- row_field_values = []
-
- for field_name in row_fields:
- field_info = FIELD_INFO[field_name]
- valid_names = field_info["aliases"]
- value = None
- for name in valid_names:
- try:
- value = data[name]
- # print(f"{field_name} -> {name}")
- break
- except KeyError:
- continue
-
- if value is None:
- if field_name in optional_fields:
- # print(f"Zeroing '{field_name}")
- value = -1
- else:
- # import pdb ;pdb.set_trace()
- raise Exception(f"Not matching field found for {field_name}, headers were: {header_names}")
-
- try:
- value = field_info["type"](value)
- except ValueError:
- print(f"{field_name}: '{value}' -> {field_info['empty']}")
- value = field_info['empty']
-
- row_field_values.append(value)
-
- return Row(*row_field_values)
-
-
- def get_rows(fpath):
- first = True
- headers = None
- with open(fpath, "r") as f:
- r = csv.reader(f)
- for line in r:
- if first:
- first = False
- headers = line
- continue
- yield csv_row_to_row(headers, line)
-
-
- def get_data_for_influx(fpath):
- data = []
- for row in get_rows(fpath):
- data.append({
- "measurement": "covid",
- "tags": {
- "state": row.State,
- "country": row.Country
- },
- "time": row.Last_Update, # TODO
- "fields": {
- "confirmed": row.Confirmed,
- "deaths": row.Deaths,
- "recovered": row.Recovered,
- "active": row.Active,
- }
- })
- return data
-
-
- def ingest_file(influx_client, fname):
- d = get_data_for_influx(fname)
- influx_client.write_points(d)
-
-
- def main():
- influx_uri = urlparse("http://localhost:10019/")
- influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password)
- influx_client.create_database("covid")
- influx_client.switch_database("covid")
-
- when = datetime.date(month=1, day=22, year=2020)
- now = datetime.date.today()
-
- while when < now:
- daystring = when.strftime("%m-%d-%Y")
- fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/{daystring}.csv"
- print(fname)
-
- ingest_file(influx_client, fname)
-
- when = when + datetime.timedelta(days=1)
-
-
- if __name__ == '__main__':
- main()
|