Source code for writer.writer_app

""" datastore writer application

    This module empties the station data `incoming` queue and writes the
    data into HDF5 files using PyTables.

import configparser
import logging
import logging.handlers
import os
import pickle as pickle
import shutil
import time

from writer.store_events import store_event_list

LEVELS = {'debug': logging.DEBUG,
          'info': logging.INFO,
          'warning': logging.WARNING,
          'error': logging.ERROR,
          'critical': logging.CRITICAL}

logger = logging.getLogger('writer')
formatter = logging.Formatter('%(asctime)s %(name)s[%(process)d]'
                              '.%(funcName)s.%(levelname)s: %(message)s')

[docs]def writer(configfile): """hisparc datastore writer application This script polls ``/datatore/frome/incoming`` for incoming data written by the WSGI app. It then store the data into the raw datastore. Configuration is read from the datastore configuation file (usually `config.ini`): .. include:: ../examples/config.ini :literal: """ # set up config global config config = configparser.ConfigParser() # set up logger file = config.get('General', 'log') + '-writer' handler = logging.handlers.TimedRotatingFileHandler(file, when='midnight', backupCount=14) handler.setFormatter(formatter) logger.addHandler(handler) level = LEVELS.get(config.get('General', 'loglevel'), logging.NOTSET) logger.setLevel(level=level) queue = os.path.join(config.get('General', 'data_dir'), 'incoming') partial_queue = os.path.join(config.get('General', 'data_dir'), 'partial') # writer process try: while True: entries = os.listdir(queue) if not entries: time.sleep(config.getint('Writer', 'sleep')) for entry in entries: path = os.path.join(queue, entry) shutil.move(path, partial_queue) path = os.path.join(partial_queue, entry) process_data(path) os.remove(path) except Exception: logger.exception('Exception occured, quitting.')
[docs]def process_data(file): """Read data from a pickled object and store store in raw datastore""" with open(file, 'rb') as handle: try: data = pickle.load(handle) except UnicodeDecodeError: logger.debug('Data seems to be pickled using python 2. Decoding.') data = decode_object(pickle.load(handle, encoding='bytes')) logger.debug('Processing data for station %d' % data['station_id']) store_event_list(config.get('General', 'data_dir'), data['station_id'], data['cluster'], data['event_list'])
[docs]def decode_object(o): """recursively decode all bytestrings in object""" if type(o) is bytes: return o.decode() elif type(o) is dict: return {decode_object(k): decode_object(v) for k, v in o.items()} elif type(o) is list: return [decode_object(obj) for obj in o] else: return o