Source code for snews_cs.alert_pub
"""
An interface for SNEWS alert publisher
Created:
August 2021
Authors:
Melih Kara
Sebastian Torres-Lara
"""
import os, click
from hop import Stream
from . import cs_utils
[docs]
class AlertPublisher:
""" Class to publish SNEWS SuperNova Alerts based on coincidence
"""
def __init__(self, env_path=None, verbose=True, auth=True, firedrill_mode=True, is_test=False):
"""
Alert publisher constructor
Parameters
----------
env_path: str
path to env file, defaults to
verbose: bool
Show alert, defaults to True
auth: bool
Use hop-auth credentials, defaults to True
"""
cs_utils.set_env(env_path)
[docs]
self.broker = os.getenv("HOP_BROKER")
if firedrill_mode:
self.alert_topic = os.getenv("FIREDRILL_ALERT_TOPIC")
else:
self.alert_topic = os.getenv("ALERT_TOPIC")
if is_test:
# use a test topic
self.alert_topic = os.getenv("CONNECTION_TEST_TOPIC")
[docs]
def __enter__(self):
self.stream = Stream(until_eos=True, auth=self.auth).open(self.alert_topic, 'w')
return self
[docs]
def __exit__(self, *args):
self.stream.close()
[docs]
def send(self, message):
"""This method will set the sent_time and send the message to the hop broker.
Parameters
----------
message: dict
dict containing observation message.
"""
self.stream.write(message)
self.display_message(message)
[docs]
def display_message(self, message):
if self.verbose:
tier = 'TEST ALERT'
click.secho(f'{"-" * 64}', fg='bright_blue')
click.secho(f'Sending {tier}', fg='bright_red')
for k, v in message.items():
print(f'{k:<35s}:{v}')
# Display message prints out the following on the server logs
#
# SNEWS_Coincidence_ALERT 2022-09-28T08:21:21.954651
# ----------------------------------------------------------------
# Sending TEST ALERT
#
# maybe we can get rid of "Sending TEST ALERT" message