Source code for snews_cs.snews_hb
"""
This a module to handle all heartbeat related work
"""
import json
import os
from datetime import UTC, datetime
from pathlib import Path
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from .core.logging import getLogger
from .cs_utils import make_beat_directory, set_env
from .database import Database
[docs]
log = getLogger(__name__)
# Check if detector name is in registered list.
[docs]
detector_file = os.path.abspath(
os.path.join(os.path.dirname(__file__), "etc/detector_properties.json")
)
with open(detector_file) as file:
snews_detectors = json.load(file)
[docs]
snews_detectors = list(snews_detectors.keys())
[docs]
beats_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../beats"))
# TODO: make a list of internal heartbeats, and send us = SERVER heartbeats.
# How many times a day can server log these heartbeats, (what is the livetime of server)
[docs]
def sanity_checks(message):
"""check if the message will crash the server
Check the following
- detector frequencies are reasonable
- latencies are reasonable
- At least one detector is operational
"""
for key in ["detector_name", "detector_status"]:
if key not in message.keys():
log.error(
f"\t> {message} is received at snews_hb.py but not valid.\n"
f"{key} is not in message keys"
)
return False
if not isinstance(message["received_time_utc"], np.datetime64):
log.error(
f"\t> {message} is received at snews_hb.py but not valid.\n"
f"{message['received_time']} is not a np.datetime64 object"
)
return False
if not message["detector_status"].lower() in ["on", "off"]:
log.error(
f"\t> {message} is received at snews_hb.py but not valid.\n"
f"{message['detector_status']} is neither ON nor OFF"
)
return False
if not message["detector_name"] in snews_detectors:
log.error(
f"\t> {message} is received at snews_hb.py but not valid.\n"
f"{message['detector_name']} is not a valid detector"
)
return False
return True
[docs]
class HeartBeat:
"""Class to handle heartbeat message stream"""
def __init__(self, env_path=None, store=True, firedrill_mode=True):
"""
:param store: `bool`
"""
log.info("\t> Heartbeat Instance is created.")
set_env(env_path)
[docs]
self.delete_after = float(os.getenv("HB_DELETE_AFTER", "7")) # days
if firedrill_mode:
self.heartbeat_topic = os.getenv("FIREDRILL_OBSERVATION_TOPIC")
else:
self.heartbeat_topic = os.getenv("OBSERVATION_TOPIC")
[docs]
self.column_names = [
"received_time_utc",
"detector",
"stamped_time_utc",
"latency",
"time_after_last",
"status",
]
# Update the alert cache
[docs]
self.cache_engine = Database(
db_file_path=Path(__file__).parent.parent / "snews_cs.db"
).engine
try:
# Try reading cached data from SQL DB
self.cache_df = pd.read_sql_table(
"cached_heartbeats",
self.cache_engine,
parse_dates=["received_time_utc", "stamped_time_utc"],
)
except Exception:
# Fall-through if cache does not exist; create it
self.cache_df = pd.DataFrame(columns=self.column_names)
[docs]
self._last_row = pd.DataFrame(
columns=self.column_names
)
[docs]
def make_entry(self, message):
"""Make an entry in the cache df using new message
# NOTE:
since we create last row separately, the sequence matters
"""
msg = {
"received_time_utc": message["received_time_utc"],
"detector": message["detector_name"],
}
msg["stamped_time_utc"] = np.datetime64(message["sent_time_utc"])
msg["latency"] = (
msg["received_time_utc"] - msg["stamped_time_utc"]
).astype("timedelta64[s]").astype(int)
# check the last message of given detector
detector_df = self.cache_df[self.cache_df["detector"] == msg["detector"]]
if len(detector_df):
msg["time_after_last"] = (
msg["received_time_utc"] - detector_df["received_time_utc"].max()
).total_seconds()
else:
msg["time_after_last"] = 0 # timedelta(0)
msg["status"] = message["detector_status"]
self._last_row = pd.DataFrame([msg])
# add this new entry to cache
if len(self.cache_df) == 0:
self.cache_df = self._last_row
else:
self.cache_df = pd.concat(
[self.cache_df, self._last_row], ignore_index=True
)
[docs]
def drop_old_messages(self):
"""Keep the heartbeats for a time period delta.
Drop the earlier messages from cache
"""
delta = f"{self.delete_after} day"
curr_time = np.datetime64(
datetime.utcnow().isoformat()
) # pd.to_datetime('now', utc=True)
delta_t = curr_time - pd.to_datetime(self.cache_df["received_time_utc"])
select = delta_t < pd.Timedelta(delta)
self.cache_df = self.cache_df[select]
self.cache_df.sort_values(by=["received_time_utc"], inplace=True)
[docs]
def update_cache(self):
"""Update the alert cache with whatever we decided to keep."""
self.cache_df.to_sql(
"cached_heartbeats", self.cache_engine, if_exists="replace", index=False
)
[docs]
def display_table(self):
"""When printed out, these table can be read from the purdue servers
-only- by admins.
"""
print(
f"\nCurrent cache \n{'=' * 133}\n{self.cache_df.to_markdown()}\n{'=' * 133}\n"
)
[docs]
def electrocardiogram(self, message):
try:
message["received_time_utc"] = np.datetime64(
datetime.now(UTC).isoformat()
) # pd.to_datetime('now', utc=True)
if sanity_checks(message):
self.make_entry(message)
self.update_cache()
self.drop_old_messages()
# if all successful, return True. Not logging each time, not to overcrowd
return True
else:
return False
except Exception as e:
log.error(f"\t Some heartbeats didn't make it\n{e}\n")
print(f"Something went terribly wrong \n {e}")
return False