Source code for snews_cs.snews_db
import pymongo
from . import cs_utils
import os
[docs]class Storage:
"""
This class sets up the SNEWS database using Mongo DB
Parameters
----------
env : `str`, optional
Path to env file, defaults to './auxlilary/test-config.env'
drop_db : `bool`, optional
drops all items in the DB every time Storage is initialized, defaults to False
use_local_db : `bool`, optional
Tells Storage to set up a local DB, defaults to False.
"""
def __init__(self, env=None, drop_db=True, use_local_db=False):
cs_utils.set_env(env)
self.mgs_expiration = int(os.getenv('MSG_EXPIRATION'))
self.coinc_threshold = int(os.getenv('COINCIDENCE_THRESHOLD'))
self.mongo_server = os.getenv('DATABASE_SERVER')
if use_local_db:
self.client = pymongo.MongoClient('mongodb://localhost:27017/') #, replicaset='rs0')
else:
self.client = pymongo.MongoClient(self.mongo_server)
self.db = self.client.snews_db
self.all_mgs = self.db.all_mgs
self.false_warnings = self.db.false_warnings
self.sig_tier_archive = self.db.sig_tier_archive
self.time_tier_archive = self.db.time_tier_archive
self.coincidence_tier_archive = self.db.coincidence_tier_archive
self.coincidence_tier_alerts = self.db.coincidence_tier_alerts
self.time_tier_alerts = self.db.time_tier_alerts
self.sig_tier_alerts = self.db.sig_tier_alerts
if drop_db:
# kill all old colls
self.all_mgs.delete_many({})
self.coincidence_tier_archive.delete_many({})
self.coincidence_tier_alerts.delete_many({})
self.false_warnings.delete_many({})
self.time_tier_archive.delete_many({})
self.sig_tier_archive.delete_many({})
self.time_tier_alerts.delete_many({})
self.sig_tier_alerts.delete_many({})
# drop the old index
self.all_mgs.drop_indexes()
self.coincidence_tier_archive.drop_indexes()
self.false_warnings.drop_indexes()
self.coincidence_tier_alerts.drop_indexes()
self.time_tier_archive.drop_indexes()
self.sig_tier_archive.drop_indexes()
self.time_tier_alerts.drop_indexes()
self.sig_tier_alerts.drop_indexes()
# set index
self.all_mgs.create_index('received_time')
self.coincidence_tier_archive.create_index('received_time', expireAfterSeconds=self.mgs_expiration)
self.sig_tier_archive.create_index('received_time', expireAfterSeconds=self.mgs_expiration)
self.time_tier_archive.create_index('received_time', expireAfterSeconds=self.mgs_expiration)
self.false_warnings.create_index('received_time')
self.coincidence_tier_alerts.create_index('received_time')
self.sig_tier_alerts.create_index('received_time')
self.time_tier_alerts.create_index('received_time')
self.coll_list = {
'CoincidenceTier': self.coincidence_tier_archive,
'SigTier':self.sig_tier_archive,
'TimeTier':self.time_tier_archive,
'Retraction': self.false_warnings,
'CoincidenceTierAlert': self.coincidence_tier_alerts,
'SigTierAlert': self.sig_tier_alerts,
'TimeTierAlert': self.time_tier_alerts,
}
[docs] def insert_mgs(self, mgs):
""" This method inserts a SNEWS message to its corresponding collection
Parameters
----------
mgs : `dict`
dictionary of the SNEWS message
"""
mgs_type = mgs['_id'].split('_')[1]
specific_coll = self.coll_list[mgs_type]
specific_coll.insert_one(mgs)
self.all_mgs.insert_one(mgs)
[docs] def get_all_messages(self, sort_order=pymongo.ASCENDING):
""" Returns a list of all messages in the 'all-messages' collection
Parameters
----------
sort_order : `object`, optional
default to `pymongo.ASCENDING`
Returns
-------
result : `list`
A list containing all items inside 'All-Messages'
"""
return self.all_mgs.find().sort('received_time', sort_order)
[docs] def get_coincidence_tier_archive(self, sort_order=pymongo.ASCENDING):
""" Returns a list of all messages in the 'cache' collection
Parameters
----------
sort_order : `object`, optional
default to `pymong.ASCENDING`
Returns
-------
result : `list`
A list containing all items inside 'Coincidence Tier Cache'
"""
return self.coincidence_tier_archive.find().sort('received_time', sort_order)
[docs] def get_false_warnings(self, sort_order=pymongo.ASCENDING):
""" Returns a list of all messages in the 'cache' collection
Parameters
----------
sort_order : `object`, optional
default to `pymong.ASCENDING`
Returns
-------
result : `list`
A list containing all items inside 'Coincidence Tier Cahce'
"""
return self.false_warnings.find().sort('received_time', sort_order)
[docs] def empty_retractions(self):
""" Returns True of if false warnings is empty
"""
if self.false_warnings.count() == 0:
return True
else:
return False
[docs] def empty_coinc_archive(self):
""" Returns True of if coincidence cache is empty
"""
if self.coincidence_tier_archive.count() <= 1:
return True
else:
return False
[docs] def purge_archive(self, coll):
""" Erases all items in a collection
Parameters
----------
coll : `str`
name of collection
"""
self.coll_list[coll].delete_many({})
[docs] def get_alert_collection(self, which_tier):
"""Gives a Mongo cursor for a specifc alert collection
Parameters
----------
which_tier : 'str'
Name of OBS tier
Returns
-------
collection cursor: 'mongo cursor object'
An ordered list of all documents inside the collection.
"""
sort_order = pymongo.ASCENDING
return self.coll_list[f'{which_tier}Alert'].find().sort('received_time', sort_order)