-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
116 lines (95 loc) · 3.37 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import csv
import sys
import logging
import requests
import json
from pydantic import BaseSettings
from abc import ABC, abstractmethod
from requests import Response
from pathlib import Path
from urllib.parse import urlencode
from io import StringIO
class Settings(BaseSettings):
OUTPUT_DIR: Path = Path("/mnt/apache_nas_data/public/export_json_csv")
UNWANTED_CSV_COLUMNS: list = ("FID", "the_geom")
UNWANTED_JSON_COLUMNS: list = ("bbox",)
MAX_FEATURES: int = 5000
GEOSERVER_WFS_URL: str = "https://www.geo2france.fr/geoserver/wfs"
GEOSERVER_LAYERS: list = ("cr_hdf:epci",)
LOG_LEVEL: str = "INFO"
class Process(ABC):
"""
Common interface to fetching data from geoserver WFS in csv and json.
"""
def __init__(self, settings: "Settings", layer: str) -> None:
self.layer = layer
self.settings = settings
@abstractmethod
def run(self) -> None:
pass
def download(self, output_format: str) -> Response:
qs = urlencode(
{
"request": "GetFeature",
"typeName": self.layer,
"maxFeature": self.settings.MAX_FEATURES,
"outputFormat": output_format,
"version": "1.0.0"
}
)
url = f"{self.settings.GEOSERVER_WFS_URL}?{qs}"
r = requests.get(url)
r.raise_for_status()
return r
@abstractmethod
def clean(self, data: str) -> str:
pass
@abstractmethod
def store(self, data: str, path: Path) -> None:
pass
class ProcessCsv(Process):
"""
Concrete implentation of fetching CSV from geoserver WFS
"""
def run(self) -> None:
csv = self.download("csv")
cleaned = self.clean(csv.text)
self.store(cleaned, f"result_{self.layer}.csv")
def clean(self, data: str) -> StringIO:
csv.field_size_limit(sys.maxsize)
output = StringIO()
reader = csv.DictReader(StringIO(data))
writer = csv.DictWriter(output, fieldnames=[x for x in reader.fieldnames if x not in self.settings.UNWANTED_CSV_COLUMNS])
writer.writeheader()
for line in reader:
for unwanted in self.settings.UNWANTED_CSV_COLUMNS:
if unwanted in line:
del line[unwanted]
writer.writerow(line)
return output
def store(self, data: StringIO, output_file: str) -> None:
with open(self.settings.OUTPUT_DIR / output_file, 'w') as f:
data.seek(0)
f.write(data.read())
class ProcessJson(Process):
"""
Concrete implentation of fetching JSON from geoserver WFS
"""
def run(self):
res = self.download("json")
cleaned = self.clean(res.json())
self.store(cleaned, f"result_{self.layer}.json")
def clean(self, geojson: dict) -> dict:
for features in geojson["features"]:
for unwanted in self.settings.UNWANTED_JSON_COLUMNS:
features.pop(unwanted, None)
return geojson["features"]
def store(self, data: dict, output_file: str) -> None:
with open(self.settings.OUTPUT_DIR / output_file, "w") as f:
json.dump(data, f)
if __name__ == "__main__":
settings = Settings()
logging.basicConfig(level=settings.LOG_LEVEL)
for layer in settings.GEOSERVER_LAYERS:
ProcessCsv(settings, layer).run()
ProcessJson(settings, layer).run()