Module src.StateDatabase.couchdb
Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
# - Jonatan Enes [main](jonatan.enes@udc.es)
# - Roberto R. Expósito
# - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.
import random
import time
import requests
import json
class CouchDBServer:
post_doc_headers = {'content-type': 'application/json'}
__COUCHDB_URL = "couchdb"
__COUCHDB_PORT = 5984
__structures_db_name = "structures"
__services_db_name = "services"
__limits_db_name = "limits"
__rules_db_name = "rules"
__events_db_name = "events"
__requests_db_name = "requests"
__users_db_name = "users"
__MAX_UPDATE_TRIES = 10
__DATABASE_TIMEOUT = 10
def __init__(self, couchdb_url=None, couchdbdb_port=None):
if not couchdb_url:
couchdb_url = self.__COUCHDB_URL
if not couchdbdb_port:
couchdbdb_port = self.__COUCHDB_PORT
else:
try:
couchdbdb_port = int(couchdbdb_port)
except ValueError:
couchdbdb_port = self.__COUCHDB_PORT
# TODO admin username and password are hard-coded
self.server = "http://admin:admin@{0}:{1}".format(couchdb_url, str(couchdbdb_port))
self.session = requests.Session()
def close_connection(self):
self.session.close()
def set_database_name(self, database_type, database_name):
if database_type == "structures":
self.__structures_db_name = database_name
elif database_type == "services":
self.__services_db_name = database_name
elif database_type == "limits":
self.__limits_db_name = database_name
elif database_type == "rules":
self.__rules_db_name = database_name
elif database_type == "events":
self.__events_db_name = database_name
elif database_type == "requests":
self.__requests_db_name = database_name
elif database_type == "profiles":
self.__profiles_db_name = database_name
else:
pass
def database_exists(self, database):
r = self.session.head(self.server + "/" + database)
return r.status_code == 200
def create_database(self, database):
r = self.session.put(self.server + "/" + database)
if r.status_code != 201:
r.raise_for_status()
else:
return True
def remove_database(self, database):
r = self.session.delete(self.server + "/" + database)
if r.status_code != 200:
r.raise_for_status()
else:
return True
def compact_database(self, database):
r = self.session.post(self.server + "/" + database + "/_compact", headers=self.post_doc_headers)
if r.status_code != 202:
r.raise_for_status()
else:
return json.loads(r.text)["ok"]
def __get_all_database_docs(self, database):
# TODO Implement pagination
docs = list()
r = self.session.get(self.server + "/" + database + "/_all_docs?include_docs=true",
timeout=self.__DATABASE_TIMEOUT)
if r.status_code != 200:
r.raise_for_status()
else:
rows = json.loads(r.text)["rows"]
for row in rows:
docs.append(row["doc"])
return docs
# PRIVATE CRUD METHODS #
# def __delete_doc(self, database, docid, rev):
# r = self.session.delete("{0}/{1}/{2}?rev={3}".format(self.server, database, str(docid), str(rev)))
# if r.status_code != 200:
# r.raise_for_status()
# else:
# return True
def __add_doc(self, database, doc):
r = self.session.post(self.server + "/" + database, data=json.dumps(doc), headers=self.post_doc_headers)
if r.status_code != 200:
r.raise_for_status()
else:
return True
def __add_bulk_docs(self, database, docs):
docs_data = {"docs": docs}
r = self.session.post(self.server + "/" + database + "/_bulk_docs", data=json.dumps(docs_data),
headers=self.post_doc_headers)
if r.status_code != 201:
r.raise_for_status()
else:
return True
def __resilient_delete_doc(self, database, doc, max_tries=10):
time_backoff_milliseconds = 100
i = 0
while i < max_tries:
i += 1
r = self.session.delete("{0}/{1}/{2}?rev={3}".format(self.server, database, str(doc["_id"]), str(doc["_rev"])))
if r.status_code == 200 or r.status_code == 201:
return True
elif r.status_code == 409:
# Conflict error, document may have been updated (e.g., heartbeat of services),
# update revision and retry after slightly random wait
time.sleep((time_backoff_milliseconds + random.randint(1, 100)) / 1000)
matches = self.__find_documents_by_matches(database, {"_id": doc["_id"]})
if len(matches) > 0:
new_doc = matches[0]
doc["_rev"] = new_doc["_rev"]
else:
r.raise_for_status()
return False
def __delete_bulk_docs(self, database, docs):
for doc in docs:
doc["_deleted"] = True
self.__add_bulk_docs(database, docs)
def __merge(self, input_dict, output_dict):
for key, value in input_dict.items():
if isinstance(value, dict):
# get node or create one
node = output_dict.setdefault(key, {})
self.__merge(value, node)
else:
output_dict[key] = value
return output_dict
def __resilient_update_doc(self, database, doc, previous_tries=0, time_backoff_milliseconds=100, max_tries=20):
r = self.session.post(self.server + "/" + database, data=json.dumps(doc), headers=self.post_doc_headers)
if r.status_code != 200 and r.status_code != 201:
if r.status_code == 409:
# Conflict error, document may have been updated (e.g., heartbeat of services),
# update revision and retry after slightly random wait
if 0 <= previous_tries < max_tries:
time.sleep((time_backoff_milliseconds + random.randint(1, 100)) / 1000)
matches = self.__find_documents_by_matches(database, {"_id": doc["_id"]})
if len(matches) > 0:
new_doc = matches[0]
doc["_rev"] = new_doc["_rev"]
doc = self.__merge(doc, new_doc)
return self.__resilient_update_doc(database, doc, previous_tries=previous_tries + 1,
max_tries=max_tries)
else:
return self.__resilient_update_doc(database, doc, previous_tries=previous_tries + 1,
max_tries=max_tries)
else:
r.raise_for_status()
elif r.status_code == 404:
# Database may have been reinitialized (deleted and recreated), wait and retry again
time.sleep((time_backoff_milliseconds + random.randint(1, 200)) / 1000)
return self.__resilient_update_doc(database, doc, previous_tries + 1)
else:
r.raise_for_status()
return False
# TODO This new method should work, test it in isolation and new commit
# def __resilient_update_doc(self, database, doc, max_tries=10):
# time_backoff_milliseconds = 100
# tries = 0
# while tries < max_tries:
# r = self.session.post(self.server + "/" + database, data=json.dumps(doc), headers=self.post_doc_headers)
# if r.status_code == 200 and r.status_code == 201:
# return True
# elif r.status_code == 409:
# # Conflict error, document may have been updated (e.g., heartbeat of services),
# # update revision and retry after slightly random wait
# time.sleep((time_backoff_milliseconds + random.randint(1, 100)) / 1000)
# matches = self.__find_documents_by_matches(database, {"_id": doc["_id"]})
# if len(matches) > 0:
# new_doc = matches[0]
# doc["_rev"] = new_doc["_rev"]
# doc = self.__merge(doc, new_doc)
# elif r.status_code == 404:
# # Database may have been reinitialized (deleted and recreated), wait a little bit longer and retry again
# time.sleep((time_backoff_milliseconds + random.randint(1, 200)) / 1000)
# else:
# r.raise_for_status()
# return False
def __find_documents_by_matches(self, database, selectors):
# TODO Implement pagination
query = {"selector": {}, "limit": 50}
for key in selectors:
query["selector"][key] = selectors[key]
req_docs = self.session.post(self.server + "/" + database + "/_find", data=json.dumps(query),
headers={'Content-Type': 'application/json'})
if req_docs.status_code != 200:
req_docs.raise_for_status()
else:
return req_docs.json()["docs"]
def __find_document_by_name(self, database, doc_name):
docs = self.__find_documents_by_matches(database, {"name": doc_name})
if not docs:
raise ValueError("Document with name {0} not found in database {1}".format(doc_name, database))
else:
# Return the first one as it should only be one
return dict(docs[0])
# STRUCTURES #
def add_structure(self, structure):
return self.__add_doc(self.__structures_db_name, structure)
def get_structure(self, structure_name):
return self.__find_document_by_name(self.__structures_db_name, structure_name)
def get_structures(self, subtype=None):
if subtype is None:
return self.__get_all_database_docs(self.__structures_db_name)
else:
return self.__find_documents_by_matches(self.__structures_db_name, {"subtype": subtype})
def delete_structure(self, structure):
self.__resilient_delete_doc(self.__structures_db_name, structure)
def update_structure(self, structure, max_tries=10):
return self.__resilient_update_doc(self.__structures_db_name, structure, max_tries=max_tries)
# EVENTS #
def add_event(self, event):
self.__add_doc(self.__events_db_name, event)
def add_events(self, events):
self.__add_bulk_docs(self.__events_db_name, events)
def get_events(self, structure):
return self.__find_documents_by_matches(self.__events_db_name, {"structure": structure["name"]})
def delete_num_events_by_structure(self, structure, event_name, event_num):
events = self.__find_documents_by_matches(self.__events_db_name,
{"structure": structure["name"], "name": event_name})
event_num = min(len(events), event_num)
events_to_delete = events[0:event_num]
self.__delete_bulk_docs(self.__events_db_name, events_to_delete)
def delete_event(self, event):
self.__resilient_delete_doc(self.__events_db_name, event)
def delete_events(self, events):
self.__delete_bulk_docs(self.__events_db_name, events)
# LIMITS #
def add_limit(self, limit):
return self.__add_doc(self.__limits_db_name, limit)
def get_all_limits(self):
return self.__get_all_database_docs(self.__limits_db_name)
def get_limits(self, structure):
# Return just the first item, as it should only be one 'limits' document, otherwise raise error
limits = self.__find_documents_by_matches(self.__limits_db_name, {"name": structure["name"]})
if not limits:
raise ValueError("Structure with name {0} has no limits".format(structure["name"]))
else:
return limits[0]
def update_limit(self, limit):
return self.__resilient_update_doc(self.__limits_db_name, limit)
# REQUESTS #
def get_requests(self, structure=None):
if structure is None:
return self.__get_all_database_docs(self.__requests_db_name)
else:
return self.__find_documents_by_matches(self.__requests_db_name, {"structure": structure["name"]})
def add_request(self, req):
self.__add_doc(self.__requests_db_name, req)
def add_requests(self, reqs):
self.__add_bulk_docs(self.__requests_db_name, reqs)
def delete_request(self, request):
self.__resilient_delete_doc(self.__requests_db_name, request)
def delete_requests(self, requests):
self.__delete_bulk_docs(self.__requests_db_name, requests)
# RULES #
def add_rule(self, rule):
return self.__add_doc(self.__rules_db_name, rule)
def get_rule(self, rule_name):
return self.__find_document_by_name(self.__rules_db_name, rule_name)
def get_rules(self):
return self.__get_all_database_docs(self.__rules_db_name)
def update_rule(self, rule):
return self.__resilient_update_doc(self.__rules_db_name, rule)
# USERS #
def add_user(self, user):
return self.__add_doc(self.__users_db_name, user)
def get_users(self):
return self.__get_all_database_docs(self.__users_db_name)
def get_user(self, user_name):
return self.__find_document_by_name(self.__users_db_name, user_name)
def update_user(self, user, max_tries=10):
return self.__resilient_update_doc(self.__users_db_name, user, max_tries=max_tries)
# SERVICES #
def get_services(self):
return self.__get_all_database_docs(self.__services_db_name)
def get_service(self, service_name):
return self.__find_document_by_name(self.__services_db_name, service_name)
def add_service(self, service):
return self.__add_doc(self.__services_db_name, service)
def update_service(self, service):
return self.__resilient_update_doc(self.__services_db_name, service)
def delete_service(self, service):
return self.__resilient_delete_doc(self.__services_db_name, service)
Classes
class CouchDBServer (couchdb_url=None, couchdbdb_port=None)
-
Expand source code
class CouchDBServer: post_doc_headers = {'content-type': 'application/json'} __COUCHDB_URL = "couchdb" __COUCHDB_PORT = 5984 __structures_db_name = "structures" __services_db_name = "services" __limits_db_name = "limits" __rules_db_name = "rules" __events_db_name = "events" __requests_db_name = "requests" __users_db_name = "users" __MAX_UPDATE_TRIES = 10 __DATABASE_TIMEOUT = 10 def __init__(self, couchdb_url=None, couchdbdb_port=None): if not couchdb_url: couchdb_url = self.__COUCHDB_URL if not couchdbdb_port: couchdbdb_port = self.__COUCHDB_PORT else: try: couchdbdb_port = int(couchdbdb_port) except ValueError: couchdbdb_port = self.__COUCHDB_PORT # TODO admin username and password are hard-coded self.server = "http://admin:admin@{0}:{1}".format(couchdb_url, str(couchdbdb_port)) self.session = requests.Session() def close_connection(self): self.session.close() def set_database_name(self, database_type, database_name): if database_type == "structures": self.__structures_db_name = database_name elif database_type == "services": self.__services_db_name = database_name elif database_type == "limits": self.__limits_db_name = database_name elif database_type == "rules": self.__rules_db_name = database_name elif database_type == "events": self.__events_db_name = database_name elif database_type == "requests": self.__requests_db_name = database_name elif database_type == "profiles": self.__profiles_db_name = database_name else: pass def database_exists(self, database): r = self.session.head(self.server + "/" + database) return r.status_code == 200 def create_database(self, database): r = self.session.put(self.server + "/" + database) if r.status_code != 201: r.raise_for_status() else: return True def remove_database(self, database): r = self.session.delete(self.server + "/" + database) if r.status_code != 200: r.raise_for_status() else: return True def compact_database(self, database): r = self.session.post(self.server + "/" + database + "/_compact", headers=self.post_doc_headers) if r.status_code != 202: r.raise_for_status() else: return json.loads(r.text)["ok"] def __get_all_database_docs(self, database): # TODO Implement pagination docs = list() r = self.session.get(self.server + "/" + database + "/_all_docs?include_docs=true", timeout=self.__DATABASE_TIMEOUT) if r.status_code != 200: r.raise_for_status() else: rows = json.loads(r.text)["rows"] for row in rows: docs.append(row["doc"]) return docs # PRIVATE CRUD METHODS # # def __delete_doc(self, database, docid, rev): # r = self.session.delete("{0}/{1}/{2}?rev={3}".format(self.server, database, str(docid), str(rev))) # if r.status_code != 200: # r.raise_for_status() # else: # return True def __add_doc(self, database, doc): r = self.session.post(self.server + "/" + database, data=json.dumps(doc), headers=self.post_doc_headers) if r.status_code != 200: r.raise_for_status() else: return True def __add_bulk_docs(self, database, docs): docs_data = {"docs": docs} r = self.session.post(self.server + "/" + database + "/_bulk_docs", data=json.dumps(docs_data), headers=self.post_doc_headers) if r.status_code != 201: r.raise_for_status() else: return True def __resilient_delete_doc(self, database, doc, max_tries=10): time_backoff_milliseconds = 100 i = 0 while i < max_tries: i += 1 r = self.session.delete("{0}/{1}/{2}?rev={3}".format(self.server, database, str(doc["_id"]), str(doc["_rev"]))) if r.status_code == 200 or r.status_code == 201: return True elif r.status_code == 409: # Conflict error, document may have been updated (e.g., heartbeat of services), # update revision and retry after slightly random wait time.sleep((time_backoff_milliseconds + random.randint(1, 100)) / 1000) matches = self.__find_documents_by_matches(database, {"_id": doc["_id"]}) if len(matches) > 0: new_doc = matches[0] doc["_rev"] = new_doc["_rev"] else: r.raise_for_status() return False def __delete_bulk_docs(self, database, docs): for doc in docs: doc["_deleted"] = True self.__add_bulk_docs(database, docs) def __merge(self, input_dict, output_dict): for key, value in input_dict.items(): if isinstance(value, dict): # get node or create one node = output_dict.setdefault(key, {}) self.__merge(value, node) else: output_dict[key] = value return output_dict def __resilient_update_doc(self, database, doc, previous_tries=0, time_backoff_milliseconds=100, max_tries=20): r = self.session.post(self.server + "/" + database, data=json.dumps(doc), headers=self.post_doc_headers) if r.status_code != 200 and r.status_code != 201: if r.status_code == 409: # Conflict error, document may have been updated (e.g., heartbeat of services), # update revision and retry after slightly random wait if 0 <= previous_tries < max_tries: time.sleep((time_backoff_milliseconds + random.randint(1, 100)) / 1000) matches = self.__find_documents_by_matches(database, {"_id": doc["_id"]}) if len(matches) > 0: new_doc = matches[0] doc["_rev"] = new_doc["_rev"] doc = self.__merge(doc, new_doc) return self.__resilient_update_doc(database, doc, previous_tries=previous_tries + 1, max_tries=max_tries) else: return self.__resilient_update_doc(database, doc, previous_tries=previous_tries + 1, max_tries=max_tries) else: r.raise_for_status() elif r.status_code == 404: # Database may have been reinitialized (deleted and recreated), wait and retry again time.sleep((time_backoff_milliseconds + random.randint(1, 200)) / 1000) return self.__resilient_update_doc(database, doc, previous_tries + 1) else: r.raise_for_status() return False # TODO This new method should work, test it in isolation and new commit # def __resilient_update_doc(self, database, doc, max_tries=10): # time_backoff_milliseconds = 100 # tries = 0 # while tries < max_tries: # r = self.session.post(self.server + "/" + database, data=json.dumps(doc), headers=self.post_doc_headers) # if r.status_code == 200 and r.status_code == 201: # return True # elif r.status_code == 409: # # Conflict error, document may have been updated (e.g., heartbeat of services), # # update revision and retry after slightly random wait # time.sleep((time_backoff_milliseconds + random.randint(1, 100)) / 1000) # matches = self.__find_documents_by_matches(database, {"_id": doc["_id"]}) # if len(matches) > 0: # new_doc = matches[0] # doc["_rev"] = new_doc["_rev"] # doc = self.__merge(doc, new_doc) # elif r.status_code == 404: # # Database may have been reinitialized (deleted and recreated), wait a little bit longer and retry again # time.sleep((time_backoff_milliseconds + random.randint(1, 200)) / 1000) # else: # r.raise_for_status() # return False def __find_documents_by_matches(self, database, selectors): # TODO Implement pagination query = {"selector": {}, "limit": 50} for key in selectors: query["selector"][key] = selectors[key] req_docs = self.session.post(self.server + "/" + database + "/_find", data=json.dumps(query), headers={'Content-Type': 'application/json'}) if req_docs.status_code != 200: req_docs.raise_for_status() else: return req_docs.json()["docs"] def __find_document_by_name(self, database, doc_name): docs = self.__find_documents_by_matches(database, {"name": doc_name}) if not docs: raise ValueError("Document with name {0} not found in database {1}".format(doc_name, database)) else: # Return the first one as it should only be one return dict(docs[0]) # STRUCTURES # def add_structure(self, structure): return self.__add_doc(self.__structures_db_name, structure) def get_structure(self, structure_name): return self.__find_document_by_name(self.__structures_db_name, structure_name) def get_structures(self, subtype=None): if subtype is None: return self.__get_all_database_docs(self.__structures_db_name) else: return self.__find_documents_by_matches(self.__structures_db_name, {"subtype": subtype}) def delete_structure(self, structure): self.__resilient_delete_doc(self.__structures_db_name, structure) def update_structure(self, structure, max_tries=10): return self.__resilient_update_doc(self.__structures_db_name, structure, max_tries=max_tries) # EVENTS # def add_event(self, event): self.__add_doc(self.__events_db_name, event) def add_events(self, events): self.__add_bulk_docs(self.__events_db_name, events) def get_events(self, structure): return self.__find_documents_by_matches(self.__events_db_name, {"structure": structure["name"]}) def delete_num_events_by_structure(self, structure, event_name, event_num): events = self.__find_documents_by_matches(self.__events_db_name, {"structure": structure["name"], "name": event_name}) event_num = min(len(events), event_num) events_to_delete = events[0:event_num] self.__delete_bulk_docs(self.__events_db_name, events_to_delete) def delete_event(self, event): self.__resilient_delete_doc(self.__events_db_name, event) def delete_events(self, events): self.__delete_bulk_docs(self.__events_db_name, events) # LIMITS # def add_limit(self, limit): return self.__add_doc(self.__limits_db_name, limit) def get_all_limits(self): return self.__get_all_database_docs(self.__limits_db_name) def get_limits(self, structure): # Return just the first item, as it should only be one 'limits' document, otherwise raise error limits = self.__find_documents_by_matches(self.__limits_db_name, {"name": structure["name"]}) if not limits: raise ValueError("Structure with name {0} has no limits".format(structure["name"])) else: return limits[0] def update_limit(self, limit): return self.__resilient_update_doc(self.__limits_db_name, limit) # REQUESTS # def get_requests(self, structure=None): if structure is None: return self.__get_all_database_docs(self.__requests_db_name) else: return self.__find_documents_by_matches(self.__requests_db_name, {"structure": structure["name"]}) def add_request(self, req): self.__add_doc(self.__requests_db_name, req) def add_requests(self, reqs): self.__add_bulk_docs(self.__requests_db_name, reqs) def delete_request(self, request): self.__resilient_delete_doc(self.__requests_db_name, request) def delete_requests(self, requests): self.__delete_bulk_docs(self.__requests_db_name, requests) # RULES # def add_rule(self, rule): return self.__add_doc(self.__rules_db_name, rule) def get_rule(self, rule_name): return self.__find_document_by_name(self.__rules_db_name, rule_name) def get_rules(self): return self.__get_all_database_docs(self.__rules_db_name) def update_rule(self, rule): return self.__resilient_update_doc(self.__rules_db_name, rule) # USERS # def add_user(self, user): return self.__add_doc(self.__users_db_name, user) def get_users(self): return self.__get_all_database_docs(self.__users_db_name) def get_user(self, user_name): return self.__find_document_by_name(self.__users_db_name, user_name) def update_user(self, user, max_tries=10): return self.__resilient_update_doc(self.__users_db_name, user, max_tries=max_tries) # SERVICES # def get_services(self): return self.__get_all_database_docs(self.__services_db_name) def get_service(self, service_name): return self.__find_document_by_name(self.__services_db_name, service_name) def add_service(self, service): return self.__add_doc(self.__services_db_name, service) def update_service(self, service): return self.__resilient_update_doc(self.__services_db_name, service) def delete_service(self, service): return self.__resilient_delete_doc(self.__services_db_name, service)
Class variables
var post_doc_headers
Methods
def add_event(self, event)
-
Expand source code
def add_event(self, event): self.__add_doc(self.__events_db_name, event)
def add_events(self, events)
-
Expand source code
def add_events(self, events): self.__add_bulk_docs(self.__events_db_name, events)
def add_limit(self, limit)
-
Expand source code
def add_limit(self, limit): return self.__add_doc(self.__limits_db_name, limit)
def add_request(self, req)
-
Expand source code
def add_request(self, req): self.__add_doc(self.__requests_db_name, req)
def add_requests(self, reqs)
-
Expand source code
def add_requests(self, reqs): self.__add_bulk_docs(self.__requests_db_name, reqs)
def add_rule(self, rule)
-
Expand source code
def add_rule(self, rule): return self.__add_doc(self.__rules_db_name, rule)
def add_service(self, service)
-
Expand source code
def add_service(self, service): return self.__add_doc(self.__services_db_name, service)
def add_structure(self, structure)
-
Expand source code
def add_structure(self, structure): return self.__add_doc(self.__structures_db_name, structure)
def add_user(self, user)
-
Expand source code
def add_user(self, user): return self.__add_doc(self.__users_db_name, user)
def close_connection(self)
-
Expand source code
def close_connection(self): self.session.close()
def compact_database(self, database)
-
Expand source code
def compact_database(self, database): r = self.session.post(self.server + "/" + database + "/_compact", headers=self.post_doc_headers) if r.status_code != 202: r.raise_for_status() else: return json.loads(r.text)["ok"]
def create_database(self, database)
-
Expand source code
def create_database(self, database): r = self.session.put(self.server + "/" + database) if r.status_code != 201: r.raise_for_status() else: return True
def database_exists(self, database)
-
Expand source code
def database_exists(self, database): r = self.session.head(self.server + "/" + database) return r.status_code == 200
def delete_event(self, event)
-
Expand source code
def delete_event(self, event): self.__resilient_delete_doc(self.__events_db_name, event)
def delete_events(self, events)
-
Expand source code
def delete_events(self, events): self.__delete_bulk_docs(self.__events_db_name, events)
def delete_num_events_by_structure(self, structure, event_name, event_num)
-
Expand source code
def delete_num_events_by_structure(self, structure, event_name, event_num): events = self.__find_documents_by_matches(self.__events_db_name, {"structure": structure["name"], "name": event_name}) event_num = min(len(events), event_num) events_to_delete = events[0:event_num] self.__delete_bulk_docs(self.__events_db_name, events_to_delete)
def delete_request(self, request)
-
Expand source code
def delete_request(self, request): self.__resilient_delete_doc(self.__requests_db_name, request)
def delete_requests(self, requests)
-
Expand source code
def delete_requests(self, requests): self.__delete_bulk_docs(self.__requests_db_name, requests)
def delete_service(self, service)
-
Expand source code
def delete_service(self, service): return self.__resilient_delete_doc(self.__services_db_name, service)
def delete_structure(self, structure)
-
Expand source code
def delete_structure(self, structure): self.__resilient_delete_doc(self.__structures_db_name, structure)
def get_all_limits(self)
-
Expand source code
def get_all_limits(self): return self.__get_all_database_docs(self.__limits_db_name)
def get_events(self, structure)
-
Expand source code
def get_events(self, structure): return self.__find_documents_by_matches(self.__events_db_name, {"structure": structure["name"]})
def get_limits(self, structure)
-
Expand source code
def get_limits(self, structure): # Return just the first item, as it should only be one 'limits' document, otherwise raise error limits = self.__find_documents_by_matches(self.__limits_db_name, {"name": structure["name"]}) if not limits: raise ValueError("Structure with name {0} has no limits".format(structure["name"])) else: return limits[0]
def get_requests(self, structure=None)
-
Expand source code
def get_requests(self, structure=None): if structure is None: return self.__get_all_database_docs(self.__requests_db_name) else: return self.__find_documents_by_matches(self.__requests_db_name, {"structure": structure["name"]})
def get_rule(self, rule_name)
-
Expand source code
def get_rule(self, rule_name): return self.__find_document_by_name(self.__rules_db_name, rule_name)
def get_rules(self)
-
Expand source code
def get_rules(self): return self.__get_all_database_docs(self.__rules_db_name)
def get_service(self, service_name)
-
Expand source code
def get_service(self, service_name): return self.__find_document_by_name(self.__services_db_name, service_name)
def get_services(self)
-
Expand source code
def get_services(self): return self.__get_all_database_docs(self.__services_db_name)
def get_structure(self, structure_name)
-
Expand source code
def get_structure(self, structure_name): return self.__find_document_by_name(self.__structures_db_name, structure_name)
def get_structures(self, subtype=None)
-
Expand source code
def get_structures(self, subtype=None): if subtype is None: return self.__get_all_database_docs(self.__structures_db_name) else: return self.__find_documents_by_matches(self.__structures_db_name, {"subtype": subtype})
def get_user(self, user_name)
-
Expand source code
def get_user(self, user_name): return self.__find_document_by_name(self.__users_db_name, user_name)
def get_users(self)
-
Expand source code
def get_users(self): return self.__get_all_database_docs(self.__users_db_name)
def remove_database(self, database)
-
Expand source code
def remove_database(self, database): r = self.session.delete(self.server + "/" + database) if r.status_code != 200: r.raise_for_status() else: return True
def set_database_name(self, database_type, database_name)
-
Expand source code
def set_database_name(self, database_type, database_name): if database_type == "structures": self.__structures_db_name = database_name elif database_type == "services": self.__services_db_name = database_name elif database_type == "limits": self.__limits_db_name = database_name elif database_type == "rules": self.__rules_db_name = database_name elif database_type == "events": self.__events_db_name = database_name elif database_type == "requests": self.__requests_db_name = database_name elif database_type == "profiles": self.__profiles_db_name = database_name else: pass
def update_limit(self, limit)
-
Expand source code
def update_limit(self, limit): return self.__resilient_update_doc(self.__limits_db_name, limit)
def update_rule(self, rule)
-
Expand source code
def update_rule(self, rule): return self.__resilient_update_doc(self.__rules_db_name, rule)
def update_service(self, service)
-
Expand source code
def update_service(self, service): return self.__resilient_update_doc(self.__services_db_name, service)
def update_structure(self, structure, max_tries=10)
-
Expand source code
def update_structure(self, structure, max_tries=10): return self.__resilient_update_doc(self.__structures_db_name, structure, max_tries=max_tries)
def update_user(self, user, max_tries=10)
-
Expand source code
def update_user(self, user, max_tries=10): return self.__resilient_update_doc(self.__users_db_name, user, max_tries=max_tries)