snews_cs.snews_coinc ==================== .. py:module:: snews_cs.snews_coinc Attributes ---------- .. autoapisummary:: snews_cs.snews_coinc.log Classes ------- .. autoapisummary:: snews_cs.snews_coinc.CacheManager snews_cs.snews_coinc.CoincidenceDistributor Functions --------- .. autoapisummary:: snews_cs.snews_coinc.np_datetime_delta_sec Module Contents --------------- .. py:data:: log .. py:function:: np_datetime_delta_sec(t_1, t_2) Return the time difference between two numpy datetime64 objects in seconds Returns: float (seconds) .. rubric:: Notes t_1 is expected to be the earlier time (no absolute value is taken) .. py:class:: CacheManager This class handles all the incoming data to the SNEWS CS Cache, adding messages, organizing sub-groups, retractions and updating cache entries .. py:attribute:: cache .. py:attribute:: updated :value: [] .. py:attribute:: msg_state :value: None .. py:attribute:: sub_group_state .. py:method:: add_to_cache(message) Takes in SNEWS message and checks if it is a retraction, update or new addition to cache. This is the 'core' function of CoincidenceDataHandler :param message: SNEWS Message, must be PT valid :type message: dict .. py:method:: _manage_cache(message) This method will add a new message to cache, checks if: A)It is an initial message (to the entire cache) or if it: B)Forms a new sub-group (sends message to _check_coinc_in_subgroups) C)Is confident to a sub-group (sends message to _check_coinc_in_subgroups) :param message: .. py:method:: _check_coinc_in_subgroups(message) This method either: A)Adds Message to an existing sub-group, if coincident with the initial signal B) If NOT coincident with any sub groups it creates two new sub groups, setting the message as their initial time. The new groups consist of coincident signals with earlier arrival time and later arrival times, respectively. Once created the new groups are checked to see if they are redundant, and if so then they are not added to the main cache. :param message: SNEWS message :type message: dict .. py:method:: _check_for_redundancies(sub_cache) Checks if sub cache is redundant :param sub_cache: New sub group :type sub_cache: dataframe :returns: True if sub group is redundant False if sub cache is unique :rtype: bool .. py:method:: _organize_cache(sub_cache) This method makes sure that the nu_delta_times are not negative, recalculates new deltas using the proper initial time :param sub_cache: Sub group :type sub_cache: dataframe .. py:method:: _fix_deltas(sub_df) This method fixes the deltas of the sub group by resetting the initial nu time :param sub_df: Sub cache :type sub_df: Dataframe :returns: **sub_df** -- Sub cache with fixed nu time deltas :rtype: Dataframe .. py:method:: _update_message(message) If triggered this method updates the p_val and neutrino time of a detector in cache. :param message: SNEWS message :type message: dict .. py:method:: cache_retraction(retraction_message) This method handles message retraction by parsing the cache and dumping any instance of the target detector :param retraction_message: SNEWS retraction message :type retraction_message: dict .. py:class:: CoincidenceDistributor(env_path=None, drop_db=False, firedrill_mode=True, hb_path=None, server_tag=None, send_email=False, send_slack=True, show_table=False) .. py:attribute:: show_table :value: False .. py:attribute:: send_email :value: False .. py:attribute:: send_slack :value: True .. py:attribute:: hb_path :value: None .. py:attribute:: server_tag :value: None .. py:attribute:: storage .. py:attribute:: topic_type :value: 'CoincidenceTier' .. py:attribute:: coinc_threshold .. py:attribute:: cache_expiration :value: 86400 .. py:attribute:: retriable_error_count :value: 0 .. py:attribute:: max_retriable_errors :value: 20 .. py:attribute:: exit_on_error :value: False .. py:attribute:: initial_set :value: False .. py:attribute:: alert .. py:attribute:: test_alert .. py:attribute:: test_topic .. py:attribute:: alert_schema .. py:attribute:: store_heartbeat .. py:attribute:: heartbeat .. py:attribute:: stash_time :value: 86400 .. py:attribute:: coinc_data .. py:attribute:: test_coinc_data .. py:attribute:: message_count .. py:attribute:: test_message_count .. py:method:: clear_cache(is_test=False) When a reset cache is passed, recreate the CoincidenceDataHandler instance .. py:method:: display_table(is_test=False) Display each sub list individually using a markdown table. .. py:method:: send_alert(sub_group_tag, alert_type, is_test=False) .. py:method:: alert_decider(is_test=False) This method will publish an alert every time a new detector submits an observation message. .. py:method:: deal_with_the_cache(snews_message) Check if the message is a test or not, then add it to the cache and run the alert decider :param snews_message: :type snews_message: dict read from the Kafka stream. :rtype: adds messages to cache and runs the coincidence decider .. py:method:: run_coincidence() As the name states this method runs the coincidence system. Starts by subscribing to the hop observation_topic. * If a CoincidenceTier message is received then it is passed to _check_coincidence. * other commands include "test-connection", "test-scenarios", "hard-reset", "Retraction", **** Reconnect logic and retryable errors thanks to Spencer Nelson (https://github.com/spenczar) https://github.com/scimma/hop-client/issues/140