Module src.StateDatabase.opentsdb
Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
# - Jonatan Enes [main](jonatan.enes@udc.es)
# - Roberto R. Expósito
# - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.
import time
import json
import requests
import gzip
import io
from requests import ReadTimeout
class OpenTSDBServer:
__OPENTSDB_URL = "opentsdb"
__OPENTSDB_PORT = 4242
NO_METRIC_DATA_DEFAULT_VALUE = 0 # -1 # TODO this should be set to -1 and everything should work
__TIMEOUT = 5
def __init__(self, opentsdb_url=None, opentsdb_port=None):
if not opentsdb_url:
opentsdb_url = self.__OPENTSDB_URL
if not opentsdb_port:
opentsdb_port = self.__OPENTSDB_PORT
else:
try:
opentsdb_port = int(opentsdb_port)
except ValueError:
opentsdb_port = self.__OPENTSDB_PORT
self.server = "http://{0}:{1}".format(opentsdb_url, str(opentsdb_port))
self.session = requests.Session()
def close_connection(self):
self.session.close()
def send_json_documents(self, json_documents):
headers = {"Content-Type": "application/json", "Content-Encoding": "gzip"}
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write(json.dumps(json_documents).encode())
try:
r = self.session.post("{0}/{1}".format(self.server, "api/put"), headers=headers, data=out.getvalue(),
timeout=self.__TIMEOUT)
if r.status_code != 204:
return False, {"error": r.json()}
else:
return True, {}
except ReadTimeout:
return False, {"error": "Server timeout"}
except Exception as e:
return False, {"error": str(e)}
def get_points(self, query, tries=3):
try:
r = self.session.post("{0}/{1}".format(self.server, "api/query"), data=json.dumps(query),
headers={'content-type': 'application/json', 'Accept': 'application/json'})
if r.status_code == 200:
return json.loads(r.text)
elif r.status_code == 400:
error_message = json.loads(r.content)["error"]["message"]
if "No such name for 'tagv'" in error_message:
return {}
else:
r.raise_for_status()
except requests.ConnectionError as e:
tries -= 1
if tries <= 0:
raise e
else:
self.get_points(query, tries)
def get_structure_timeseries(self, tags, window_difference, window_delay, retrieve_metrics, generate_metrics, downsample=5):
usages = dict()
subquery = list()
for metric in retrieve_metrics:
usages[metric] = self.NO_METRIC_DATA_DEFAULT_VALUE
subquery.append(dict(aggregator='zimsum', metric=metric, tags=tags, downsample=str(downsample) + "s-avg"))
start = int(time.time() - (window_difference + window_delay))
end = int(time.time() - window_delay)
query = dict(start=start, end=end, queries=subquery)
result = self.get_points(query)
if result:
for metric in result:
dps = metric["dps"]
summatory = sum(dps.values())
if len(dps) > 0:
average_real = summatory / len(dps)
else:
average_real = 0
usages[metric["metric"]] = average_real
final_values = dict()
for value in generate_metrics:
final_values[value] = self.NO_METRIC_DATA_DEFAULT_VALUE
for metric in generate_metrics[value]:
if metric in usages and usages[metric] != self.NO_METRIC_DATA_DEFAULT_VALUE:
final_values[value] += usages[metric]
return final_values
Classes
class OpenTSDBServer (opentsdb_url=None, opentsdb_port=None)
-
Expand source code
class OpenTSDBServer: __OPENTSDB_URL = "opentsdb" __OPENTSDB_PORT = 4242 NO_METRIC_DATA_DEFAULT_VALUE = 0 # -1 # TODO this should be set to -1 and everything should work __TIMEOUT = 5 def __init__(self, opentsdb_url=None, opentsdb_port=None): if not opentsdb_url: opentsdb_url = self.__OPENTSDB_URL if not opentsdb_port: opentsdb_port = self.__OPENTSDB_PORT else: try: opentsdb_port = int(opentsdb_port) except ValueError: opentsdb_port = self.__OPENTSDB_PORT self.server = "http://{0}:{1}".format(opentsdb_url, str(opentsdb_port)) self.session = requests.Session() def close_connection(self): self.session.close() def send_json_documents(self, json_documents): headers = {"Content-Type": "application/json", "Content-Encoding": "gzip"} out = io.BytesIO() with gzip.GzipFile(fileobj=out, mode="w") as f: f.write(json.dumps(json_documents).encode()) try: r = self.session.post("{0}/{1}".format(self.server, "api/put"), headers=headers, data=out.getvalue(), timeout=self.__TIMEOUT) if r.status_code != 204: return False, {"error": r.json()} else: return True, {} except ReadTimeout: return False, {"error": "Server timeout"} except Exception as e: return False, {"error": str(e)} def get_points(self, query, tries=3): try: r = self.session.post("{0}/{1}".format(self.server, "api/query"), data=json.dumps(query), headers={'content-type': 'application/json', 'Accept': 'application/json'}) if r.status_code == 200: return json.loads(r.text) elif r.status_code == 400: error_message = json.loads(r.content)["error"]["message"] if "No such name for 'tagv'" in error_message: return {} else: r.raise_for_status() except requests.ConnectionError as e: tries -= 1 if tries <= 0: raise e else: self.get_points(query, tries) def get_structure_timeseries(self, tags, window_difference, window_delay, retrieve_metrics, generate_metrics, downsample=5): usages = dict() subquery = list() for metric in retrieve_metrics: usages[metric] = self.NO_METRIC_DATA_DEFAULT_VALUE subquery.append(dict(aggregator='zimsum', metric=metric, tags=tags, downsample=str(downsample) + "s-avg")) start = int(time.time() - (window_difference + window_delay)) end = int(time.time() - window_delay) query = dict(start=start, end=end, queries=subquery) result = self.get_points(query) if result: for metric in result: dps = metric["dps"] summatory = sum(dps.values()) if len(dps) > 0: average_real = summatory / len(dps) else: average_real = 0 usages[metric["metric"]] = average_real final_values = dict() for value in generate_metrics: final_values[value] = self.NO_METRIC_DATA_DEFAULT_VALUE for metric in generate_metrics[value]: if metric in usages and usages[metric] != self.NO_METRIC_DATA_DEFAULT_VALUE: final_values[value] += usages[metric] return final_values
Class variables
var NO_METRIC_DATA_DEFAULT_VALUE
Methods
def close_connection(self)
-
Expand source code
def close_connection(self): self.session.close()
def get_points(self, query, tries=3)
-
Expand source code
def get_points(self, query, tries=3): try: r = self.session.post("{0}/{1}".format(self.server, "api/query"), data=json.dumps(query), headers={'content-type': 'application/json', 'Accept': 'application/json'}) if r.status_code == 200: return json.loads(r.text) elif r.status_code == 400: error_message = json.loads(r.content)["error"]["message"] if "No such name for 'tagv'" in error_message: return {} else: r.raise_for_status() except requests.ConnectionError as e: tries -= 1 if tries <= 0: raise e else: self.get_points(query, tries)
def get_structure_timeseries(self, tags, window_difference, window_delay, retrieve_metrics, generate_metrics, downsample=5)
-
Expand source code
def get_structure_timeseries(self, tags, window_difference, window_delay, retrieve_metrics, generate_metrics, downsample=5): usages = dict() subquery = list() for metric in retrieve_metrics: usages[metric] = self.NO_METRIC_DATA_DEFAULT_VALUE subquery.append(dict(aggregator='zimsum', metric=metric, tags=tags, downsample=str(downsample) + "s-avg")) start = int(time.time() - (window_difference + window_delay)) end = int(time.time() - window_delay) query = dict(start=start, end=end, queries=subquery) result = self.get_points(query) if result: for metric in result: dps = metric["dps"] summatory = sum(dps.values()) if len(dps) > 0: average_real = summatory / len(dps) else: average_real = 0 usages[metric["metric"]] = average_real final_values = dict() for value in generate_metrics: final_values[value] = self.NO_METRIC_DATA_DEFAULT_VALUE for metric in generate_metrics[value]: if metric in usages and usages[metric] != self.NO_METRIC_DATA_DEFAULT_VALUE: final_values[value] += usages[metric] return final_values
def send_json_documents(self, json_documents)
-
Expand source code
def send_json_documents(self, json_documents): headers = {"Content-Type": "application/json", "Content-Encoding": "gzip"} out = io.BytesIO() with gzip.GzipFile(fileobj=out, mode="w") as f: f.write(json.dumps(json_documents).encode()) try: r = self.session.post("{0}/{1}".format(self.server, "api/put"), headers=headers, data=out.getvalue(), timeout=self.__TIMEOUT) if r.status_code != 204: return False, {"error": r.json()} else: return True, {} except ReadTimeout: return False, {"error": "Server timeout"} except Exception as e: return False, {"error": str(e)}