Source code for wsgi.wsgi_app

import configparser
import csv
import hashlib
import logging
import logging.handlers
import os
import pickle
import shutil
import tempfile
import urllib.parse

from pathlib import Path

from . import rcodes

LEVELS = {
    'debug': logging.DEBUG,
    'info': logging.INFO,
    'warning': logging.WARNING,
    'error': logging.ERROR,
    'critical': logging.CRITICAL,
}

logger = logging.getLogger('wsgi_app')
formatter = logging.Formatter('%(asctime)s %(name)s[%(process)d].%(funcName)s.%(levelname)s: %(message)s')

MINIMUM_YEAR = 2020


[docs] def application(environ, start_response, configfile): """The hisparc upload application This handler is called by uWSGI whenever someone requests our URL. First, we generate a dictionary of POSTed variables and try to read out the station_id, password, checksum and data. When we do a readline(), we already read out the entire datastream. I don't know if we can first check on station_id/password combinations before reading out the datastream without setting up a bidirectional communication channel. When the checksum matches, we unpickle the event_list and pass everything on to store_event_list. """ do_init(configfile) # start http response status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) # read data from the POST variables post_input = environ['wsgi.input'].readline().decode() post_data = urllib.parse.parse_qs(post_input) # process POST data try: data = post_data['data'][0] checksum = post_data['checksum'][0] station_id = int(post_data['station_id'][0]) password = post_data['password'][0] except (KeyError, EOFError): logger.debug('POST (vars) error') return [rcodes.RC_ISE_INV_POSTDATA] try: cluster, station_password = station_list[station_id] except KeyError: logger.debug(f'Station {station_id} is unknown') return [rcodes.RC_PE_INV_STATIONID] if station_password != password: logger.debug(f'Station {station_id}: password mismatch: {password}') return [rcodes.RC_PE_INV_AUTHCODE] else: our_checksum = hashlib.md5(data.encode('iso-8859-1')).hexdigest() if our_checksum != checksum: logger.debug(f'Station {station_id}: checksum mismatch') return [rcodes.RC_PE_INV_INPUT] else: try: try: event_list = pickle.loads(data.encode('iso-8859-1')) except UnicodeDecodeError: # string was probably pickled on python 2. # decode as bytes and decode all bytestrings to string. logger.debug('UnicodeDecodeError on python 2 pickle. Decoding bytestrings.') event_list = decode_object(pickle.loads(data.encode('iso-8859-1'), encoding='bytes')) except (pickle.UnpicklingError, AttributeError): logger.debug(f'Station {station_id}: pickling error') return [rcodes.RC_PE_PICKLING_ERROR] store_data(station_id, cluster, event_list) logger.debug(f'Station {station_id}: succesfully completed') return [rcodes.RC_OK]
[docs] def do_init(configfile): """Load configuration and passwords and set up a logger handler This function will do one-time initialization. By using global variables, we eliminate the need to reread configuration and passwords on every request. Configuration is read from the datastore configuation file (usually `config.ini`): .. include:: ../examples/config.ini :literal: Station information is read from the `station_list` config variable. (`station_list.csv` on frome) """ # set up config global config try: config except NameError: config = configparser.ConfigParser() config.read(configfile) # set up logger if not logger.handlers: file = config.get('General', 'log') + f'-wsgi.{os.getpid()}' handler = logging.handlers.TimedRotatingFileHandler(file, when='midnight', backupCount=14) handler.setFormatter(formatter) logger.addHandler(handler) level = LEVELS.get(config.get('General', 'loglevel'), logging.NOTSET) logger.setLevel(level=level) # read station list global station_list try: station_list except NameError: station_list = {} station_list_path = Path(config.get('General', 'station_list')) with station_list_path.open() as file: reader = csv.reader(file) for station in reader: if station: num, cluster, password = station num = int(num) station_list[num] = (cluster, password)
[docs] def store_data(station_id, cluster, event_list): """Store verified event data to temporary storage""" logger.debug(f'Storing data for station {station_id}') data_dir = Path(config.get('General', 'data_dir')) directory = data_dir / 'incoming' tmp_dir = data_dir / 'tmp' if is_data_suspicious(event_list): logger.debug('Event list marked as suspicious.') directory = data_dir / 'suspicious' with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as file: logger.debug(f'Filename: {file.name}') data = {'station_id': station_id, 'cluster': cluster, 'event_list': event_list} pickle.dump(data, file) shutil.move(file.name, directory)
[docs] def is_data_suspicious(event_list): """Check data for suspiciousness Suspiciousness, a previously unknown quantum number that may signify the actual birth of the universe and the reweaving of past fates into current events has come to hunt us and our beloved data. Note: Apr 7, 2019 0:00 is the default time after a cold start without GPS signal. The DAQ will happily send events even when no GPS signal has been acquired (yet). Events with timestamp Apr 7, 2019 are most probably caused by no or bad GPS signal. Such events must be eigenstates of suspiciousness. """ for event in event_list: if event['header']['datetime'].year < MINIMUM_YEAR: logger.debug('Date < 2020: Timestamp has high suspiciousness.') return True return False
[docs] def decode_object(o): """recursively decode all bytestrings in object""" if isinstance(o, bytes): return o.decode() elif isinstance(o, dict): return {decode_object(key): decode_object(value) for key, value in o.items()} elif isinstance(o, list): return [decode_object(obj) for obj in o] else: return o