my tools for viewing covid data
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

178 lines
4.3 KiB

  1. #!/usr/bin/env python3
  2. import csv
  3. from collections import namedtuple
  4. from influxdb import InfluxDBClient
  5. from urllib.parse import urlparse
  6. import datetime
  7. row_fields = ["State", "Country", "Last_Update", "Confirmed", "Deaths", "Recovered", "Active", "Latitude", "Longitude"]
  8. Row = namedtuple("Row", " ".join(row_fields))
  9. FIELD_INFO = {
  10. "State": {
  11. "aliases": {
  12. "Province/State",
  13. "Province_State",
  14. "\ufeffProvince/State",
  15. },
  16. "type": str,
  17. "empty": ""
  18. },
  19. "Country": {
  20. "aliases": {
  21. "Country/Region",
  22. "Country_Region",
  23. },
  24. "type": str,
  25. "empty": ""
  26. },
  27. "Last_Update": {
  28. "aliases": {
  29. "Last Update",
  30. "Last_Update",
  31. },
  32. "type": str,
  33. "empty": ""
  34. },
  35. "Confirmed": {
  36. "aliases": set(),
  37. "type": int,
  38. "empty": -1
  39. },
  40. "Deaths": {
  41. "aliases": set(),
  42. "type": int,
  43. "empty": -1
  44. },
  45. "Recovered": {
  46. "aliases": set(),
  47. "type": int,
  48. "empty": -1
  49. },
  50. "Active": {
  51. "aliases": {"Active", },
  52. "type": int,
  53. "empty": -1
  54. },
  55. "Latitude": {
  56. "aliases": {"Lat", },
  57. "type": float,
  58. "empty": 0.0
  59. },
  60. "Longitude": {
  61. "aliases": {"Long_", },
  62. "type": float,
  63. "empty": 0.0
  64. },
  65. }
  66. for key in FIELD_INFO.keys():
  67. FIELD_INFO[key]["aliases"].update([key])
  68. # fields we allow to missing
  69. optional_fields = frozenset(["Active", "Latitude", "Longitude"])
  70. def csv_row_to_row(header_names, row):
  71. """
  72. Given a list of header names and list of row fields,
  73. Convert the row into a Row object.
  74. The name_aliases table above is used to alias names
  75. """
  76. data = {header_names[i]: row[i] for i in range(0, len(header_names))}
  77. row_field_values = []
  78. for field_name in row_fields:
  79. field_info = FIELD_INFO[field_name]
  80. valid_names = field_info["aliases"]
  81. value = None
  82. for name in valid_names:
  83. try:
  84. value = data[name]
  85. # print(f"{field_name} -> {name}")
  86. break
  87. except KeyError:
  88. continue
  89. if value is None:
  90. if field_name in optional_fields:
  91. # print(f"Zeroing '{field_name}")
  92. value = -1
  93. else:
  94. # import pdb ;pdb.set_trace()
  95. raise Exception(f"Not matching field found for {field_name}, headers were: {header_names}")
  96. try:
  97. value = field_info["type"](value)
  98. except ValueError:
  99. print(f"{field_name}: '{value}' -> {field_info['empty']}")
  100. value = field_info['empty']
  101. row_field_values.append(value)
  102. return Row(*row_field_values)
  103. def get_rows(fpath):
  104. first = True
  105. headers = None
  106. with open(fpath, "r") as f:
  107. r = csv.reader(f)
  108. for line in r:
  109. if first:
  110. first = False
  111. headers = line
  112. continue
  113. yield csv_row_to_row(headers, line)
  114. def get_data_for_influx(fpath):
  115. data = []
  116. for row in get_rows(fpath):
  117. data.append({
  118. "measurement": "covid",
  119. "tags": {
  120. "state": row.State,
  121. "country": row.Country
  122. },
  123. "time": row.Last_Update, # TODO
  124. "fields": {
  125. "confirmed": row.Confirmed,
  126. "deaths": row.Deaths,
  127. "recovered": row.Recovered,
  128. "active": row.Active,
  129. }
  130. })
  131. return data
  132. def ingest_file(influx_client, fname):
  133. d = get_data_for_influx(fname)
  134. influx_client.write_points(d)
  135. def main():
  136. influx_uri = urlparse("http://localhost:10019/")
  137. influx_client = InfluxDBClient(influx_uri.hostname, str(influx_uri.port)) # user, password)
  138. influx_client.create_database("covid")
  139. influx_client.switch_database("covid")
  140. when = datetime.date(month=1, day=22, year=2020)
  141. now = datetime.date.today()
  142. while when < now:
  143. daystring = when.strftime("%m-%d-%Y")
  144. fname = f"COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/{daystring}.csv"
  145. print(fname)
  146. ingest_file(influx_client, fname)
  147. when = when + datetime.timedelta(days=1)
  148. if __name__ == '__main__':
  149. main()