Module src.MyUtils.MyUtils
Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
# - Jonatan Enes [main](jonatan.enes@udc.es)
# - Roberto R. Expósito
# - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import random
import time
import logging
import sys
import requests
import traceback
from termcolor import colored
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
# DON'T NEED TO TEST
def resilient_beat(db_handler, service_name, max_tries=10):
try:
service = db_handler.get_service(service_name)
service["heartbeat_human"] = time.strftime("%D %H:%M:%S", time.localtime())
service["heartbeat"] = time.time()
db_handler.update_service(service)
except (requests.exceptions.HTTPError, ValueError) as e:
if max_tries > 0:
time.sleep((1000 + random.randint(1, 200)) / 1000)
resilient_beat(db_handler, service_name, max_tries - 1)
else:
raise e
# DON'T NEED TO TEST
def beat(db_handler, service_name):
resilient_beat(db_handler, service_name, max_tries=5)
class MyConfig:
DEFAULTS_CONFIG = None
config = None
def __init__(self, DEFAULTS_CONFIG):
self.DEFAULTS_CONFIG = DEFAULTS_CONFIG
def get_config(self):
return self.config
def set_config(self, config):
self.config = config
def get_value(self, key):
try:
return self.config[key]
except KeyError:
return self.DEFAULTS_CONFIG[key]
def set_value(self, key, value):
self.config[key] = value
# DON'T NEED TO TEST
def get_config_value(config, default_config, key):
try:
return config[key]
except KeyError:
return default_config[key]
# DON'T NEED TO TEST
def log_info(message, debug):
logging.info(message)
if debug:
print("[{0}] INFO: {1}".format(get_time_now_string(), message))
# DON'T NEED TO TEST
def log_warning(message, debug):
logging.warning(message)
if debug:
print(colored("[{0}] WARN: {1}".format(get_time_now_string(), message), "yellow"))
# DON'T NEED TO TEST
def log_error(message, debug):
logging.error(message)
if debug:
print(colored("[{0}] ERROR: {1}".format(get_time_now_string(), message), "red"))
# DON'T NEED TO TEST
def get_time_now_string():
return str(time.strftime("%H:%M:%S", time.localtime()))
def get_host_containers(container_host_ip, container_host_port, rescaler_http_session, debug):
try:
full_address = "http://{0}:{1}/container/".format(container_host_ip, container_host_port)
r = rescaler_http_session.get(full_address, headers={'Accept': 'application/json'})
if r.status_code == 200:
return dict(r.json())
else:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
log_error(
"Error trying to get container info {0} {1}".format(str(e), traceback.format_exc()),
debug)
return None
# CAN'T TEST
def get_container_resources(container, rescaler_http_session, debug):
container_name = container["name"]
try:
container_host_ip = container["host_rescaler_ip"]
container_host_port = container["host_rescaler_port"]
full_address = "http://{0}:{1}/container/{2}".format(container_host_ip, container_host_port, container_name)
r = rescaler_http_session.get(full_address, headers={'Accept': 'application/json'})
if r.status_code == 200:
return dict(r.json())
else:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
log_error(
"Error trying to get container {0} info {1} {2}".format(container_name, str(e), traceback.format_exc()),
debug)
return None
# CAN'T TEST
def register_service(db_handler, service):
try:
existing_service = db_handler.get_service(service["name"])
# Service is registered, remove it
db_handler.delete_service(existing_service)
except ValueError:
# Service is not registered, everything is fine
pass
db_handler.add_service(service)
# CAN'T TEST
def get_service(db_handler, service_name, max_allowed_failures=10, time_backoff_seconds=2):
fails = 0
success = False
service = None
# Get service info
while not success:
try:
service = db_handler.get_service(service_name)
success = True
except (requests.exceptions.HTTPError, ValueError):
# An error might have been thrown because database was recently updated or created
# try again up to a maximum number of retries
fails += 1
if fails >= max_allowed_failures:
message = "Fatal error, couldn't retrieve service."
log_error(message, True)
raise Exception(message)
else:
time.sleep(time_backoff_seconds)
if not service or "config" not in service:
message = "Fatal error, couldn't retrieve service configuration."
log_error(message, True)
raise Exception(message)
return service
# TESTED
# Tranlsate something like '2-5,7' to [2,3,4,7]
def get_cpu_list(cpu_num_string):
cpu_list = list()
parts = cpu_num_string.split(",")
for part in parts:
ranges = part.split("-")
if len(ranges) == 1:
# Single core, no range (e.g., '5')
cpu_list.append(ranges[0])
else:
# Range (e.g., '4-7' -> 4,5,6)
cpu_list += range(int(ranges[0]), int(ranges[-1]) + 1)
return [str(i) for i in cpu_list]
def copy_structure_base(structure):
keys_to_copy = ["_id", "_rev", "type", "subtype", "name"]
# TODO FIX, some structures types have specific fields, fix accordingly
if structure["subtype"] == "container":
keys_to_copy.append("host")
new_struct = dict()
for key in keys_to_copy:
new_struct[key] = structure[key]
return new_struct
def valid_resource(resource):
if resource not in ["cpu", "mem", "disk", "net", "energy"]:
return False
else:
return True
# DON'T NEED TO TEST
def get_resource(structure, resource):
return structure["resources"][resource]
# CAN'T TEST
def update_structure(structure, db_handler, debug, max_tries=10):
try:
db_handler.update_structure(structure, max_tries=max_tries)
log_info("{0} {1} -> updated".format(structure["subtype"].capitalize(), structure["name"]), debug)
except requests.exceptions.HTTPError:
log_error("Error updating container " + structure["name"] + " " + traceback.format_exc(), debug)
def update_user(user, db_handler, debug, max_tries=10):
try:
db_handler.update_user(user, max_tries=max_tries)
log_info("User {0} -> updated".format(user["name"]), debug)
except requests.exceptions.HTTPError:
log_error("Error updating user " + user["name"] + " " + traceback.format_exc(), debug)
# CAN'T TEST
def get_structures(db_handler, debug, subtype="application"):
try:
return db_handler.get_structures(subtype=subtype)
except (requests.exceptions.HTTPError, ValueError):
log_warning("Couldn't retrieve " + subtype + " info.", debug=debug)
return None
def start_epoch(debug):
log_info("----------------------", debug)
log_info("Starting Epoch", debug)
return time.time()
def end_epoch(debug, window_difference, t0):
t1 = time.time()
time_proc = "%.2f" % (t1 - t0 - window_difference)
time_total = "%.2f" % (t1 - t0)
log_info("Epoch processed in {0} seconds ({1} processing and {2} sleeping)".format(time_total, time_proc, str(window_difference)), debug)
log_info("----------------------\n", debug)
def wait_operation_thread(thread, debug):
"""This is used in services like the snapshoters or the Guardian that use threads to carry out operations.
A main thread is launched that spawns the needed threads to carry out the operations. The service waits for this
thread to finish.
Args:
thread (Python Thread): The thread that has spawned the basic threads that carry out operations as needed
"""
if thread and thread.is_alive():
log_warning("Previous thread didn't finish and next poll should start now", debug)
log_warning("Going to wait until thread finishes before proceeding", debug)
delay_start = time.time()
thread.join()
delay_end = time.time()
log_warning("Resulting delay of: {0} seconds".format(str(delay_end - delay_start)), debug)
# TESTED
def generate_request_name(amount, resource):
if amount == 0:
raise ValueError("Amount is zero")
elif amount is None:
raise ValueError("Amount is missing")
if int(amount) < 0:
return resource.title() + "RescaleDown"
elif int(amount) > 0:
return resource.title() + "RescaleUp"
else:
raise ValueError("Invalid amount")
def structure_is_application(structure):
return structure["subtype"] == "application"
def structure_is_container(structure):
return structure["subtype"] == "container"
# TESTED
def generate_event_name(event, resource):
if "scale" not in event:
raise ValueError("Missing 'scale' key")
if "up" not in event["scale"] and "down" not in event["scale"]:
raise ValueError("Must have an 'up' or 'down count")
elif "up" in event["scale"] and event["scale"]["up"] > 0 \
and "down" in event["scale"] and event["scale"]["down"] > 0:
# SPECIAL CASE OF HEAVY HYSTERESIS
# raise ValueError("HYSTERESIS detected -> Can't have both up and down counts")
if event["scale"]["up"] > event["scale"]["down"]:
final_string = resource.title() + "Bottleneck"
else:
final_string = resource.title() + "Underuse"
elif "down" in event["scale"] and event["scale"]["down"] > 0:
final_string = resource.title() + "Underuse"
elif "up" in event["scale"] and event["scale"]["up"] > 0:
final_string = resource.title() + "Bottleneck"
else:
raise ValueError("Error generating event name")
return final_string
def generate_structure_usage_metric(resource):
return "structure.{0}.usage".format(resource)
Functions
def beat(db_handler, service_name)
-
Expand source code
def beat(db_handler, service_name): resilient_beat(db_handler, service_name, max_tries=5)
def copy_structure_base(structure)
-
Expand source code
def copy_structure_base(structure): keys_to_copy = ["_id", "_rev", "type", "subtype", "name"] # TODO FIX, some structures types have specific fields, fix accordingly if structure["subtype"] == "container": keys_to_copy.append("host") new_struct = dict() for key in keys_to_copy: new_struct[key] = structure[key] return new_struct
def end_epoch(debug, window_difference, t0)
-
Expand source code
def end_epoch(debug, window_difference, t0): t1 = time.time() time_proc = "%.2f" % (t1 - t0 - window_difference) time_total = "%.2f" % (t1 - t0) log_info("Epoch processed in {0} seconds ({1} processing and {2} sleeping)".format(time_total, time_proc, str(window_difference)), debug) log_info("----------------------\n", debug)
def eprint(*args, **kwargs)
-
Expand source code
def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
def generate_event_name(event, resource)
-
Expand source code
def generate_event_name(event, resource): if "scale" not in event: raise ValueError("Missing 'scale' key") if "up" not in event["scale"] and "down" not in event["scale"]: raise ValueError("Must have an 'up' or 'down count") elif "up" in event["scale"] and event["scale"]["up"] > 0 \ and "down" in event["scale"] and event["scale"]["down"] > 0: # SPECIAL CASE OF HEAVY HYSTERESIS # raise ValueError("HYSTERESIS detected -> Can't have both up and down counts") if event["scale"]["up"] > event["scale"]["down"]: final_string = resource.title() + "Bottleneck" else: final_string = resource.title() + "Underuse" elif "down" in event["scale"] and event["scale"]["down"] > 0: final_string = resource.title() + "Underuse" elif "up" in event["scale"] and event["scale"]["up"] > 0: final_string = resource.title() + "Bottleneck" else: raise ValueError("Error generating event name") return final_string
def generate_request_name(amount, resource)
-
Expand source code
def generate_request_name(amount, resource): if amount == 0: raise ValueError("Amount is zero") elif amount is None: raise ValueError("Amount is missing") if int(amount) < 0: return resource.title() + "RescaleDown" elif int(amount) > 0: return resource.title() + "RescaleUp" else: raise ValueError("Invalid amount")
def generate_structure_usage_metric(resource)
-
Expand source code
def generate_structure_usage_metric(resource): return "structure.{0}.usage".format(resource)
def get_config_value(config, default_config, key)
-
Expand source code
def get_config_value(config, default_config, key): try: return config[key] except KeyError: return default_config[key]
def get_container_resources(container, rescaler_http_session, debug)
-
Expand source code
def get_container_resources(container, rescaler_http_session, debug): container_name = container["name"] try: container_host_ip = container["host_rescaler_ip"] container_host_port = container["host_rescaler_port"] full_address = "http://{0}:{1}/container/{2}".format(container_host_ip, container_host_port, container_name) r = rescaler_http_session.get(full_address, headers={'Accept': 'application/json'}) if r.status_code == 200: return dict(r.json()) else: r.raise_for_status() except requests.exceptions.HTTPError as e: log_error( "Error trying to get container {0} info {1} {2}".format(container_name, str(e), traceback.format_exc()), debug) return None
def get_cpu_list(cpu_num_string)
-
Expand source code
def get_cpu_list(cpu_num_string): cpu_list = list() parts = cpu_num_string.split(",") for part in parts: ranges = part.split("-") if len(ranges) == 1: # Single core, no range (e.g., '5') cpu_list.append(ranges[0]) else: # Range (e.g., '4-7' -> 4,5,6) cpu_list += range(int(ranges[0]), int(ranges[-1]) + 1) return [str(i) for i in cpu_list]
def get_host_containers(container_host_ip, container_host_port, rescaler_http_session, debug)
-
Expand source code
def get_host_containers(container_host_ip, container_host_port, rescaler_http_session, debug): try: full_address = "http://{0}:{1}/container/".format(container_host_ip, container_host_port) r = rescaler_http_session.get(full_address, headers={'Accept': 'application/json'}) if r.status_code == 200: return dict(r.json()) else: r.raise_for_status() except requests.exceptions.HTTPError as e: log_error( "Error trying to get container info {0} {1}".format(str(e), traceback.format_exc()), debug) return None
def get_resource(structure, resource)
-
Expand source code
def get_resource(structure, resource): return structure["resources"][resource]
def get_service(db_handler, service_name, max_allowed_failures=10, time_backoff_seconds=2)
-
Expand source code
def get_service(db_handler, service_name, max_allowed_failures=10, time_backoff_seconds=2): fails = 0 success = False service = None # Get service info while not success: try: service = db_handler.get_service(service_name) success = True except (requests.exceptions.HTTPError, ValueError): # An error might have been thrown because database was recently updated or created # try again up to a maximum number of retries fails += 1 if fails >= max_allowed_failures: message = "Fatal error, couldn't retrieve service." log_error(message, True) raise Exception(message) else: time.sleep(time_backoff_seconds) if not service or "config" not in service: message = "Fatal error, couldn't retrieve service configuration." log_error(message, True) raise Exception(message) return service
def get_structures(db_handler, debug, subtype='application')
-
Expand source code
def get_structures(db_handler, debug, subtype="application"): try: return db_handler.get_structures(subtype=subtype) except (requests.exceptions.HTTPError, ValueError): log_warning("Couldn't retrieve " + subtype + " info.", debug=debug) return None
def get_time_now_string()
-
Expand source code
def get_time_now_string(): return str(time.strftime("%H:%M:%S", time.localtime()))
def log_error(message, debug)
-
Expand source code
def log_error(message, debug): logging.error(message) if debug: print(colored("[{0}] ERROR: {1}".format(get_time_now_string(), message), "red"))
def log_info(message, debug)
-
Expand source code
def log_info(message, debug): logging.info(message) if debug: print("[{0}] INFO: {1}".format(get_time_now_string(), message))
def log_warning(message, debug)
-
Expand source code
def log_warning(message, debug): logging.warning(message) if debug: print(colored("[{0}] WARN: {1}".format(get_time_now_string(), message), "yellow"))
def register_service(db_handler, service)
-
Expand source code
def register_service(db_handler, service): try: existing_service = db_handler.get_service(service["name"]) # Service is registered, remove it db_handler.delete_service(existing_service) except ValueError: # Service is not registered, everything is fine pass db_handler.add_service(service)
def resilient_beat(db_handler, service_name, max_tries=10)
-
Expand source code
def resilient_beat(db_handler, service_name, max_tries=10): try: service = db_handler.get_service(service_name) service["heartbeat_human"] = time.strftime("%D %H:%M:%S", time.localtime()) service["heartbeat"] = time.time() db_handler.update_service(service) except (requests.exceptions.HTTPError, ValueError) as e: if max_tries > 0: time.sleep((1000 + random.randint(1, 200)) / 1000) resilient_beat(db_handler, service_name, max_tries - 1) else: raise e
def start_epoch(debug)
-
Expand source code
def start_epoch(debug): log_info("----------------------", debug) log_info("Starting Epoch", debug) return time.time()
def structure_is_application(structure)
-
Expand source code
def structure_is_application(structure): return structure["subtype"] == "application"
def structure_is_container(structure)
-
Expand source code
def structure_is_container(structure): return structure["subtype"] == "container"
def update_structure(structure, db_handler, debug, max_tries=10)
-
Expand source code
def update_structure(structure, db_handler, debug, max_tries=10): try: db_handler.update_structure(structure, max_tries=max_tries) log_info("{0} {1} -> updated".format(structure["subtype"].capitalize(), structure["name"]), debug) except requests.exceptions.HTTPError: log_error("Error updating container " + structure["name"] + " " + traceback.format_exc(), debug)
def update_user(user, db_handler, debug, max_tries=10)
-
Expand source code
def update_user(user, db_handler, debug, max_tries=10): try: db_handler.update_user(user, max_tries=max_tries) log_info("User {0} -> updated".format(user["name"]), debug) except requests.exceptions.HTTPError: log_error("Error updating user " + user["name"] + " " + traceback.format_exc(), debug)
def valid_resource(resource)
-
Expand source code
def valid_resource(resource): if resource not in ["cpu", "mem", "disk", "net", "energy"]: return False else: return True
def wait_operation_thread(thread, debug)
-
This is used in services like the snapshoters or the Guardian that use threads to carry out operations. A main thread is launched that spawns the needed threads to carry out the operations. The service waits for this thread to finish.
Args
thread
:Python Thread
- The thread that has spawned the basic threads that carry out operations as needed
Expand source code
def wait_operation_thread(thread, debug): """This is used in services like the snapshoters or the Guardian that use threads to carry out operations. A main thread is launched that spawns the needed threads to carry out the operations. The service waits for this thread to finish. Args: thread (Python Thread): The thread that has spawned the basic threads that carry out operations as needed """ if thread and thread.is_alive(): log_warning("Previous thread didn't finish and next poll should start now", debug) log_warning("Going to wait until thread finishes before proceeding", debug) delay_start = time.time() thread.join() delay_end = time.time() log_warning("Resulting delay of: {0} seconds".format(str(delay_end - delay_start)), debug)
Classes
class MyConfig (DEFAULTS_CONFIG)
-
Expand source code
class MyConfig: DEFAULTS_CONFIG = None config = None def __init__(self, DEFAULTS_CONFIG): self.DEFAULTS_CONFIG = DEFAULTS_CONFIG def get_config(self): return self.config def set_config(self, config): self.config = config def get_value(self, key): try: return self.config[key] except KeyError: return self.DEFAULTS_CONFIG[key] def set_value(self, key, value): self.config[key] = value
Class variables
var DEFAULTS_CONFIG
var config
Methods
def get_config(self)
-
Expand source code
def get_config(self): return self.config
def get_value(self, key)
-
Expand source code
def get_value(self, key): try: return self.config[key] except KeyError: return self.DEFAULTS_CONFIG[key]
def set_config(self, config)
-
Expand source code
def set_config(self, config): self.config = config
def set_value(self, key, value)
-
Expand source code
def set_value(self, key, value): self.config[key] = value