Source code for snews_cs.alert_pub

"""
An interface for SNEWS alert publisher

Created: 
August 2021
Authors: 
Melih Kara
Sebastian Torres-Lara
"""
import hop, sys, time, os, json, click
from hop import Stream
from datetime import datetime
from collections import namedtuple
from dotenv import load_dotenv
from . import cs_utils
from .snews_db import Storage


[docs]class AlertPublisher: """ Class to publish SNEWS SuperNova Alerts based on coincidence """ def __init__(self, env_path=None, verbose=True, auth=True, use_local=False, firedrill_mode=True): """ Alert publisher constructor Parameters ---------- env_path: str path to env file, defaults to verbose: bool Show alert, defaults to True auth: bool Use hop-auth crendentials, defaults to True use_local: bool Use local MongoClient, defaults to True """ cs_utils.set_env(env_path) self.auth = auth self.broker = os.getenv("HOP_BROKER") if firedrill_mode: self.alert_topic = os.getenv("FIREDRILL_ALERT_TOPIC") else: self.alert_topic = os.getenv("ALERT_TOPIC") self.times = cs_utils.TimeStuff() self.verbose = verbose self.time_str = lambda: self.times.get_snews_time() self.storage = Storage(drop_db=False, use_local_db=use_local)
[docs] def __enter__(self): self.stream = Stream(until_eos=True, auth=self.auth).open(self.alert_topic, 'w') return self
[docs] def __exit__(self, *args): self.stream.close()
[docs] def send(self, message): """This method will set the sent_time and send the message to the hop broker. Parameters ---------- messages: list list containing observation message. """ self.stream.write(message) self.display_message(message)
[docs] def display_message(self, message): if self.verbose: print(message['_id']) tier = 'TEST ALERT' click.secho(f'{"-" * 64}', fg='bright_blue') click.secho(f'Sending {tier}', fg='bright_red') for k, v in message.items(): print(f'{k:<20s}:{v}')