covid/ingest2.py
2020-07-31 10:31:27 -07:00

68 lines
1.9 KiB
Python
Executable File

#!/usr/bin/env python3
import csv
from influxdb import InfluxDBClient
from urllib.parse import urlparse
import datetime
sources = {
"recovered": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv",
"deaths": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv",
"cases": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv",
}
left_headers = ["Province/State", "Country/Region", "Lat", "Long"]
def load_source(fpath, field_name):
first = True
dates = None
data = []
with open(fpath, "r") as f:
r = csv.reader(f)
for line in r:
if first:
first = False
dates = line[len(left_headers):]
continue
state, country, lat, lon = line[0:4]
row = line[len(left_headers):]
for i, date in enumerate(dates):
date_ob = datetime.datetime.strptime(date, "%m/%d/%y")
data.append({
"measurement": "covid",
"tags": {
"country": country,
"state": state or "ALL"
},
"time": date_ob.isoformat(),
"fields": {
field_name: row[i]
}
})
return data
def get_payload():
payload = []
for field_name, fpath in sources.items():
payload.extend(load_source(fpath, field_name))
return payload
def main():
influx_uri = urlparse("http://localhost:10019/")
influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password)
influx_client.create_database("covid")
influx_client.switch_database("covid")
data = get_payload()
influx_client.write_points(data)
if __name__ == '__main__':
main()