Module src.Snapshoters.StructuresSnapshoter
Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
# - Jonatan Enes [main](jonatan.enes@udc.es)
# - Roberto R. Expósito
# - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
from threading import Thread
import requests
import time
import traceback
import logging
import src.StateDatabase.couchdb as couchDB
from src.MyUtils.MyUtils import MyConfig, log_error, get_service, beat, log_info, \
update_structure, get_host_containers, get_structures, copy_structure_base, wait_operation_thread, log_warning
db_handler = couchDB.CouchDBServer()
rescaler_http_session = requests.Session()
translate_map = {
"cpu": {"metric": "structure.cpu.current", "limit_label": "effective_cpu_limit"},
"mem": {"metric": "structure.mem.current", "limit_label": "mem_limit"},
"disk": {"metric": "structure.disk.current", "limit_label": "disk_read_limit"}, # FIXME missing write value
"net": {"metric": "structure.net.current", "limit_label": "net_limit"}
}
SERVICE_NAME = "structures_snapshoter"
CONFIG_DEFAULT_VALUES = {"POLLING_FREQUENCY": 5, "DEBUG": True, "PERSIST_APPS": True, "RESOURCES_PERSISTED": ["cpu", "mem"], "ACTIVE": True}
MAX_FAIL_NUM = 5
debug = True
def update_container_current_values(container_name, resources):
# Remote database operation
database_structure = db_handler.get_structure(container_name)
structure = database_structure.copy()
if not "resources" in structure:
structure["resources"] = dict()
for resource in resources_persisted:
if resource not in structure["resources"]:
structure["resources"][resource] = dict()
if resource not in resources or not resources[resource]:
log_error("Unable to get info for resource {0} for container {1}".format(resource, container_name), debug)
structure["resources"][resource]["current"] = 0
else:
structure["resources"][resource]["current"] = resources[resource][translate_map[resource]["limit_label"]]
structure["resources"][resource]["current"] = int(structure["resources"][resource]["current"])
# Remote database operation
update_structure(structure, db_handler, debug)
def thread_persist_container(container, container_resources_dict):
container_name = container["name"]
# Try to get the container resources, if unavailable, continue with others
# Remote operation
# resources = MyUtils.get_container_resources(container, rescaler_http_session, debug)
resources = container_resources_dict[container_name]["resources"]
if not resources:
log_error("Couldn't get container's {0} resources".format(container_name), debug)
return
# Persist by updating the Database current value
update_container_current_values(container_name, resources)
def persist_containers(container_resources_dict):
# Try to get the containers, if unavailable, return
# Remote database operation
containers = get_structures(db_handler, debug, subtype="container")
if not containers:
return
# Retrieve each container resources, persist them and store them to generate host info
threads = []
for container in containers:
# Check that the document has been properly initialized, otherwise it might be overwritten with just
# the "current" value without possibility of correcting it
skip = False
for resource in resources_persisted:
if resource not in container["resources"] or "max" not in container["resources"][resource]:
log_error("Container {0} has not a proper config for the resource {1}".format(container["name"], resource), debug)
skip = True
if skip:
continue
process = Thread(target=thread_persist_container, args=(container, container_resources_dict,))
process.start()
threads.append(process)
for process in threads:
process.join()
def persist_applications(container_resources_dict):
# Try to get the applications, if unavailable, return
applications = get_structures(db_handler, debug, subtype="application")
if not applications:
return
# Generate the applications current resource values
for app in applications:
for resource in resources_persisted:
if resource not in app["resources"]:
log_error("Application {0} is missing info of resource {1}".format(app["name"], resource), debug)
else:
app["resources"][resource]["current"] = 0
application_containers = app["containers"]
for container_name in application_containers:
if container_name not in container_resources_dict:
log_error("Container info {0} is missing for app : {1}, app info will not be totally accurate".format(
container_name, app["name"]), debug)
continue
for resource in resources_persisted:
try:
container_resources = container_resources_dict[container_name]["resources"]
if resource not in container_resources or not container_resources[resource]:
log_error("Unable to get info for resource {0} for container {1} when computing app {2} resources".format(
resource, container_name, app["name"]), debug)
else:
current_resource_label = translate_map[resource]["limit_label"]
app["resources"][resource]["current"] += container_resources[resource][current_resource_label]
except KeyError:
if "name" in container_resources_dict[container_name] and "name" in app:
log_error("Container info {0} is missing for app: {1} and resource {2} resource,".format(
container_name, app["name"], resource) + " app info will not be totally accurate", debug)
# Remote database operation
update_structure(app, db_handler, debug)
def fill_container_dict(hosts_info, containers):
def host_info_request(h, d):
host_containers = get_host_containers(h["host_rescaler_ip"], h["host_rescaler_port"], rescaler_http_session, debug)
for container_name in host_containers:
if container_name in container_list_names:
d[container_name] = host_containers[container_name]
container_list_names = [c["name"] for c in containers]
container_info = dict()
threads = list()
for hostname in hosts_info:
host = hosts_info[hostname]
t = Thread(target=host_info_request, args=(host, container_info,))
t.start()
threads.append(t)
for t in threads:
t.join()
return container_info
def get_container_resources_dict():
# Remote database operation
containers = get_structures(db_handler, debug, subtype="container")
if not containers:
return
# Get all the different hosts of the containers
hosts_info = dict()
for container in containers:
host = container["host"]
if host not in hosts_info:
hosts_info[host] = dict()
hosts_info[host]["host_rescaler_ip"] = container["host_rescaler_ip"]
hosts_info[host]["host_rescaler_port"] = container["host_rescaler_port"]
# For each host, retrieve its containers and persist the ones we look for
container_info = fill_container_dict(hosts_info, containers)
container_resources_dict = dict()
for container in containers:
container_name = container["name"]
if container_name not in container_info:
log_warning("Container info for {0} not found, check that it is really living in its supposed host '{1}', and that "
"the host is alive and with the Node Scaler service running".format(container_name, container["host"]), debug)
continue
container_resources_dict[container_name] = container
container_resources_dict[container_name]["resources"] = container_info[container_name]
return container_resources_dict
def persist_thread():
t0 = time.time()
container_resources_dict = get_container_resources_dict()
t1 = time.time()
persist_applications(container_resources_dict)
t2 = time.time()
persist_containers(container_resources_dict)
t3 = time.time()
log_info("It took {0} seconds to get container info".format(str("%.2f" % (t1 - t0))), debug)
log_info("It took {0} seconds to snapshot applications".format(str("%.2f" % (t2 - t1))), debug)
log_info("It took {0} seconds to snapshot containers".format(str("%.2f" % (t3 - t2))), debug)
def invalid_conf(config):
# TODO Tiis code is duplicated on the structures and database snapshoters
for key, num in [("POLLING_FREQUENCY", config.get_value("POLLING_FREQUENCY"))]:
if num < 3:
return True, "Configuration item '{0}' with a value of '{1}' is likely invalid".format(key, num)
return False, ""
def persist():
logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)
global resources_persisted
global debug
myConfig = MyConfig(CONFIG_DEFAULT_VALUES)
while True:
log_info("----------------------", debug)
log_info("Starting Epoch", debug)
t0 = time.time()
# Get service info
service = get_service(db_handler, SERVICE_NAME) # Remote database operation
# Heartbeat
beat(db_handler, SERVICE_NAME) # Remote database operation
# CONFIG
myConfig.set_config(service["config"])
polling_frequency = myConfig.get_value("POLLING_FREQUENCY")
debug = myConfig.get_value("DEBUG")
resources_persisted = myConfig.get_value("RESOURCES_PERSISTED")
SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")
log_info("Going to snapshot resources: {0}".format(resources_persisted), debug)
log_info("Config is as follows:", debug)
log_info(".............................................", debug)
log_info("Polling frequency -> {0}".format(polling_frequency), debug)
log_info("Resources to be snapshoter are -> {0}".format(resources_persisted), debug)
log_info(".............................................", debug)
## CHECK INVALID CONFIG ##
# TODO This code is duplicated on the structures and database snapshoters
invalid, message = invalid_conf(myConfig)
if invalid:
log_error(message, debug)
time.sleep(polling_frequency)
if polling_frequency < 3:
log_error("Polling frequency is too short, replacing with DEFAULT value '{0}'".format(CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]), debug)
polling_frequency = CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]
log_info("----------------------\n", debug)
time.sleep(polling_frequency)
continue
thread = None
if SERVICE_IS_ACTIVATED:
thread = Thread(target=persist_thread, args=())
thread.start()
else:
log_warning("Structure snapshoter is not activated, will not do anything", debug)
time.sleep(polling_frequency)
wait_operation_thread(thread, debug)
t1 = time.time()
time_proc = "%.2f" % (t1 - t0 - polling_frequency)
time_total = "%.2f" % (t1 - t0)
log_info("Epoch processed in {0} seconds ({1} processing and {2} sleeping)".format(time_total, time_proc, str(polling_frequency)), debug)
log_info("----------------------\n", debug)
def main():
try:
persist()
except Exception as e:
log_error("{0} {1}".format(str(e), str(traceback.format_exc())), debug=True)
if __name__ == "__main__":
main()
Functions
def fill_container_dict(hosts_info, containers)
-
Expand source code
def fill_container_dict(hosts_info, containers): def host_info_request(h, d): host_containers = get_host_containers(h["host_rescaler_ip"], h["host_rescaler_port"], rescaler_http_session, debug) for container_name in host_containers: if container_name in container_list_names: d[container_name] = host_containers[container_name] container_list_names = [c["name"] for c in containers] container_info = dict() threads = list() for hostname in hosts_info: host = hosts_info[hostname] t = Thread(target=host_info_request, args=(host, container_info,)) t.start() threads.append(t) for t in threads: t.join() return container_info
def get_container_resources_dict()
-
Expand source code
def get_container_resources_dict(): # Remote database operation containers = get_structures(db_handler, debug, subtype="container") if not containers: return # Get all the different hosts of the containers hosts_info = dict() for container in containers: host = container["host"] if host not in hosts_info: hosts_info[host] = dict() hosts_info[host]["host_rescaler_ip"] = container["host_rescaler_ip"] hosts_info[host]["host_rescaler_port"] = container["host_rescaler_port"] # For each host, retrieve its containers and persist the ones we look for container_info = fill_container_dict(hosts_info, containers) container_resources_dict = dict() for container in containers: container_name = container["name"] if container_name not in container_info: log_warning("Container info for {0} not found, check that it is really living in its supposed host '{1}', and that " "the host is alive and with the Node Scaler service running".format(container_name, container["host"]), debug) continue container_resources_dict[container_name] = container container_resources_dict[container_name]["resources"] = container_info[container_name] return container_resources_dict
def invalid_conf(config)
-
Expand source code
def invalid_conf(config): # TODO Tiis code is duplicated on the structures and database snapshoters for key, num in [("POLLING_FREQUENCY", config.get_value("POLLING_FREQUENCY"))]: if num < 3: return True, "Configuration item '{0}' with a value of '{1}' is likely invalid".format(key, num) return False, ""
def main()
-
Expand source code
def main(): try: persist() except Exception as e: log_error("{0} {1}".format(str(e), str(traceback.format_exc())), debug=True)
def persist()
-
Expand source code
def persist(): logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO) global resources_persisted global debug myConfig = MyConfig(CONFIG_DEFAULT_VALUES) while True: log_info("----------------------", debug) log_info("Starting Epoch", debug) t0 = time.time() # Get service info service = get_service(db_handler, SERVICE_NAME) # Remote database operation # Heartbeat beat(db_handler, SERVICE_NAME) # Remote database operation # CONFIG myConfig.set_config(service["config"]) polling_frequency = myConfig.get_value("POLLING_FREQUENCY") debug = myConfig.get_value("DEBUG") resources_persisted = myConfig.get_value("RESOURCES_PERSISTED") SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE") log_info("Going to snapshot resources: {0}".format(resources_persisted), debug) log_info("Config is as follows:", debug) log_info(".............................................", debug) log_info("Polling frequency -> {0}".format(polling_frequency), debug) log_info("Resources to be snapshoter are -> {0}".format(resources_persisted), debug) log_info(".............................................", debug) ## CHECK INVALID CONFIG ## # TODO This code is duplicated on the structures and database snapshoters invalid, message = invalid_conf(myConfig) if invalid: log_error(message, debug) time.sleep(polling_frequency) if polling_frequency < 3: log_error("Polling frequency is too short, replacing with DEFAULT value '{0}'".format(CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]), debug) polling_frequency = CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"] log_info("----------------------\n", debug) time.sleep(polling_frequency) continue thread = None if SERVICE_IS_ACTIVATED: thread = Thread(target=persist_thread, args=()) thread.start() else: log_warning("Structure snapshoter is not activated, will not do anything", debug) time.sleep(polling_frequency) wait_operation_thread(thread, debug) t1 = time.time() time_proc = "%.2f" % (t1 - t0 - polling_frequency) time_total = "%.2f" % (t1 - t0) log_info("Epoch processed in {0} seconds ({1} processing and {2} sleeping)".format(time_total, time_proc, str(polling_frequency)), debug) log_info("----------------------\n", debug)
def persist_applications(container_resources_dict)
-
Expand source code
def persist_applications(container_resources_dict): # Try to get the applications, if unavailable, return applications = get_structures(db_handler, debug, subtype="application") if not applications: return # Generate the applications current resource values for app in applications: for resource in resources_persisted: if resource not in app["resources"]: log_error("Application {0} is missing info of resource {1}".format(app["name"], resource), debug) else: app["resources"][resource]["current"] = 0 application_containers = app["containers"] for container_name in application_containers: if container_name not in container_resources_dict: log_error("Container info {0} is missing for app : {1}, app info will not be totally accurate".format( container_name, app["name"]), debug) continue for resource in resources_persisted: try: container_resources = container_resources_dict[container_name]["resources"] if resource not in container_resources or not container_resources[resource]: log_error("Unable to get info for resource {0} for container {1} when computing app {2} resources".format( resource, container_name, app["name"]), debug) else: current_resource_label = translate_map[resource]["limit_label"] app["resources"][resource]["current"] += container_resources[resource][current_resource_label] except KeyError: if "name" in container_resources_dict[container_name] and "name" in app: log_error("Container info {0} is missing for app: {1} and resource {2} resource,".format( container_name, app["name"], resource) + " app info will not be totally accurate", debug) # Remote database operation update_structure(app, db_handler, debug)
def persist_containers(container_resources_dict)
-
Expand source code
def persist_containers(container_resources_dict): # Try to get the containers, if unavailable, return # Remote database operation containers = get_structures(db_handler, debug, subtype="container") if not containers: return # Retrieve each container resources, persist them and store them to generate host info threads = [] for container in containers: # Check that the document has been properly initialized, otherwise it might be overwritten with just # the "current" value without possibility of correcting it skip = False for resource in resources_persisted: if resource not in container["resources"] or "max" not in container["resources"][resource]: log_error("Container {0} has not a proper config for the resource {1}".format(container["name"], resource), debug) skip = True if skip: continue process = Thread(target=thread_persist_container, args=(container, container_resources_dict,)) process.start() threads.append(process) for process in threads: process.join()
def persist_thread()
-
Expand source code
def persist_thread(): t0 = time.time() container_resources_dict = get_container_resources_dict() t1 = time.time() persist_applications(container_resources_dict) t2 = time.time() persist_containers(container_resources_dict) t3 = time.time() log_info("It took {0} seconds to get container info".format(str("%.2f" % (t1 - t0))), debug) log_info("It took {0} seconds to snapshot applications".format(str("%.2f" % (t2 - t1))), debug) log_info("It took {0} seconds to snapshot containers".format(str("%.2f" % (t3 - t2))), debug)
def thread_persist_container(container, container_resources_dict)
-
Expand source code
def thread_persist_container(container, container_resources_dict): container_name = container["name"] # Try to get the container resources, if unavailable, continue with others # Remote operation # resources = MyUtils.get_container_resources(container, rescaler_http_session, debug) resources = container_resources_dict[container_name]["resources"] if not resources: log_error("Couldn't get container's {0} resources".format(container_name), debug) return # Persist by updating the Database current value update_container_current_values(container_name, resources)
def update_container_current_values(container_name, resources)
-
Expand source code
def update_container_current_values(container_name, resources): # Remote database operation database_structure = db_handler.get_structure(container_name) structure = database_structure.copy() if not "resources" in structure: structure["resources"] = dict() for resource in resources_persisted: if resource not in structure["resources"]: structure["resources"][resource] = dict() if resource not in resources or not resources[resource]: log_error("Unable to get info for resource {0} for container {1}".format(resource, container_name), debug) structure["resources"][resource]["current"] = 0 else: structure["resources"][resource]["current"] = resources[resource][translate_map[resource]["limit_label"]] structure["resources"][resource]["current"] = int(structure["resources"][resource]["current"]) # Remote database operation update_structure(structure, db_handler, debug)