Source code for writer.store_events

import base64
import calendar
import logging

from writer import storage
from writer.upload_codes import eventtype_upload_codes

logger = logging.getLogger('writer.store_events')

MINIMUM_YEAR = 2020


[docs] def store_event(datafile, cluster, station_id, event): """Stores an event in the h5 filesystem :param datafile: the h5 data file :param cluster: the name of the cluster to which the station belongs :param station_id: the id of the station this event belongs to :param event: the event to store """ eventheader = event['header'] eventdatalist = event['datalist'] eventtype = eventheader['eventtype_uploadcode'] try: upload_codes = eventtype_upload_codes[eventtype] except KeyError: logger.error(f'Unknown event type: {eventtype}, discarding event (station: {station_id})') return parentnode = storage.get_or_create_station_group(datafile, cluster, station_id) table = storage.get_or_create_node(datafile, parentnode, upload_codes['_tablename']) blobs = storage.get_or_create_node(datafile, parentnode, 'blobs') row = table.row row['event_id'] = table.nrows + 1 # make a unix-like timestamp timestamp = calendar.timegm(eventheader['datetime'].utctimetuple()) nanoseconds = eventheader['nanoseconds'] # make an extended timestamp, which is the number of nanoseconds since # epoch ext_timestamp = timestamp * 1_000_000_000 + nanoseconds row['timestamp'] = timestamp if upload_codes['_has_ext_time']: # This is e.g. a HiSPARC coincidence or comparator message, # extended timing information is available row['nanoseconds'] = nanoseconds row['ext_timestamp'] = ext_timestamp # get default values for the data data = {} for key, value in upload_codes.items(): if key[0] != '_': # private meta information starts with a _ (e.g. _tablename) data[key] = row[value] # process event data for item in eventdatalist: # uploadcode: EVENTRATE, PH1, IN3, etc. uploadcode = item['data_uploadcode'] # value: actual data value value = item['data'] if data_is_blob(uploadcode, upload_codes['_blobs']): # data should be stored inside the blob array, ... if uploadcode[:-1] == 'TR': # traces are base64 encoded value = base64.decodebytes(value.encode('iso-8859-1')) else: # blobs are bytestrings value = value.encode('iso-8859-1') blobs.append(value) # ... with a pointer stored in the event table value = len(blobs) - 1 if uploadcode[-1].isdigit(): # uploadcode: PH1, IN3, etc. key, index = uploadcode[:-1], int(uploadcode[-1]) - 1 if key in data: data[key][index] = value else: logger.warning(f'Datatype not known on server side: {key} ({eventtype})') elif uploadcode in data: # uploadcode: EVENTRATE, RED, etc. data[uploadcode] = value else: logger.warning(f'Datatype not known on server side: {uploadcode} ({eventtype})') # write data values to row for key, value in upload_codes.items(): if key[0] != '_': # private meta information starts with a _ (e.g. _tablename) row[value] = data[key] row.append() table.flush() blobs.flush()
[docs] def data_is_blob(uploadcode, blob_types): """Determine if data is a variable length binary value (blob)""" if uploadcode[-1].isdigit(): if uploadcode[:-1] in blob_types: return True elif uploadcode in blob_types: return True return False
[docs] def store_event_list(data_dir, station_id, cluster, event_list): """Store a list of events""" prev_date = None datafile = None for event in event_list: try: timestamp = event['header']['datetime'] if timestamp: date = timestamp.date() if date.year < MINIMUM_YEAR: logger.error(f'Old event ({date}), discarding event (station: {station_id})') continue if date != prev_date: if datafile: datafile.close() datafile = storage.open_or_create_file(data_dir, date) prev_date = date store_event(datafile, cluster, station_id, event) else: logger.error(f'Strange event (no timestamp!), discarding event (station: {station_id})') except Exception: logger.exception(f'Cannot process event, discarding event (station: {station_id})') if datafile: datafile.close()