Source code for writer.writer_app

"""datastore writer application

This module empties the station data `incoming` queue and writes the
data into HDF5 files using PyTables.

"""

import configparser
import logging
import logging.handlers
import pickle
import time

from pathlib import Path

from writer.store_events import store_event_list

LEVELS = {
    'debug': logging.DEBUG,
    'info': logging.INFO,
    'warning': logging.WARNING,
    'error': logging.ERROR,
    'critical': logging.CRITICAL,
}

logger = logging.getLogger('writer')
formatter = logging.Formatter('%(asctime)s %(name)s[%(process)d].%(funcName)s.%(levelname)s: %(message)s')


[docs] def writer(configfile): """hisparc datastore writer application This script polls ``/datatore/frome/incoming`` for incoming data written by the WSGI app. It then store the data into the raw datastore. Configuration is read from the datastore configuation file (usually `config.ini`): .. include:: ../examples/config.ini :literal: """ # set up config global config config = configparser.ConfigParser() config.read(configfile) # set up logger file = config.get('General', 'log') + '-writer' handler = logging.handlers.TimedRotatingFileHandler(file, when='midnight', backupCount=14) handler.setFormatter(formatter) logger.addHandler(handler) level = LEVELS.get(config.get('General', 'loglevel'), logging.NOTSET) logger.setLevel(level=level) data_dir = Path(config.get('General', 'data_dir')) queue = data_dir / 'incoming' partial_queue = data_dir / 'partial' sleep_duration = config.getint('Writer', 'sleep') # writer process try: while True: entries = queue.iterdir() if not entries: time.sleep(sleep_duration) for entry in entries: partial_path = partial_queue / entry.name entry.rename(partial_path) process_data(partial_path, data_dir) partial_path.unlink() except Exception: logger.exception('Exception occured, quitting.')
[docs] def process_data(file, data_dir): """Read data from a pickled object and store store in raw datastore""" with file.open('rb') as handle: try: data = pickle.load(handle) except UnicodeDecodeError: logger.debug('Data seems to be pickled using python 2. Decoding.') data = decode_object(pickle.load(handle, encoding='bytes')) logger.debug(f"Processing data for station {data['station_id']}") store_event_list(data_dir, data['station_id'], data['cluster'], data['event_list'])
[docs] def decode_object(o): """recursively decode all bytestrings in object""" if isinstance(o, bytes): return o.decode() elif isinstance(o, dict): return {decode_object(k): decode_object(v) for k, v in o.items()} elif isinstance(o, list): return [decode_object(obj) for obj in o] else: return o