Module src.Orchestrator.structures
Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
# - Jonatan Enes [main](jonatan.enes@udc.es)
# - Roberto R. Expósito
# - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.
import requests
from flask import Blueprint
from flask import abort
from flask import jsonify
from flask import request
import time
import src.Scaler.Scaler
from src.MyUtils import MyUtils
from src.MyUtils.MyUtils import valid_resource, get_host_containers
from src.Orchestrator.utils import get_db, BACK_OFF_TIME_MS, MAX_TRIES
from src.Scaler import Scaler
structure_routes = Blueprint('structures', __name__)
def retrieve_structure(structure_name):
try:
return get_db().get_structure(structure_name)
except ValueError:
return abort(404)
@structure_routes.route("/structure/", methods=['GET'])
def get_structures():
return jsonify(get_db().get_structures())
@structure_routes.route("/structure/<structure_name>", methods=['GET'])
def get_structure(structure_name):
return jsonify(retrieve_structure(structure_name))
@structure_routes.route("/structure/<structure_name>/resources", methods=['GET'])
def get_structure_resources(structure_name):
return jsonify(retrieve_structure(structure_name)["resources"])
@structure_routes.route("/structure/<structure_name>/resources/<resource>", methods=['GET'])
def get_structure_resource(structure_name, resource):
try:
return jsonify(retrieve_structure(structure_name)["resources"][resource])
except KeyError:
return abort(404)
@structure_routes.route("/structure/<structure_name>/resources/<resource>/<parameter>", methods=['GET'])
def get_structure_parameter_of_resource(structure_name, resource, parameter):
try:
return jsonify(retrieve_structure(structure_name)["resources"][resource][parameter])
except KeyError:
return abort(404)
@structure_routes.route("/structure/<structure_name>/resources/<resource>/<parameter>", methods=['PUT'])
def set_structure_parameter_of_resource(structure_name, resource, parameter):
if not valid_resource(resource):
return abort(400, {"message": "Resource '{0}' is not valid".format(resource)})
if parameter not in ["max", "min"]:
return abort(400, {"message": "Invalid parameter state"})
try:
value = int(request.json["value"])
if value < 0:
return abort(400)
except KeyError:
return abort(400)
structure = retrieve_structure(structure_name)
if parameter in structure["resources"][resource] and structure["resources"][resource][parameter] == value:
return jsonify(201)
put_done = False
tries = 0
while not put_done:
tries += 1
structure = retrieve_structure(structure_name)
structure["resources"][resource][parameter] = value
get_db().update_structure(structure)
time.sleep(BACK_OFF_TIME_MS / 1000)
structure = retrieve_structure(structure_name)
put_done = structure["resources"][resource][parameter] == value
if tries >= MAX_TRIES:
return abort(400, {"message": "MAX_TRIES updating database document"})
return jsonify(201)
def set_structure_to_guarded_state(structure_name, state):
if state not in [True, False]:
return abort(400, {"message": "Invalid guarded state"})
structure = retrieve_structure(structure_name)
if "guard" in structure and structure["guard"] == state:
return jsonify(201)
put_done = False
tries = 0
while not put_done:
tries += 1
structure = retrieve_structure(structure_name)
structure["guard"] = state
get_db().update_structure(structure)
time.sleep(BACK_OFF_TIME_MS / 1000)
structure = retrieve_structure(structure_name)
put_done = structure["guard"] == state
if tries >= MAX_TRIES:
return abort(400, {"message": "MAX_TRIES updating database document"})
return jsonify(201)
@structure_routes.route("/structure/<structure_name>/guard", methods=['PUT'])
def set_structure_to_guarded(structure_name):
return set_structure_to_guarded_state(structure_name, True)
@structure_routes.route("/structure/<structure_name>/unguard", methods=['PUT'])
def set_structure_to_unguarded(structure_name):
return set_structure_to_guarded_state(structure_name, False)
@structure_routes.route("/structure/<structure_name>/resources/<resource>/guard", methods=['PUT'])
def set_structure_resource_to_guarded(structure_name, resource):
return set_structure_multiple_resources_to_guard_state(structure_name, [resource], True)
@structure_routes.route("/structure/<structure_name>/resources/<resource>/unguard", methods=['PUT'])
def set_structure_resource_to_unguarded(structure_name, resource):
return set_structure_multiple_resources_to_guard_state(structure_name, [resource], False)
def set_structure_multiple_resources_to_guard_state(structure_name, resources, state):
structure = retrieve_structure(structure_name)
if "resources" not in structure:
return abort(400, {"message": "Structure '{0}' has no resources to configure".format(structure_name)})
else:
for resource in resources:
if not valid_resource(resource):
return abort(400, {"message": "Resource '{0}' is not valid".format(resource)})
elif resource not in structure["resources"]:
return abort(400, {"message": "Resource '{0}' is missing in structure {1}".format(resource, structure_name)})
# 1st check, in case nothing has to be done really
put_done = True
structure = retrieve_structure(structure_name)
for resource in resources:
put_done = put_done and structure["resources"][resource]["guard"] == state
tries = 0
while not put_done:
tries += 1
structure = retrieve_structure(structure_name)
for resource in resources:
structure["resources"][resource]["guard"] = state
get_db().update_structure(structure)
time.sleep(BACK_OFF_TIME_MS / 1000)
structure = retrieve_structure(structure_name)
put_done = True
for resource in resources:
put_done = put_done and structure["resources"][resource]["guard"] == state
if tries >= MAX_TRIES:
return abort(400, {"message": "MAX_TRIES updating database document"})
return jsonify(201)
def get_resources_to_change_guard_from_request(request):
resources = None
data = request.json
if not data:
abort(400, {"message": "empty content"})
try:
resources = data["resources"]
if not isinstance(resources, (list, str)):
abort(400, {"message": "invalid content, resources must be a list or a string"})
except (KeyError, TypeError):
abort(400, {"message": "invalid content, must be a json object with resources as key"})
return resources
@structure_routes.route("/structure/<structure_name>/resources/guard", methods=['PUT'])
def set_structure_multiple_resources_to_guarded(structure_name):
resources = get_resources_to_change_guard_from_request(request)
return set_structure_multiple_resources_to_guard_state(structure_name, resources, True)
@structure_routes.route("/structure/<structure_name>/resources/unguard", methods=['PUT'])
def set_structure_multiple_resources_to_unguarded(structure_name):
resources = get_resources_to_change_guard_from_request(request)
return set_structure_multiple_resources_to_guard_state(structure_name, resources, False)
@structure_routes.route("/structure/<structure_name>/limits", methods=['GET'])
def get_structure_limits(structure_name):
try:
return jsonify(get_db().get_limits(retrieve_structure(structure_name))["resources"])
except ValueError:
return abort(404)
@structure_routes.route("/structure/<structure_name>/limits/<resource>", methods=['GET'])
def get_structure_resource_limits(structure_name, resource):
try:
return jsonify(get_db().get_limits(retrieve_structure(structure_name))["resources"][resource])
except ValueError:
return abort(404)
@structure_routes.route("/structure/<structure_name>/limits/<resource>/boundary", methods=['PUT'])
def set_structure_resource_limit_boundary(structure_name, resource):
if not valid_resource(resource):
return abort(400, {"message": "Resource '{0}' is not valid".format(resource)})
try:
value = int(request.json["value"])
if value < 0:
return abort(400)
except ValueError:
return abort(500)
structure = retrieve_structure(structure_name)
structure_limits = get_db().get_limits(structure)
if "boundary" not in structure_limits["resources"][resource]:
current_boundary = -1
else:
current_boundary = structure_limits["resources"][resource]["boundary"]
if current_boundary == value:
pass
else:
put_done = False
tries = 0
while not put_done:
tries += 1
structure_limits["resources"][resource]["boundary"] = value
get_db().update_limit(structure_limits)
time.sleep(BACK_OFF_TIME_MS / 1000)
structure = retrieve_structure(structure_name)
structure_limits = get_db().get_limits(structure)
put_done = structure_limits["resources"][resource]["boundary"] == value
if tries >= MAX_TRIES:
return abort(400, {"message": "MAX_TRIES updating database document"})
return jsonify(201)
def disable_scaler(scaler_service):
scaler_service["config"]["ACTIVE"] = False
get_db().update_service(scaler_service)
# Wait a little bit, half the polling time of the Scaler
polling_freq = MyUtils.get_config_value(scaler_service["config"], src.Scaler.Scaler.CONFIG_DEFAULT_VALUES, "POLLING_FREQUENCY")
time.sleep(int(polling_freq))
def restore_scaler_state(scaler_service, previous_state):
scaler_service["config"]["ACTIVE"] = previous_state
get_db().update_service(scaler_service)
@structure_routes.route("/structure/container/<structure_name>/<app_name>", methods=['PUT'])
def subscribe_container_to_app(structure_name, app_name):
structure = retrieve_structure(structure_name)
app = retrieve_structure(app_name)
cont_name = structure["name"]
# Look for any application that hosts this container, to make sure it is not already subscribed
apps = get_db().get_structures(subtype="application")
for app in apps:
if cont_name in app["containers"]:
return abort(400, {"message": "Container '{0}' already subscribed in app '{1}'".format(cont_name, app_name)})
app["containers"].append(cont_name)
get_db().update_structure(app)
return jsonify(201)
@structure_routes.route("/structure/container/<structure_name>/<app_name>", methods=['DELETE'])
def desubscribe_container_from_app(structure_name, app_name):
container = retrieve_structure(structure_name)
app = retrieve_structure(app_name)
cont_name = container["name"]
if cont_name not in app["containers"]:
return abort(400, {"message": "Container '{0}' missing in app '{1}'".format(cont_name, app_name)})
else:
app["containers"].remove(cont_name)
get_db().update_structure(app)
return jsonify(201)
@structure_routes.route("/structure/container/<structure_name>", methods=['DELETE'])
def desubscribe_container(structure_name):
structure = retrieve_structure(structure_name)
cont_name = structure["name"]
# Look for any application that hosts this container, and remove it from the list
apps = get_db().get_structures(subtype="application")
for app in apps:
if cont_name in app["containers"]:
desubscribe_container_from_app(structure_name, app["name"])
# Disable the Scaler as we will modify the core mapping of a host
scaler_service = get_db().get_service(src.Scaler.Scaler.SERVICE_NAME)
previous_state = MyUtils.get_config_value(scaler_service["config"], src.Scaler.Scaler.CONFIG_DEFAULT_VALUES, "ACTIVE")
if previous_state:
disable_scaler(scaler_service)
# Free resources
# CPU
# Get the core map of the container's host and free the allocated shares for this container
cont_host = structure["host"]
host = get_db().get_structure(cont_host)
core_map = host["resources"]["cpu"]["core_usage_mapping"]
freed_shares = 0
for core in core_map:
if cont_name in core_map[core]:
core_shares = core_map[core][cont_name]
freed_shares += core_shares
core_map[core][cont_name] = 0
core_map[core]["free"] += core_shares
host["resources"]["cpu"]["core_usage_mapping"] = core_map
host["resources"]["cpu"]["free"] += freed_shares
# MEM
host["resources"]["mem"]["free"] += structure["resources"]["mem"]["current"]
get_db().update_structure(host)
# Delete the document for this structure
get_db().delete_structure(structure)
# Restore the previous state of the Scaler service
restore_scaler_state(scaler_service, previous_state)
return jsonify(201)
@structure_routes.route("/structure/container/<structure_name>", methods=['PUT'])
def subscribe_container(structure_name):
req_cont = request.json["container"]
req_limits = request.json["limits"]
node_scaler_session = requests.Session()
# Check that all the needed data is present on the requestes container
container = {}
for key in ["name", "host_rescaler_ip", "host_rescaler_port", "host", "guard", "subtype"]:
if key not in req_cont:
return abort(400, {"message": "Missing key '{0}'".format(key)})
else:
container[key] = req_cont[key]
# Check that all the needed data for resources is present on the requested container
container["resources"] = {}
if "resources" not in req_cont:
return abort(400, {"message": "Missing resource information"})
elif "cpu" not in req_cont["resources"] or "mem" not in req_cont["resources"]:
return abort(400, {"message": "Missing cpu or mem resource information"})
else:
container["resources"] = {"cpu": {}, "mem": {}}
for key in ["max", "min", "current", "guard"]:
if key not in req_cont["resources"]["cpu"] or key not in req_cont["resources"]["mem"]:
return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)})
else:
container["resources"]["cpu"][key] = req_cont["resources"]["cpu"][key]
container["resources"]["mem"][key] = req_cont["resources"]["mem"][key]
# Check that the endpoint requested container name matches with the one in the request
if container["name"] != structure_name:
return abort(400, {"message": "Name mismatch".format(key)})
# Check if the container already exists
try:
cont = get_db().get_structure(structure_name)
if cont:
return abort(400, {"message": "Container with this name already exists".format(key)})
except ValueError:
pass
# Check that its supposed host exists and that it reports this container
try:
get_db().get_structure(container["host"])
except ValueError:
return abort(400, {"message": "Container host does not exist".format(key)})
host_containers = get_host_containers(container["host_rescaler_ip"], container["host_rescaler_port"], node_scaler_session, True)
if container["name"] not in host_containers:
return abort(400, {"message": "Container host does not report any container named '{0}'".format(container["name"])})
container["type"] = "structure"
# Check that all the needed data for resources is present on the requested container LIMITS
limits = {}
if "resources" not in req_limits:
return abort(400, {"message": "Missing resource information for the limits"})
elif "cpu" not in req_limits["resources"] or "mem" not in req_limits["resources"]:
return abort(400, {"message": "Missing cpu or mem resource information for the limits"})
else:
limits["resources"] = {"cpu": {}, "mem": {}}
for key in ["boundary"]:
if key not in req_limits["resources"]["cpu"] or key not in req_limits["resources"]["mem"]:
return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)})
else:
limits["resources"]["cpu"][key] = req_limits["resources"]["cpu"][key]
limits["resources"]["mem"][key] = req_limits["resources"]["mem"][key]
limits["type"] = 'limit'
limits["name"] = container["name"]
#### ALL looks good up to this point, proceed
# Disable the Scaler as we will modify the core mapping of a host
scaler_service = get_db().get_service(src.Scaler.Scaler.SERVICE_NAME)
previous_state = MyUtils.get_config_value(scaler_service["config"], src.Scaler.Scaler.CONFIG_DEFAULT_VALUES, "ACTIVE")
if previous_state:
disable_scaler(scaler_service)
# Get the host info
cont_host = container["host"]
cont_name = container["name"]
host = get_db().get_structure(cont_host)
# CPU
# Look for resource shares on the container's host
needed_shares = container["resources"]["cpu"]["current"]
if host["resources"]["cpu"]["free"] < needed_shares:
return abort(400, {"message": "Host does not have enough shares".format(key)})
core_map = host["resources"]["cpu"]["core_usage_mapping"]
host_max_cores = int(host["resources"]["cpu"]["max"] / 100)
host_cpu_list = [str(i) for i in range(host_max_cores)]
pending_shares = needed_shares
used_cores = list()
# Try to satisfy the request by looking and adding a single core
for core in host_cpu_list:
if core_map[core]["free"] >= pending_shares:
core_map[core]["free"] -= pending_shares
core_map[core][cont_name] += pending_shares
pending_shares = 0
used_cores.append(core)
break
# Finally, if unsuccessful, add as many cores as necessary, starting with the ones with the largest free shares to avoid too much spread
if pending_shares > 0:
l = list()
for core in host_cpu_list:
l.append((core, core_map[core]["free"]))
l.sort(key=lambda tup: tup[1], reverse=True)
less_used_cores = [i[0] for i in l]
for core in less_used_cores:
# If this core has free shares
if core_map[core]["free"] > 0 and pending_shares > 0:
if cont_name not in core_map[core]:
core_map[core][cont_name] = 0
# If it has more free shares than needed, assign them and finish
if core_map[core]["free"] >= pending_shares:
core_map[core]["free"] -= pending_shares
core_map[core][cont_name] += pending_shares
pending_shares = 0
used_cores.append(core)
break
else:
# Otherwise, assign as many as possible and continue
core_map[core][cont_name] += core_map[core]["free"]
pending_shares -= core_map[core]["free"]
core_map[core]["free"] = 0
used_cores.append(core)
if pending_shares > 0:
return abort(400, {"message": "Container host does not have enough free CPU shares as requested"})
host["resources"]["cpu"]["core_usage_mapping"] = core_map
host["resources"]["cpu"]["free"] -= needed_shares
# MEM
needed_memory = container["resources"]["mem"]["current"]
host_memory = host["resources"]["mem"]["free"]
if needed_memory > host_memory:
return abort(400, {"message": "Container host does not have enough free memory requested"})
host["resources"]["mem"]["free"] -= needed_memory
resource_dict = {"cpu": {"cpu_num": ",".join(used_cores), "cpu_allowance_limit": needed_shares},
"mem": {"mem_limit": needed_memory}}
Scaler.set_container_resources(node_scaler_session, container, resource_dict, True)
get_db().add_structure(container)
get_db().add_limit(limits)
get_db().update_structure(host)
# Restore the previous state of the Scaler service
restore_scaler_state(scaler_service, previous_state)
return jsonify(201)
@structure_routes.route("/structure/host/<structure_name>", methods=['PUT'])
def subscribe_host(structure_name):
data = request.json
node_scaler_session = requests.Session()
# Check that all the needed data is present on the request
host = {}
for key in ["name", "host", "subtype", "host_rescaler_ip", "host_rescaler_port"]:
if key not in data:
return abort(400, {"message": "Missing key '{0}'".format(key)})
else:
host[key] = data[key]
# Check that all the needed data for resources is present on the request
host["resources"] = {}
if "resources" not in data:
return abort(400, {"message": "Missing resource information"})
elif "cpu" not in data["resources"] or "mem" not in data["resources"]:
return abort(400, {"message": "Missing cpu or mem resource information"})
else:
host["resources"] = {"cpu": {}, "mem": {}}
for key in ["max", "free"]:
if key not in data["resources"]["cpu"] or key not in data["resources"]["mem"]:
return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)})
else:
host["resources"]["cpu"][key] = data["resources"]["cpu"][key]
host["resources"]["mem"][key] = data["resources"]["mem"][key]
host["resources"]["cpu"]["core_usage_mapping"] = {}
for n in range(0, int(host["resources"]["cpu"]["max"] / 100)):
host["resources"]["cpu"]["core_usage_mapping"][n] = {"free": 100}
if host["name"] != structure_name:
return abort(400, {"message": "Name mismatch".format(key)})
# Check if the host already exists
try:
host = get_db().get_structure(structure_name)
if host:
return abort(400, {"message": "Host with this name already exists".format(key)})
except ValueError:
pass
# Check that this supposed host exists and that it reports this container
try:
host_containers = get_host_containers(host["host_rescaler_ip"], host["host_rescaler_port"], node_scaler_session, True)
if not host_containers:
raise RuntimeError()
except Exception:
return abort(400, {"message": "Could not connect to this host, is it up and has its node scaler up?"})
# Host looks good, insert it into the database
host["type"] = "structure"
get_db().add_structure(host)
return jsonify(201)
@structure_routes.route("/structure/host/<structure_name>", methods=['DELETE'])
def desubscribe_host(structure_name):
host = retrieve_structure(structure_name)
# Delete the document for this structure
get_db().delete_structure(host)
return jsonify(201)
@structure_routes.route("/structure/apps/<structure_name>", methods=['PUT'])
def subscribe_app(structure_name):
req_app = request.json["app"]
req_limits = request.json["limits"]
# Check that all the needed data is present on the request
app = {}
for key in ["name", "guard", "subtype", "resources"]:
if key not in req_app:
return abort(400, {"message": "Missing key '{0}'".format(key)})
else:
app[key] = req_app[key]
# Check that all the needed data for resources is present on the request
app["resources"] = {}
if "resources" not in req_app:
return abort(400, {"message": "Missing resource information"})
elif "cpu" not in req_app["resources"] or "mem" not in req_app["resources"]:
return abort(400, {"message": "Missing cpu or mem resource information"})
else:
app["resources"] = {"cpu": {}, "mem": {}}
for key in ["max", "min", "guard"]:
if key not in req_app["resources"]["cpu"] or key not in req_app["resources"]["mem"]:
return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)})
else:
app["resources"]["cpu"][key] = req_app["resources"]["cpu"][key]
app["resources"]["mem"][key] = req_app["resources"]["mem"][key]
if app["name"] != structure_name:
return abort(400, {"message": "Name mismatch".format(key)})
# Check if the app already exists
try:
app = get_db().get_structure(structure_name)
if app:
return abort(400, {"message": "App with this name already exists".format(key)})
except ValueError:
pass
#### ALL looks good up to this point, proceed
app["containers"] = list()
app["type"] = "structure"
# Check that all the needed data for resources is present on the requested container LIMITS
limits = {}
if "resources" not in req_limits:
return abort(400, {"message": "Missing resource information for the limits"})
elif "cpu" not in req_limits["resources"] or "mem" not in req_limits["resources"]:
return abort(400, {"message": "Missing cpu or mem resource information for the limits"})
else:
limits["resources"] = {"cpu": {}, "mem": {}}
for key in ["boundary"]:
if key not in req_limits["resources"]["cpu"] or key not in req_limits["resources"]["mem"]:
return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)})
else:
limits["resources"]["cpu"][key] = req_limits["resources"]["cpu"][key]
limits["resources"]["mem"][key] = req_limits["resources"]["mem"][key]
limits["type"] = 'limit'
limits["name"] = app["name"]
get_db().add_structure(app)
get_db().add_limit(limits)
return jsonify(201)
@structure_routes.route("/structure/apps/<structure_name>", methods=['DELETE'])
def desubscribe_app(structure_name):
app = retrieve_structure(structure_name)
# Delete the document for this structure
get_db().delete_structure(app)
return jsonify(201)
Functions
def desubscribe_app(structure_name)
-
Expand source code
@structure_routes.route("/structure/apps/<structure_name>", methods=['DELETE']) def desubscribe_app(structure_name): app = retrieve_structure(structure_name) # Delete the document for this structure get_db().delete_structure(app) return jsonify(201)
def desubscribe_container(structure_name)
-
Expand source code
@structure_routes.route("/structure/container/<structure_name>", methods=['DELETE']) def desubscribe_container(structure_name): structure = retrieve_structure(structure_name) cont_name = structure["name"] # Look for any application that hosts this container, and remove it from the list apps = get_db().get_structures(subtype="application") for app in apps: if cont_name in app["containers"]: desubscribe_container_from_app(structure_name, app["name"]) # Disable the Scaler as we will modify the core mapping of a host scaler_service = get_db().get_service(src.Scaler.Scaler.SERVICE_NAME) previous_state = MyUtils.get_config_value(scaler_service["config"], src.Scaler.Scaler.CONFIG_DEFAULT_VALUES, "ACTIVE") if previous_state: disable_scaler(scaler_service) # Free resources # CPU # Get the core map of the container's host and free the allocated shares for this container cont_host = structure["host"] host = get_db().get_structure(cont_host) core_map = host["resources"]["cpu"]["core_usage_mapping"] freed_shares = 0 for core in core_map: if cont_name in core_map[core]: core_shares = core_map[core][cont_name] freed_shares += core_shares core_map[core][cont_name] = 0 core_map[core]["free"] += core_shares host["resources"]["cpu"]["core_usage_mapping"] = core_map host["resources"]["cpu"]["free"] += freed_shares # MEM host["resources"]["mem"]["free"] += structure["resources"]["mem"]["current"] get_db().update_structure(host) # Delete the document for this structure get_db().delete_structure(structure) # Restore the previous state of the Scaler service restore_scaler_state(scaler_service, previous_state) return jsonify(201)
def desubscribe_container_from_app(structure_name, app_name)
-
Expand source code
@structure_routes.route("/structure/container/<structure_name>/<app_name>", methods=['DELETE']) def desubscribe_container_from_app(structure_name, app_name): container = retrieve_structure(structure_name) app = retrieve_structure(app_name) cont_name = container["name"] if cont_name not in app["containers"]: return abort(400, {"message": "Container '{0}' missing in app '{1}'".format(cont_name, app_name)}) else: app["containers"].remove(cont_name) get_db().update_structure(app) return jsonify(201)
def desubscribe_host(structure_name)
-
Expand source code
@structure_routes.route("/structure/host/<structure_name>", methods=['DELETE']) def desubscribe_host(structure_name): host = retrieve_structure(structure_name) # Delete the document for this structure get_db().delete_structure(host) return jsonify(201)
def disable_scaler(scaler_service)
-
Expand source code
def disable_scaler(scaler_service): scaler_service["config"]["ACTIVE"] = False get_db().update_service(scaler_service) # Wait a little bit, half the polling time of the Scaler polling_freq = MyUtils.get_config_value(scaler_service["config"], src.Scaler.Scaler.CONFIG_DEFAULT_VALUES, "POLLING_FREQUENCY") time.sleep(int(polling_freq))
def get_resources_to_change_guard_from_request(request)
-
Expand source code
def get_resources_to_change_guard_from_request(request): resources = None data = request.json if not data: abort(400, {"message": "empty content"}) try: resources = data["resources"] if not isinstance(resources, (list, str)): abort(400, {"message": "invalid content, resources must be a list or a string"}) except (KeyError, TypeError): abort(400, {"message": "invalid content, must be a json object with resources as key"}) return resources
def get_structure(structure_name)
-
Expand source code
@structure_routes.route("/structure/<structure_name>", methods=['GET']) def get_structure(structure_name): return jsonify(retrieve_structure(structure_name))
def get_structure_limits(structure_name)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/limits", methods=['GET']) def get_structure_limits(structure_name): try: return jsonify(get_db().get_limits(retrieve_structure(structure_name))["resources"]) except ValueError: return abort(404)
def get_structure_parameter_of_resource(structure_name, resource, parameter)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/resources/<resource>/<parameter>", methods=['GET']) def get_structure_parameter_of_resource(structure_name, resource, parameter): try: return jsonify(retrieve_structure(structure_name)["resources"][resource][parameter]) except KeyError: return abort(404)
def get_structure_resource(structure_name, resource)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/resources/<resource>", methods=['GET']) def get_structure_resource(structure_name, resource): try: return jsonify(retrieve_structure(structure_name)["resources"][resource]) except KeyError: return abort(404)
def get_structure_resource_limits(structure_name, resource)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/limits/<resource>", methods=['GET']) def get_structure_resource_limits(structure_name, resource): try: return jsonify(get_db().get_limits(retrieve_structure(structure_name))["resources"][resource]) except ValueError: return abort(404)
def get_structure_resources(structure_name)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/resources", methods=['GET']) def get_structure_resources(structure_name): return jsonify(retrieve_structure(structure_name)["resources"])
def get_structures()
-
Expand source code
@structure_routes.route("/structure/", methods=['GET']) def get_structures(): return jsonify(get_db().get_structures())
def restore_scaler_state(scaler_service, previous_state)
-
Expand source code
def restore_scaler_state(scaler_service, previous_state): scaler_service["config"]["ACTIVE"] = previous_state get_db().update_service(scaler_service)
def retrieve_structure(structure_name)
-
Expand source code
def retrieve_structure(structure_name): try: return get_db().get_structure(structure_name) except ValueError: return abort(404)
def set_structure_multiple_resources_to_guard_state(structure_name, resources, state)
-
Expand source code
def set_structure_multiple_resources_to_guard_state(structure_name, resources, state): structure = retrieve_structure(structure_name) if "resources" not in structure: return abort(400, {"message": "Structure '{0}' has no resources to configure".format(structure_name)}) else: for resource in resources: if not valid_resource(resource): return abort(400, {"message": "Resource '{0}' is not valid".format(resource)}) elif resource not in structure["resources"]: return abort(400, {"message": "Resource '{0}' is missing in structure {1}".format(resource, structure_name)}) # 1st check, in case nothing has to be done really put_done = True structure = retrieve_structure(structure_name) for resource in resources: put_done = put_done and structure["resources"][resource]["guard"] == state tries = 0 while not put_done: tries += 1 structure = retrieve_structure(structure_name) for resource in resources: structure["resources"][resource]["guard"] = state get_db().update_structure(structure) time.sleep(BACK_OFF_TIME_MS / 1000) structure = retrieve_structure(structure_name) put_done = True for resource in resources: put_done = put_done and structure["resources"][resource]["guard"] == state if tries >= MAX_TRIES: return abort(400, {"message": "MAX_TRIES updating database document"}) return jsonify(201)
def set_structure_multiple_resources_to_guarded(structure_name)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/resources/guard", methods=['PUT']) def set_structure_multiple_resources_to_guarded(structure_name): resources = get_resources_to_change_guard_from_request(request) return set_structure_multiple_resources_to_guard_state(structure_name, resources, True)
def set_structure_multiple_resources_to_unguarded(structure_name)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/resources/unguard", methods=['PUT']) def set_structure_multiple_resources_to_unguarded(structure_name): resources = get_resources_to_change_guard_from_request(request) return set_structure_multiple_resources_to_guard_state(structure_name, resources, False)
def set_structure_parameter_of_resource(structure_name, resource, parameter)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/resources/<resource>/<parameter>", methods=['PUT']) def set_structure_parameter_of_resource(structure_name, resource, parameter): if not valid_resource(resource): return abort(400, {"message": "Resource '{0}' is not valid".format(resource)}) if parameter not in ["max", "min"]: return abort(400, {"message": "Invalid parameter state"}) try: value = int(request.json["value"]) if value < 0: return abort(400) except KeyError: return abort(400) structure = retrieve_structure(structure_name) if parameter in structure["resources"][resource] and structure["resources"][resource][parameter] == value: return jsonify(201) put_done = False tries = 0 while not put_done: tries += 1 structure = retrieve_structure(structure_name) structure["resources"][resource][parameter] = value get_db().update_structure(structure) time.sleep(BACK_OFF_TIME_MS / 1000) structure = retrieve_structure(structure_name) put_done = structure["resources"][resource][parameter] == value if tries >= MAX_TRIES: return abort(400, {"message": "MAX_TRIES updating database document"}) return jsonify(201)
def set_structure_resource_limit_boundary(structure_name, resource)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/limits/<resource>/boundary", methods=['PUT']) def set_structure_resource_limit_boundary(structure_name, resource): if not valid_resource(resource): return abort(400, {"message": "Resource '{0}' is not valid".format(resource)}) try: value = int(request.json["value"]) if value < 0: return abort(400) except ValueError: return abort(500) structure = retrieve_structure(structure_name) structure_limits = get_db().get_limits(structure) if "boundary" not in structure_limits["resources"][resource]: current_boundary = -1 else: current_boundary = structure_limits["resources"][resource]["boundary"] if current_boundary == value: pass else: put_done = False tries = 0 while not put_done: tries += 1 structure_limits["resources"][resource]["boundary"] = value get_db().update_limit(structure_limits) time.sleep(BACK_OFF_TIME_MS / 1000) structure = retrieve_structure(structure_name) structure_limits = get_db().get_limits(structure) put_done = structure_limits["resources"][resource]["boundary"] == value if tries >= MAX_TRIES: return abort(400, {"message": "MAX_TRIES updating database document"}) return jsonify(201)
def set_structure_resource_to_guarded(structure_name, resource)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/resources/<resource>/guard", methods=['PUT']) def set_structure_resource_to_guarded(structure_name, resource): return set_structure_multiple_resources_to_guard_state(structure_name, [resource], True)
def set_structure_resource_to_unguarded(structure_name, resource)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/resources/<resource>/unguard", methods=['PUT']) def set_structure_resource_to_unguarded(structure_name, resource): return set_structure_multiple_resources_to_guard_state(structure_name, [resource], False)
def set_structure_to_guarded(structure_name)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/guard", methods=['PUT']) def set_structure_to_guarded(structure_name): return set_structure_to_guarded_state(structure_name, True)
def set_structure_to_guarded_state(structure_name, state)
-
Expand source code
def set_structure_to_guarded_state(structure_name, state): if state not in [True, False]: return abort(400, {"message": "Invalid guarded state"}) structure = retrieve_structure(structure_name) if "guard" in structure and structure["guard"] == state: return jsonify(201) put_done = False tries = 0 while not put_done: tries += 1 structure = retrieve_structure(structure_name) structure["guard"] = state get_db().update_structure(structure) time.sleep(BACK_OFF_TIME_MS / 1000) structure = retrieve_structure(structure_name) put_done = structure["guard"] == state if tries >= MAX_TRIES: return abort(400, {"message": "MAX_TRIES updating database document"}) return jsonify(201)
def set_structure_to_unguarded(structure_name)
-
Expand source code
@structure_routes.route("/structure/<structure_name>/unguard", methods=['PUT']) def set_structure_to_unguarded(structure_name): return set_structure_to_guarded_state(structure_name, False)
def subscribe_app(structure_name)
-
Expand source code
@structure_routes.route("/structure/apps/<structure_name>", methods=['PUT']) def subscribe_app(structure_name): req_app = request.json["app"] req_limits = request.json["limits"] # Check that all the needed data is present on the request app = {} for key in ["name", "guard", "subtype", "resources"]: if key not in req_app: return abort(400, {"message": "Missing key '{0}'".format(key)}) else: app[key] = req_app[key] # Check that all the needed data for resources is present on the request app["resources"] = {} if "resources" not in req_app: return abort(400, {"message": "Missing resource information"}) elif "cpu" not in req_app["resources"] or "mem" not in req_app["resources"]: return abort(400, {"message": "Missing cpu or mem resource information"}) else: app["resources"] = {"cpu": {}, "mem": {}} for key in ["max", "min", "guard"]: if key not in req_app["resources"]["cpu"] or key not in req_app["resources"]["mem"]: return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)}) else: app["resources"]["cpu"][key] = req_app["resources"]["cpu"][key] app["resources"]["mem"][key] = req_app["resources"]["mem"][key] if app["name"] != structure_name: return abort(400, {"message": "Name mismatch".format(key)}) # Check if the app already exists try: app = get_db().get_structure(structure_name) if app: return abort(400, {"message": "App with this name already exists".format(key)}) except ValueError: pass #### ALL looks good up to this point, proceed app["containers"] = list() app["type"] = "structure" # Check that all the needed data for resources is present on the requested container LIMITS limits = {} if "resources" not in req_limits: return abort(400, {"message": "Missing resource information for the limits"}) elif "cpu" not in req_limits["resources"] or "mem" not in req_limits["resources"]: return abort(400, {"message": "Missing cpu or mem resource information for the limits"}) else: limits["resources"] = {"cpu": {}, "mem": {}} for key in ["boundary"]: if key not in req_limits["resources"]["cpu"] or key not in req_limits["resources"]["mem"]: return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)}) else: limits["resources"]["cpu"][key] = req_limits["resources"]["cpu"][key] limits["resources"]["mem"][key] = req_limits["resources"]["mem"][key] limits["type"] = 'limit' limits["name"] = app["name"] get_db().add_structure(app) get_db().add_limit(limits) return jsonify(201)
def subscribe_container(structure_name)
-
Expand source code
@structure_routes.route("/structure/container/<structure_name>", methods=['PUT']) def subscribe_container(structure_name): req_cont = request.json["container"] req_limits = request.json["limits"] node_scaler_session = requests.Session() # Check that all the needed data is present on the requestes container container = {} for key in ["name", "host_rescaler_ip", "host_rescaler_port", "host", "guard", "subtype"]: if key not in req_cont: return abort(400, {"message": "Missing key '{0}'".format(key)}) else: container[key] = req_cont[key] # Check that all the needed data for resources is present on the requested container container["resources"] = {} if "resources" not in req_cont: return abort(400, {"message": "Missing resource information"}) elif "cpu" not in req_cont["resources"] or "mem" not in req_cont["resources"]: return abort(400, {"message": "Missing cpu or mem resource information"}) else: container["resources"] = {"cpu": {}, "mem": {}} for key in ["max", "min", "current", "guard"]: if key not in req_cont["resources"]["cpu"] or key not in req_cont["resources"]["mem"]: return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)}) else: container["resources"]["cpu"][key] = req_cont["resources"]["cpu"][key] container["resources"]["mem"][key] = req_cont["resources"]["mem"][key] # Check that the endpoint requested container name matches with the one in the request if container["name"] != structure_name: return abort(400, {"message": "Name mismatch".format(key)}) # Check if the container already exists try: cont = get_db().get_structure(structure_name) if cont: return abort(400, {"message": "Container with this name already exists".format(key)}) except ValueError: pass # Check that its supposed host exists and that it reports this container try: get_db().get_structure(container["host"]) except ValueError: return abort(400, {"message": "Container host does not exist".format(key)}) host_containers = get_host_containers(container["host_rescaler_ip"], container["host_rescaler_port"], node_scaler_session, True) if container["name"] not in host_containers: return abort(400, {"message": "Container host does not report any container named '{0}'".format(container["name"])}) container["type"] = "structure" # Check that all the needed data for resources is present on the requested container LIMITS limits = {} if "resources" not in req_limits: return abort(400, {"message": "Missing resource information for the limits"}) elif "cpu" not in req_limits["resources"] or "mem" not in req_limits["resources"]: return abort(400, {"message": "Missing cpu or mem resource information for the limits"}) else: limits["resources"] = {"cpu": {}, "mem": {}} for key in ["boundary"]: if key not in req_limits["resources"]["cpu"] or key not in req_limits["resources"]["mem"]: return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)}) else: limits["resources"]["cpu"][key] = req_limits["resources"]["cpu"][key] limits["resources"]["mem"][key] = req_limits["resources"]["mem"][key] limits["type"] = 'limit' limits["name"] = container["name"] #### ALL looks good up to this point, proceed # Disable the Scaler as we will modify the core mapping of a host scaler_service = get_db().get_service(src.Scaler.Scaler.SERVICE_NAME) previous_state = MyUtils.get_config_value(scaler_service["config"], src.Scaler.Scaler.CONFIG_DEFAULT_VALUES, "ACTIVE") if previous_state: disable_scaler(scaler_service) # Get the host info cont_host = container["host"] cont_name = container["name"] host = get_db().get_structure(cont_host) # CPU # Look for resource shares on the container's host needed_shares = container["resources"]["cpu"]["current"] if host["resources"]["cpu"]["free"] < needed_shares: return abort(400, {"message": "Host does not have enough shares".format(key)}) core_map = host["resources"]["cpu"]["core_usage_mapping"] host_max_cores = int(host["resources"]["cpu"]["max"] / 100) host_cpu_list = [str(i) for i in range(host_max_cores)] pending_shares = needed_shares used_cores = list() # Try to satisfy the request by looking and adding a single core for core in host_cpu_list: if core_map[core]["free"] >= pending_shares: core_map[core]["free"] -= pending_shares core_map[core][cont_name] += pending_shares pending_shares = 0 used_cores.append(core) break # Finally, if unsuccessful, add as many cores as necessary, starting with the ones with the largest free shares to avoid too much spread if pending_shares > 0: l = list() for core in host_cpu_list: l.append((core, core_map[core]["free"])) l.sort(key=lambda tup: tup[1], reverse=True) less_used_cores = [i[0] for i in l] for core in less_used_cores: # If this core has free shares if core_map[core]["free"] > 0 and pending_shares > 0: if cont_name not in core_map[core]: core_map[core][cont_name] = 0 # If it has more free shares than needed, assign them and finish if core_map[core]["free"] >= pending_shares: core_map[core]["free"] -= pending_shares core_map[core][cont_name] += pending_shares pending_shares = 0 used_cores.append(core) break else: # Otherwise, assign as many as possible and continue core_map[core][cont_name] += core_map[core]["free"] pending_shares -= core_map[core]["free"] core_map[core]["free"] = 0 used_cores.append(core) if pending_shares > 0: return abort(400, {"message": "Container host does not have enough free CPU shares as requested"}) host["resources"]["cpu"]["core_usage_mapping"] = core_map host["resources"]["cpu"]["free"] -= needed_shares # MEM needed_memory = container["resources"]["mem"]["current"] host_memory = host["resources"]["mem"]["free"] if needed_memory > host_memory: return abort(400, {"message": "Container host does not have enough free memory requested"}) host["resources"]["mem"]["free"] -= needed_memory resource_dict = {"cpu": {"cpu_num": ",".join(used_cores), "cpu_allowance_limit": needed_shares}, "mem": {"mem_limit": needed_memory}} Scaler.set_container_resources(node_scaler_session, container, resource_dict, True) get_db().add_structure(container) get_db().add_limit(limits) get_db().update_structure(host) # Restore the previous state of the Scaler service restore_scaler_state(scaler_service, previous_state) return jsonify(201)
def subscribe_container_to_app(structure_name, app_name)
-
Expand source code
@structure_routes.route("/structure/container/<structure_name>/<app_name>", methods=['PUT']) def subscribe_container_to_app(structure_name, app_name): structure = retrieve_structure(structure_name) app = retrieve_structure(app_name) cont_name = structure["name"] # Look for any application that hosts this container, to make sure it is not already subscribed apps = get_db().get_structures(subtype="application") for app in apps: if cont_name in app["containers"]: return abort(400, {"message": "Container '{0}' already subscribed in app '{1}'".format(cont_name, app_name)}) app["containers"].append(cont_name) get_db().update_structure(app) return jsonify(201)
def subscribe_host(structure_name)
-
Expand source code
@structure_routes.route("/structure/host/<structure_name>", methods=['PUT']) def subscribe_host(structure_name): data = request.json node_scaler_session = requests.Session() # Check that all the needed data is present on the request host = {} for key in ["name", "host", "subtype", "host_rescaler_ip", "host_rescaler_port"]: if key not in data: return abort(400, {"message": "Missing key '{0}'".format(key)}) else: host[key] = data[key] # Check that all the needed data for resources is present on the request host["resources"] = {} if "resources" not in data: return abort(400, {"message": "Missing resource information"}) elif "cpu" not in data["resources"] or "mem" not in data["resources"]: return abort(400, {"message": "Missing cpu or mem resource information"}) else: host["resources"] = {"cpu": {}, "mem": {}} for key in ["max", "free"]: if key not in data["resources"]["cpu"] or key not in data["resources"]["mem"]: return abort(400, {"message": "Missing key '{0}' for cpu or mem resource".format(key)}) else: host["resources"]["cpu"][key] = data["resources"]["cpu"][key] host["resources"]["mem"][key] = data["resources"]["mem"][key] host["resources"]["cpu"]["core_usage_mapping"] = {} for n in range(0, int(host["resources"]["cpu"]["max"] / 100)): host["resources"]["cpu"]["core_usage_mapping"][n] = {"free": 100} if host["name"] != structure_name: return abort(400, {"message": "Name mismatch".format(key)}) # Check if the host already exists try: host = get_db().get_structure(structure_name) if host: return abort(400, {"message": "Host with this name already exists".format(key)}) except ValueError: pass # Check that this supposed host exists and that it reports this container try: host_containers = get_host_containers(host["host_rescaler_ip"], host["host_rescaler_port"], node_scaler_session, True) if not host_containers: raise RuntimeError() except Exception: return abort(400, {"message": "Could not connect to this host, is it up and has its node scaler up?"}) # Host looks good, insert it into the database host["type"] = "structure" get_db().add_structure(host) return jsonify(201)