import json
import os
import pickle
import random
import sys
import time
from datetime import datetime
import adc.errors
import click
import numpy as np
import pandas as pd
from hop import Stream
from . import cs_utils, snews_bot
from .alert_pub import AlertPublisher
from .core.logging import getLogger
from .cs_alert_schema import CoincidenceTierAlert
from .cs_email import send_email
from .cs_remote_commands import CommandHandler
from .cs_stats import cache_false_alarm_rate
from .snews_hb import HeartBeat
from .snews_sql import Storage
[docs]
log = getLogger(__name__)
# needs more work. Vectorization converts the datatype to object in the dataframe and crashes
# to_numpy_datetime = lambda x: np.datetime64(x) if not isinstance(x, np.datetime64) else x
# check if they are already numpy datetime64 objects (failsafe)
# t_1, t_2 = to_numpy_datetime(t_1), to_numpy_datetime(t_2)
# @np.vectorize
[docs]
def np_datetime_delta_sec(t_1, t_2):
"""Return the time difference between two numpy datetime64 objects in seconds
Returns: float (seconds)
Notes
-----
t_1 is expected to be the earlier time (no absolute value is taken)
"""
total_seconds = (t_2 - t_1) / np.timedelta64(1, "s") # Convert to seconds
return total_seconds
# TODO: duplicate for a test-cache. Do not drop actual cache each time there are tests
[docs]
class CacheManager:
"""
This class handles all the incoming data to the SNEWS CS Cache,
adding messages, organizing sub-groups, retractions and updating cache entries
"""
def __init__(self):
# define the col names of the cache df
[docs]
self.cache = pd.DataFrame(
columns=[
"id",
"detector_name",
"received_time",
"machine_time_utc",
"neutrino_time_utc",
"neutrino_time_as_datetime",
"p_val",
"meta",
"sub_group",
"neutrino_time_delta",
]
)
# keep track of updated sub groups
# this dict is used to store the current state of each sub group in the cache,
# UPDATE, COINCIDENT, None, RETRACTION.
[docs]
self.sub_group_state = {}
[docs]
def add_to_cache(self, message):
"""
Takes in SNEWS message and checks if it is a retraction, update or new addition to cache.
This is the 'core' function of CoincidenceDataHandler
Parameters
----------
message : dict
SNEWS Message, must be PT valid
"""
# retraction
if "retract_latest" in message.keys():
print("RETRACTING MESSAGE FROM")
self.cache_retraction(retraction_message=message)
return None # break if message is meant for retraction
message["neutrino_time_as_datetime"] = np.datetime64(
message["neutrino_time_utc"]
)
# update
if message["detector_name"] in self.cache["detector_name"].to_list():
self._update_message(message)
# regular add
else:
self._manage_cache(message)
self.cache = self.cache.sort_values(
by=["sub_group", "neutrino_time_delta"], ignore_index=True
)
self.cache = self.cache.reset_index(drop=True)
[docs]
def _manage_cache(self, message):
"""
This method will add a new message to cache, checks if:
A)It is an initial message (to the entire cache) or if it:
B)Forms a new sub-group (sends message to _check_coinc_in_subgroups)
C)Is confident to a sub-group (sends message to _check_coinc_in_subgroups)
Parameters
----------
message
"""
# if the cache is empty add the message to cache, declare state of sub group 0 as INITIAL
if len(self.cache) == 0:
print("Initial Message!!")
message["neutrino_time_delta"] = 0
message["sub_group"] = 0
self.sub_group_state[0] = "INITIAL"
self.cache = pd.DataFrame([message])
# if the cache is not empty, check if the message is coincident with other sub groups
else:
self._check_coinc_in_subgroups(message)
[docs]
def _check_coinc_in_subgroups(self, message):
"""This method either:
A)Adds Message to an existing sub-group, if coincident with the initial signal
B) If NOT coincident with any sub groups it creates two new sub groups,
setting the message as their initial time.
The new groups consist of coincident signals with earlier arrival time and
later arrival times, respectively.
Once created the new groups are checked to see if they are redundant,
and if so then they are not added to the main cache.
Parameters
----------
message : dict
SNEWS message
"""
# grab the current sub group tags
sub_group_tags = self.cache["sub_group"].unique()
# this boolean declares whether if the message is not coincident
is_coinc = False
for tag in sub_group_tags:
# query the cache, select the current sub group
sub_cache = self.cache.query("sub_group==@tag")
# reset the index, for the sake of keeping things organized
sub_cache = sub_cache.reset_index(drop=True)
# select the initial nu time of the sub group
sub_ini_t = sub_cache["neutrino_time_as_datetime"].min()
# make the nu time delta series
delta = np_datetime_delta_sec(
t_2=message["neutrino_time_as_datetime"], t_1=sub_ini_t
)
# if the message's nu time is within the coincidence window
if 0 < delta <= 10.0:
# to the message add the corresponding sub group and nu time delta
message["sub_group"] = tag
message["neutrino_time_delta"] = delta
# turn the message into a pd df, this is for concatenating it to the cache
temp = pd.DataFrame([message])
# concat the message df to the cahce
self.cache = pd.concat([self.cache, temp], ignore_index=True)
# set the message as coinc
is_coinc = True
# declare the state the sub group to COINC_MSG
self.sub_group_state[tag] = "COINC_MSG"
# if the message is not coincident with any of the sub groups create a new sub group
if not is_coinc:
# set the message's nu time, as the initial nu time
new_ini_t = message["neutrino_time_as_datetime"]
# create the sub group tag
new_sub_tag = len(sub_group_tags)
# turn the message into a df
message_as_cache = pd.DataFrame([message])
# create a temp cache concat the message
temp_cache = pd.concat([self.cache, message_as_cache], ignore_index=True)
# drop dublicates of detector name and nu time
temp_cache = temp_cache.drop_duplicates(
subset=["detector_name", "neutrino_time_utc"]
)
# create a new time delta
temp_cache["neutrino_time_delta"] = np_datetime_delta_sec(
t_1=new_ini_t, t_2=temp_cache["neutrino_time_as_datetime"]
)
# Make two subgroup one for early signal and post
new_sub_group_early = temp_cache.query("-10 <= neutrino_time_delta <= 0")
new_sub_group_post = temp_cache.query("0 <= neutrino_time_delta <= 10.0")
# drop old sub-group col or pandas will scream at you
new_sub_group_post = new_sub_group_post.drop(columns="sub_group", axis=0)
new_sub_group_early = new_sub_group_early.drop(columns="sub_group", axis=0)
# make new sub-group tag
new_sub_group_early["sub_group"] = new_sub_tag
new_sub_group_post["sub_group"] = int(new_sub_tag + 1)
# sort sub-group by nu time
new_sub_group_early = new_sub_group_early.sort_values(
by="neutrino_time_as_datetime"
)
new_sub_group_post = new_sub_group_post.sort_values(
by="neutrino_time_as_datetime"
)
# check if new sub groups are the same:
# if so, drop the later one
if (
new_sub_group_early["id"].to_list()
== new_sub_group_post["id"].to_list()
):
new_sub_group_post = new_sub_group_post.drop(
columns="sub_group", axis=0
)
new_sub_group_post["sub_group"] = new_sub_tag
self._organize_cache(sub_cache=new_sub_group_post)
# organize the cache
else:
self._organize_cache(sub_cache=new_sub_group_post)
self._organize_cache(sub_cache=new_sub_group_early)
[docs]
def _check_for_redundancies(self, sub_cache):
"""Checks if sub cache is redundant
Parameters
----------
sub_cache : dataframe
New sub group
Returns
-------
bool
True if sub group is redundant
False if sub cache is unique
"""
# create a series of the ids in the sub group
ids = sub_cache["id"]
# if this sub group only contains a single message and the detector name is already
# present in the cache return True
if (
len(sub_cache) == 1
and sub_cache["id"].to_list()[0] in self.cache["id"].to_list()
):
return True
# loop through the other sub group tags
for sub_tag in self.cache["sub_group"].unique():
# save the other sub groups as a df
other_sub = self.cache.query("sub_group == @sub_tag")
# check if the current sub group's ids are in the other sub group
check_ids = ids.isin(other_sub["id"])
# if the ids are in the other sub group, return True
if check_ids.eq(True).all():
return True
return False
[docs]
def _organize_cache(self, sub_cache):
"""
This method makes sure that the nu_delta_times are not negative,
recalculates new deltas using the proper initial time
Parameters
----------
sub_cache : dataframe
Sub group
"""
# if the sub is redundant then return out of the
if self._check_for_redundancies(sub_cache):
return
# for the sake of keeping things organized reset the index of the sub group
sub_cache = sub_cache.reset_index(drop=True)
# if the initial nu time is negative then fix it by passing the sub group to fix_deltas
if sub_cache["neutrino_time_delta"][0] < 0:
sub_cache = self._fix_deltas(sub_df=sub_cache)
if len(sub_cache) > 1:
# set the state of the sub group to COINC_MSG_STAGGERED
self.sub_group_state[sub_cache["sub_group"][0]] = "COINC_MSG_STAGGERED"
else:
self.sub_group_state[sub_cache["sub_group"][0]] = None
# concat to the cache
self.cache = pd.concat([self.cache, sub_cache], ignore_index=True)
# sort the values of the cache by their sub group and nu time ( ascending order)
self.cache = self.cache.sort_values(
by=["sub_group", "neutrino_time_as_datetime"]
).reset_index(drop=True)
[docs]
def _fix_deltas(self, sub_df):
"""
This method fixes the deltas of the sub group by resetting the initial nu time
Parameters
----------
sub_df : Dataframe
Sub cache
Returns
-------
sub_df : Dataframe
Sub cache with fixed nu time deltas
"""
# find the new initial nu time
initial_time = sub_df["neutrino_time_as_datetime"].min()
# drop the old delta col
sub_df = sub_df.drop(columns="neutrino_time_delta", axis=0)
# make the new delta col
sub_df["neutrino_time_delta"] = np_datetime_delta_sec(
t_1=initial_time, t_2=sub_df["neutrino_time_as_datetime"]
)
# sort the nu times by ascending order
sub_df = sub_df.sort_values(by=["neutrino_time_as_datetime"])
return sub_df
[docs]
def _update_message(self, message):
"""If triggered this method updates the p_val and neutrino time of a detector in cache.
Parameters
----------
message : dict
SNEWS message
Returns
-------
"""
# declare the name of the detector that will be updated
update_detector = message["detector_name"]
# announce that an update is happening
update_message = f"\t> UPDATING MESSAGE FROM: {update_detector}"
log.info(update_message)
# get indices of where the detector name is present
detector_ind = self.cache.query(
"detector_name==@update_detector"
).index.to_list()
# loop through the indices
for ind in detector_ind:
# get the sub tag
sub_tag = self.cache["sub_group"][ind]
# declare the state of the sub group as UPDATE
self.sub_group_state[sub_tag] = "UPDATE"
# get the initial nu time of the sub group
initial_time = self.cache.query("sub_group==@sub_tag")[
"neutrino_time_as_datetime"
].min()
# ignore update if the updated message is outside the coincident window
if (
abs(
np_datetime_delta_sec(
t_2=message["neutrino_time_as_datetime"], t_1=initial_time
)
)
> 10.0
):
continue
# update the message if it is coincident with the current sub group
else:
# find the ind to be updated and replace its contents with
for key in message.keys():
self.cache.at[ind, key] = message[key]
self.cache.at[ind, "neutrino_time_delta"] = np_datetime_delta_sec(
t_2=message["neutrino_time_as_datetime"], t_1=initial_time
)
# append the updated list
self.updated.append(self.cache["sub_group"][ind])
# if there are any updated sub groups reorganize them
if len(self.updated) != 0:
# loop through updated sub group list
for sub_tag in self.updated:
# make a sub group df
sub_df = self.cache.query("sub_group == @sub_tag")
# dump the unorganized subgroup
self.cache = self.cache.query("sub_group != @sub_tag")
# fix deltas of updated sub group
sub_df = self._fix_deltas(sub_df)
# concat the organized sub group with the rest of the cache
self.cache = pd.concat([self.cache, sub_df], ignore_index=True)
# sort the values of the cache by sub group nu time
self.cache = self.cache.sort_values(
by=["sub_group", "neutrino_time_as_datetime"]
).reset_index(drop=True)
[docs]
def cache_retraction(self, retraction_message):
"""
This method handles message retraction by parsing the cache and dumping any instance
of the target detector
Parameters
----------
retraction_message : dict
SNEWS retraction message
"""
retracted_name = retraction_message["detector_name"]
self.cache = self.cache.query("detector_name!=@retracted_name")
logstr = retracted_name
# in case retracted message was an initial
if len(self.cache) == 0:
return 0
for sub_tag in self.cache["sub_group"].unique():
self.sub_group_state[sub_tag] = "RETRACTION"
other_sub = self.cache.query("sub_group == @sub_tag")
if other_sub["neutrino_time_delta"].min() != 0.0:
if len(other_sub) == 1:
other_sub = other_sub.drop(columns=["neutrino_time_delta"])
other_sub["neutrino_time_delta"] = [0]
else:
# set new initial nu time
new_initial_time = other_sub["neutrino_time_as_datetime"].min()
# drop the old delta
other_sub = other_sub.drop(columns=["neutrino_time_delta"])
# make new delta
other_sub["neutrino_time_delta"] = np_datetime_delta_sec(
t_2=other_sub["neutrino_time_as_datetime"], t_1=new_initial_time
)
# concat retracted sub group to the cache
self.cache = self.cache.query("sub_group!=@sub_tag")
self.cache = pd.concat([self.cache, other_sub], ignore_index=True)
self.cache = self.cache.sort_values(by="neutrino_time_utc").reset_index()
# log retraction to log file
log.info(f"\t> Retracted {logstr} from sub-group {sub_tag}")
[docs]
class CoincidenceDistributor:
def __init__(
self,
env_path=None,
drop_db=False,
firedrill_mode=True,
hb_path=None,
server_tag=None,
send_email=False,
send_slack=True,
show_table=False,
):
"""This class is in charge of sending alerts to SNEWS when CS is triggered
Parameters
----------
env_path : `str`
path to env file, defaults to '/etc/test-config.env'
send_slack: `bool`
Whether to send alerts on slack
"""
log.debug("Initializing CoincDecider\n")
cs_utils.set_env(env_path)
[docs]
self.show_table = show_table
[docs]
self.send_email = send_email
[docs]
self.send_slack = send_slack
# name of your sever, used for alerts
[docs]
self.server_tag = server_tag
# initialize local MongoDB
[docs]
self.storage = Storage(drop_db=drop_db)
# declare topic type, used for alerts
[docs]
self.topic_type = "CoincidenceTier"
# from the env var get the coinc thresh, 10sec
[docs]
self.coinc_threshold = float(os.getenv("COINCIDENCE_THRESHOLD"))
# lifetime of case (sec) = 24hr
[docs]
self.cache_expiration = 86400
# Some Kafka errors are retryable.
[docs]
self.retriable_error_count = 0
[docs]
self.max_retriable_errors = 20
[docs]
self.exit_on_error = False # True
[docs]
self.initial_set = False
[docs]
self.alert = AlertPublisher(env_path=env_path, firedrill_mode=firedrill_mode)
[docs]
self.test_alert = AlertPublisher(
env_path=env_path, is_test=True
) # overwrites with connection test topic
if firedrill_mode:
self.observation_topic = os.getenv("FIREDRILL_OBSERVATION_TOPIC")
else:
self.observation_topic = os.getenv("OBSERVATION_TOPIC")
# for testing, the alerts will be sent to this topic
[docs]
self.test_topic = os.getenv("CONNECTION_TEST_TOPIC")
[docs]
self.alert_schema = CoincidenceTierAlert(env_path)
# handle heartbeat
[docs]
self.store_heartbeat = bool(os.getenv("STORE_HEARTBEAT", "True"))
[docs]
self.heartbeat = HeartBeat(env_path=env_path, firedrill_mode=firedrill_mode)
[docs]
self.stash_time = 86400
[docs]
self.coinc_data = CacheManager()
[docs]
self.test_coinc_data = CacheManager() # a separate cache for testing
[docs]
self.message_count = {}
[docs]
self.test_message_count = {}
# don't use a storage for the test cache
[docs]
def clear_cache(self, is_test=False):
"""When a reset cache is passed, recreate the
CoincidenceDataHandler instance
"""
if not is_test:
log.info("\t > [RESET] Resetting the cache.")
del self.coinc_data
self.coinc_data = CacheManager()
else:
del self.test_coinc_data
self.test_coinc_data = CacheManager()
# ----------------------------------------------------------------------------------------------------------------
[docs]
def display_table(self, is_test=False):
"""
Display each sub list individually using a markdown table.
"""
click.secho(
"Here is the current coincident table\n",
fg="magenta",
bold=True,
)
if not is_test:
cache_data = self.coinc_data
else:
cache_data = self.test_coinc_data
for sub_list in cache_data.cache["sub_group"].unique():
sub_df = cache_data.cache.query(f"sub_group=={sub_list}")
sub_df = sub_df.drop(
columns=[
"meta",
"machine_time_utc",
"schema_version",
"neutrino_time_as_datetime",
]
)
sub_df = sub_df.sort_values(by=["neutrino_time_utc"])
# snews_bot.send_table(sub_df) # no need to print the table on the server.
# Logs have the full content
print(sub_df.to_markdown())
print("=" * 168)
[docs]
def send_alert(self, sub_group_tag, alert_type, is_test=False):
if not is_test:
sub_df = self.coinc_data.cache.query("sub_group==@sub_group_tag")
try:
false_alarm_prob = cache_false_alarm_rate(
cache_sub_list=sub_df, hb_cache=self.heartbeat.cache_df
)
except Exception:
false_alarm_prob = "(couldn't compute)"
alert_publisher = self.alert
else:
sub_df = self.test_coinc_data.cache.query("sub_group==@sub_group_tag")
false_alarm_prob = "N/A"
alert_publisher = self.test_alert
p_vals = sub_df["p_val"].to_list()
p_vals_avg = np.round(sub_df["p_val"].mean(), decimals=5)
nu_times = sub_df["neutrino_time_utc"].to_list()
detector_names = sub_df["detector_name"].to_list()
alert_data = dict(
p_vals=p_vals,
p_val_avg=p_vals_avg,
sub_list_num=int(sub_group_tag),
neutrino_times=nu_times,
detector_names=detector_names,
false_alarm_prob=false_alarm_prob,
server_tag=self.server_tag,
alert_type=alert_type,
)
with alert_publisher as pub:
alert = self.alert_schema.get_cs_alert_schema(
data=alert_data, is_test=is_test
)
pub.send(alert)
# only check to see if email or slack should be sent if the alert is not a test alert
if not is_test:
if self.send_email:
send_email(alert)
if self.send_slack:
snews_bot.send_table(
alert_data, alert, is_test=True, topic=self.observation_topic
)
# ------------------------------------------------------------------------------------------------------------------
[docs]
def alert_decider(self, is_test=False):
"""
This method will publish an alert every time a new detector submits an observation message.
"""
click.secho(f'{"=" * 100}', fg="bright_red")
# Determine which cache to use
cache_data = self.test_coinc_data if is_test else self.coinc_data
_message_count = self.test_message_count if is_test else self.message_count
log_info = " [TEST] " if is_test else " "
def publish_alert(sub_group_tag, state, message):
click.secho(
f"SUB GROUP {sub_group_tag}: {message:^100}".upper(),
bg="bright_green",
fg="red",
)
click.secho(
f'{"Publishing an Alert!!!".upper():^100}', bg="bright_green", fg="red"
)
click.secho(f'{"=" * 100}', fg="bright_red")
log.info(f"\t> {log_info} An alert was published: {state} !")
self.send_alert(
sub_group_tag=sub_group_tag, alert_type=state, is_test=is_test
)
for sub_group_tag, state in cache_data.sub_group_state.items():
print(f"CHECKING FOR ALERTS IN SUB GROUP: {sub_group_tag}")
if state is None:
print(f"NO ALERTS IN SUB GROUP: {sub_group_tag}")
continue
message_count = len(cache_data.cache.query("sub_group==@sub_group_tag"))
if state == "COINC_MSG_STAGGERED":
publish_alert(sub_group_tag, state, "COINCIDENT DETECTOR..")
elif (
state == "RETRACTION" and message_count < _message_count[sub_group_tag]
):
publish_alert(sub_group_tag, state, "RETRACTION HAS BEEN MADE")
elif state == "INITIAL":
log.debug(
f"\t> {log_info} Initial message in sub group:{sub_group_tag}"
)
click.secho(
f'SUB GROUP {sub_group_tag}: {"Initial message received".upper():^100}',
bg="bright_green",
fg="red",
)
click.secho(f'{"=" * 100}', fg="bright_red")
elif state == "UPDATE" and message_count == _message_count[sub_group_tag]:
click.secho(
f'SUB GROUP {sub_group_tag}: {"A MESSAGE HAS BEEN UPDATED".upper():^100}',
bg="bright_green",
fg="red",
)
log.debug(f"\t> {log_info} An UPDATE message is received")
if message_count > 1:
publish_alert(
sub_group_tag, state, "Publishing an updated Alert!!!"
)
log.debug(f"\t> {log_info} An alert is updated!")
elif state == "COINC_MSG" and message_count > _message_count[sub_group_tag]:
publish_alert(sub_group_tag, state, "NEW COINCIDENT DETECTOR..")
# ------------------------------------------------------------------------------------------------------------------
[docs]
def deal_with_the_cache(self, snews_message):
"""Check if the message is a test or not, then add it to the cache and run the alert decider
Parameters
----------
snews_message: dict read from the Kafka stream.
Returns
-------
adds messages to cache and runs the coincidence decider
"""
if "is_test" in snews_message.keys():
is_test = snews_message["is_test"]
elif (
"meta" in snews_message.keys() and "is_test" in snews_message["meta"].keys()
):
is_test = snews_message["meta"]["is_test"]
else:
is_test = False
if not is_test:
self.coinc_data.add_to_cache(message=snews_message)
# run the search
self.alert_decider(is_test=is_test)
# update message count
for sub_group_tag in self.coinc_data.cache["sub_group"].unique():
self.message_count[sub_group_tag] = len(
self.coinc_data.cache.query("sub_group==@sub_group_tag")
)
self.coinc_data.sub_group_state[sub_group_tag] = None
self.coinc_data.updated = []
# do not have a storage for the tests
if not is_test:
self.storage.insert_coinc_cache(self.coinc_data.cache)
sys.stdout.flush()
self.coinc_data.updated = []
if self.show_table:
self.display_table("main") # don't display on the server
else:
self.test_coinc_data.add_to_cache(message=snews_message)
# run the search
self.alert_decider(is_test=is_test)
# update message count
for sub_group_tag in self.test_coinc_data.cache["sub_group"].unique():
self.test_message_count[sub_group_tag] = len(
self.test_coinc_data.cache.query("sub_group==@sub_group_tag")
)
self.test_coinc_data.sub_group_state[sub_group_tag] = None
self.test_coinc_data.updated = []
# do not have a storage for the tests
sys.stdout.flush()
self.test_coinc_data.updated = []
if self.show_table:
self.display_table("test") # don't display on the server
# -------------------------------------------------------------------------------------------------------------------
[docs]
def run_coincidence(self):
"""
As the name states this method runs the coincidence system.
Starts by subscribing to the hop observation_topic.
* If a CoincidenceTier message is received then it is passed to _check_coincidence.
* other commands include "test-connection", "test-scenarios",
"hard-reset", "Retraction",
****
Reconnect logic and retryable errors thanks to Spencer Nelson (https://github.com/spenczar)
https://github.com/scimma/hop-client/issues/140
"""
fatal_error = True
while True:
try:
stream = Stream(until_eos=False)
with stream.open(self.observation_topic, "r") as s:
click.secho(
f"{datetime.utcnow().isoformat()} (re)Initializing Coincidence System for "
f"{self.observation_topic}\n"
)
for snews_message in s:
# check for the hop version
try:
snews_message = snews_message.content
except Exception as e:
log.error(
f"A message with older hop version is found. {e}\n{snews_message}"
)
snews_message = snews_message
# Unpack the message
if type(snews_message) is bytes:
snews_message = pickle.loads(snews_message)
snews_message = snews_message.model_dump()
elif type(snews_message) is str:
snews_message = json.loads(snews_message)
else:
log.error("Message is not parsable:")
log.error(snews_message)
continue
# handle the input message
handler = CommandHandler(snews_message)
# if a coincidence tier message (or retraction) run through the logic
if handler.handle(self):
snews_message["received_time"] = np.datetime_as_string(
np.datetime64(datetime.utcnow().isoformat()), unit="ns"
)
# print info on the servers terminal, (important info is logged)
terminal_output = click.style(
f'{"-" * 57}\n', fg="bright_blue"
)
terminal_output += click.style(
f'{"Coincidence Tier Message Received":^57}\n',
fg="bright_blue",
)
terminal_output += click.style(
"\t>"
f"{snews_message['detector_name']}"
f"{snews_message['received_time']}",
fg="bright_blue",
)
click.secho(terminal_output)
# add to cache
# if actual observation, use coincidence cache, else if testing use
# test cache
self.deal_with_the_cache(snews_message)
# for each read message reduce the retriable err count
if self.retriable_error_count > 1:
self.retriable_error_count -= 1
# handle a keyboard interrupt (ctrl+c)
except KeyboardInterrupt:
print("Caught a keyboard interrupt. Goodbye world!")
log.error("(2) Caught a keyboard interrupt. Exiting.\n")
fatal_error = True
self.exit_on_error = True
sys.exit(0)
# if there is a KafkaException, check if retriable
except adc.errors.KafkaException as e:
if e.retriable:
self.retriable_error_count += 1
if self.retriable_error_count >= self.max_retriable_errors:
log.error(
"Max retryable errors exceeded. "
f"Here is the most recent exception:\n{e}\n"
)
fatal_error = True
else:
log.error(f"Retryable error! \n{e}\n")
# sleep with exponential backoff and a bit of jitter.
time.sleep(
(1.5**self.retriable_error_count)
* (1 + random.random())
/ 2
)
else:
log.error(
"(1) Something crashed the server, not a retriable error, "
f"here is the Exception raised\n{e}\n"
)
fatal_error = True
# any other exception is logged, but not fatal (?)
except Exception as e:
log.error(
f"(2) Something crashed the server, here is the Exception raised\n{e}\n"
)
fatal_error = False # True # maybe not a fatal error?
finally:
# if we are breaking on errors and there is a fatal error, break
if self.exit_on_error and fatal_error:
break
# otherwise continue by re-initiating
continue