Module src.Scaler.Scaler

Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.


from __future__ import print_function

import json
import time
from threading import Thread

import requests
import traceback
import logging

from requests import HTTPError

import src.StateDatabase.couchdb as couchDB
import src.StateDatabase.opentsdb as bdwatchdog
from src.Guardian.Guardian import Guardian
from src.Snapshoters.StructuresSnapshoter import get_container_resources_dict

from src.MyUtils.MyUtils import MyConfig, log_error, get_service, beat, log_info, log_warning, \
    get_structures, update_structure, generate_request_name, structure_is_application, structure_is_container
from src.StateDatabase import couchdb

CONFIG_DEFAULT_VALUES = {"POLLING_FREQUENCY": 5, "REQUEST_TIMEOUT": 60, "self.debug": True, "CHECK_CORE_MAP": True, "ACTIVE": True}
SERVICE_NAME = "scaler"

BDWATCHDOG_CONTAINER_METRICS = {"cpu": ['proc.cpu.user', 'proc.cpu.kernel'],
                                "mem": ['proc.mem.resident'],
                                "disk": ['proc.disk.writes.mb', 'proc.disk.reads.mb'],
                                "net": ['proc.net.tcp.in.mb', 'proc.net.tcp.out.mb']}
RESCALER_CONTAINER_METRICS = {'cpu': ['proc.cpu.user', 'proc.cpu.kernel'], 'mem': ['proc.mem.resident'],
                              'disk': ['proc.disk.writes.mb', 'proc.disk.reads.mb'],
                              'net': ['proc.net.tcp.in.mb', 'proc.net.tcp.out.mb']}

APP_SCALING_SPLIT_AMOUNT = 5


def set_container_resources(rescaler_http_session, container, resources, debug):
    rescaler_ip = container["host_rescaler_ip"]
    rescaler_port = container["host_rescaler_port"]
    container_name = container["name"]
    r = rescaler_http_session.put(
        "http://{0}:{1}/container/{2}".format(rescaler_ip, rescaler_port, container_name),
        data=json.dumps(resources),
        headers={'Content-Type': 'application/json', 'Accept': 'application/json'})
    if r.status_code == 201:
        return dict(r.json())
    else:
        log_error(str(json.dumps(r.json())), debug)
        r.raise_for_status()


class Scaler:
    """
    Scaler class that implements the logic for this microservice.
    """

    def __init__(self):
        self.couchdb_handler = couchdb.CouchDBServer()
        self.db_handler = couchDB.CouchDBServer()
        self.rescaler_http_session = requests.Session()
        self.bdwatchdog_handler = bdwatchdog.OpenTSDBServer()
        self.host_info_cache = dict()
        self.container_info_cache = dict()
        self.apply_request_by_resource = {"cpu": self.apply_cpu_request, "mem": self.apply_mem_request, "disk": self.apply_disk_request, "net": self.apply_net_request}

    #### CHECKS ####
    def fix_container_cpu_mapping(self, container, cpu_used_cores, cpu_used_shares):

        resource_dict = {"cpu": {}}
        resource_dict["cpu"]["cpu_num"] = ",".join(cpu_used_cores)
        resource_dict["cpu"]["cpu_allowance_limit"] = int(cpu_used_shares)
        try:
            # TODO FIX this error should be further diagnosed, in case it affects other modules who use this call too
            set_container_resources(self.rescaler_http_session, container, resource_dict, self.debug)
            return True
        except (Exception, RuntimeError, ValueError, requests.HTTPError) as e:
            log_error("Error when setting container resources: {0}".format(str(e)), self.debug)
            return False

    def check_host_cpu_limits(self):
        errors_detected = False
        for host in self.host_info_cache.values():
            all_accounted_shares = 0
            map = host["resources"]["cpu"]["core_usage_mapping"]
            for core in map.values():
                for container in core:
                    if container != "free":
                        all_accounted_shares += core[container]
            if all_accounted_shares > host["resources"]["cpu"]["max"]:
                log_error("Host {0} has more mapped shares than its maximum".format(host["name"]), self.debug)
                errors_detected = True
        return errors_detected

    def check_host_has_enough_free_resources(self, host_info, needed_resources, resource):
        host_shares = host_info["resources"][resource]["free"]
        if host_shares == 0:
            raise ValueError("No resources available for resource {0} in host {1} ".format(resource, host_info["name"]))
        elif host_shares < needed_resources:
            missing_shares = needed_resources - host_shares
            # raise ValueError("Error in setting {0}, couldn't get the resources needed, missing {1} shares".format(resource, missing_shares))
            log_warning(
                "Beware, there are not enough free shares for resource {0} in the host, there are {1},  missing {2}".format(resource, host_shares, missing_shares),
                self.debug)

    def check_containers_cpu_limits(self, containers):
        errors_detected = False
        for container in containers:
            database_resources = container["resources"]

            if "max" not in database_resources["cpu"]:
                log_error("container {0} has not a maximum value set, check its configuration".format(container["name"]), self.debug)
                errors_detected = True
                continue

            max_cpu_limit = database_resources["cpu"]["max"]
            real_resources = self.container_info_cache[container["name"]]["resources"]
            try:
                current_cpu_limit = self.get_current_resource_value(real_resources, "cpu")
                if current_cpu_limit > max_cpu_limit:
                    log_error("container {0} has, somehow, more shares ({1}) than the maximum ({2}), check the max "
                              "parameter in its configuration".format(container["name"], current_cpu_limit, max_cpu_limit), self.debug)
                    errors_detected = True
            except ValueError as e:
                log_error("Current value of structure {0} is not valid: {1}".format(container["name"], str(e)), self.debug)
                errors_detected = True

        return errors_detected

    def check_container_cpu_mapping(self, container, host_info, cpu_used_cores, cpu_used_shares):
        host_max_cores = int(host_info["resources"]["cpu"]["max"] / 100)
        host_cpu_list = [str(i) for i in range(host_max_cores)]
        core_usage_map = host_info["resources"]["cpu"]["core_usage_mapping"]

        cpu_accounted_shares = 0
        cpu_accounted_cores = list()
        container_name = container["name"]
        for core in core_usage_map:
            if core not in host_cpu_list:
                continue
            if container_name in core_usage_map[core] and core_usage_map[core][container_name] != 0:
                cpu_accounted_shares += core_usage_map[core][container_name]
                cpu_accounted_cores.append(core)

        if sorted(cpu_used_cores) != sorted(cpu_accounted_cores) or cpu_used_shares != cpu_accounted_shares:
            return False, cpu_accounted_cores, cpu_accounted_shares
        else:
            return True, cpu_accounted_cores, cpu_accounted_shares

    def check_container_core_mapping(self, container, real_resources):
        errors_detected = False
        database_resources = container["resources"]

        if container["host"] not in self.host_info_cache:
            log_error("Host info '{0}' for container {1} is missing".format(container["host"], container["name"]), self.debug)
            return True
        elif "max" not in database_resources["cpu"]:
            # This error should have been previously detected
            return True
        else:
            try:
                current_cpu_limit = self.get_current_resource_value(real_resources, "cpu")
            except ValueError as e:
                log_error(e, self.debug)
                return True

        host_info = self.host_info_cache[container["host"]]
        max_cpu_limit = database_resources["cpu"]["max"]
        cpu_list = self.get_cpu_list(real_resources["cpu"]["cpu_num"])
        c_name = container["name"]

        map_host_valid, actual_used_cores, actual_used_shares = self.check_container_cpu_mapping(container, host_info, cpu_list, current_cpu_limit)

        if not map_host_valid:
            log_error(
                "Detected invalid core mapping for container {0}, has {1}-{2}, should be {3}-{4}".format(c_name, cpu_list, current_cpu_limit, actual_used_cores, actual_used_shares),
                self.debug)
            log_error("trying to automatically fix", self.debug)
            success = self.fix_container_cpu_mapping(container, actual_used_cores, actual_used_shares)
            if success:
                log_error("Succeeded fixing {0} container's core mapping".format(container["name"]), self.debug)
                errors_detected = True
            else:
                log_error("Failed in fixing {0} container's core mapping".format(container["name"]), self.debug)
                errors_detected = False
        return errors_detected

    def check_core_mapping(self, containers):
        errors_detected = False
        for container in containers:
            c_name = container["name"]
            log_info("Checking container {0}".format(c_name), self.debug)
            if c_name not in self.container_info_cache or "resources" not in self.container_info_cache[c_name]:
                log_error("Couldn't get container's {0} resources, can't check its sanity".format(c_name), self.debug)
                continue
            real_resources = self.container_info_cache[c_name]["resources"]
            errors = self.check_container_core_mapping(container, real_resources)
            errors_detected = errors_detected or errors
        return errors_detected

    def check_invalid_resource_value(self, database_resources, amount, current, resource):
        max_resource_limit = int(database_resources["resources"][resource]["max"])
        min_resource_limit = int(database_resources["resources"][resource]["min"])
        resource_limit = int(current + amount)

        if resource_limit < 0:
            raise ValueError("Error in setting {0}, it would be lower than 0".format(resource))
        elif resource_limit < min_resource_limit:
            raise ValueError("Error in setting {0}, new value {1} it would be lower than min {2}".format(resource, resource_limit, min_resource_limit))
        elif resource_limit > max_resource_limit:
            raise ValueError("Error in setting {0}, new value {1} it would be higher than max {2}".format(resource, resource_limit, max_resource_limit))

    ######################################################

    #### REQUEST MANAGEMENT ####
    def filter_requests(self, request_timeout):
        fresh_requests, purged_requests, final_requests = list(), list(), list()
        # Remote database operation
        all_requests = self.db_handler.get_requests()
        purged_counter = 0
        duplicated_counter = 0

        # First purge the old requests
        for request in all_requests:
            if request["timestamp"] < time.time() - request_timeout:
                purged_requests.append(request)
                purged_counter += 1
            else:
                fresh_requests.append(request)

        # Then remove repeated requests for the same structure if found
        structure_requests_dict = {}
        for request in fresh_requests:
            structure = request["structure"]  # The structure name (string), acting as an id
            action = request["action"]  # The action name (string)
            if structure not in structure_requests_dict:
                structure_requests_dict[structure] = {}

            if action not in structure_requests_dict[structure]:
                structure_requests_dict[structure][action] = request
            else:
                # A previous request was found for this structure, remove old one and leave the newer one
                stored_request = structure_requests_dict[structure][action]
                if stored_request["timestamp"] > request["timestamp"]:
                    # The stored request is newer, leave it and mark the retrieved one to be removed
                    purged_requests.append(request)
                else:
                    # The stored request is older, mark it to be remove and save the retrieved one
                    purged_requests.append(stored_request)
                    structure_requests_dict[structure][action] = request

                duplicated_counter += 1

        self.db_handler.delete_requests(purged_requests)

        for structure in structure_requests_dict:
            for action in structure_requests_dict[structure]:
                final_requests.append(structure_requests_dict[structure][action])

        log_info("Number of purged/duplicated requests was {0}/{1}".format(purged_counter, duplicated_counter), True)
        return final_requests

    def sort_requests(self, new_requests):
        container_reqs, app_reqs = list(), list()
        for r in new_requests:
            if r["structure_type"] == "container":
                container_reqs.append(r)
            elif r["structure_type"] == "application":
                app_reqs.append(r)
            else:
                pass
        return container_reqs, app_reqs

    ######################################################

    #### RESOURCE REQUEST MANAGEMENT ####
    def process_request(self, request, real_resources, database_resources):
        # Create a 'fake' container structure with only the required info
        container = {"host_rescaler_ip": request["host_rescaler_ip"],
                     "host_rescaler_port": request["host_rescaler_port"],
                     "name": request["structure"]}

        # Apply the request and get the new resources to set
        try:
            new_resources = self.apply_request(request, real_resources, database_resources)
            if new_resources:
                log_info("Request: {0} for container : {1} for new resources : {2}".format(
                    request["action"], request["structure"], json.dumps(new_resources)), self.debug)

                # Apply changes through a REST call
                set_container_resources(self.rescaler_http_session, container, new_resources, self.debug)
        except (ValueError) as e:
            log_error("Error with container {0} in applying the request -> {1}".format(request["structure"], str(e)), self.debug)
            return
        except (HTTPError) as e:
            log_error("Error setting container {0} resources -> {1}".format(request["structure"], str(e)), self.debug)
            return
        except (Exception) as e:
            log_error("Error with container {0} -> {1}".format(request["structure"], str(e)), self.debug)
            return

    def apply_request(self, request, real_resources, database_resources):

        amount = int(request["amount"])

        host_info = self.host_info_cache[request["host"]]
        resource = request["resource"]

        # Get the current resource limit, if unlimited, then max, min or mean
        current_resource_limit = self.get_current_resource_value(real_resources, resource)

        # Check that the resource limit is respected, not lower than min or higher than max
        self.check_invalid_resource_value(database_resources, amount, current_resource_limit, resource)

        if amount > 0:
            # If the request is for scale up, check that the host has enough free resources before proceeding
            self.check_host_has_enough_free_resources(host_info, amount, resource)

        fun = self.apply_request_by_resource[resource]
        result = fun(request, database_resources, real_resources, amount)

        return result

    def apply_cpu_request(self, request, database_resources, real_resources, amount):
        resource = request["resource"]
        structure_name = request["structure"]
        host_info = self.host_info_cache[request["host"]]

        core_usage_map = host_info["resources"][resource]["core_usage_mapping"]

        current_cpu_limit = self.get_current_resource_value(real_resources, resource)
        cpu_list = self.get_cpu_list(real_resources["cpu"]["cpu_num"])

        host_max_cores = int(host_info["resources"]["cpu"]["max"] / 100)
        host_cpu_list = [str(i) for i in range(host_max_cores)]
        for core in host_cpu_list:
            if core not in core_usage_map:
                core_usage_map[core] = dict()
                core_usage_map[core]["free"] = 100
            if structure_name not in core_usage_map[core]:
                core_usage_map[core][structure_name] = 0

        used_cores = list(cpu_list)  # copy

        if amount > 0:
            # Rescale up, so look for free shares to assign and maybe add cores
            needed_shares = amount

            # First fill the already used cores so that no additional cores are added unnecessarily
            for core in cpu_list:
                if core_usage_map[core]["free"] > 0:
                    if core_usage_map[core]["free"] > needed_shares:
                        core_usage_map[core]["free"] -= needed_shares
                        core_usage_map[core][structure_name] += needed_shares
                        needed_shares = 0
                        break
                    else:
                        core_usage_map[core][structure_name] += core_usage_map[core]["free"]
                        needed_shares -= core_usage_map[core]["free"]
                        core_usage_map[core]["free"] = 0

            # Next try to satisfy the request by looking and adding a single core
            if needed_shares > 0:
                for core in host_cpu_list:
                    if core_usage_map[core]["free"] >= needed_shares:
                        core_usage_map[core]["free"] -= needed_shares
                        core_usage_map[core][structure_name] += needed_shares
                        needed_shares = 0
                        used_cores.append(core)
                        break

            # Finally, if unsuccessful, add as many cores as necessary, starting with the ones with the largest free shares to avoid too much spread
            if needed_shares > 0:
                l = list()
                for core in host_cpu_list:
                    l.append((core, core_usage_map[core]["free"]))
                l.sort(key=lambda tup: tup[1], reverse=True)
                less_used_cores = [i[0] for i in l]

                for core in less_used_cores:
                    # If this core has free shares
                    if core_usage_map[core]["free"] > 0 and needed_shares > 0:
                        # If it has more free shares than needed, assign them and finish
                        if core_usage_map[core]["free"] >= needed_shares:
                            core_usage_map[core]["free"] -= needed_shares
                            core_usage_map[core][structure_name] += needed_shares
                            needed_shares = 0
                            used_cores.append(core)
                            break
                        else:
                            # Otherwise, assign as many as possible and continue
                            core_usage_map[core][structure_name] += core_usage_map[core]["free"]
                            needed_shares -= core_usage_map[core]["free"]
                            core_usage_map[core]["free"] = 0
                            used_cores.append(core)

            if needed_shares > 0:
                # raise ValueError("Error in setting cpu, couldn't get the resources needed, missing {0} shares".format(needed_shares))
                log_warning("Structure {0} couldn't get as much CPU shares as intended ({1}), "
                            "instead it got {2}".format(structure_name, amount, amount - needed_shares), self.debug)
                amount = amount - needed_shares
                # FIXME couldn't do rescale up properly as shares to get remain

        elif amount < 0:
            # Rescale down so free all shares and claim new one to see how many cores can be freed
            shares_to_free = abs(amount)

            # First try to find cores with less shares for this structure (less allocated) and remove them
            l = list()
            for core in cpu_list:
                l.append((core, core_usage_map[core][structure_name]))
            l.sort(key=lambda tup: tup[1], reverse=False)
            less_allocated_cores = [i[0] for i in l]

            for core in less_allocated_cores:
                # Equal or less allocated shares than amount to be freed, remove this core altogether and if shares remain to be freed, continue
                if core_usage_map[core][structure_name] <= shares_to_free:
                    core_usage_map[core]["free"] += core_usage_map[core][structure_name]
                    shares_to_free -= core_usage_map[core][structure_name]
                    core_usage_map[core][structure_name] = 0
                    used_cores.remove(core)
                    # In the event that the amount to be freed was equal to the allocated one, finish
                    if shares_to_free == 0:
                        break
                # More allocated shares than amount to be freed, reduce allocation and finish
                elif core_usage_map[core][structure_name] > shares_to_free:
                    core_usage_map[core]["free"] += shares_to_free
                    core_usage_map[core][structure_name] -= shares_to_free
                    shares_to_free = 0
                    break

            if shares_to_free > 0:
                raise ValueError("Error in setting cpu, couldn't free the resources properly")

        # No error thrown, so persist the new mapping to the cache
        self.host_info_cache[request["host"]]["resources"]["cpu"]["core_usage_mapping"] = core_usage_map
        self.host_info_cache[request["host"]]["resources"]["cpu"]["free"] -= amount

        resource_dict = {resource: {}}
        resource_dict["cpu"]["cpu_num"] = ",".join(used_cores)
        resource_dict["cpu"]["cpu_allowance_limit"] = int(current_cpu_limit + amount)

        return resource_dict

    def apply_mem_request(self, request, database_resources, real_resources, amount):
        resource_dict = {request["resource"]: {}}
        current_mem_limit = self.get_current_resource_value(real_resources, request["resource"])

        # No error thrown, so persist the new mapping to the cache
        self.host_info_cache[request["host"]]["resources"]["mem"]["free"] -= amount

        # Return the dictionary to set the resources
        resource_dict["mem"]["mem_limit"] = str(int(amount + current_mem_limit))

        return resource_dict

    def apply_disk_request(self, request, database_resources, real_resources, amount):
        resource_dict = {request["resource"]: {}}
        current_disk_limit = self.get_current_resource_value(real_resources, request["resource"])

        # Return the dictionary to set the resources
        resource_dict["disk"]["disk_read_limit"] = str(int(amount + current_disk_limit))
        resource_dict["disk"]["disk_write_limit"] = str(int(amount + current_disk_limit))

        return resource_dict

    def apply_net_request(self, request, database_resources, real_resources, amount):
        resource_dict = {request["resource"]: {}}
        current_net_limit = self.get_current_resource_value(real_resources, request["resource"])

        # Return the dictionary to set the resources
        resource_dict["net"]["net_limit"] = str(int(amount + current_net_limit))

        return resource_dict

    ######################################################

    ##### CONTAINER SCALING ######
    def rescale_container(self, request, structure):
        try:
            # Needed for the resources reported in the database (the 'max, min' values)
            database_resources = structure

            # Get the resources the container is using from its host NodeScaler (the 'current' value)
            c_name = structure["name"]
            if c_name not in self.container_info_cache or "resources" not in self.container_info_cache[c_name]:
                log_error("Couldn't get container's {0} resources, can't rescale".format(c_name), self.debug)
                return
            real_resources = self.container_info_cache[c_name]["resources"]

            # Process the request
            self.process_request(request, real_resources, database_resources)
        except Exception as e:
            log_error(str(e) + " " + str(traceback.format_exc()), self.debug)

    ######################################################

    ##### APPLICATION SCALING ######
    def sort_containers_by_usage_margin(self, container1, container2, resource):
        """
        Parameters:
            container1: dict -> A container structure
            container2: dict -> A container structure
            resource: str -> the resource to be used for sorting
        Returns:
            The tuple of the containers with the (lowest,highest) margin between resources used and resources set
        """
        c1_current_amount = container1["resources"][resource]["current"]
        c1_usage_amount = container1["resources"][resource]["usage"]
        c2_current_amount = container2["resources"][resource]["current"]
        c2_usage_amount = container2["resources"][resource]["usage"]
        if c1_current_amount - c1_usage_amount < c2_current_amount - c2_usage_amount:
            lowest, highest = container1, container2
        else:
            lowest, highest = container2, container1

        return lowest, highest

    def lowest_current_to_usage_margin(self, container1, container2, resource):
        # Return the container with the lowest margin between resources used and resources set (closest bottleneck)
        lowest, _ = self.sort_containers_by_usage_margin(container1, container2, resource)
        return lowest

    def highest_current_to_usage_margin(self, container1, container2, resource):
        # Return the container with the highest margin between resources used and resources set (lowest use)
        _, highest = self.sort_containers_by_usage_margin(container1, container2, resource)
        return highest

    def generate_requests(self, new_requests, app_label):
        rescaled_containers = list()
        total_amount = 0
        for req in new_requests:
            self.db_handler.add_request(req)
            rescaled_containers.append((req["structure"], req["amount"]))
            total_amount += req["amount"]
        log_info("App {0} rescaled {1} shares by rescaling containers: {2}".format(app_label, total_amount, str(rescaled_containers)), self.debug)

    def single_container_rescale(self, request, app_containers, resource_usage_cache):
        amount, resource_label = request["amount"], request["resource"]
        scalable_containers = list()
        resource_shares = abs(amount)

        # Look for containers that can be rescaled
        for container in app_containers:
            usages = resource_usage_cache[container["name"]]
            container["resources"][resource_label]["usage"] = usages[resource_label]
            current_value = container["resources"][resource_label]["current"]

            # Rescale down
            if amount < 0:
                # Check that the container has enough free resource shares
                # available to be released and that it would be able
                # to be rescaled without dropping under the minimum value
                if current_value < resource_shares:
                    # Container doesn't have enough resources to free
                    # ("Container doesn't have enough resources to free", self.debug)
                    pass
                elif current_value + amount < container["resources"][resource_label]["min"]:
                    # Container can't free that amount without dropping under the minimum
                    # log_error("Container {0} can't free that amount without dropping under the minimum".format(container["name"]), self.debug)
                    pass
                else:
                    scalable_containers.append(container)

            # Rescale up
            else:
                # Check that the container has enough free resource shares available in the host and that it would be able
                # to be rescaled without exceeded the maximum value
                container_host = container["host"]

                if self.host_info_cache[container_host]["resources"][resource_label]["free"] < resource_shares:
                    # Container's host doesn't have enough free resources
                    # log_error("Container's host doesn't have enough free resources", self.debug)
                    pass
                elif current_value + amount > container["resources"][resource_label]["max"]:
                    # Container can't get that amount without exceeding the maximum
                    # log_error("Container can't get that amount without exceeding the maximum", self.debug)
                    pass
                else:
                    scalable_containers.append(container)

        # Look for the best fit container for this resource and launch the rescaling request for it
        if scalable_containers:
            best_fit_container = scalable_containers[0]

            for container in scalable_containers[1:]:
                if amount < 0:
                    # If scaling down, look for containers with usages far from the limit (underuse)
                    best_fit_container = self.highest_current_to_usage_margin(container, best_fit_container, resource_label)
                else:
                    # If scaling up, look for containers with usages close to the limit (bottleneck)
                    best_fit_container = self.lowest_current_to_usage_margin(container, best_fit_container, resource_label)

            # Generate the new request
            new_request = Guardian.generate_request(best_fit_container, amount, resource_label)

            return True, best_fit_container, new_request
        else:
            return False, {}, {}

    def rescale_application(self, request, structure):

        # Get container names that this app uses
        app_containers_names = structure["containers"]
        app_containers = list()

        for cont_name in app_containers_names:
            # Get the container
            container = self.db_handler.get_structure(cont_name)
            app_containers.append(container)

            # Retrieve host info and cache it in case other containers or applications need it
            if container["host"] not in self.host_info_cache:
                self.host_info_cache[container["host"]] = self.db_handler.get_structure(container["host"])

        total_amount = request["amount"]

        requests = list()
        remaining_amount = total_amount
        split_amount = APP_SCALING_SPLIT_AMOUNT * (request["amount"] / abs(request["amount"]))  # This sets the sign
        request["amount"] = split_amount

        # Create smaller requests of 'split_amount' size
        while abs(remaining_amount) > 0 and abs(remaining_amount) > abs(split_amount):
            requests.append(dict(request))
            remaining_amount -= split_amount

        # If some remaining amount is left, create the last request
        if abs(remaining_amount) > 0:
            request["amount"] = remaining_amount
            requests.append(dict(request))

        # Get the request usage for all the containers and cache it
        resource_usage_cache = dict()
        for container in app_containers:
            amount, resource = request["amount"], request["resource"]
            metrics_to_retrieve = BDWATCHDOG_CONTAINER_METRICS[resource]
            resource_usage_cache[container["name"]] = self.bdwatchdog_handler.get_structure_timeseries(
                {"host": container["name"]}, 10, 20,
                metrics_to_retrieve, RESCALER_CONTAINER_METRICS)

        success, iterations = True, 0
        generated_requests = dict()

        while success and len(requests) > 0:
            request = requests.pop(0)
            success, container_to_rescale, generated_request = self.single_container_rescale(request, app_containers, resource_usage_cache)
            if success:
                # If rescaling was successful, update the container's resources as they have been rescaled
                for c in app_containers:
                    container_name = c["name"]
                    if container_name == container_to_rescale["name"]:
                        # Initialize
                        if container_name not in generated_requests:
                            generated_requests[container_name] = list()

                        generated_requests[container_name].append(generated_request)
                        container_to_rescale["resources"][request["resource"]]["current"] += request["amount"]
                        app_containers.remove(c)
                        app_containers.append(container_to_rescale)
                        break
            else:
                break

            iterations += 1

        # Collapse all the requests to generate just 1 per container
        final_requests = list()
        for container in generated_requests:
            # Copy the first request as the base request
            flat_request = dict(generated_requests[container][0])
            flat_request["amount"] = 0
            for request in generated_requests[container]:
                flat_request["amount"] += request["amount"]
            final_requests.append(flat_request)
        self.generate_requests(final_requests, structure["name"])

        if len(requests) > 0:
            # Couldn't completely rescale the application as some split of a major rescaling operation could not be completed
            log_warning("App {0} could not be completely rescaled, only: {1} shares of resource: {2} have been scaled".format(
                request["structure"], str(int(iterations * split_amount)), request["resource"]), self.debug)

    ######################################################

    ### SERVICE METHODS ####
    def invalid_conf(self, config):
        # TODO This code is duplicated on the structures and database snapshoters
        for key, num in [("POLLING_FREQUENCY", config.get_value("POLLING_FREQUENCY")), ("REQUEST_TIMEOUT", config.get_value("REQUEST_TIMEOUT"))]:
            if num < 5:
                return True, "Configuration item '{0}' with a value of '{1}' is likely invalid".format(key, num)
        return False, ""

    def get_cpu_list(self, cpu_num_string):
        # Translate something like '2-4,7' to [2,3,7]
        cpu_list = list()
        parts = cpu_num_string.split(",")
        for part in parts:
            ranges = part.split("-")
            if len(ranges) == 1:
                cpu_list.append(ranges[0])
            else:
                for n in range(int(ranges[0]), int(ranges[1]) + 1):
                    cpu_list.append(str(n))
        return cpu_list

    def get_current_resource_value(self, real_resources, resource):
        translation_dict = {"cpu": "cpu_allowance_limit", "mem": "mem_limit"}

        if resource not in translation_dict:
            raise ValueError("Resource '{0}' unknown".format(resource))
        else:
            resource_translated = translation_dict[resource]

        if resource not in real_resources:
            raise ValueError("Resource '{0}' info missing from host".format(resource))

        if resource_translated not in real_resources[resource]:
            raise ValueError("Current value for resource '{0}' missing from host resource info".format(resource))

        current_resource_limit = real_resources[resource][resource_translated]
        if current_resource_limit == -1:
            raise ValueError("Resource {0} has not a 'current' value set, that is, it is unlimited".format(resource))
        else:
            try:
                current_resource_limit = int(current_resource_limit)
            except ValueError:
                raise ValueError("Bad current {0} limit value".format(resource))
        return current_resource_limit

    def process_requests(self, reqs):
        for request in reqs:
            structure_name = request["structure"]

            # Retrieve structure info
            try:
                structure = self.db_handler.get_structure(structure_name)
            except (requests.exceptions.HTTPError, ValueError):
                log_error("Error, couldn't find structure {0} in database".format(structure_name), self.debug)
                continue

            # Rescale the structure accordingly, whether it is a container or an application
            if structure_is_container(structure):
                self.rescale_container(request, structure)
            elif structure_is_application(structure):
                self.rescale_application(request, structure)
            else:
                log_error("Unknown type of structure '{0}'".format(structure["subtype"]), self.debug)

            # Remove the request from the database
            self.db_handler.delete_request(request)

    def split_requests(self, all_requests):
        scale_down, scale_up = list(), list()
        for request in all_requests:
            if "action" not in request or not request["action"]:
                continue
            elif request["action"].endswith("Down"):
                scale_down.append(request)
            elif request["action"].endswith("Up"):
                scale_up.append(request)
        return scale_down, scale_up

    def fill_host_info_cache(self, containers):
        self.host_info_cache = dict()
        for container in containers:
            if container["host"] not in self.host_info_cache:
                self.host_info_cache[container["host"]] = self.db_handler.get_structure(container["host"])
        return

    def persist_new_host_information(self, ):
        def persist_thread(self, host):
            data = self.host_info_cache[host]
            update_structure(data, self.db_handler, self.debug)

        threads = list()
        for host in self.host_info_cache:
            t = Thread(target=persist_thread, args=(self, host,))
            t.start()
            threads.append(t)

        for t in threads:
            t.join()

    def scale_structures(self, new_requests):
        log_info("Processing requests", self.debug)

        t0 = time.time()

        # Split the requests between scale down and scale up
        scale_down, scale_up = self.split_requests(new_requests)

        # Process first the requests that free resources, then the one that use them
        self.process_requests(scale_down)
        self.process_requests(scale_up)

        # Persist the new host information
        self.persist_new_host_information()

        t1 = time.time()
        log_info("It took {0} seconds to process requests".format(str("%.2f" % (t1 - t0))), self.debug)

    ######################################################

    def scale(self, ):
        logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

        myConfig = MyConfig(CONFIG_DEFAULT_VALUES)

        # Remove previous requests
        log_info("Purging any previous requests", True)
        self.filter_requests(0)
        log_info("----------------------\n", True)

        while True:
            # Remote database operation
            service = get_service(self.db_handler, SERVICE_NAME)

            # Heartbeat
            beat(self.db_handler, SERVICE_NAME)

            # CONFIG
            myConfig.set_config(service["config"])
            polling_frequency = myConfig.get_value("POLLING_FREQUENCY")
            request_timeout = myConfig.get_value("REQUEST_TIMEOUT")
            self.debug = myConfig.get_value("self.debug")
            CHECK_CORE_MAP = myConfig.get_value("CHECK_CORE_MAP")
            SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

            log_info("----------------------", self.debug)
            log_info("Starting Epoch", self.debug)
            t0 = time.time()

            ## CHECK INVALID CONFIG ##
            # TODO This code is duplicated on the structures and database snapshoters
            invalid, message = self.invalid_conf(myConfig)
            if invalid:
                log_error(message, self.debug)
                time.sleep(polling_frequency)
                if polling_frequency < 5:
                    log_error("Polling frequency is too short, replacing with DEFAULT value '{0}'".format(CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]), self.debug)
                    polling_frequency = CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]

                log_info("----------------------\n", self.debug)
                time.sleep(polling_frequency)
                continue

            if SERVICE_IS_ACTIVATED:

                # Get the container structures and their resource information as such data is going to be needed
                containers = get_structures(self.db_handler, self.debug, subtype="container")
                try:
                    self.container_info_cache = get_container_resources_dict()  # Reset the cache
                except (Exception, RuntimeError) as e:
                    log_error("Error getting host document, skipping epoch altogether", self.debug)
                    log_error(str(e), self.debug)
                    time.sleep(polling_frequency)
                    continue

                # Fill the host information cache
                log_info("Getting host and container info", self.debug)
                try:
                    self.fill_host_info_cache(containers)
                except (Exception, RuntimeError) as e:
                    log_error("Error getting host document, skipping epoch altogether", self.debug)
                    log_error(str(e), self.debug)
                    time.sleep(polling_frequency)
                    continue

                # Do the core mapping check-up
                if CHECK_CORE_MAP:
                    log_info("Doing container CPU limits check", self.debug)
                    log_info("First hosts", self.debug)
                    errors_detected = self.check_host_cpu_limits()
                    if errors_detected:
                        log_error("Errors detected during host CPU limits check", self.debug)

                    log_info("Second containers", self.debug)
                    errors_detected = self.check_containers_cpu_limits(containers)
                    if errors_detected:
                        log_error("Errors detected during container CPU limits check", self.debug)

                    log_info("Doing core mapping check", self.debug)
                    errors_detected = self.check_core_mapping(containers)
                    if errors_detected:
                        log_error("Errors detected during container CPU map check", self.debug)
                else:
                    log_warning("Core map check has been disabled", self.debug)

                # Get the requests
                new_requests = self.filter_requests(request_timeout)
                container_reqs, app_reqs = self.sort_requests(new_requests)

                # Process first the application requests, as they generate container ones
                if app_reqs:
                    log_info("Processing applications requests", self.debug)
                    self.scale_structures(app_reqs)
                else:
                    log_info("No applications requests", self.debug)

                # Then process container ones
                if container_reqs:
                    log_info("Processing container requests", self.debug)
                    self.scale_structures(container_reqs)
                else:
                    log_info("No container requests", self.debug)

                t1 = time.time()
                log_info("Epoch processed in {0} seconds".format(str("%.2f" % (t1 - t0))), self.debug)

            else:
                log_warning("Scaler service is not activated", self.debug)

            log_info("----------------------\n", self.debug)
            time.sleep(polling_frequency)


def main():
    try:
        scaler = Scaler()
        scaler.scale()
    except Exception as e:
        log_error("{0} {1}".format(str(e), str(traceback.format_exc())), debug=True)


if __name__ == "__main__":
    main()

Functions

def main()
Expand source code
def main():
    try:
        scaler = Scaler()
        scaler.scale()
    except Exception as e:
        log_error("{0} {1}".format(str(e), str(traceback.format_exc())), debug=True)
def set_container_resources(rescaler_http_session, container, resources, debug)
Expand source code
def set_container_resources(rescaler_http_session, container, resources, debug):
    rescaler_ip = container["host_rescaler_ip"]
    rescaler_port = container["host_rescaler_port"]
    container_name = container["name"]
    r = rescaler_http_session.put(
        "http://{0}:{1}/container/{2}".format(rescaler_ip, rescaler_port, container_name),
        data=json.dumps(resources),
        headers={'Content-Type': 'application/json', 'Accept': 'application/json'})
    if r.status_code == 201:
        return dict(r.json())
    else:
        log_error(str(json.dumps(r.json())), debug)
        r.raise_for_status()

Classes

class Scaler

Scaler class that implements the logic for this microservice.

Expand source code
class Scaler:
    """
    Scaler class that implements the logic for this microservice.
    """

    def __init__(self):
        self.couchdb_handler = couchdb.CouchDBServer()
        self.db_handler = couchDB.CouchDBServer()
        self.rescaler_http_session = requests.Session()
        self.bdwatchdog_handler = bdwatchdog.OpenTSDBServer()
        self.host_info_cache = dict()
        self.container_info_cache = dict()
        self.apply_request_by_resource = {"cpu": self.apply_cpu_request, "mem": self.apply_mem_request, "disk": self.apply_disk_request, "net": self.apply_net_request}

    #### CHECKS ####
    def fix_container_cpu_mapping(self, container, cpu_used_cores, cpu_used_shares):

        resource_dict = {"cpu": {}}
        resource_dict["cpu"]["cpu_num"] = ",".join(cpu_used_cores)
        resource_dict["cpu"]["cpu_allowance_limit"] = int(cpu_used_shares)
        try:
            # TODO FIX this error should be further diagnosed, in case it affects other modules who use this call too
            set_container_resources(self.rescaler_http_session, container, resource_dict, self.debug)
            return True
        except (Exception, RuntimeError, ValueError, requests.HTTPError) as e:
            log_error("Error when setting container resources: {0}".format(str(e)), self.debug)
            return False

    def check_host_cpu_limits(self):
        errors_detected = False
        for host in self.host_info_cache.values():
            all_accounted_shares = 0
            map = host["resources"]["cpu"]["core_usage_mapping"]
            for core in map.values():
                for container in core:
                    if container != "free":
                        all_accounted_shares += core[container]
            if all_accounted_shares > host["resources"]["cpu"]["max"]:
                log_error("Host {0} has more mapped shares than its maximum".format(host["name"]), self.debug)
                errors_detected = True
        return errors_detected

    def check_host_has_enough_free_resources(self, host_info, needed_resources, resource):
        host_shares = host_info["resources"][resource]["free"]
        if host_shares == 0:
            raise ValueError("No resources available for resource {0} in host {1} ".format(resource, host_info["name"]))
        elif host_shares < needed_resources:
            missing_shares = needed_resources - host_shares
            # raise ValueError("Error in setting {0}, couldn't get the resources needed, missing {1} shares".format(resource, missing_shares))
            log_warning(
                "Beware, there are not enough free shares for resource {0} in the host, there are {1},  missing {2}".format(resource, host_shares, missing_shares),
                self.debug)

    def check_containers_cpu_limits(self, containers):
        errors_detected = False
        for container in containers:
            database_resources = container["resources"]

            if "max" not in database_resources["cpu"]:
                log_error("container {0} has not a maximum value set, check its configuration".format(container["name"]), self.debug)
                errors_detected = True
                continue

            max_cpu_limit = database_resources["cpu"]["max"]
            real_resources = self.container_info_cache[container["name"]]["resources"]
            try:
                current_cpu_limit = self.get_current_resource_value(real_resources, "cpu")
                if current_cpu_limit > max_cpu_limit:
                    log_error("container {0} has, somehow, more shares ({1}) than the maximum ({2}), check the max "
                              "parameter in its configuration".format(container["name"], current_cpu_limit, max_cpu_limit), self.debug)
                    errors_detected = True
            except ValueError as e:
                log_error("Current value of structure {0} is not valid: {1}".format(container["name"], str(e)), self.debug)
                errors_detected = True

        return errors_detected

    def check_container_cpu_mapping(self, container, host_info, cpu_used_cores, cpu_used_shares):
        host_max_cores = int(host_info["resources"]["cpu"]["max"] / 100)
        host_cpu_list = [str(i) for i in range(host_max_cores)]
        core_usage_map = host_info["resources"]["cpu"]["core_usage_mapping"]

        cpu_accounted_shares = 0
        cpu_accounted_cores = list()
        container_name = container["name"]
        for core in core_usage_map:
            if core not in host_cpu_list:
                continue
            if container_name in core_usage_map[core] and core_usage_map[core][container_name] != 0:
                cpu_accounted_shares += core_usage_map[core][container_name]
                cpu_accounted_cores.append(core)

        if sorted(cpu_used_cores) != sorted(cpu_accounted_cores) or cpu_used_shares != cpu_accounted_shares:
            return False, cpu_accounted_cores, cpu_accounted_shares
        else:
            return True, cpu_accounted_cores, cpu_accounted_shares

    def check_container_core_mapping(self, container, real_resources):
        errors_detected = False
        database_resources = container["resources"]

        if container["host"] not in self.host_info_cache:
            log_error("Host info '{0}' for container {1} is missing".format(container["host"], container["name"]), self.debug)
            return True
        elif "max" not in database_resources["cpu"]:
            # This error should have been previously detected
            return True
        else:
            try:
                current_cpu_limit = self.get_current_resource_value(real_resources, "cpu")
            except ValueError as e:
                log_error(e, self.debug)
                return True

        host_info = self.host_info_cache[container["host"]]
        max_cpu_limit = database_resources["cpu"]["max"]
        cpu_list = self.get_cpu_list(real_resources["cpu"]["cpu_num"])
        c_name = container["name"]

        map_host_valid, actual_used_cores, actual_used_shares = self.check_container_cpu_mapping(container, host_info, cpu_list, current_cpu_limit)

        if not map_host_valid:
            log_error(
                "Detected invalid core mapping for container {0}, has {1}-{2}, should be {3}-{4}".format(c_name, cpu_list, current_cpu_limit, actual_used_cores, actual_used_shares),
                self.debug)
            log_error("trying to automatically fix", self.debug)
            success = self.fix_container_cpu_mapping(container, actual_used_cores, actual_used_shares)
            if success:
                log_error("Succeeded fixing {0} container's core mapping".format(container["name"]), self.debug)
                errors_detected = True
            else:
                log_error("Failed in fixing {0} container's core mapping".format(container["name"]), self.debug)
                errors_detected = False
        return errors_detected

    def check_core_mapping(self, containers):
        errors_detected = False
        for container in containers:
            c_name = container["name"]
            log_info("Checking container {0}".format(c_name), self.debug)
            if c_name not in self.container_info_cache or "resources" not in self.container_info_cache[c_name]:
                log_error("Couldn't get container's {0} resources, can't check its sanity".format(c_name), self.debug)
                continue
            real_resources = self.container_info_cache[c_name]["resources"]
            errors = self.check_container_core_mapping(container, real_resources)
            errors_detected = errors_detected or errors
        return errors_detected

    def check_invalid_resource_value(self, database_resources, amount, current, resource):
        max_resource_limit = int(database_resources["resources"][resource]["max"])
        min_resource_limit = int(database_resources["resources"][resource]["min"])
        resource_limit = int(current + amount)

        if resource_limit < 0:
            raise ValueError("Error in setting {0}, it would be lower than 0".format(resource))
        elif resource_limit < min_resource_limit:
            raise ValueError("Error in setting {0}, new value {1} it would be lower than min {2}".format(resource, resource_limit, min_resource_limit))
        elif resource_limit > max_resource_limit:
            raise ValueError("Error in setting {0}, new value {1} it would be higher than max {2}".format(resource, resource_limit, max_resource_limit))

    ######################################################

    #### REQUEST MANAGEMENT ####
    def filter_requests(self, request_timeout):
        fresh_requests, purged_requests, final_requests = list(), list(), list()
        # Remote database operation
        all_requests = self.db_handler.get_requests()
        purged_counter = 0
        duplicated_counter = 0

        # First purge the old requests
        for request in all_requests:
            if request["timestamp"] < time.time() - request_timeout:
                purged_requests.append(request)
                purged_counter += 1
            else:
                fresh_requests.append(request)

        # Then remove repeated requests for the same structure if found
        structure_requests_dict = {}
        for request in fresh_requests:
            structure = request["structure"]  # The structure name (string), acting as an id
            action = request["action"]  # The action name (string)
            if structure not in structure_requests_dict:
                structure_requests_dict[structure] = {}

            if action not in structure_requests_dict[structure]:
                structure_requests_dict[structure][action] = request
            else:
                # A previous request was found for this structure, remove old one and leave the newer one
                stored_request = structure_requests_dict[structure][action]
                if stored_request["timestamp"] > request["timestamp"]:
                    # The stored request is newer, leave it and mark the retrieved one to be removed
                    purged_requests.append(request)
                else:
                    # The stored request is older, mark it to be remove and save the retrieved one
                    purged_requests.append(stored_request)
                    structure_requests_dict[structure][action] = request

                duplicated_counter += 1

        self.db_handler.delete_requests(purged_requests)

        for structure in structure_requests_dict:
            for action in structure_requests_dict[structure]:
                final_requests.append(structure_requests_dict[structure][action])

        log_info("Number of purged/duplicated requests was {0}/{1}".format(purged_counter, duplicated_counter), True)
        return final_requests

    def sort_requests(self, new_requests):
        container_reqs, app_reqs = list(), list()
        for r in new_requests:
            if r["structure_type"] == "container":
                container_reqs.append(r)
            elif r["structure_type"] == "application":
                app_reqs.append(r)
            else:
                pass
        return container_reqs, app_reqs

    ######################################################

    #### RESOURCE REQUEST MANAGEMENT ####
    def process_request(self, request, real_resources, database_resources):
        # Create a 'fake' container structure with only the required info
        container = {"host_rescaler_ip": request["host_rescaler_ip"],
                     "host_rescaler_port": request["host_rescaler_port"],
                     "name": request["structure"]}

        # Apply the request and get the new resources to set
        try:
            new_resources = self.apply_request(request, real_resources, database_resources)
            if new_resources:
                log_info("Request: {0} for container : {1} for new resources : {2}".format(
                    request["action"], request["structure"], json.dumps(new_resources)), self.debug)

                # Apply changes through a REST call
                set_container_resources(self.rescaler_http_session, container, new_resources, self.debug)
        except (ValueError) as e:
            log_error("Error with container {0} in applying the request -> {1}".format(request["structure"], str(e)), self.debug)
            return
        except (HTTPError) as e:
            log_error("Error setting container {0} resources -> {1}".format(request["structure"], str(e)), self.debug)
            return
        except (Exception) as e:
            log_error("Error with container {0} -> {1}".format(request["structure"], str(e)), self.debug)
            return

    def apply_request(self, request, real_resources, database_resources):

        amount = int(request["amount"])

        host_info = self.host_info_cache[request["host"]]
        resource = request["resource"]

        # Get the current resource limit, if unlimited, then max, min or mean
        current_resource_limit = self.get_current_resource_value(real_resources, resource)

        # Check that the resource limit is respected, not lower than min or higher than max
        self.check_invalid_resource_value(database_resources, amount, current_resource_limit, resource)

        if amount > 0:
            # If the request is for scale up, check that the host has enough free resources before proceeding
            self.check_host_has_enough_free_resources(host_info, amount, resource)

        fun = self.apply_request_by_resource[resource]
        result = fun(request, database_resources, real_resources, amount)

        return result

    def apply_cpu_request(self, request, database_resources, real_resources, amount):
        resource = request["resource"]
        structure_name = request["structure"]
        host_info = self.host_info_cache[request["host"]]

        core_usage_map = host_info["resources"][resource]["core_usage_mapping"]

        current_cpu_limit = self.get_current_resource_value(real_resources, resource)
        cpu_list = self.get_cpu_list(real_resources["cpu"]["cpu_num"])

        host_max_cores = int(host_info["resources"]["cpu"]["max"] / 100)
        host_cpu_list = [str(i) for i in range(host_max_cores)]
        for core in host_cpu_list:
            if core not in core_usage_map:
                core_usage_map[core] = dict()
                core_usage_map[core]["free"] = 100
            if structure_name not in core_usage_map[core]:
                core_usage_map[core][structure_name] = 0

        used_cores = list(cpu_list)  # copy

        if amount > 0:
            # Rescale up, so look for free shares to assign and maybe add cores
            needed_shares = amount

            # First fill the already used cores so that no additional cores are added unnecessarily
            for core in cpu_list:
                if core_usage_map[core]["free"] > 0:
                    if core_usage_map[core]["free"] > needed_shares:
                        core_usage_map[core]["free"] -= needed_shares
                        core_usage_map[core][structure_name] += needed_shares
                        needed_shares = 0
                        break
                    else:
                        core_usage_map[core][structure_name] += core_usage_map[core]["free"]
                        needed_shares -= core_usage_map[core]["free"]
                        core_usage_map[core]["free"] = 0

            # Next try to satisfy the request by looking and adding a single core
            if needed_shares > 0:
                for core in host_cpu_list:
                    if core_usage_map[core]["free"] >= needed_shares:
                        core_usage_map[core]["free"] -= needed_shares
                        core_usage_map[core][structure_name] += needed_shares
                        needed_shares = 0
                        used_cores.append(core)
                        break

            # Finally, if unsuccessful, add as many cores as necessary, starting with the ones with the largest free shares to avoid too much spread
            if needed_shares > 0:
                l = list()
                for core in host_cpu_list:
                    l.append((core, core_usage_map[core]["free"]))
                l.sort(key=lambda tup: tup[1], reverse=True)
                less_used_cores = [i[0] for i in l]

                for core in less_used_cores:
                    # If this core has free shares
                    if core_usage_map[core]["free"] > 0 and needed_shares > 0:
                        # If it has more free shares than needed, assign them and finish
                        if core_usage_map[core]["free"] >= needed_shares:
                            core_usage_map[core]["free"] -= needed_shares
                            core_usage_map[core][structure_name] += needed_shares
                            needed_shares = 0
                            used_cores.append(core)
                            break
                        else:
                            # Otherwise, assign as many as possible and continue
                            core_usage_map[core][structure_name] += core_usage_map[core]["free"]
                            needed_shares -= core_usage_map[core]["free"]
                            core_usage_map[core]["free"] = 0
                            used_cores.append(core)

            if needed_shares > 0:
                # raise ValueError("Error in setting cpu, couldn't get the resources needed, missing {0} shares".format(needed_shares))
                log_warning("Structure {0} couldn't get as much CPU shares as intended ({1}), "
                            "instead it got {2}".format(structure_name, amount, amount - needed_shares), self.debug)
                amount = amount - needed_shares
                # FIXME couldn't do rescale up properly as shares to get remain

        elif amount < 0:
            # Rescale down so free all shares and claim new one to see how many cores can be freed
            shares_to_free = abs(amount)

            # First try to find cores with less shares for this structure (less allocated) and remove them
            l = list()
            for core in cpu_list:
                l.append((core, core_usage_map[core][structure_name]))
            l.sort(key=lambda tup: tup[1], reverse=False)
            less_allocated_cores = [i[0] for i in l]

            for core in less_allocated_cores:
                # Equal or less allocated shares than amount to be freed, remove this core altogether and if shares remain to be freed, continue
                if core_usage_map[core][structure_name] <= shares_to_free:
                    core_usage_map[core]["free"] += core_usage_map[core][structure_name]
                    shares_to_free -= core_usage_map[core][structure_name]
                    core_usage_map[core][structure_name] = 0
                    used_cores.remove(core)
                    # In the event that the amount to be freed was equal to the allocated one, finish
                    if shares_to_free == 0:
                        break
                # More allocated shares than amount to be freed, reduce allocation and finish
                elif core_usage_map[core][structure_name] > shares_to_free:
                    core_usage_map[core]["free"] += shares_to_free
                    core_usage_map[core][structure_name] -= shares_to_free
                    shares_to_free = 0
                    break

            if shares_to_free > 0:
                raise ValueError("Error in setting cpu, couldn't free the resources properly")

        # No error thrown, so persist the new mapping to the cache
        self.host_info_cache[request["host"]]["resources"]["cpu"]["core_usage_mapping"] = core_usage_map
        self.host_info_cache[request["host"]]["resources"]["cpu"]["free"] -= amount

        resource_dict = {resource: {}}
        resource_dict["cpu"]["cpu_num"] = ",".join(used_cores)
        resource_dict["cpu"]["cpu_allowance_limit"] = int(current_cpu_limit + amount)

        return resource_dict

    def apply_mem_request(self, request, database_resources, real_resources, amount):
        resource_dict = {request["resource"]: {}}
        current_mem_limit = self.get_current_resource_value(real_resources, request["resource"])

        # No error thrown, so persist the new mapping to the cache
        self.host_info_cache[request["host"]]["resources"]["mem"]["free"] -= amount

        # Return the dictionary to set the resources
        resource_dict["mem"]["mem_limit"] = str(int(amount + current_mem_limit))

        return resource_dict

    def apply_disk_request(self, request, database_resources, real_resources, amount):
        resource_dict = {request["resource"]: {}}
        current_disk_limit = self.get_current_resource_value(real_resources, request["resource"])

        # Return the dictionary to set the resources
        resource_dict["disk"]["disk_read_limit"] = str(int(amount + current_disk_limit))
        resource_dict["disk"]["disk_write_limit"] = str(int(amount + current_disk_limit))

        return resource_dict

    def apply_net_request(self, request, database_resources, real_resources, amount):
        resource_dict = {request["resource"]: {}}
        current_net_limit = self.get_current_resource_value(real_resources, request["resource"])

        # Return the dictionary to set the resources
        resource_dict["net"]["net_limit"] = str(int(amount + current_net_limit))

        return resource_dict

    ######################################################

    ##### CONTAINER SCALING ######
    def rescale_container(self, request, structure):
        try:
            # Needed for the resources reported in the database (the 'max, min' values)
            database_resources = structure

            # Get the resources the container is using from its host NodeScaler (the 'current' value)
            c_name = structure["name"]
            if c_name not in self.container_info_cache or "resources" not in self.container_info_cache[c_name]:
                log_error("Couldn't get container's {0} resources, can't rescale".format(c_name), self.debug)
                return
            real_resources = self.container_info_cache[c_name]["resources"]

            # Process the request
            self.process_request(request, real_resources, database_resources)
        except Exception as e:
            log_error(str(e) + " " + str(traceback.format_exc()), self.debug)

    ######################################################

    ##### APPLICATION SCALING ######
    def sort_containers_by_usage_margin(self, container1, container2, resource):
        """
        Parameters:
            container1: dict -> A container structure
            container2: dict -> A container structure
            resource: str -> the resource to be used for sorting
        Returns:
            The tuple of the containers with the (lowest,highest) margin between resources used and resources set
        """
        c1_current_amount = container1["resources"][resource]["current"]
        c1_usage_amount = container1["resources"][resource]["usage"]
        c2_current_amount = container2["resources"][resource]["current"]
        c2_usage_amount = container2["resources"][resource]["usage"]
        if c1_current_amount - c1_usage_amount < c2_current_amount - c2_usage_amount:
            lowest, highest = container1, container2
        else:
            lowest, highest = container2, container1

        return lowest, highest

    def lowest_current_to_usage_margin(self, container1, container2, resource):
        # Return the container with the lowest margin between resources used and resources set (closest bottleneck)
        lowest, _ = self.sort_containers_by_usage_margin(container1, container2, resource)
        return lowest

    def highest_current_to_usage_margin(self, container1, container2, resource):
        # Return the container with the highest margin between resources used and resources set (lowest use)
        _, highest = self.sort_containers_by_usage_margin(container1, container2, resource)
        return highest

    def generate_requests(self, new_requests, app_label):
        rescaled_containers = list()
        total_amount = 0
        for req in new_requests:
            self.db_handler.add_request(req)
            rescaled_containers.append((req["structure"], req["amount"]))
            total_amount += req["amount"]
        log_info("App {0} rescaled {1} shares by rescaling containers: {2}".format(app_label, total_amount, str(rescaled_containers)), self.debug)

    def single_container_rescale(self, request, app_containers, resource_usage_cache):
        amount, resource_label = request["amount"], request["resource"]
        scalable_containers = list()
        resource_shares = abs(amount)

        # Look for containers that can be rescaled
        for container in app_containers:
            usages = resource_usage_cache[container["name"]]
            container["resources"][resource_label]["usage"] = usages[resource_label]
            current_value = container["resources"][resource_label]["current"]

            # Rescale down
            if amount < 0:
                # Check that the container has enough free resource shares
                # available to be released and that it would be able
                # to be rescaled without dropping under the minimum value
                if current_value < resource_shares:
                    # Container doesn't have enough resources to free
                    # ("Container doesn't have enough resources to free", self.debug)
                    pass
                elif current_value + amount < container["resources"][resource_label]["min"]:
                    # Container can't free that amount without dropping under the minimum
                    # log_error("Container {0} can't free that amount without dropping under the minimum".format(container["name"]), self.debug)
                    pass
                else:
                    scalable_containers.append(container)

            # Rescale up
            else:
                # Check that the container has enough free resource shares available in the host and that it would be able
                # to be rescaled without exceeded the maximum value
                container_host = container["host"]

                if self.host_info_cache[container_host]["resources"][resource_label]["free"] < resource_shares:
                    # Container's host doesn't have enough free resources
                    # log_error("Container's host doesn't have enough free resources", self.debug)
                    pass
                elif current_value + amount > container["resources"][resource_label]["max"]:
                    # Container can't get that amount without exceeding the maximum
                    # log_error("Container can't get that amount without exceeding the maximum", self.debug)
                    pass
                else:
                    scalable_containers.append(container)

        # Look for the best fit container for this resource and launch the rescaling request for it
        if scalable_containers:
            best_fit_container = scalable_containers[0]

            for container in scalable_containers[1:]:
                if amount < 0:
                    # If scaling down, look for containers with usages far from the limit (underuse)
                    best_fit_container = self.highest_current_to_usage_margin(container, best_fit_container, resource_label)
                else:
                    # If scaling up, look for containers with usages close to the limit (bottleneck)
                    best_fit_container = self.lowest_current_to_usage_margin(container, best_fit_container, resource_label)

            # Generate the new request
            new_request = Guardian.generate_request(best_fit_container, amount, resource_label)

            return True, best_fit_container, new_request
        else:
            return False, {}, {}

    def rescale_application(self, request, structure):

        # Get container names that this app uses
        app_containers_names = structure["containers"]
        app_containers = list()

        for cont_name in app_containers_names:
            # Get the container
            container = self.db_handler.get_structure(cont_name)
            app_containers.append(container)

            # Retrieve host info and cache it in case other containers or applications need it
            if container["host"] not in self.host_info_cache:
                self.host_info_cache[container["host"]] = self.db_handler.get_structure(container["host"])

        total_amount = request["amount"]

        requests = list()
        remaining_amount = total_amount
        split_amount = APP_SCALING_SPLIT_AMOUNT * (request["amount"] / abs(request["amount"]))  # This sets the sign
        request["amount"] = split_amount

        # Create smaller requests of 'split_amount' size
        while abs(remaining_amount) > 0 and abs(remaining_amount) > abs(split_amount):
            requests.append(dict(request))
            remaining_amount -= split_amount

        # If some remaining amount is left, create the last request
        if abs(remaining_amount) > 0:
            request["amount"] = remaining_amount
            requests.append(dict(request))

        # Get the request usage for all the containers and cache it
        resource_usage_cache = dict()
        for container in app_containers:
            amount, resource = request["amount"], request["resource"]
            metrics_to_retrieve = BDWATCHDOG_CONTAINER_METRICS[resource]
            resource_usage_cache[container["name"]] = self.bdwatchdog_handler.get_structure_timeseries(
                {"host": container["name"]}, 10, 20,
                metrics_to_retrieve, RESCALER_CONTAINER_METRICS)

        success, iterations = True, 0
        generated_requests = dict()

        while success and len(requests) > 0:
            request = requests.pop(0)
            success, container_to_rescale, generated_request = self.single_container_rescale(request, app_containers, resource_usage_cache)
            if success:
                # If rescaling was successful, update the container's resources as they have been rescaled
                for c in app_containers:
                    container_name = c["name"]
                    if container_name == container_to_rescale["name"]:
                        # Initialize
                        if container_name not in generated_requests:
                            generated_requests[container_name] = list()

                        generated_requests[container_name].append(generated_request)
                        container_to_rescale["resources"][request["resource"]]["current"] += request["amount"]
                        app_containers.remove(c)
                        app_containers.append(container_to_rescale)
                        break
            else:
                break

            iterations += 1

        # Collapse all the requests to generate just 1 per container
        final_requests = list()
        for container in generated_requests:
            # Copy the first request as the base request
            flat_request = dict(generated_requests[container][0])
            flat_request["amount"] = 0
            for request in generated_requests[container]:
                flat_request["amount"] += request["amount"]
            final_requests.append(flat_request)
        self.generate_requests(final_requests, structure["name"])

        if len(requests) > 0:
            # Couldn't completely rescale the application as some split of a major rescaling operation could not be completed
            log_warning("App {0} could not be completely rescaled, only: {1} shares of resource: {2} have been scaled".format(
                request["structure"], str(int(iterations * split_amount)), request["resource"]), self.debug)

    ######################################################

    ### SERVICE METHODS ####
    def invalid_conf(self, config):
        # TODO This code is duplicated on the structures and database snapshoters
        for key, num in [("POLLING_FREQUENCY", config.get_value("POLLING_FREQUENCY")), ("REQUEST_TIMEOUT", config.get_value("REQUEST_TIMEOUT"))]:
            if num < 5:
                return True, "Configuration item '{0}' with a value of '{1}' is likely invalid".format(key, num)
        return False, ""

    def get_cpu_list(self, cpu_num_string):
        # Translate something like '2-4,7' to [2,3,7]
        cpu_list = list()
        parts = cpu_num_string.split(",")
        for part in parts:
            ranges = part.split("-")
            if len(ranges) == 1:
                cpu_list.append(ranges[0])
            else:
                for n in range(int(ranges[0]), int(ranges[1]) + 1):
                    cpu_list.append(str(n))
        return cpu_list

    def get_current_resource_value(self, real_resources, resource):
        translation_dict = {"cpu": "cpu_allowance_limit", "mem": "mem_limit"}

        if resource not in translation_dict:
            raise ValueError("Resource '{0}' unknown".format(resource))
        else:
            resource_translated = translation_dict[resource]

        if resource not in real_resources:
            raise ValueError("Resource '{0}' info missing from host".format(resource))

        if resource_translated not in real_resources[resource]:
            raise ValueError("Current value for resource '{0}' missing from host resource info".format(resource))

        current_resource_limit = real_resources[resource][resource_translated]
        if current_resource_limit == -1:
            raise ValueError("Resource {0} has not a 'current' value set, that is, it is unlimited".format(resource))
        else:
            try:
                current_resource_limit = int(current_resource_limit)
            except ValueError:
                raise ValueError("Bad current {0} limit value".format(resource))
        return current_resource_limit

    def process_requests(self, reqs):
        for request in reqs:
            structure_name = request["structure"]

            # Retrieve structure info
            try:
                structure = self.db_handler.get_structure(structure_name)
            except (requests.exceptions.HTTPError, ValueError):
                log_error("Error, couldn't find structure {0} in database".format(structure_name), self.debug)
                continue

            # Rescale the structure accordingly, whether it is a container or an application
            if structure_is_container(structure):
                self.rescale_container(request, structure)
            elif structure_is_application(structure):
                self.rescale_application(request, structure)
            else:
                log_error("Unknown type of structure '{0}'".format(structure["subtype"]), self.debug)

            # Remove the request from the database
            self.db_handler.delete_request(request)

    def split_requests(self, all_requests):
        scale_down, scale_up = list(), list()
        for request in all_requests:
            if "action" not in request or not request["action"]:
                continue
            elif request["action"].endswith("Down"):
                scale_down.append(request)
            elif request["action"].endswith("Up"):
                scale_up.append(request)
        return scale_down, scale_up

    def fill_host_info_cache(self, containers):
        self.host_info_cache = dict()
        for container in containers:
            if container["host"] not in self.host_info_cache:
                self.host_info_cache[container["host"]] = self.db_handler.get_structure(container["host"])
        return

    def persist_new_host_information(self, ):
        def persist_thread(self, host):
            data = self.host_info_cache[host]
            update_structure(data, self.db_handler, self.debug)

        threads = list()
        for host in self.host_info_cache:
            t = Thread(target=persist_thread, args=(self, host,))
            t.start()
            threads.append(t)

        for t in threads:
            t.join()

    def scale_structures(self, new_requests):
        log_info("Processing requests", self.debug)

        t0 = time.time()

        # Split the requests between scale down and scale up
        scale_down, scale_up = self.split_requests(new_requests)

        # Process first the requests that free resources, then the one that use them
        self.process_requests(scale_down)
        self.process_requests(scale_up)

        # Persist the new host information
        self.persist_new_host_information()

        t1 = time.time()
        log_info("It took {0} seconds to process requests".format(str("%.2f" % (t1 - t0))), self.debug)

    ######################################################

    def scale(self, ):
        logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

        myConfig = MyConfig(CONFIG_DEFAULT_VALUES)

        # Remove previous requests
        log_info("Purging any previous requests", True)
        self.filter_requests(0)
        log_info("----------------------\n", True)

        while True:
            # Remote database operation
            service = get_service(self.db_handler, SERVICE_NAME)

            # Heartbeat
            beat(self.db_handler, SERVICE_NAME)

            # CONFIG
            myConfig.set_config(service["config"])
            polling_frequency = myConfig.get_value("POLLING_FREQUENCY")
            request_timeout = myConfig.get_value("REQUEST_TIMEOUT")
            self.debug = myConfig.get_value("self.debug")
            CHECK_CORE_MAP = myConfig.get_value("CHECK_CORE_MAP")
            SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

            log_info("----------------------", self.debug)
            log_info("Starting Epoch", self.debug)
            t0 = time.time()

            ## CHECK INVALID CONFIG ##
            # TODO This code is duplicated on the structures and database snapshoters
            invalid, message = self.invalid_conf(myConfig)
            if invalid:
                log_error(message, self.debug)
                time.sleep(polling_frequency)
                if polling_frequency < 5:
                    log_error("Polling frequency is too short, replacing with DEFAULT value '{0}'".format(CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]), self.debug)
                    polling_frequency = CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]

                log_info("----------------------\n", self.debug)
                time.sleep(polling_frequency)
                continue

            if SERVICE_IS_ACTIVATED:

                # Get the container structures and their resource information as such data is going to be needed
                containers = get_structures(self.db_handler, self.debug, subtype="container")
                try:
                    self.container_info_cache = get_container_resources_dict()  # Reset the cache
                except (Exception, RuntimeError) as e:
                    log_error("Error getting host document, skipping epoch altogether", self.debug)
                    log_error(str(e), self.debug)
                    time.sleep(polling_frequency)
                    continue

                # Fill the host information cache
                log_info("Getting host and container info", self.debug)
                try:
                    self.fill_host_info_cache(containers)
                except (Exception, RuntimeError) as e:
                    log_error("Error getting host document, skipping epoch altogether", self.debug)
                    log_error(str(e), self.debug)
                    time.sleep(polling_frequency)
                    continue

                # Do the core mapping check-up
                if CHECK_CORE_MAP:
                    log_info("Doing container CPU limits check", self.debug)
                    log_info("First hosts", self.debug)
                    errors_detected = self.check_host_cpu_limits()
                    if errors_detected:
                        log_error("Errors detected during host CPU limits check", self.debug)

                    log_info("Second containers", self.debug)
                    errors_detected = self.check_containers_cpu_limits(containers)
                    if errors_detected:
                        log_error("Errors detected during container CPU limits check", self.debug)

                    log_info("Doing core mapping check", self.debug)
                    errors_detected = self.check_core_mapping(containers)
                    if errors_detected:
                        log_error("Errors detected during container CPU map check", self.debug)
                else:
                    log_warning("Core map check has been disabled", self.debug)

                # Get the requests
                new_requests = self.filter_requests(request_timeout)
                container_reqs, app_reqs = self.sort_requests(new_requests)

                # Process first the application requests, as they generate container ones
                if app_reqs:
                    log_info("Processing applications requests", self.debug)
                    self.scale_structures(app_reqs)
                else:
                    log_info("No applications requests", self.debug)

                # Then process container ones
                if container_reqs:
                    log_info("Processing container requests", self.debug)
                    self.scale_structures(container_reqs)
                else:
                    log_info("No container requests", self.debug)

                t1 = time.time()
                log_info("Epoch processed in {0} seconds".format(str("%.2f" % (t1 - t0))), self.debug)

            else:
                log_warning("Scaler service is not activated", self.debug)

            log_info("----------------------\n", self.debug)
            time.sleep(polling_frequency)

Methods

def apply_cpu_request(self, request, database_resources, real_resources, amount)
Expand source code
def apply_cpu_request(self, request, database_resources, real_resources, amount):
    resource = request["resource"]
    structure_name = request["structure"]
    host_info = self.host_info_cache[request["host"]]

    core_usage_map = host_info["resources"][resource]["core_usage_mapping"]

    current_cpu_limit = self.get_current_resource_value(real_resources, resource)
    cpu_list = self.get_cpu_list(real_resources["cpu"]["cpu_num"])

    host_max_cores = int(host_info["resources"]["cpu"]["max"] / 100)
    host_cpu_list = [str(i) for i in range(host_max_cores)]
    for core in host_cpu_list:
        if core not in core_usage_map:
            core_usage_map[core] = dict()
            core_usage_map[core]["free"] = 100
        if structure_name not in core_usage_map[core]:
            core_usage_map[core][structure_name] = 0

    used_cores = list(cpu_list)  # copy

    if amount > 0:
        # Rescale up, so look for free shares to assign and maybe add cores
        needed_shares = amount

        # First fill the already used cores so that no additional cores are added unnecessarily
        for core in cpu_list:
            if core_usage_map[core]["free"] > 0:
                if core_usage_map[core]["free"] > needed_shares:
                    core_usage_map[core]["free"] -= needed_shares
                    core_usage_map[core][structure_name] += needed_shares
                    needed_shares = 0
                    break
                else:
                    core_usage_map[core][structure_name] += core_usage_map[core]["free"]
                    needed_shares -= core_usage_map[core]["free"]
                    core_usage_map[core]["free"] = 0

        # Next try to satisfy the request by looking and adding a single core
        if needed_shares > 0:
            for core in host_cpu_list:
                if core_usage_map[core]["free"] >= needed_shares:
                    core_usage_map[core]["free"] -= needed_shares
                    core_usage_map[core][structure_name] += needed_shares
                    needed_shares = 0
                    used_cores.append(core)
                    break

        # Finally, if unsuccessful, add as many cores as necessary, starting with the ones with the largest free shares to avoid too much spread
        if needed_shares > 0:
            l = list()
            for core in host_cpu_list:
                l.append((core, core_usage_map[core]["free"]))
            l.sort(key=lambda tup: tup[1], reverse=True)
            less_used_cores = [i[0] for i in l]

            for core in less_used_cores:
                # If this core has free shares
                if core_usage_map[core]["free"] > 0 and needed_shares > 0:
                    # If it has more free shares than needed, assign them and finish
                    if core_usage_map[core]["free"] >= needed_shares:
                        core_usage_map[core]["free"] -= needed_shares
                        core_usage_map[core][structure_name] += needed_shares
                        needed_shares = 0
                        used_cores.append(core)
                        break
                    else:
                        # Otherwise, assign as many as possible and continue
                        core_usage_map[core][structure_name] += core_usage_map[core]["free"]
                        needed_shares -= core_usage_map[core]["free"]
                        core_usage_map[core]["free"] = 0
                        used_cores.append(core)

        if needed_shares > 0:
            # raise ValueError("Error in setting cpu, couldn't get the resources needed, missing {0} shares".format(needed_shares))
            log_warning("Structure {0} couldn't get as much CPU shares as intended ({1}), "
                        "instead it got {2}".format(structure_name, amount, amount - needed_shares), self.debug)
            amount = amount - needed_shares
            # FIXME couldn't do rescale up properly as shares to get remain

    elif amount < 0:
        # Rescale down so free all shares and claim new one to see how many cores can be freed
        shares_to_free = abs(amount)

        # First try to find cores with less shares for this structure (less allocated) and remove them
        l = list()
        for core in cpu_list:
            l.append((core, core_usage_map[core][structure_name]))
        l.sort(key=lambda tup: tup[1], reverse=False)
        less_allocated_cores = [i[0] for i in l]

        for core in less_allocated_cores:
            # Equal or less allocated shares than amount to be freed, remove this core altogether and if shares remain to be freed, continue
            if core_usage_map[core][structure_name] <= shares_to_free:
                core_usage_map[core]["free"] += core_usage_map[core][structure_name]
                shares_to_free -= core_usage_map[core][structure_name]
                core_usage_map[core][structure_name] = 0
                used_cores.remove(core)
                # In the event that the amount to be freed was equal to the allocated one, finish
                if shares_to_free == 0:
                    break
            # More allocated shares than amount to be freed, reduce allocation and finish
            elif core_usage_map[core][structure_name] > shares_to_free:
                core_usage_map[core]["free"] += shares_to_free
                core_usage_map[core][structure_name] -= shares_to_free
                shares_to_free = 0
                break

        if shares_to_free > 0:
            raise ValueError("Error in setting cpu, couldn't free the resources properly")

    # No error thrown, so persist the new mapping to the cache
    self.host_info_cache[request["host"]]["resources"]["cpu"]["core_usage_mapping"] = core_usage_map
    self.host_info_cache[request["host"]]["resources"]["cpu"]["free"] -= amount

    resource_dict = {resource: {}}
    resource_dict["cpu"]["cpu_num"] = ",".join(used_cores)
    resource_dict["cpu"]["cpu_allowance_limit"] = int(current_cpu_limit + amount)

    return resource_dict
def apply_disk_request(self, request, database_resources, real_resources, amount)
Expand source code
def apply_disk_request(self, request, database_resources, real_resources, amount):
    resource_dict = {request["resource"]: {}}
    current_disk_limit = self.get_current_resource_value(real_resources, request["resource"])

    # Return the dictionary to set the resources
    resource_dict["disk"]["disk_read_limit"] = str(int(amount + current_disk_limit))
    resource_dict["disk"]["disk_write_limit"] = str(int(amount + current_disk_limit))

    return resource_dict
def apply_mem_request(self, request, database_resources, real_resources, amount)
Expand source code
def apply_mem_request(self, request, database_resources, real_resources, amount):
    resource_dict = {request["resource"]: {}}
    current_mem_limit = self.get_current_resource_value(real_resources, request["resource"])

    # No error thrown, so persist the new mapping to the cache
    self.host_info_cache[request["host"]]["resources"]["mem"]["free"] -= amount

    # Return the dictionary to set the resources
    resource_dict["mem"]["mem_limit"] = str(int(amount + current_mem_limit))

    return resource_dict
def apply_net_request(self, request, database_resources, real_resources, amount)
Expand source code
def apply_net_request(self, request, database_resources, real_resources, amount):
    resource_dict = {request["resource"]: {}}
    current_net_limit = self.get_current_resource_value(real_resources, request["resource"])

    # Return the dictionary to set the resources
    resource_dict["net"]["net_limit"] = str(int(amount + current_net_limit))

    return resource_dict
def apply_request(self, request, real_resources, database_resources)
Expand source code
def apply_request(self, request, real_resources, database_resources):

    amount = int(request["amount"])

    host_info = self.host_info_cache[request["host"]]
    resource = request["resource"]

    # Get the current resource limit, if unlimited, then max, min or mean
    current_resource_limit = self.get_current_resource_value(real_resources, resource)

    # Check that the resource limit is respected, not lower than min or higher than max
    self.check_invalid_resource_value(database_resources, amount, current_resource_limit, resource)

    if amount > 0:
        # If the request is for scale up, check that the host has enough free resources before proceeding
        self.check_host_has_enough_free_resources(host_info, amount, resource)

    fun = self.apply_request_by_resource[resource]
    result = fun(request, database_resources, real_resources, amount)

    return result
def check_container_core_mapping(self, container, real_resources)
Expand source code
def check_container_core_mapping(self, container, real_resources):
    errors_detected = False
    database_resources = container["resources"]

    if container["host"] not in self.host_info_cache:
        log_error("Host info '{0}' for container {1} is missing".format(container["host"], container["name"]), self.debug)
        return True
    elif "max" not in database_resources["cpu"]:
        # This error should have been previously detected
        return True
    else:
        try:
            current_cpu_limit = self.get_current_resource_value(real_resources, "cpu")
        except ValueError as e:
            log_error(e, self.debug)
            return True

    host_info = self.host_info_cache[container["host"]]
    max_cpu_limit = database_resources["cpu"]["max"]
    cpu_list = self.get_cpu_list(real_resources["cpu"]["cpu_num"])
    c_name = container["name"]

    map_host_valid, actual_used_cores, actual_used_shares = self.check_container_cpu_mapping(container, host_info, cpu_list, current_cpu_limit)

    if not map_host_valid:
        log_error(
            "Detected invalid core mapping for container {0}, has {1}-{2}, should be {3}-{4}".format(c_name, cpu_list, current_cpu_limit, actual_used_cores, actual_used_shares),
            self.debug)
        log_error("trying to automatically fix", self.debug)
        success = self.fix_container_cpu_mapping(container, actual_used_cores, actual_used_shares)
        if success:
            log_error("Succeeded fixing {0} container's core mapping".format(container["name"]), self.debug)
            errors_detected = True
        else:
            log_error("Failed in fixing {0} container's core mapping".format(container["name"]), self.debug)
            errors_detected = False
    return errors_detected
def check_container_cpu_mapping(self, container, host_info, cpu_used_cores, cpu_used_shares)
Expand source code
def check_container_cpu_mapping(self, container, host_info, cpu_used_cores, cpu_used_shares):
    host_max_cores = int(host_info["resources"]["cpu"]["max"] / 100)
    host_cpu_list = [str(i) for i in range(host_max_cores)]
    core_usage_map = host_info["resources"]["cpu"]["core_usage_mapping"]

    cpu_accounted_shares = 0
    cpu_accounted_cores = list()
    container_name = container["name"]
    for core in core_usage_map:
        if core not in host_cpu_list:
            continue
        if container_name in core_usage_map[core] and core_usage_map[core][container_name] != 0:
            cpu_accounted_shares += core_usage_map[core][container_name]
            cpu_accounted_cores.append(core)

    if sorted(cpu_used_cores) != sorted(cpu_accounted_cores) or cpu_used_shares != cpu_accounted_shares:
        return False, cpu_accounted_cores, cpu_accounted_shares
    else:
        return True, cpu_accounted_cores, cpu_accounted_shares
def check_containers_cpu_limits(self, containers)
Expand source code
def check_containers_cpu_limits(self, containers):
    errors_detected = False
    for container in containers:
        database_resources = container["resources"]

        if "max" not in database_resources["cpu"]:
            log_error("container {0} has not a maximum value set, check its configuration".format(container["name"]), self.debug)
            errors_detected = True
            continue

        max_cpu_limit = database_resources["cpu"]["max"]
        real_resources = self.container_info_cache[container["name"]]["resources"]
        try:
            current_cpu_limit = self.get_current_resource_value(real_resources, "cpu")
            if current_cpu_limit > max_cpu_limit:
                log_error("container {0} has, somehow, more shares ({1}) than the maximum ({2}), check the max "
                          "parameter in its configuration".format(container["name"], current_cpu_limit, max_cpu_limit), self.debug)
                errors_detected = True
        except ValueError as e:
            log_error("Current value of structure {0} is not valid: {1}".format(container["name"], str(e)), self.debug)
            errors_detected = True

    return errors_detected
def check_core_mapping(self, containers)
Expand source code
def check_core_mapping(self, containers):
    errors_detected = False
    for container in containers:
        c_name = container["name"]
        log_info("Checking container {0}".format(c_name), self.debug)
        if c_name not in self.container_info_cache or "resources" not in self.container_info_cache[c_name]:
            log_error("Couldn't get container's {0} resources, can't check its sanity".format(c_name), self.debug)
            continue
        real_resources = self.container_info_cache[c_name]["resources"]
        errors = self.check_container_core_mapping(container, real_resources)
        errors_detected = errors_detected or errors
    return errors_detected
def check_host_cpu_limits(self)
Expand source code
def check_host_cpu_limits(self):
    errors_detected = False
    for host in self.host_info_cache.values():
        all_accounted_shares = 0
        map = host["resources"]["cpu"]["core_usage_mapping"]
        for core in map.values():
            for container in core:
                if container != "free":
                    all_accounted_shares += core[container]
        if all_accounted_shares > host["resources"]["cpu"]["max"]:
            log_error("Host {0} has more mapped shares than its maximum".format(host["name"]), self.debug)
            errors_detected = True
    return errors_detected
def check_host_has_enough_free_resources(self, host_info, needed_resources, resource)
Expand source code
def check_host_has_enough_free_resources(self, host_info, needed_resources, resource):
    host_shares = host_info["resources"][resource]["free"]
    if host_shares == 0:
        raise ValueError("No resources available for resource {0} in host {1} ".format(resource, host_info["name"]))
    elif host_shares < needed_resources:
        missing_shares = needed_resources - host_shares
        # raise ValueError("Error in setting {0}, couldn't get the resources needed, missing {1} shares".format(resource, missing_shares))
        log_warning(
            "Beware, there are not enough free shares for resource {0} in the host, there are {1},  missing {2}".format(resource, host_shares, missing_shares),
            self.debug)
def check_invalid_resource_value(self, database_resources, amount, current, resource)
Expand source code
def check_invalid_resource_value(self, database_resources, amount, current, resource):
    max_resource_limit = int(database_resources["resources"][resource]["max"])
    min_resource_limit = int(database_resources["resources"][resource]["min"])
    resource_limit = int(current + amount)

    if resource_limit < 0:
        raise ValueError("Error in setting {0}, it would be lower than 0".format(resource))
    elif resource_limit < min_resource_limit:
        raise ValueError("Error in setting {0}, new value {1} it would be lower than min {2}".format(resource, resource_limit, min_resource_limit))
    elif resource_limit > max_resource_limit:
        raise ValueError("Error in setting {0}, new value {1} it would be higher than max {2}".format(resource, resource_limit, max_resource_limit))
def fill_host_info_cache(self, containers)
Expand source code
def fill_host_info_cache(self, containers):
    self.host_info_cache = dict()
    for container in containers:
        if container["host"] not in self.host_info_cache:
            self.host_info_cache[container["host"]] = self.db_handler.get_structure(container["host"])
    return
def filter_requests(self, request_timeout)
Expand source code
def filter_requests(self, request_timeout):
    fresh_requests, purged_requests, final_requests = list(), list(), list()
    # Remote database operation
    all_requests = self.db_handler.get_requests()
    purged_counter = 0
    duplicated_counter = 0

    # First purge the old requests
    for request in all_requests:
        if request["timestamp"] < time.time() - request_timeout:
            purged_requests.append(request)
            purged_counter += 1
        else:
            fresh_requests.append(request)

    # Then remove repeated requests for the same structure if found
    structure_requests_dict = {}
    for request in fresh_requests:
        structure = request["structure"]  # The structure name (string), acting as an id
        action = request["action"]  # The action name (string)
        if structure not in structure_requests_dict:
            structure_requests_dict[structure] = {}

        if action not in structure_requests_dict[structure]:
            structure_requests_dict[structure][action] = request
        else:
            # A previous request was found for this structure, remove old one and leave the newer one
            stored_request = structure_requests_dict[structure][action]
            if stored_request["timestamp"] > request["timestamp"]:
                # The stored request is newer, leave it and mark the retrieved one to be removed
                purged_requests.append(request)
            else:
                # The stored request is older, mark it to be remove and save the retrieved one
                purged_requests.append(stored_request)
                structure_requests_dict[structure][action] = request

            duplicated_counter += 1

    self.db_handler.delete_requests(purged_requests)

    for structure in structure_requests_dict:
        for action in structure_requests_dict[structure]:
            final_requests.append(structure_requests_dict[structure][action])

    log_info("Number of purged/duplicated requests was {0}/{1}".format(purged_counter, duplicated_counter), True)
    return final_requests
def fix_container_cpu_mapping(self, container, cpu_used_cores, cpu_used_shares)
Expand source code
def fix_container_cpu_mapping(self, container, cpu_used_cores, cpu_used_shares):

    resource_dict = {"cpu": {}}
    resource_dict["cpu"]["cpu_num"] = ",".join(cpu_used_cores)
    resource_dict["cpu"]["cpu_allowance_limit"] = int(cpu_used_shares)
    try:
        # TODO FIX this error should be further diagnosed, in case it affects other modules who use this call too
        set_container_resources(self.rescaler_http_session, container, resource_dict, self.debug)
        return True
    except (Exception, RuntimeError, ValueError, requests.HTTPError) as e:
        log_error("Error when setting container resources: {0}".format(str(e)), self.debug)
        return False
def generate_requests(self, new_requests, app_label)
Expand source code
def generate_requests(self, new_requests, app_label):
    rescaled_containers = list()
    total_amount = 0
    for req in new_requests:
        self.db_handler.add_request(req)
        rescaled_containers.append((req["structure"], req["amount"]))
        total_amount += req["amount"]
    log_info("App {0} rescaled {1} shares by rescaling containers: {2}".format(app_label, total_amount, str(rescaled_containers)), self.debug)
def get_cpu_list(self, cpu_num_string)
Expand source code
def get_cpu_list(self, cpu_num_string):
    # Translate something like '2-4,7' to [2,3,7]
    cpu_list = list()
    parts = cpu_num_string.split(",")
    for part in parts:
        ranges = part.split("-")
        if len(ranges) == 1:
            cpu_list.append(ranges[0])
        else:
            for n in range(int(ranges[0]), int(ranges[1]) + 1):
                cpu_list.append(str(n))
    return cpu_list
def get_current_resource_value(self, real_resources, resource)
Expand source code
def get_current_resource_value(self, real_resources, resource):
    translation_dict = {"cpu": "cpu_allowance_limit", "mem": "mem_limit"}

    if resource not in translation_dict:
        raise ValueError("Resource '{0}' unknown".format(resource))
    else:
        resource_translated = translation_dict[resource]

    if resource not in real_resources:
        raise ValueError("Resource '{0}' info missing from host".format(resource))

    if resource_translated not in real_resources[resource]:
        raise ValueError("Current value for resource '{0}' missing from host resource info".format(resource))

    current_resource_limit = real_resources[resource][resource_translated]
    if current_resource_limit == -1:
        raise ValueError("Resource {0} has not a 'current' value set, that is, it is unlimited".format(resource))
    else:
        try:
            current_resource_limit = int(current_resource_limit)
        except ValueError:
            raise ValueError("Bad current {0} limit value".format(resource))
    return current_resource_limit
def highest_current_to_usage_margin(self, container1, container2, resource)
Expand source code
def highest_current_to_usage_margin(self, container1, container2, resource):
    # Return the container with the highest margin between resources used and resources set (lowest use)
    _, highest = self.sort_containers_by_usage_margin(container1, container2, resource)
    return highest
def invalid_conf(self, config)
Expand source code
def invalid_conf(self, config):
    # TODO This code is duplicated on the structures and database snapshoters
    for key, num in [("POLLING_FREQUENCY", config.get_value("POLLING_FREQUENCY")), ("REQUEST_TIMEOUT", config.get_value("REQUEST_TIMEOUT"))]:
        if num < 5:
            return True, "Configuration item '{0}' with a value of '{1}' is likely invalid".format(key, num)
    return False, ""
def lowest_current_to_usage_margin(self, container1, container2, resource)
Expand source code
def lowest_current_to_usage_margin(self, container1, container2, resource):
    # Return the container with the lowest margin between resources used and resources set (closest bottleneck)
    lowest, _ = self.sort_containers_by_usage_margin(container1, container2, resource)
    return lowest
def persist_new_host_information(self)
Expand source code
def persist_new_host_information(self, ):
    def persist_thread(self, host):
        data = self.host_info_cache[host]
        update_structure(data, self.db_handler, self.debug)

    threads = list()
    for host in self.host_info_cache:
        t = Thread(target=persist_thread, args=(self, host,))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()
def process_request(self, request, real_resources, database_resources)
Expand source code
def process_request(self, request, real_resources, database_resources):
    # Create a 'fake' container structure with only the required info
    container = {"host_rescaler_ip": request["host_rescaler_ip"],
                 "host_rescaler_port": request["host_rescaler_port"],
                 "name": request["structure"]}

    # Apply the request and get the new resources to set
    try:
        new_resources = self.apply_request(request, real_resources, database_resources)
        if new_resources:
            log_info("Request: {0} for container : {1} for new resources : {2}".format(
                request["action"], request["structure"], json.dumps(new_resources)), self.debug)

            # Apply changes through a REST call
            set_container_resources(self.rescaler_http_session, container, new_resources, self.debug)
    except (ValueError) as e:
        log_error("Error with container {0} in applying the request -> {1}".format(request["structure"], str(e)), self.debug)
        return
    except (HTTPError) as e:
        log_error("Error setting container {0} resources -> {1}".format(request["structure"], str(e)), self.debug)
        return
    except (Exception) as e:
        log_error("Error with container {0} -> {1}".format(request["structure"], str(e)), self.debug)
        return
def process_requests(self, reqs)
Expand source code
def process_requests(self, reqs):
    for request in reqs:
        structure_name = request["structure"]

        # Retrieve structure info
        try:
            structure = self.db_handler.get_structure(structure_name)
        except (requests.exceptions.HTTPError, ValueError):
            log_error("Error, couldn't find structure {0} in database".format(structure_name), self.debug)
            continue

        # Rescale the structure accordingly, whether it is a container or an application
        if structure_is_container(structure):
            self.rescale_container(request, structure)
        elif structure_is_application(structure):
            self.rescale_application(request, structure)
        else:
            log_error("Unknown type of structure '{0}'".format(structure["subtype"]), self.debug)

        # Remove the request from the database
        self.db_handler.delete_request(request)
def rescale_application(self, request, structure)
Expand source code
def rescale_application(self, request, structure):

    # Get container names that this app uses
    app_containers_names = structure["containers"]
    app_containers = list()

    for cont_name in app_containers_names:
        # Get the container
        container = self.db_handler.get_structure(cont_name)
        app_containers.append(container)

        # Retrieve host info and cache it in case other containers or applications need it
        if container["host"] not in self.host_info_cache:
            self.host_info_cache[container["host"]] = self.db_handler.get_structure(container["host"])

    total_amount = request["amount"]

    requests = list()
    remaining_amount = total_amount
    split_amount = APP_SCALING_SPLIT_AMOUNT * (request["amount"] / abs(request["amount"]))  # This sets the sign
    request["amount"] = split_amount

    # Create smaller requests of 'split_amount' size
    while abs(remaining_amount) > 0 and abs(remaining_amount) > abs(split_amount):
        requests.append(dict(request))
        remaining_amount -= split_amount

    # If some remaining amount is left, create the last request
    if abs(remaining_amount) > 0:
        request["amount"] = remaining_amount
        requests.append(dict(request))

    # Get the request usage for all the containers and cache it
    resource_usage_cache = dict()
    for container in app_containers:
        amount, resource = request["amount"], request["resource"]
        metrics_to_retrieve = BDWATCHDOG_CONTAINER_METRICS[resource]
        resource_usage_cache[container["name"]] = self.bdwatchdog_handler.get_structure_timeseries(
            {"host": container["name"]}, 10, 20,
            metrics_to_retrieve, RESCALER_CONTAINER_METRICS)

    success, iterations = True, 0
    generated_requests = dict()

    while success and len(requests) > 0:
        request = requests.pop(0)
        success, container_to_rescale, generated_request = self.single_container_rescale(request, app_containers, resource_usage_cache)
        if success:
            # If rescaling was successful, update the container's resources as they have been rescaled
            for c in app_containers:
                container_name = c["name"]
                if container_name == container_to_rescale["name"]:
                    # Initialize
                    if container_name not in generated_requests:
                        generated_requests[container_name] = list()

                    generated_requests[container_name].append(generated_request)
                    container_to_rescale["resources"][request["resource"]]["current"] += request["amount"]
                    app_containers.remove(c)
                    app_containers.append(container_to_rescale)
                    break
        else:
            break

        iterations += 1

    # Collapse all the requests to generate just 1 per container
    final_requests = list()
    for container in generated_requests:
        # Copy the first request as the base request
        flat_request = dict(generated_requests[container][0])
        flat_request["amount"] = 0
        for request in generated_requests[container]:
            flat_request["amount"] += request["amount"]
        final_requests.append(flat_request)
    self.generate_requests(final_requests, structure["name"])

    if len(requests) > 0:
        # Couldn't completely rescale the application as some split of a major rescaling operation could not be completed
        log_warning("App {0} could not be completely rescaled, only: {1} shares of resource: {2} have been scaled".format(
            request["structure"], str(int(iterations * split_amount)), request["resource"]), self.debug)
def rescale_container(self, request, structure)
Expand source code
def rescale_container(self, request, structure):
    try:
        # Needed for the resources reported in the database (the 'max, min' values)
        database_resources = structure

        # Get the resources the container is using from its host NodeScaler (the 'current' value)
        c_name = structure["name"]
        if c_name not in self.container_info_cache or "resources" not in self.container_info_cache[c_name]:
            log_error("Couldn't get container's {0} resources, can't rescale".format(c_name), self.debug)
            return
        real_resources = self.container_info_cache[c_name]["resources"]

        # Process the request
        self.process_request(request, real_resources, database_resources)
    except Exception as e:
        log_error(str(e) + " " + str(traceback.format_exc()), self.debug)
def scale(self)
Expand source code
def scale(self, ):
    logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

    myConfig = MyConfig(CONFIG_DEFAULT_VALUES)

    # Remove previous requests
    log_info("Purging any previous requests", True)
    self.filter_requests(0)
    log_info("----------------------\n", True)

    while True:
        # Remote database operation
        service = get_service(self.db_handler, SERVICE_NAME)

        # Heartbeat
        beat(self.db_handler, SERVICE_NAME)

        # CONFIG
        myConfig.set_config(service["config"])
        polling_frequency = myConfig.get_value("POLLING_FREQUENCY")
        request_timeout = myConfig.get_value("REQUEST_TIMEOUT")
        self.debug = myConfig.get_value("self.debug")
        CHECK_CORE_MAP = myConfig.get_value("CHECK_CORE_MAP")
        SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

        log_info("----------------------", self.debug)
        log_info("Starting Epoch", self.debug)
        t0 = time.time()

        ## CHECK INVALID CONFIG ##
        # TODO This code is duplicated on the structures and database snapshoters
        invalid, message = self.invalid_conf(myConfig)
        if invalid:
            log_error(message, self.debug)
            time.sleep(polling_frequency)
            if polling_frequency < 5:
                log_error("Polling frequency is too short, replacing with DEFAULT value '{0}'".format(CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]), self.debug)
                polling_frequency = CONFIG_DEFAULT_VALUES["POLLING_FREQUENCY"]

            log_info("----------------------\n", self.debug)
            time.sleep(polling_frequency)
            continue

        if SERVICE_IS_ACTIVATED:

            # Get the container structures and their resource information as such data is going to be needed
            containers = get_structures(self.db_handler, self.debug, subtype="container")
            try:
                self.container_info_cache = get_container_resources_dict()  # Reset the cache
            except (Exception, RuntimeError) as e:
                log_error("Error getting host document, skipping epoch altogether", self.debug)
                log_error(str(e), self.debug)
                time.sleep(polling_frequency)
                continue

            # Fill the host information cache
            log_info("Getting host and container info", self.debug)
            try:
                self.fill_host_info_cache(containers)
            except (Exception, RuntimeError) as e:
                log_error("Error getting host document, skipping epoch altogether", self.debug)
                log_error(str(e), self.debug)
                time.sleep(polling_frequency)
                continue

            # Do the core mapping check-up
            if CHECK_CORE_MAP:
                log_info("Doing container CPU limits check", self.debug)
                log_info("First hosts", self.debug)
                errors_detected = self.check_host_cpu_limits()
                if errors_detected:
                    log_error("Errors detected during host CPU limits check", self.debug)

                log_info("Second containers", self.debug)
                errors_detected = self.check_containers_cpu_limits(containers)
                if errors_detected:
                    log_error("Errors detected during container CPU limits check", self.debug)

                log_info("Doing core mapping check", self.debug)
                errors_detected = self.check_core_mapping(containers)
                if errors_detected:
                    log_error("Errors detected during container CPU map check", self.debug)
            else:
                log_warning("Core map check has been disabled", self.debug)

            # Get the requests
            new_requests = self.filter_requests(request_timeout)
            container_reqs, app_reqs = self.sort_requests(new_requests)

            # Process first the application requests, as they generate container ones
            if app_reqs:
                log_info("Processing applications requests", self.debug)
                self.scale_structures(app_reqs)
            else:
                log_info("No applications requests", self.debug)

            # Then process container ones
            if container_reqs:
                log_info("Processing container requests", self.debug)
                self.scale_structures(container_reqs)
            else:
                log_info("No container requests", self.debug)

            t1 = time.time()
            log_info("Epoch processed in {0} seconds".format(str("%.2f" % (t1 - t0))), self.debug)

        else:
            log_warning("Scaler service is not activated", self.debug)

        log_info("----------------------\n", self.debug)
        time.sleep(polling_frequency)
def scale_structures(self, new_requests)
Expand source code
def scale_structures(self, new_requests):
    log_info("Processing requests", self.debug)

    t0 = time.time()

    # Split the requests between scale down and scale up
    scale_down, scale_up = self.split_requests(new_requests)

    # Process first the requests that free resources, then the one that use them
    self.process_requests(scale_down)
    self.process_requests(scale_up)

    # Persist the new host information
    self.persist_new_host_information()

    t1 = time.time()
    log_info("It took {0} seconds to process requests".format(str("%.2f" % (t1 - t0))), self.debug)
def single_container_rescale(self, request, app_containers, resource_usage_cache)
Expand source code
def single_container_rescale(self, request, app_containers, resource_usage_cache):
    amount, resource_label = request["amount"], request["resource"]
    scalable_containers = list()
    resource_shares = abs(amount)

    # Look for containers that can be rescaled
    for container in app_containers:
        usages = resource_usage_cache[container["name"]]
        container["resources"][resource_label]["usage"] = usages[resource_label]
        current_value = container["resources"][resource_label]["current"]

        # Rescale down
        if amount < 0:
            # Check that the container has enough free resource shares
            # available to be released and that it would be able
            # to be rescaled without dropping under the minimum value
            if current_value < resource_shares:
                # Container doesn't have enough resources to free
                # ("Container doesn't have enough resources to free", self.debug)
                pass
            elif current_value + amount < container["resources"][resource_label]["min"]:
                # Container can't free that amount without dropping under the minimum
                # log_error("Container {0} can't free that amount without dropping under the minimum".format(container["name"]), self.debug)
                pass
            else:
                scalable_containers.append(container)

        # Rescale up
        else:
            # Check that the container has enough free resource shares available in the host and that it would be able
            # to be rescaled without exceeded the maximum value
            container_host = container["host"]

            if self.host_info_cache[container_host]["resources"][resource_label]["free"] < resource_shares:
                # Container's host doesn't have enough free resources
                # log_error("Container's host doesn't have enough free resources", self.debug)
                pass
            elif current_value + amount > container["resources"][resource_label]["max"]:
                # Container can't get that amount without exceeding the maximum
                # log_error("Container can't get that amount without exceeding the maximum", self.debug)
                pass
            else:
                scalable_containers.append(container)

    # Look for the best fit container for this resource and launch the rescaling request for it
    if scalable_containers:
        best_fit_container = scalable_containers[0]

        for container in scalable_containers[1:]:
            if amount < 0:
                # If scaling down, look for containers with usages far from the limit (underuse)
                best_fit_container = self.highest_current_to_usage_margin(container, best_fit_container, resource_label)
            else:
                # If scaling up, look for containers with usages close to the limit (bottleneck)
                best_fit_container = self.lowest_current_to_usage_margin(container, best_fit_container, resource_label)

        # Generate the new request
        new_request = Guardian.generate_request(best_fit_container, amount, resource_label)

        return True, best_fit_container, new_request
    else:
        return False, {}, {}
def sort_containers_by_usage_margin(self, container1, container2, resource)

Parameters

container1: dict -> A container structure container2: dict -> A container structure resource: str -> the resource to be used for sorting

Returns

The tuple of the containers with the (lowest,highest) margin between resources used and resources set

Expand source code
def sort_containers_by_usage_margin(self, container1, container2, resource):
    """
    Parameters:
        container1: dict -> A container structure
        container2: dict -> A container structure
        resource: str -> the resource to be used for sorting
    Returns:
        The tuple of the containers with the (lowest,highest) margin between resources used and resources set
    """
    c1_current_amount = container1["resources"][resource]["current"]
    c1_usage_amount = container1["resources"][resource]["usage"]
    c2_current_amount = container2["resources"][resource]["current"]
    c2_usage_amount = container2["resources"][resource]["usage"]
    if c1_current_amount - c1_usage_amount < c2_current_amount - c2_usage_amount:
        lowest, highest = container1, container2
    else:
        lowest, highest = container2, container1

    return lowest, highest
def sort_requests(self, new_requests)
Expand source code
def sort_requests(self, new_requests):
    container_reqs, app_reqs = list(), list()
    for r in new_requests:
        if r["structure_type"] == "container":
            container_reqs.append(r)
        elif r["structure_type"] == "application":
            app_reqs.append(r)
        else:
            pass
    return container_reqs, app_reqs
def split_requests(self, all_requests)
Expand source code
def split_requests(self, all_requests):
    scale_down, scale_up = list(), list()
    for request in all_requests:
        if "action" not in request or not request["action"]:
            continue
        elif request["action"].endswith("Down"):
            scale_down.append(request)
        elif request["action"].endswith("Up"):
            scale_up.append(request)
    return scale_down, scale_up