-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #51 from Yicong-Huang/env-crawl-rebase
env crawler
- Loading branch information
Showing
6 changed files
with
279 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import logging | ||
import os | ||
from datetime import timedelta, date | ||
from ftplib import FTP, error_perm | ||
from typing import List, Optional | ||
|
||
import rootpath | ||
|
||
rootpath.append() | ||
|
||
from paths import PRISM_DATA_PATH | ||
from backend.data_preparation.crawler.crawlerbase import CrawlerBase | ||
|
||
logger = logging.getLogger('TaskManager') | ||
|
||
|
||
class PRISMCrawler(CrawlerBase): | ||
FTP_SERVER = 'prism.nacse.org' | ||
VARIABLES = ['ppt', 'tmax', 'vpdmax'] | ||
STAGES = ['stable', 'provisional', 'early'] | ||
ADDITIONAL_CODES = { | ||
'ppt': '4kmD2', | ||
'tmax': '4kmD1', | ||
'vpdmax': '4kmD1' | ||
} | ||
|
||
def __init__(self): | ||
super().__init__() | ||
|
||
self.ftp = FTP(PRISMCrawler.FTP_SERVER) | ||
self.ftp.login('anonymous') | ||
self.buffer: List[bytes] = list() | ||
|
||
def crawl(self, target_date: date, variable: str) -> Optional[str]: | ||
"""this func will download a single file""" | ||
if not os.path.exists(PRISM_DATA_PATH): | ||
os.makedirs(PRISM_DATA_PATH) | ||
|
||
self.ftp.cwd(f'/daily/{variable}/{target_date.strftime("%Y")}') | ||
for stage in PRISMCrawler.STAGES: | ||
filename = f'PRISM_{variable}_{stage}_{PRISMCrawler.ADDITIONAL_CODES[variable]}' \ | ||
f'_{target_date.strftime("%Y%m%d")}_bil.zip' | ||
try: | ||
self.ftp.retrbinary(f"RETR {filename}", lambda content: self.buffer.append(content)) | ||
except error_perm as e: | ||
logger.info(e) | ||
continue | ||
else: | ||
zip_content = b''.join(self.buffer) | ||
with open(os.path.join(PRISM_DATA_PATH, filename), 'wb') as f: | ||
f.write(zip_content) | ||
self.buffer.clear() | ||
logger.info(f'file-written: {filename}') | ||
return os.path.join(PRISM_DATA_PATH, filename) | ||
|
||
# return None if not crawled | ||
return None | ||
|
||
|
||
if __name__ == '__main__': | ||
logger.setLevel(logging.INFO) | ||
logger.addHandler(logging.StreamHandler()) | ||
crawler = PRISMCrawler() | ||
crawler.crawl(date.today() - timedelta(days=1), 'ppt') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import datetime | ||
import logging | ||
|
||
import numpy as np | ||
import psycopg2.errors | ||
import psycopg2.extras | ||
import rootpath | ||
|
||
rootpath.append() | ||
from backend.data_preparation.connection import Connection | ||
from backend.data_preparation.dumper.dumperbase import DumperBase | ||
|
||
logger = logging.getLogger('TaskManager') | ||
|
||
|
||
class PRISMDumper(DumperBase): | ||
INSERT_SQLS = { | ||
'ppt': ''' | ||
insert into prism (date, gid, ppt) values %s | ||
ON CONFLICT (date, gid) DO UPDATE SET | ||
ppt=EXCLUDED.ppt | ||
''', | ||
'tmax': ''' | ||
insert into prism (date, gid, tmax) values %s | ||
ON CONFLICT (date, gid) DO UPDATE SET | ||
tmax=EXCLUDED.tmax | ||
''', | ||
'vpdmax': ''' | ||
insert into prism (date, gid, vpdmax) values %s | ||
ON CONFLICT (date, gid) DO UPDATE SET | ||
vpdmax=EXCLUDED.vpdmax | ||
''' | ||
} | ||
INSERT_INFOS = { | ||
'ppt': 'insert into prism_info (date, ppt) values (%s, %s) ' | ||
'on conflict(date) do update set ppt=EXCLUDED.ppt', | ||
'tmax': 'insert into prism_info (date, tmax) values (%s, %s) ' | ||
'on conflict(date) do update set tmax=EXCLUDED.tmax', | ||
'vpdmax': 'insert into prism_info (date, vpdmax) values (%s, %s) ' | ||
'on conflict(date) do update set vpdmax=EXCLUDED.vpdmax' | ||
} | ||
|
||
def insert(self, date: datetime.date, _data: np.ndarray, var_type: str): | ||
with Connection() as conn: | ||
cur = conn.cursor() | ||
psycopg2.extras.execute_values(cur, PRISMDumper.INSERT_SQLS[var_type], | ||
PRISMDumper.record_generator(date, _data), | ||
template=None, page_size=10000) | ||
cur.execute(PRISMDumper.INSERT_INFOS[var_type], (date, 1)) | ||
conn.commit() | ||
cur.close() | ||
|
||
@staticmethod | ||
def record_generator(date: datetime.date, _data): | ||
for gid, val in enumerate(_data.tolist()): | ||
yield (date, gid, val) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import logging | ||
import os | ||
import zipfile | ||
from typing import Optional | ||
|
||
import numpy as np | ||
import rootpath | ||
|
||
rootpath.append() | ||
|
||
from backend.data_preparation.extractor.extractorbase import ExtractorBase | ||
|
||
logger = logging.getLogger('TaskManager') | ||
|
||
class BILFormat: | ||
def __init__(self): | ||
self.ndarray: np.ndarray | ||
self.flattened: np.ndarray | ||
|
||
|
||
class BILExtractor(ExtractorBase): | ||
# crop: | ||
# begin point(191, 14) | ||
# (228, 248) | ||
CROP_TOP = 191 | ||
CROP_BOTTOM = 419 | ||
CROP_LEFT = 14 | ||
CROP_RIGHT = 262 | ||
|
||
def extract(self, filepath) -> Optional[BILFormat]: | ||
# extract files | ||
try: | ||
filename = os.path.basename(filepath) | ||
zf = zipfile.ZipFile(filepath) | ||
|
||
zf.extract(os.path.splitext(filename)[0] + '.hdr', os.path.split(filepath)[0]) | ||
zf.extract(os.path.splitext(filename)[0] + '.bil', os.path.split(filepath)[0]) | ||
header_path = os.path.join(os.path.split(filepath)[0], os.path.splitext(filename)[0] + '.hdr') | ||
bil_path = os.path.join(os.path.split(filepath)[0], os.path.splitext(filename)[0] + '.bil') | ||
|
||
# read header and BIL | ||
bil: BILFormat = BILExtractor.read_prism_bil(header_path, bil_path) | ||
|
||
# clean up | ||
os.remove(header_path) | ||
os.remove(bil_path) | ||
|
||
return bil | ||
except FileNotFoundError: | ||
logger.error('[PRISM-Extractor][FileNotFoundError]') | ||
return None | ||
|
||
# TODO: export numpy array file | ||
def export(self, file_type: str, file_name: str) -> None: | ||
pass | ||
|
||
@staticmethod | ||
def read_prism_bil(hdr_path, bil_path) -> BILFormat: | ||
"""Read an array from ESRI BIL raster file""" | ||
hdr_dict = BILExtractor.read_prism_hdr(hdr_path) | ||
# For now, only use NROWS, NCOLS, and NODATA | ||
# Eventually use NBANDS, BYTEORDER, LAYOUT, PIXELTYPE, NBITS | ||
|
||
prism_array = np.fromfile(bil_path, dtype=np.float32) # type: np.ndarray | ||
prism_array = prism_array.reshape( | ||
int(hdr_dict['NROWS']), int(hdr_dict['NCOLS'])) | ||
|
||
# replace -9999 with np.nan | ||
prism_array[prism_array == float(hdr_dict['NODATA'])] = np.nan | ||
|
||
bil = BILFormat() | ||
bil.ndarray = prism_array[BILExtractor.CROP_TOP:BILExtractor.CROP_BOTTOM, | ||
BILExtractor.CROP_LEFT:BILExtractor.CROP_RIGHT] | ||
bil.flattened = prism_array.flatten() | ||
return bil | ||
|
||
@staticmethod | ||
def read_prism_hdr(hdr_path: str): | ||
"""Read an ESRI BIL HDR file""" | ||
with open(hdr_path, 'r') as input_f: | ||
header_list = input_f.readlines() | ||
# noinspection PyTypeChecker | ||
return dict(item.strip().split() for item in header_list) | ||
|
||
|
||
if __name__ == '__main__': | ||
logger.setLevel(logging.INFO) | ||
logger.addHandler(logging.StreamHandler()) | ||
|
||
ext = BILExtractor() | ||
print(ext.extract('E:\\Projects\\Wildfires\\data\\PRISM\\PRISM_ppt_early_4kmD2_20190802_bil.zip')) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import logging | ||
import os | ||
import time | ||
from datetime import datetime, timedelta, timezone | ||
from typing import List | ||
|
||
import rootpath | ||
|
||
rootpath.append() | ||
from backend.task.runnable import Runnable | ||
from backend.data_preparation.connection import Connection | ||
from backend.data_preparation.crawler.prism_crawler import PRISMCrawler | ||
from backend.data_preparation.extractor.bil_extractor import BILExtractor, BILFormat | ||
from backend.data_preparation.dumper.prism_dumper import PRISMDumper | ||
|
||
logger = logging.getLogger('TaskManager') | ||
|
||
|
||
class DataFromPRISM(Runnable): | ||
def __init__(self): | ||
self.crawler = PRISMCrawler() | ||
self.extractor = BILExtractor() | ||
self.dumper = PRISMDumper() | ||
self.buffer: List[bytes] = list() | ||
|
||
def run(self, end_clause: datetime.date = None): | ||
current_date = datetime.now(timezone.utc).date() | ||
with Connection() as conn: | ||
cur = conn.cursor() | ||
cur.execute('select date, ppt, tmax, vpdmax from prism_info') | ||
exist_list = cur.fetchall() | ||
|
||
exist_dict = dict() | ||
for date, ppt, tmax, vpdmax in exist_list: | ||
exist_dict[date] = (ppt, tmax, vpdmax) | ||
|
||
date = current_date - timedelta(days=1) | ||
while date >= end_clause: | ||
|
||
logger.info(f'fetch: {date}') | ||
for var_idx, var in enumerate(PRISMCrawler.VARIABLES): | ||
# skip if exist | ||
if date in exist_dict and exist_dict[date][var_idx]: | ||
logger.info(f'skip: {date}-{var}') | ||
continue | ||
|
||
saved_filepath = self.crawler.crawl(date, var) | ||
if saved_filepath: | ||
bil = self.extractor.extract(saved_filepath) # type: BILFormat | ||
if bil: | ||
self.dumper.insert(date, bil.flattened, var) | ||
|
||
# clean up | ||
os.remove(saved_filepath) | ||
|
||
# finish crawling a day | ||
date = date - timedelta(days=1) | ||
|
||
|
||
if __name__ == '__main__': | ||
logger.setLevel(logging.INFO) | ||
logger.addHandler(logging.StreamHandler()) | ||
while True: | ||
DataFromPRISM().run(datetime.now().date() - timedelta(days=7)) | ||
time.sleep(3600 * 6) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters