Source code for test_02_alert

# -*- coding: utf-8 -*-
""" Initialization unit tests for the snews_cs alert tests
"""

import os
import unittest

from hop import Stream
from hop.io import StartPosition
from snews_pt.messages import SNEWSMessageBuilder
from snews_pt.remote_commands import reset_cache

from snews_cs.snews_coinc import CoincidenceDistributor


[docs] class TestServer(unittest.TestCase):
[docs] def test_alerts(self): coin1 = SNEWSMessageBuilder(detector_name='KamLAND', machine_time_utc='2012-06-09T15:30:00.000501', neutrino_time='2012-06-09T15:31:08.891011', firedrill_mode=False, p_val=0.98, is_test=True) coin2 = SNEWSMessageBuilder(detector_name='XENONnT', machine_time_utc='2012-06-09T15:30:00.000501', neutrino_time='2012-06-09T15:31:07.465011', firedrill_mode=False, p_val=0.98, is_test=True) # make sure the test cache is empty reset_cache(is_test=True) # First, send two coinciding messages for coin in [coin1, coin2]: try: coin.send_messages() except Exception as exc: print('test_alert test failed trying to send messages with SNEWSMessageBuilder.send_messages() !\n') assert False, f"Exception raised:\n {exc}" # we need to think of a way for github actions to run the server # coincidence_searcher = CoincidenceDistributor(env_path='/etc/test-config.env', firedrill_mode=False, server_tag='test') # Next, manually open the stream and search for coincidences # this tests the coincidence logic ALREADY RUNNING on the server default_connection_topic = "kafka://kafka.scimma.org/snews.connection-testing" test_alert_topic = os.getenv("CONNECTION_TEST_TOPIC", default_connection_topic) _start_at = StartPosition.LATEST #if start_at=="LATEST" else StartPosition.EARLIEST substream = Stream(until_eos=True, auth=True, start_at=_start_at) message_expected = {"False Alarm Prob": "N/A", "id": "SNEWS_Coincidence_ALERT XXXXXXXXXXX", "alert_type": "TEST COINC_MSG", "detector_names": ["KamLAND", "XENONnT"], "neutrino_times": ["2012-06-09T15:31:08.891011000", "2012-06-09T15:31:07.465011000"], "p_values": [0.98, 0.98], "p_values average": 0.98, "sent_time": "XXXXXXX", "server_tag": "XXXXXXXX", "sub list number": 0} fields_must_match = ["False Alarm Prob", "alert_type", "detector_names", "neutrino_times", "p_values", "p_values average"] with substream.open(test_alert_topic, "r") as ss: for read in ss: read = read.content for field in fields_must_match: self.assertTrue(read[field] == message_expected[field], f"Field {field} does not match!") # clear the cache again afterwards reset_cache(is_test=True)