from . import cs_utils
from .snews_db import Storage
import os, click
from datetime import datetime
from .alert_pub import AlertPublisher
import numpy as np
import pandas as pd
from hop import Stream
from . import snews_bot
from .cs_alert_schema import CoincidenceTierAlert
[docs]class CoincDecider:
def __init__(self, env_path=None, use_local_db=True, is_test=True, drop_db=False, firedrill_mode=True):
"""Coincidence Decider class constructor
Parameters
----------
env_path : str
path to env file, defaults to '/auxiliary/test-config.env'
use_local_db:
tells CoincDecider to use local MongoClient, defaults to True
is_test: bool
tells CoincDecider if it's running in test mode,
"""
cs_utils.set_env(env_path)
self.hype_mode_ON = True
self.storage = Storage(drop_db=drop_db, use_local_db=use_local_db)
self.topic_type = "CoincidenceTier"
self.coinc_threshold = float(os.getenv('COINCIDENCE_THRESHOLD'))
self.cache_expiration = 86400
self.alert = AlertPublisher(env_path=env_path, use_local=use_local_db, firedrill_mode=firedrill_mode)
self.times = cs_utils.TimeStuff(env_path)
if firedrill_mode:
self.observation_topic = os.getenv("FIREDRILL_OBSERVATION_TOPIC")
else:
self.observation_topic = os.getenv("OBSERVATION_TOPIC")
self.column_names = ["_id", "detector_name", "received_time", "machine_time", "neutrino_time",
"p_val", "meta", "sub_list_num", "nu_delta_t"]
self.cache_df = pd.DataFrame(columns=self.column_names)
self.alert_schema = CoincidenceTierAlert(env_path)
self.is_test = is_test
self.in_coincidence = False
self.in_list_already = False
self.stash_time = 86400
# ------------------------------------------------------------------------------------------------------------------
[docs] def _is_old_message(self, message):
"""
Checks if snews message is too old.
Parameters
----------
message: dict
incoming SNEWS message
Returns
-------
True is message is older than stash time (24hrs)
"""
curr_t = datetime.utcnow()
nu_t = self.times.str_to_datetime(message['neutrino_time'], fmt='%y/%m/%d %H:%M:%S:%f')
del_t = (curr_t - nu_t).total_seconds()
if del_t >= self.stash_time:
return True
else:
return False
# ------------------------------------------------------------------------------------------------------------------
[docs] def append_message_to_df(self, message, delta_t, sub_list_num):
"""
Appends cache df when there is a coincident signal
Parameters
----------
message : `dict`
dictionary of the SNEWS message
delta_t : 'float'
value for time difference between message nu time and initial (0 if message set initial)
sub_list_num : 'int'
numeric label for coincidence sub list
"""
message['sub_list_num'] = sub_list_num
message['nu_delta_t'] = delta_t
self.cache_df = self.cache_df.append(message, ignore_index=True)
# ------------------------------------------------------------------------------------------------------------------
[docs] def reset_df(self):
"""
Resets coincidence arrays if coincidence is broken
"""
del self.cache_df
self.cache_df = pd.DataFrame(columns=self.column_names)
self.initial_set = False
# ------------------------------------------------------------------------------------------------------------------
[docs] def _coincident_with_whole_list(self, message, sub_list_num, ):
"""
As the name states this backend method checks if the message is coincident
with the whole sub list (within 10secs of each message)
Parameters
----------
message: dict
incoming message
sub_list_num: int
number of current sub list that _check_coincidence is on.
"""
# first check if detector already in sublist
sub_list = self.cache_df.query(f'sub_list_num=={sub_list_num}')
sub_list_detectors = list(sub_list['detector_name'])
if message['detector_name'] in sub_list_detectors:
self.in_coincidence = True
return 'ALREADY_IN_LIST',
# compare the current nu time with all other on the sublist
message_nu_time = self.times.str_to_datetime(message['neutrino_time'], fmt='%y/%m/%d %H:%M:%S:%f')
nu_times = pd.to_datetime(sub_list.neutrino_time, format='%y/%m/%d %H:%M:%S:%f')
delta_ts = ((message_nu_time - nu_times).dt.total_seconds()).values # numpy array
# check if signal is NOT coincident with the whole list
if all(abs_del_t > self.coinc_threshold for abs_del_t in np.abs(delta_ts)):
self.in_coincidence = False
return 'NOT_COINCIDENT',
# check if signal is coincident with the whole list and arrives earlier
elif all(0 > del_t >= -self.coinc_threshold for del_t in delta_ts):
self.in_coincidence = True
return 'EARLY_COINCIDENT', np.insert(np.sort(np.abs(delta_ts)), 0, 0)
# check if signal is coincident with the whole list
elif all(abs_del_t <= self.coinc_threshold for abs_del_t in np.abs(delta_ts)):
self.in_coincidence = True
return 'COINCIDENT', delta_ts[0]
# not sure if I need this
else:
self.in_coincidence = False
return 'NOT_COINCIDENT',
# ------------------------------------------------------------------------------------------------------------------
[docs] def _concat_to_cache(self, new_list, new_sub_list_num):
""" Performs a concat on cache df ( appends the new sub list into it, bottom join)
Parameters
----------
new_list: pandas DataFrame
new sub list
new_sub_list_num: int
label of new list
"""
self.cache_df = pd.concat([self.cache_df.query(f'sub_list_num!={new_sub_list_num}'), new_list],
ignore_index=True)
# ------------------------------------------------------------------------------------------------------------------
[docs] def _new_list_find_coincidences(self, message, new_sub_list):
"""
Checks for coincident messages with the new sub_list.
Compares nu times to the sub list's initial nu time.
(1)If a message is coincident with the ENTIRE new sub list,
has an eralier time than the initial nu time:
The early coincident time message will have its nu time
as the initial time for the list. The message is appended to the sub list.
A new set of nu_delta_t 's is maded and passed to the sub list.
Finally, the sublist is sorted by 'nu times' and the indeces are rest
(2)If a message is coincident with the ENTIRE new sub list:
Message is appended to the sub list.
Then sub list is concatenated to the cache
(3)If a message is not coincident it is ignored.
Parameters
----------
initial_time: datetime object
initial time set my first neutrino signal
detector_name: str
Name of the detector that sets initial time
new_sub_list: int
label of new sub list
"""
other_df = self.cache_df.query(f'sub_list_num!={new_sub_list}').sort_values(by='neutrino_time').drop_duplicates(
subset=['_id'])
detector_name = message['detector_name']
initial_time = self.times.str_to_datetime(message['neutrino_time'], fmt='%y/%m/%d %H:%M:%S:%f')
other_df = other_df.query(f'detector_name != "{detector_name}"')
new_list = self.cache_df.query(f'sub_list_num=={new_sub_list}')
for index, row in other_df.iterrows():
# print(f'{index} {row["detector_name"]} this is my ini time {initial_time}')
if row['detector_name'] in list(new_list['detector_name']):
continue
nu_time = self.times.str_to_datetime(row['neutrino_time'], fmt='%y/%m/%d %H:%M:%S:%f')
del_t = (nu_time - initial_time).total_seconds()
# (1)
# make sure the signal is not an initial that its nu is earlier than new list's initial
if float(row['nu_delta_t']) != 0 and (0 > del_t >= -self.coinc_threshold):
if self._coincident_with_whole_list(message=row.copy(deep=False),
sub_list_num=new_sub_list, )[0] == 'EARLY_COINCIDENT':
# print(f'{row["detector_name"]} is making new ini {del_t}')
initial_time = nu_time
new_row = row.copy(deep=False)
new_row['sub_list_num'] = new_sub_list
new_list = new_list.append(new_row, ignore_index=True)
new_nu_time = pd.to_datetime(new_list.neutrino_time, format='%y/%m/%d %H:%M:%S:%f')
new_list['nu_delta_t'] = ((new_nu_time - initial_time).dt.total_seconds()).values
new_list = new_list.sort_values(by='neutrino_time')
self._concat_to_cache(new_list, new_sub_list)
else:
pass
# (2)
if 0 < del_t <= self.coinc_threshold:
# print(f'delta within coinc_threshold, my del t is {del_t}')
print(self._coincident_with_whole_list(message=row.copy(deep=False),
sub_list_num=new_sub_list, ))
if self._coincident_with_whole_list(message=row.copy(deep=False),
sub_list_num=new_sub_list, )[0] == 'COINCIDENT':
# print(f'appending to {row["detector_name"]} new list {new_list} {del_t}')
new_row = row.copy(deep=False)
new_row['sub_list_num'] = new_sub_list
new_row['nu_delta_t'] = del_t
new_list = new_list.append(new_row, ignore_index=True)
new_list = new_list.sort_values(by='neutrino_time')
self._concat_to_cache(new_list, new_sub_list)
else:
pass
# (3)
else:
continue
# reset the index
self.cache_df = self.cache_df.reset_index(drop=True)
# ------------------------------------------------------------------------------------------------------------------
[docs] def _dump_redundant_list(self):
"""Gets rid of a sub list if the whole list is contained in anther list.
"""
for sub_list in list(self.cache_df['sub_list_num'].unique()):
curr_ids = list(self.cache_df.query(f'sub_list_num=={sub_list}')['_id'])
for other_sub_list in list(self.cache_df['sub_list_num'].unique()):
other_ids = list(self.cache_df.query(f'sub_list_num=={other_sub_list}')['_id'])
if sub_list == other_sub_list:
continue
elif len(curr_ids) < len(other_ids) and set(curr_ids).issubset(other_ids):
curr_index = list(self.cache_df.query(f'sub_list_num=={sub_list}').index)
self.cache_df = self.cache_df.drop(curr_index)
self.cache_df = self.cache_df.reset_index(drop=True)
# ------------------------------------------------------------------------------------------------------------------
[docs] def _check_sub_lists(self, message, sub_list):
"""Checks _coincident_with_whole_list's return and applies actions on sub list (append, give new initial time)
Parameters
----------
message: dict
incoming message
sub_list: int
current sub list that _check_coincidence is on
"""
coinc_with_list = self._coincident_with_whole_list(message, sub_list)
if coinc_with_list[0] == 'ALREADY_IN_LIST':
# print('ALREADY_IN_LIST')
self.in_list_already = True
pass
elif coinc_with_list[0] == 'NOT_COINCIDENT':
pass
elif coinc_with_list[0] == 'COINCIDENT':
# print('COINCIDENT')
delta_t = coinc_with_list[1]
self.append_message_to_df(message, delta_t, sub_list)
# print(f'appending to {sub_list}')
elif coinc_with_list[0] == 'EARLY_COINCIDENT':
self.append_message_to_df(message, 0, sub_list)
_sub_list = self.cache_df.query(f'sub_list_num=={sub_list}')
_sub_list = _sub_list.sort_values(by='neutrino_time') # first sort! dt's are sorted too
# re-assign dt making the early message first
_sub_list['nu_delta_t'] = coinc_with_list[1]
self.cache_df = pd.concat([_sub_list, self.cache_df.query(f'sub_list_num!={sub_list}')],
ignore_index=True)
self.cache_df = self.cache_df.reset_index(drop=True)
# ------------------------------------------------------------------------------------------------------------------
[docs] def _check_coincidence(self, message, ):
"""
Parent method of coincidence, calls a support methods to check if a message is coincident.
Appending actions are made by support methods.
(1) Checks subs lists to see if messages is coincident with the whole list.
(2) If messages is not coincident with any lists make a new subs list and looks for other messages
in the cache to append to the new sub list.
(3) If a new detector has been added to any subs list proceed to send an alert (both SNEWS alert and Slack post)
Parameters
----------
message: dict
incoming message
"""
if len(self.cache_df) == 0:
self.append_message_to_df(message, 0, 0)
pass
subs_list_nums = list(self.cache_df['sub_list_num'].unique())
self.in_coincidence = False
self.in_list_already = False
# (1)
for sub_list in subs_list_nums:
# print(f'Checking sub list: {sub_list}')
self._check_sub_lists(message=message, sub_list=sub_list)
# (2)
# print(f'in_coincidence: {self.in_coincidence}')
if not self.in_coincidence:
# print('making new list')
new_sub_list = max(subs_list_nums,
default=-1) + 1 # -1 is there for the very first msg when sub_list_nums empty
self.append_message_to_df(message, 0, new_sub_list)
self._new_list_find_coincidences(message=message, new_sub_list=new_sub_list)
# (3)
# unique detector has been added to a sub_list
# else:
# # not coincidence but maybe already in list
if not self.in_list_already:
print('we got something publishing an alert !')
self._dump_redundant_list()
self.cache_df = self.cache_df.sort_values(by=['sub_list_num', 'received_time'])
self.hype_mode_publish()
self.display_table()
# ----------------------------------------------------------------------------------------------------------------
[docs] def display_table(self):
"""
Display each sub list individually using a markdown table and sends to slack channel.
"""
click.secho(
f'Here is the current coincident table\n',
fg='magenta', bold=True, )
for sub_list in self.cache_df['sub_list_num'].unique():
sub_df = self.cache_df.query(f'sub_list_num=={sub_list}')
sub_df = sub_df.drop(columns=['meta', 'machine_time', 'schema_version'])
sub_df = sub_df.sort_values(by=['received_time'])
# snews_bot.send_table(sub_df)
print(sub_df.to_markdown())
print('=' * 168)
# TODO: 27/02 update for new df format (REWORK)
# ------------------------------------------------------------------------------------------------------------------
[docs] def _retract_from_cache(self, retrc_message):
"""
Parses retraction message, will delete 'n' latest messages. The list is sorted by 'received_time', the latest message
will be at the top of the list.
Parameters
----------
retrc_message: dict
retraction message.
"""
drop_detector = retrc_message['detector_name']
delete_n_many = retrc_message['N_retract_latest']
if retrc_message['N_retract_latest'] == 'ALL':
delete_n_many = self.cache_df.groupby(by='detector_name').size().to_dict()[drop_detector]
print(f'\nDropping latest message(s) from {drop_detector}\nRetracting: {delete_n_many} messages')
sorted_df = self.cache_df.sort_values(by='received_time')
for i in sorted_df.index:
if delete_n_many > 0 and self.cache_df.loc[i, 'detector_name'] == drop_detector:
self.cache_df.drop(index=i, inplace=True)
delete_n_many -= 1
self.cache_df = self.cache_df.reset_index(drop=True)
# ------------------------------------------------------------------------------------------------------------------
[docs] def hype_mode_publish(self):
"""
This method will publish an alert every time a new detector
submits an observation message
Parameters
----------
n_old_unique_count : `int`
the least number of detectors required for the hype publish
"""
click.secho(f'{"=" * 100}', fg='bright_red')
for sub_list in list(self.cache_df['sub_list_num'].unique()):
_sub_df = self.cache_df.query(f'sub_list_num=={sub_list}')
p_vals = _sub_df['p_val'].to_list()
p_vals_avg = _sub_df['p_val'].mean()
nu_times = _sub_df['neutrino_time'].to_list()
detector_names = _sub_df['detector_name'].to_list()
alert_data = cs_utils.data_cs_alert(p_vals=p_vals, p_val_avg=p_vals_avg, sub_list_num=sub_list,
nu_times=nu_times, detector_names=detector_names, )
with self.alert as pub:
alert = self.alert_schema.get_cs_alert_schema(data=alert_data)
pub.send(alert)
click.secho(f'{"NEW COINCIDENT DETECTOR.. ".upper():^100}\n', bg='bright_green', fg='red')
click.secho(f'{"Published an Alert!!!".upper():^100}\n', bg='bright_green', fg='red')
click.secho(f'{"=" * 100}', fg='bright_red')
# snews_bot.send_table(self.cache_df, self.is_test)
# ------------------------------------------------------------------------------------------------------------------
[docs] def dump_old_messages(self, message):
"""
WIP
Checks the time sent by the Updater, if any messages have a 24hrs difference from the updater time
they are thrown out of the df.
Parameters
----------
message: dict
Updater message
"""
current_sent_time = message['sent_time']
ind = 0
for latest_sent_time in self.cache_df['sent_time']:
latest_sent_time = datetime.strptime(latest_sent_time, '%d/%m/%y %H:%M:%S')
current_sent_time = datetime.strptime(current_sent_time, '%d/%m/%y %H:%M:%S')
del_t = (current_sent_time - latest_sent_time).total_seconds()
if del_t >= self.stash_time:
self.cache_df.drop(ind, inplace=True)
ind += 1
self.cache_df = self.cache_df.reset_index(drop=True)
# ------------------------------------------------------------------------------------------------------------------
[docs] def check_rights(self, message):
""" check if the requested user has rights
e.g. to reset the cache
"""
if message['pass'] == os.getenv('snews_cs_admin_pass'):
self.reset_df()
click.secho('Cache restarted', fg='yellow')
else:
click.secho('The user has no right to reset the cache', fg='yellow')
pass
# ------------------------------------------------------------------------------------------------------------------
[docs] def run_coincidence(self):
"""
As the name states this method runs the coincidence system.
Starts by subscribing to the hop observation_topic.
Filters out any messages that don't belong to either CoincidenceTier or Retraction.
* IF a CoincidenceTier message is received then it is passed to _check_coincidence.
* IF a Retraction message is received then it is passed to _retract_from_cache.
* IF a hard-reset is passed then cache is reset.
"""
stream = Stream(until_eos=False)
with stream.open(self.observation_topic, "r") as s:
print(f'Running Coincidence System for {self.observation_topic}\n'
f'Nothing here, please wait...')
for snews_message in s:
# Check for Coincidence
# if it is a reset message, reset and continue
if snews_message['_id'].split('_')[0] == 'hard-reset':
self.check_rights(snews_message)
continue
# if it is an old message, continue
if self._is_old_message(message=snews_message):
continue
# Handle topic messages
if snews_message['_id'].split('_')[1] == self.topic_type:
snews_message['received_time'] = datetime.utcnow().strftime("%y/%m/%d %H:%M:%S:%f")
self.storage.insert_mgs(snews_message)
click.secho(f'{"-" * 57}', fg='bright_blue')
click.secho(f'Incoming message from: {snews_message["detector_name"]}'.upper(), bold=True, fg='red')
self._check_coincidence(message=snews_message)
# Check for Retraction (NEEDS WORK)
elif snews_message['_id'].split('_')[1] == 'Retraction':
if snews_message['which_tier'] == 'CoincidenceTier' or snews_message['which_tier'] == 'ALL':
snews_message['received_time'] = datetime.utcnow().strftime("%y/%m/%d %H:%M:%S:%f")
self._retract_from_cache(snews_message)
self.storage.insert_mgs(snews_message)
else:
pass
# Does not follow snews_pt convention
else:
click.secho(f"Attempted to submit a message that does not follow "
f"snews_pt convention. \nThis is not supported now", fg='red')
print(f"Message id received; \n{snews_message['_id']}\n")