Skip to content

Commit

Permalink
Replace os.path by pathlib.Path
Browse files Browse the repository at this point in the history
  • Loading branch information
153957 committed Jun 27, 2024
1 parent 6d64344 commit 3735fa7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
11 changes: 4 additions & 7 deletions writer/storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Storage docstrings"""

import os

import tables


Expand Down Expand Up @@ -276,12 +274,11 @@ def open_or_create_file(data_dir, date):
:param date: the event date
"""
directory = os.path.join(data_dir, '%d/%d' % (date.year, date.month))
file = os.path.join(directory, '%d_%d_%d.h5' % (date.year, date.month, date.day))
directory = data_dir / f'{date.year}/{date.month}'
file = directory / f'{date.year}_{date.month}_{date.day}.h5'

if not os.path.exists(directory):
# create dir and parent dirs with mode rwxr-xr-x
os.makedirs(directory, 0o755)
# Ensure dir and parent directories exist with mode rwxr-xr-x
directory.mkdir(mode=0o755, parents=True, exists_ok=True)

return tables.open_file(file, 'a')

Expand Down
31 changes: 17 additions & 14 deletions writer/writer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import configparser
import logging
import logging.handlers
import os
import pickle
import shutil
import time

from pathlib import Path

from writer.store_events import store_event_list

LEVELS = {
Expand Down Expand Up @@ -53,39 +53,42 @@ def writer(configfile):
level = LEVELS.get(config.get('General', 'loglevel'), logging.NOTSET)
logger.setLevel(level=level)

queue = os.path.join(config.get('General', 'data_dir'), 'incoming')
partial_queue = os.path.join(config.get('General', 'data_dir'), 'partial')
data_dir = Path(config.get('General', 'data_dir'))
queue = data_dir / 'incoming'
partial_queue = data_dir / 'partial'

sleep_duration = config.getint('Writer', 'sleep')

# writer process
try:
while True:
entries = os.listdir(queue)
entries = queue.iterdir()

if not entries:
time.sleep(config.getint('Writer', 'sleep'))
time.sleep(sleep_duration)

for entry in entries:
path = os.path.join(queue, entry)
shutil.move(path, partial_queue)
partial_path = partial_queue / entry.name
entry.rename(partial_path)

process_data(partial_path, data_dir)
partial_path.unlink()

path = os.path.join(partial_queue, entry)
process_data(path)
os.remove(path)
except Exception:
logger.exception('Exception occured, quitting.')


def process_data(file):
def process_data(file, data_dir):
"""Read data from a pickled object and store store in raw datastore"""
with open(file, 'rb') as handle:
with file.open('rb') as handle:
try:
data = pickle.load(handle)
except UnicodeDecodeError:
logger.debug('Data seems to be pickled using python 2. Decoding.')
data = decode_object(pickle.load(handle, encoding='bytes'))

logger.debug(f"Processing data for station {data['station_id']}")
store_event_list(config.get('General', 'data_dir'), data['station_id'], data['cluster'], data['event_list'])
store_event_list(data_dir, data['station_id'], data['cluster'], data['event_list'])


def decode_object(o):
Expand Down

0 comments on commit 3735fa7

Please sign in to comment.