Browse Source

initial commit

master
dave 3 months ago
commit
3f8cde16df
11 changed files with 525 additions and 0 deletions
  1. +1
    -0
      .gitignore
  2. +3
    -0
      .gitmodules
  3. +1
    -0
      COVID-19
  4. +65
    -0
      Makefile
  5. +1
    -0
      README.md
  6. +26
    -0
      headers.py
  7. +178
    -0
      ingest.py
  8. +67
    -0
      ingest2.py
  9. +173
    -0
      ingest3.py
  10. +9
    -0
      requirements.txt
  11. +1
    -0
      testdata/.gitignore

+ 1
- 0
.gitignore View File

@@ -0,0 +1 @@
/testenv

+ 3
- 0
.gitmodules View File

@@ -0,0 +1,3 @@
[submodule "COVID-19"]
path = COVID-19
url = https://github.com/CSSEGISandData/COVID-19.git

+ 1
- 0
COVID-19

@@ -0,0 +1 @@
Subproject commit 21f8615831b38706587f6cc1c8bc4e4f4b0f128e

+ 65
- 0
Makefile View File

@@ -0,0 +1,65 @@
INFLUX_DATA_PATH := ${CURDIR}/testdata/influx
GRAFANA_DATA_PATH := ${CURDIR}/testdata/grafana
VENV := testenv

INFLUX := http://localhost:10019/covid


.PHONY: local-venv-setup
local-venv-setup:
virtualenv -p python3 $(VENV)
./$(VENV)/bin/python3 -m pip install -r requirements.txt


.PHONY: local-venv-wipe
local-venv-wipe:
rm -rf $(VENV)


.PHONY: run-local-influx
run-local-influx:
-docker rm covid-influx
mkdir -p $(INFLUX_DATA_PATH)
docker run -d --name covid-influx -p 10019:8086 -v ${INFLUX_DATA_PATH}:/var/lib/influxdb influxdb:1.7
echo "http://localhost:10019"


.PHONY: stop-local-influx
stop-local-influx:
-docker stop covid-influx


.PHONY: wipe-local-influx
wipe-local-influx:
-rm -rf $(INFLUX_DATA_PATH)


.PHONY: local-influx-cli
local-influx-cli:
docker exec -it covid-influx influx


.PHONY: run-local-grafana
run-local-grafana:
-docker rm covid-grafana
mkdir -p $(GRAFANA_DATA_PATH)
docker run -d --name covid-grafana --link covid-influx:influx -p 10020:3000 -v ${GRAFANA_DATA_PATH}:/var/lib/grafana grafana/grafana
echo "http://localhost:10020"


.PHONY: stop-local-grafana
stop-local-grafana:
-docker stop covid-grafana


.PHONY: wipe-local-grafana
wipe-local-grafana:
-rm -rf $(GRAFANA_DATA_PATH)

.PHONY: ingest
ingest:
python3 ingest.py

.PHONY: pull
pull:
cd COVID-19/; git pull

+ 1
- 0
README.md View File

@@ -0,0 +1 @@
The amount of energy needed to refute bullshit is an order of magnitude bigger than to produce it.

+ 26
- 0
headers.py View File

@@ -0,0 +1,26 @@
import os
import csv


basedir = "COVID-19/csse_covid_19_data/csse_covid_19_daily_reports_us"

headers = []

for fname in os.listdir(basedir):
fpath = os.path.join(basedir, fname)
with open(fpath) as f:
c = csv.reader(f)
for row in c:
headers.append(set(row))
break


for headerset in headers:
for other in headers:
if headerset != other:
print(headerset)
print(other)

print("ok")

print(list(headers[0]))

+ 178
- 0
ingest.py View File

@@ -0,0 +1,178 @@
#!/usr/bin/env python3

import csv
from collections import namedtuple
from influxdb import InfluxDBClient
from urllib.parse import urlparse
import datetime

row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"]
Row = namedtuple("Row", " ".join(row_fields))


FIELD_INFO = {
"State": {
"aliases": {
"Province/State",
"Province_State",
"\ufeffProvince/State",
},
"type": str,
"empty": ""
},
"Country": {
"aliases": {
"Country/Region",
"Country_Region",
},
"type": str,
"empty": ""
},
"Last_Update": {
"aliases": {
"Last Update",
"Last_Update",
},
"type": str,
"empty": ""
},
"Confirmed": {
"aliases": set(),
"type": int,
"empty": -1
},
"Deaths": {
"aliases": set(),
"type": int,
"empty": -1
},
"Recovered": {
"aliases": set(),
"type": int,
"empty": -1
},
"Active": {
"aliases": {"Active", },
"type": int,
"empty": -1
},
"Latitude": {
"aliases": {"Lat", },
"type": float,
"empty": 0.0
},
"Longitude": {
"aliases": {"Long_", },
"type": float,
"empty": 0.0
},
}

for key in FIELD_INFO.keys():
FIELD_INFO[key]["aliases"].update([key])


# fields we allow to missing
optional_fields = frozenset(["Active", "Latitude", "Longitude"])


def csv_row_to_row(header_names, row):
"""
Given a list of header names and list of row fields,
Convert the row into a Row object.
The name_aliases table above is used to alias names
"""
data = {header_names[i]: row[i] for i in range(0, len(header_names))}

row_field_values = []

for field_name in row_fields:
field_info = FIELD_INFO[field_name]
valid_names = field_info["aliases"]
value = None
for name in valid_names:
try:
value = data[name]
# print(f"{field_name} -> {name}")
break
except KeyError:
continue

if value is None:
if field_name in optional_fields:
# print(f"Zeroing '{field_name}")
value = -1
else:
# import pdb ;pdb.set_trace()
raise Exception(f"Not matching field found for {field_name}, headers were: {header_names}")

try:
value = field_info["type"](value)
except ValueError:
print(f"{field_name}: '{value}' -> {field_info['empty']}")
value = field_info['empty']

row_field_values.append(value)

return Row(*row_field_values)


def get_rows(fpath):
first = True
headers = None
with open(fpath, "r") as f:
r = csv.reader(f)
for line in r:
if first:
first = False
headers = line
continue
yield csv_row_to_row(headers, line)


def get_data_for_influx(fpath):
data = []
for row in get_rows(fpath):
data.append({
"measurement": "covid",
"tags": {
"state": row.State,
"country": row.Country
},
"time": row.Last_Update, # TODO
"fields": {
"confirmed": row.Confirmed,
"deaths": row.Deaths,
"recovered": row.Recovered,
"active": row.Active,
}
})
return data


def ingest_file(influx_client, fname):
d = get_data_for_influx(fname)
influx_client.write_points(d)


def main():
influx_uri = urlparse("http://localhost:10019/")
influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password)
influx_client.create_database("covid")
influx_client.switch_database("covid")

when = datetime.date(month=1, day=22, year=2020)
now = datetime.date.today()

while when < now:
daystring = when.strftime("%m-%d-%Y")
fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/{daystring}.csv"
print(fname)

ingest_file(influx_client, fname)

when = when + datetime.timedelta(days=1)


if __name__ == '__main__':
main()

+ 67
- 0
ingest2.py View File

@@ -0,0 +1,67 @@
#!/usr/bin/env python3

import csv
from influxdb import InfluxDBClient
from urllib.parse import urlparse
import datetime

sources = {
"recovered": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv",
"deaths": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv",
"cases": "COVID-19/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv",
}

left_headers = ["Province/State", "Country/Region", "Lat", "Long"]


def load_source(fpath, field_name):
first = True
dates = None
data = []

with open(fpath, "r") as f:
r = csv.reader(f)
for line in r:
if first:
first = False
dates = line[len(left_headers):]
continue

state, country, lat, lon = line[0:4]
row = line[len(left_headers):]

for i, date in enumerate(dates):
date_ob = datetime.datetime.strptime(date, "%m/%d/%y")
data.append({
"measurement": "covid",
"tags": {
"country": country,
"state": state or "ALL"
},
"time": date_ob.isoformat(),
"fields": {
field_name: row[i]
}
})
return data


def get_payload():
payload = []
for field_name, fpath in sources.items():
payload.extend(load_source(fpath, field_name))
return payload


def main():
influx_uri = urlparse("http://localhost:10019/")
influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password)
influx_client.create_database("covid")
influx_client.switch_database("covid")

data = get_payload()
influx_client.write_points(data)


if __name__ == '__main__':
main()

+ 173
- 0
ingest3.py View File

@@ -0,0 +1,173 @@
#!/usr/bin/env python3

import csv
from influxdb import InfluxDBClient
from urllib.parse import urlparse
import datetime

row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"]

f2int = lambda x: int(float(x))

row_fields = {
'Hospitalization_Rate': float,
'People_Hospitalized': f2int,
'Incident_Rate': float,
'Province_State': str,
'FIPS': f2int,
'People_Tested': f2int,
'Lat': float,
'Long_': float,
'ISO3': str,
'Testing_Rate': float,
'Deaths': f2int,
'Mortality_Rate': float,
'Recovered': f2int,
'Confirmed': f2int,
'UID': f2int,
'Last_Update': None,
'Active': f2int,
'Country_Region': str,
}

# https://www.nytimes.com/elections/2016/results/president
states = {
'red': {
'Georgia',
'Ohio',
'Montana',
'Pennsylvania',
'South Dakota',
'Tennessee',
'Nebraska',
'North Dakota',
'Mississippi',
'Utah',
'Missouri',
'Alaska',
'Idaho',
'Arkansas',
'Wyoming',
'Alabama',
'Indiana',
'Kentucky',
'Louisiana',
'Kansas',
'Florida',
'Iowa',
'Oklahoma',
'Texas',
'West Virginia',
'Arizona',
'South Carolina',
'Wisconsin',
'North Carolina',
'Michigan',
},
'blue': {
'Minnesota',
'New Mexico',
'Oregon',
'Nevada',
'New Jersey',
'Colorado',
'Washington',
'New Hampshire',
'District of Columbia',
'Maryland',
'Virginia',
'California',
'Hawaii',
'Massachusetts',
'New York',
'Rhode Island',
'Vermont',
'Connecticut',
'Delaware',
'Illinois',
'Maine',
},
'other': {
'American Samoa',
'Guam',
'Puerto Rico',
'Diamond Princess',
'Virgin Islands',
'Grand Princess',
'Northern Mariana Islands',
}
}

states_bycolor = {}
for color, states in states.items():
for state in states:
states_bycolor[state] = color


def convert(func, inp):
if inp == "":
return func(0)
return func(inp)


def get_rows(fpath):
first = True
headers = None
with open(fpath, "r") as f:
r = csv.reader(f)
for line in r:
if first:
first = False
headers = line
continue
yield {headers[i]: convert(row_fields[headers[i]], line[i])
for i in range(0, len(headers))
if row_fields[headers[i]]}


def get_data_for_influx(fpath, assigned_date=None):
data = []
for row in get_rows(fpath):
if row["Province_State"] == "Recovered":
continue
data.append({
"measurement": "covid",
"tags": {
"state": row["Province_State"],
"iso3": row["ISO3"],
"color": states_bycolor[row["Province_State"]]
},
"time": assigned_date or row["Last_Update"],
"fields": row
})
return data


def ingest_file(influx_client, fname, assigned_date):
d = get_data_for_influx(fname, assigned_date)
# import json
# print(json.dumps(d, indent=4))
influx_client.write_points(d)


def main():
influx_uri = urlparse("http://localhost:10019/")
influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password)
influx_client.create_database("covid")
influx_client.switch_database("covid")

when = datetime.date(month=4, day=12, year=2020)
now = datetime.date.today()

while when < now:
daystring = when.strftime("%m-%d-%Y")
fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports_us/{daystring}.csv"
print(fname)

ingest_file(influx_client, fname, when.strftime("%Y-%m-%dT%H:%M:%SZ"))

when = when + datetime.timedelta(days=1)


if __name__ == '__main__':
main()

+ 9
- 0
requirements.txt View File

@@ -0,0 +1,9 @@
certifi==2019.11.28
chardet==3.0.4
idna==2.9
influxdb==5.2.3
python-dateutil==2.8.1
pytz==2019.3
requests==2.23.0
six==1.14.0
urllib3==1.25.8

+ 1
- 0
testdata/.gitignore View File

@@ -0,0 +1 @@
*

Loading…
Cancel
Save