Module src.Guardian.Guardian

Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.


from __future__ import print_function

from threading import Thread
import time
import traceback
import logging

from json_logic import jsonLogic
from termcolor import colored

from src.MyUtils.MyUtils import MyConfig, log_error, get_service, beat, log_info, log_warning, \
    get_structures, generate_event_name, generate_request_name, wait_operation_thread, structure_is_container, generate_structure_usage_metric, start_epoch, end_epoch
import src.StateDatabase.couchdb as couchdb
import src.StateDatabase.opentsdb as bdwatchdog

BDWATCHDOG_CONTAINER_METRICS = {"cpu": ['proc.cpu.user', 'proc.cpu.kernel'], "mem": ['proc.mem.resident', 'proc.mem.virtual']}
BDWATCHDOG_APPLICATION_METRICS = {"cpu": ['structure.cpu.usage'], "mem": ['structure.mem.usage'], "energy": ['structure.energy.usage']}

GUARDIAN_CONTAINER_METRICS = {
    'structure.cpu.usage': ['proc.cpu.user', 'proc.cpu.kernel'],
    'structure.mem.usage': ['proc.mem.resident']
}
GUARDIAN_APPLICATION_METRICS = {
    'structure.cpu.usage': ['structure.cpu.usage'],
    'structure.mem.usage': ['structure.mem.usage'],
    'structure.energy.usage': ['structure.energy.usage']
}
GUARDIAN_METRICS = {"container": GUARDIAN_CONTAINER_METRICS, "application": GUARDIAN_APPLICATION_METRICS}
BDWATCHDOG_METRICS = {"container": BDWATCHDOG_CONTAINER_METRICS, "application": BDWATCHDOG_APPLICATION_METRICS}

TAGS = {"container": "host", "application": "structure"}

translator_dict = {"cpu": "structure.cpu.usage", "mem": "structure.mem.usage", "energy": "structure.energy.usage"}

CONFIG_DEFAULT_VALUES = {"WINDOW_TIMELAPSE": 10, "WINDOW_DELAY": 10, "EVENT_TIMEOUT": 40, "DEBUG": True,
                         "STRUCTURE_GUARDED": "container", "GUARDABLE_RESOURCES": ["cpu"],
                         "CPU_SHARES_PER_WATT": 5, "ACTIVE": True}
SERVICE_NAME = "guardian"

NOT_AVAILABLE_STRING = "n/a"

NON_ADJUSTABLE_RESOURCES = ["energy"]


class Guardian:
    """
    Guardian class that implements the logic for this microservice. The Guardian takes care of matching the resource
    time series with a subset of rules to generate Events and then, matches the event against another subset of rules
    to generate scaling Requests.

    For more information you can visit: https://serverlesscontainers.readthedocs.io/en/latest/#architecture-and-microservices
    """

    def __init__(self):
        self.opentsdb_handler = bdwatchdog.OpenTSDBServer()
        self.couchdb_handler = couchdb.CouchDBServer()
        self.NO_METRIC_DATA_DEFAULT_VALUE = self.opentsdb_handler.NO_METRIC_DATA_DEFAULT_VALUE

    @staticmethod
    def check_unset_values(value, label, resource):
        """Check if a value has the N/A value and, if that is the case, raise an informative exception

        Args:
            value (integer): The value to be inspected
            label (string): Resource label name (e.g., upper limit), used for the exception string creation
            resource (string): Resource name (e.g., cpu), used for the exception string creation

        Returns:
            None

        Raises:
            Exception if value is N/A
        """
        if value == NOT_AVAILABLE_STRING:
            raise ValueError(
                "value for '{0}' in resource '{1}' is not set or is not available.".format(label, resource))

    @staticmethod
    def check_invalid_values(value1, label1, value2, label2, resource="n/a"):
        """ Check that two values have properly values set with the policy value1 < value2, otherwise raise ValueError

        Args:
            value1 (integer): First value
            label1 (string): First resource label name (e.g., upper limit), used for the exception string creation
            value2 (integer): Second value
            label2 (string): Second resource label name (e.g., lower limit), used for the exception string creation
            resource (string): Resource name (e.g., cpu), used for the exception string creation

        Returns:
            None

        Raises:
            ValueError if value1 > value2
            RuntimeError if value1 > value2 and value1 is current and value2 is max, that is the current is higher than the max
        """
        if value1 > value2 and label1 == "current" and label2 == "max":
            raise ValueError(
                "somehow this structure has a resource limit applied higher than maximum for {0}".format(resource))
        if value1 > value2:
            raise ValueError("in resources: {0} value for '{1}': {2} is greater than value for '{3}': {4}".format(
                resource, label1, str(value1), label2, str(value2)))

    @staticmethod
    def try_get_value(d, key):
        """Get the value stored in the dictionary or return a N/A string value if:

        * it is not in it
        * it is not an valid integer

        Args:
            d (dict): A dictionary storing values
            key (string): A string key

        Returns:
            (integer/string) int-mapped value stored in dict
        """
        try:
            return int(d[key])
        except (KeyError, ValueError):
            return NOT_AVAILABLE_STRING

    @staticmethod
    def sort_events(structure_events, event_timeout):
        """Sorts the events according to a simple policy regarding the _[now - timeout <----> now]_ time window (TW):

        * The event is **inside** the TW -> valid event
        * The event is **outside** the TW -> invalid event

        The 'now' time reference is taken inside this function

        Args:
            structure_events (list): A list of the events triggered in the past for a specific structure
            event_timeout (integer): A timeout in seconds

        Returns:
            (tuple[list,list]) A tuple of lists of events, first the valid and then the invalid.

        """
        valid, invalid = list(), list()
        for event in structure_events:
            if event["timestamp"] < time.time() - event_timeout:
                invalid.append(event)
            else:
                valid.append(event)
        return valid, invalid

    @staticmethod
    def reduce_structure_events(structure_events):
        """Reduces a list of events that have been generated for a single Structure into one single event. Considering
        that each event is a dictionary with an integer value for either a 'down' or 'up' event, all of the dictionaries
        can be reduced to one that can have two values for either 'up' and 'down' events, considering that the Structure
        resource may have a high hysteresis.

        Args:
            structure_events (list): A list of events for a single Structure

        Returns:
            (dict) A dictionary with the added up events in a signle dictionary

        """
        events_reduced = {"action": {}}
        for event in structure_events:
            resource = event["resource"]
            if resource not in events_reduced["action"]:
                events_reduced["action"][resource] = {"events": {"scale": {"down": 0, "up": 0}}}
            for key in event["action"]["events"]["scale"].keys():
                value = event["action"]["events"]["scale"][key]
                events_reduced["action"][resource]["events"]["scale"][key] += value
        return events_reduced["action"]

    def get_resource_summary(self, resource_label, resources_dict, limits_dict, usages_dict):
        """Produces a string to summarize the current state of a resource with all of its information and
        the following format: _[max, current, upper limit, usage, lower limit, min]_

        Args:
            resource_label (string): The resource name label, used to create the string and to access the dictionaries
            resources_dict (dict): a dictionary with the metrics (e.g., max, min) of the resources
            limits_dict (dict): a dictionary with the limits (e.g., lower, upper) of the resources
            usages_dict (dict): a dictionary with the usages of the resources

        Returns:
            (string) A summary string that contains all of the appropriate values for all of the resources

        """
        metrics = resources_dict[resource_label]
        limits = limits_dict[resource_label]
        color_map = {"max": "red", "current": "magenta", "upper": "yellow", "usage": "cyan", "lower": "green", "min": "blue"}

        if not usages_dict or usages_dict[translator_dict[resource_label]] == self.NO_METRIC_DATA_DEFAULT_VALUE:
            usage_value_string = NOT_AVAILABLE_STRING
        else:
            usage_value_string = str("%.2f" % usages_dict[translator_dict[resource_label]])

        strings = list()
        for key, values in [("max", metrics), ("current", metrics), ("upper", limits), ("lower", limits), ("min", metrics)]:
            strings.append(colored(str(self.try_get_value(values, key)), color_map[key]))
        strings.insert(3, colored(usage_value_string, color_map["usage"]))  # Manually add the usage metric

        return ",".join(strings)

    @staticmethod
    def adjust_amount(amount, structure_resources, structure_limits):
        """Pre-check and, if needed, adjust the scaled amount with the policy:

        * If lower limit < min value -> Amount to reduce too large, adjust it so that the lower limit is set to the minimum
        * If new applied value > max value -> Amount to increase too large, adjust it so that the current value is set to the maximum

        Args:
            amount (integer): A number representing the amount to reduce or increase from the current value
            structure_resources (dict): Dictionary with the structure resource control values (min,current,max)
            structure_limits (dict): Dictionary with the structure resource limit values (lower,upper)

        Returns:
            (integer) The amount adjusted (trimmed) in case it would exceed any limit
        """
        expected_value = structure_resources["current"] + amount
        lower_limit = structure_limits["lower"] + amount
        max_limit = structure_resources["max"]
        min_limit = structure_resources["min"]

        if lower_limit < min_limit:
            amount += min_limit - lower_limit
        elif expected_value > max_limit:
            amount -= expected_value - max_limit

        return amount

    @staticmethod
    def get_amount_from_fit_reduction(current_resource_limit, boundary, current_resource_usage):
        """Get an amount that will be reduced from the current resource limit using a policy of *fit to the usage*.
        With this policy it is aimed at setting a new current value that gets close to the usage but leaving a boundary
        to avoid causing a severe bottleneck. More specifically, using the boundary configured this policy tries to
        find a scale down amount that makes the usage value stay between the _now new_ lower and upper limits.

        Args:
            current_resource_limit (integer): The current applied limit for this resource
            boundary (integer): The boundary used between limits
            current_resource_usage (integer): The usage value for this resource

        Returns:
            (int) The amount to be reduced using the fit to usage policy.

        """
        upper_to_lower_window = boundary
        current_to_upper_window = boundary

        # Set the limit so that the resource usage is placed in between the upper and lower limits
        # and keeping the boundary between the upper and the real resource limits
        desired_applied_resource_limit = \
            current_resource_usage + int(upper_to_lower_window / 2) + current_to_upper_window

        return -1 * (current_resource_limit - desired_applied_resource_limit)

    def get_amount_from_proportional_energy_rescaling(self, structure, resource):
        """Get an amount that will be reduced from the current resource limit using a policy of *proportional
        energy-based CPU scaling*.
        With this policy it is aimed at setting a new current CPU value that makes the energy consumed by a Structure
        get closer to a limit.

        *THIS FUNCTION IS USED WITH THE ENERGY CAPPING SCENARIO*, see: http://bdwatchdog.dec.udc.es/energy/index.html

        Args:
            structure (dict): The dictionary containing all of the structure resource information
            resource (string): The resource name, used for indexing puroposes

        Returns:
            (int) The amount to be reduced using the fit to usage policy.

        """
        max_resource_limit = structure["resources"][resource]["max"]
        current_resource_limit = structure["resources"][resource]["usage"]
        difference = max_resource_limit - current_resource_limit
        energy_amplification = difference * self.cpu_shares_per_watt  # How many cpu shares to rescale per watt
        return int(energy_amplification)

    def get_container_energy_str(self, resources_dict):
        """Get a summary string but for the energy resource, which has a different behavior from others such as CPU or
        Memory.

        *THIS FUNCTION IS USED WITH THE ENERGY CAPPING SCENARIO*, see: http://bdwatchdog.dec.udc.es/energy/index.html

        Args:
            resources_dict (dict): A dictionary with all the resources' information, including energy

        Returns:
            (string) A string that summarizes the state of the energy resource

        """
        energy_dict = resources_dict["energy"]
        string = list()
        for field in ["max", "usage", "min"]:
            string.append(str(self.try_get_value(energy_dict, field)))
        return ",".join(string)

    def adjust_container_state(self, resources, limits, resources_to_adjust):
        for resource in resources_to_adjust:
            if "boundary" not in limits[resource]:
                raise RuntimeError("Missing boundary value for resource {0}".format(resource))
            if "current" not in resources[resource]:
                raise RuntimeError("Missing current value for resource {0}".format(resource))

            n_loop, errors = 0, True
            while errors:
                n_loop += 1
                try:
                    self.check_invalid_container_state(resources, limits, resource)
                    errors = False
                except ValueError:
                    # Correct the chain current > upper > lower, including boundary between current and upper
                    boundary = int(limits[resource]["boundary"])
                    limits[resource]["upper"] = int(resources[resource]["current"] - boundary)
                    limits[resource]["lower"] = int(limits[resource]["upper"] - boundary)
                except RuntimeError as e:
                    log_error(str(e), self.debug)
                    raise e
                if n_loop >= 10:
                    message = "Limits for {0} can't be adjusted, check the configuration (max:{1},current:{2}, boundary:{3}, min:{4})".format(
                        resource, resources[resource]["max"], int(resources[resource]["current"]), limits[resource]["boundary"], resources[resource]["min"])
                    raise RuntimeError(message)
                    # TODO This prevents from checking other resources
        return limits

    def check_invalid_container_state(self, resources, limits, resource):
        if resource not in resources:
            raise RuntimeError("resource values not available for resource {0}".format(resource))
        if resource not in limits:
            raise RuntimeError("limit values not available for resource {0}".format(resource))
        data = {"res": resources, "lim": limits}
        values_tuples = [("max", "res"), ("current", "res"), ("upper", "lim"), ("lower", "lim"), ("min", "res")]
        values = dict()
        for value, vtype in values_tuples:
            values[value] = self.try_get_value(data[vtype][resource], value)
        values["boundary"] = data["lim"][resource]["boundary"]

        # Check values are set and valid, except for current as it may have not been persisted yet
        for value in values:
            self.check_unset_values(values[value], value, resource)

        # Check if the first value is greater than the second
        # check the full chain max > upper > current > lower
        if values["current"] != NOT_AVAILABLE_STRING:
            self.check_invalid_values(values["current"], "current", values["max"], "max", resource=resource)
        self.check_invalid_values(values["upper"], "upper", values["current"], "current", resource=resource)
        self.check_invalid_values(values["lower"], "lower", values["upper"], "upper", resource=resource)

        # Check that there is a boundary between values, like the current and upper, so
        # that the limit can be surpassed
        if values["current"] != NOT_AVAILABLE_STRING:
            if values["current"] - values["boundary"] < values["upper"]:
                raise ValueError("value for 'current': {0} is too close (less than {1}) to value for 'upper': {2}".format(
                    str(values["current"]), str(values["boundary"]), str(values["upper"])))

            elif values["current"] - values["boundary"] > values["upper"]:
                raise ValueError("value for 'current': {0} is too far (more than {1}) from value for 'upper': {2}".format(
                    str(values["current"]), str(values["boundary"]), str(values["upper"])))

    @staticmethod
    def rule_triggers_event(rule, data, resources):
        if rule["resource"] not in resources:
            return False
        else:
            return rule["active"] and \
                   resources[rule["resource"]]["guard"] and \
                   rule["generates"] == "events" and \
                   jsonLogic(rule["rule"], data)

    def match_usages_and_limits(self, structure_name, rules, usages, limits, resources):

        resources_with_rules = list()
        for rule in rules:
            if rule["resource"] in resources_with_rules:
                pass
            else:
                resources_with_rules.append(rule["resource"])

        useful_resources = list()
        for resource in self.guardable_resources:
            if resource not in resources_with_rules:
                log_warning("Resource {0} has no rules applied to it".format(resource), self.debug)
            else:
                useful_resources.append(resource)

        data = dict()
        for resource in useful_resources:
            if resource in resources:
                data[resource] = {
                    "limits": {resource: limits[resource]},
                    "structure": {resource: resources[resource]}}

        for usage_metric in usages:
            keys = usage_metric.split(".")
            struct_type, usage_resource = keys[0], keys[1]
            # Split the key from the retrieved data, e.g., structure.mem.usages, where mem is the resource
            if usage_resource in useful_resources:
                data[usage_resource][struct_type][usage_resource][keys[2]] = usages[usage_metric]

        events = []
        for rule in rules:
            try:
                # Check that the rule is active, the resource to watch is guarded and that the rule is activated
                if self.rule_triggers_event(rule, data, resources):
                    event_name = generate_event_name(rule["action"]["events"], rule["resource"])
                    event = self.generate_event(event_name, structure_name, rule["resource"], rule["action"])
                    events.append(event)

            except KeyError as e:
                log_warning("rule: {0} is missing a parameter {1} {2}".format(
                    rule["name"], str(e), str(traceback.format_exc())), self.debug)

        return events

    @staticmethod
    def generate_event(event_name, structure_name, resource, action):
        event = dict(
            name=event_name,
            resource=resource,
            type="event",
            structure=structure_name,
            action=action,
            timestamp=int(time.time()))
        return event

    @staticmethod
    def generate_request(structure, amount, resource_label):
        action = generate_request_name(amount, resource_label)
        request = dict(
            type="request",
            resource=resource_label,
            amount=int(amount),
            structure=structure["name"],
            action=action,
            timestamp=int(time.time()),
            structure_type=structure["subtype"]
        )
        # For the moment, energy rescaling is uniquely mapped to cpu rescaling
        if resource_label == "energy":
            request["resource"] = "cpu"
            request["for_energy"] = True

        # If scaling a container, add its host information as it will be needed
        if structure_is_container(structure):
            request["host"] = structure["host"]
            request["host_rescaler_ip"] = structure["host_rescaler_ip"]
            request["host_rescaler_port"] = structure["host_rescaler_port"]

        return request

    def match_rules_and_events(self, structure, rules, events, limits, usages):
        generated_requests = list()
        events_to_remove = dict()

        for rule in rules:
            # Check that the rule has the required parameters
            rule_invalid = False
            for key in ["active", "resource", "generates", "name", ]:
                if key not in rule:
                    log_warning("Rule: {0} is missing a key parameter '{1}', skipping it".format(rule["name"], key), self.debug)
                    rule_invalid = True
            if rule_invalid:
                continue

            if rule["generates"] == "requests":
                if "rescale_policy" not in rule or "rescale_type" not in rule:
                    log_warning("Rule: {0} is missing the 'rescale_type' or the 'rescale_policy' parameter, skipping it".format(rule["name"]), self.debug)
                    continue

                if rule["rescale_type"] == "up" and "amount" not in rule:
                    log_warning("Rule: {0} is missing a the amount parameter, skipping it".format(rule["name"]), self.debug)
                    continue

            resource_label = rule["resource"]

            rule_activated = rule["active"] and \
                             rule["generates"] == "requests" and \
                             resource_label in events and \
                             jsonLogic(rule["rule"], events[resource_label])

            if not rule_activated:
                continue

            # RULE HAS BEEN ACTIVATED

            # If rescaling a container, check that the current resource value exists, otherwise there is nothing to rescale
            if structure_is_container(structure) and "current" not in structure["resources"][resource_label]:
                log_warning("No current value for container' {0}' and "
                            "resource '{1}', can't rescale".format(structure["name"], resource_label), self.debug)
                continue

            # Get the amount to be applied from the policy set
            if rule["rescale_type"] == "up":
                if rule["rescale_policy"] == "amount":
                    amount = rule["amount"]
                elif rule["rescale_policy"] == "proportional":
                    amount = rule["amount"]
                    current_resource_limit = structure["resources"][resource_label]["current"]
                    upper_limit = limits[resource_label]["upper"]
                    usage = usages[translator_dict[resource_label]]
                    ratio = min((usage - upper_limit) / (current_resource_limit - upper_limit), 1)
                    amount = int(ratio * amount)
                    log_warning("PROP -> cur : {0} | upp : {1} | usa: {2} | ratio {3} | amount {4}".format(
                        current_resource_limit, upper_limit, usage, ratio, amount), self.debug)
                else:
                    log_warning("Invalid rescale policy '{0} for Rule {1}, skipping it".format(rule["rescale_policy"], rule["name"]), self.debug)
                    continue
            elif rule["rescale_type"] == "down":
                if rule["rescale_policy"] == "amount":
                    amount = rule["amount"]
                elif rule["rescale_policy"] == "fit_to_usage":
                    current_resource_limit = structure["resources"][resource_label]["current"]
                    boundary = limits[resource_label]["boundary"]
                    usage = usages[translator_dict[resource_label]]
                    amount = self.get_amount_from_fit_reduction(current_resource_limit, boundary, usage)
                elif rule["rescale_policy"] == "proportional" and rule["resource"] == "energy":
                    amount = self.get_amount_from_proportional_energy_rescaling(structure, resource_label)
                else:
                    log_warning("Invalid rescale policy '{0} for Rule {1}, skipping it".format(rule["rescale_policy"], rule["name"]), self.debug)
                    continue
            else:
                log_warning("Invalid rescale type '{0} for Rule {1}, skipping it".format(rule["rescale_type"], rule["name"]), self.debug)
                continue

            # Ensure that amount is an integer, either by converting float -> int, or string -> int
            amount = int(amount)

            # If it is 0, because there was a previous floating value between -1 and 1, set it to 0 so that it does not generate any Request
            if amount == 0:
                log_warning("Amount generated for structure {0} with rule {1} is 0".format(structure["name"], rule["name"]), self.debug)

            # If the resource is susceptible to check, ensure that it does not surpass any limit
            new_amount = amount
            if resource_label not in NON_ADJUSTABLE_RESOURCES:
                structure_resources = structure["resources"][resource_label]
                structure_limits = limits[resource_label]
                new_amount = self.adjust_amount(amount, structure_resources, structure_limits)
                if new_amount != amount:
                    log_warning("Amount generated for structure {0} with rule {1} has been trimmed from {2} to {3}".format(
                        structure["name"], rule["name"], amount, new_amount), self.debug)

            # Generate the request and append it
            request = self.generate_request(structure, new_amount, resource_label)
            generated_requests.append(request)

            # Remove the events that triggered the request
            event_name = generate_event_name(events[resource_label]["events"], resource_label)
            if event_name not in events_to_remove:
                events_to_remove[event_name] = 0
            events_to_remove[event_name] += rule["events_to_remove"]

        return generated_requests, events_to_remove

    def print_structure_info(self, container, usages, limits, triggered_events, triggered_requests):
        resources = container["resources"]

        container_name_str = "@" + container["name"]
        resources_str = "| "
        for resource in self.guardable_resources:
            if container["resources"][resource]["guard"]:
                resources_str += resource + "({0})".format(self.get_resource_summary(resource, resources, limits, usages)) + " | "

        ev, req = list(), list()
        for event in triggered_events:
            ev.append(event["name"])
        for request in triggered_requests:
            req.append(request["action"])
        triggered_requests_and_events = "#TRIGGERED EVENTS {0} AND TRIGGERED REQUESTS {1}".format(str(ev), str(req))
        log_info(
            " ".join([container_name_str, resources_str, triggered_requests_and_events]),
            self.debug)

    def process_serverless_structure(self, structure, usages, limits, rules):

        # Match usages and rules to generate events
        triggered_events = self.match_usages_and_limits(structure["name"], rules, usages, limits, structure["resources"])

        # Remote database operation
        if triggered_events:
            self.couchdb_handler.add_events(triggered_events)

        # Remote database operation
        all_events = self.couchdb_handler.get_events(structure)

        # Filter the events according to timestamp
        filtered_events, old_events = self.sort_events(all_events, self.event_timeout)

        if old_events:
            # Remote database operation
            self.couchdb_handler.delete_events(old_events)

        # If there are no events, nothing else to do as no requests will be generated
        if filtered_events:
            # Merge all the event counts
            reduced_events = self.reduce_structure_events(filtered_events)

            # Match events and rules to generate requests
            triggered_requests, events_to_remove = self.match_rules_and_events(structure, rules, reduced_events, limits, usages)

            # Remove events that generated the request
            # Remote database operation
            for event in events_to_remove:
                self.couchdb_handler.delete_num_events_by_structure(structure, event, events_to_remove[event])

            if triggered_requests:
                # Remote database operation
                self.couchdb_handler.add_requests(triggered_requests)

        else:
            triggered_requests = list()

        # DEBUG AND INFO OUTPUT
        if self.debug:
            self.print_structure_info(structure, usages, limits, triggered_events, triggered_requests)

    def serverless(self, structure, rules):
        structure_subtype = structure["subtype"]

        # Check if structure is guarded
        if "guard" not in structure or not structure["guard"]:
            log_warning("structure: {0} is set to leave alone, skipping".format(structure["name"]), self.debug)
            return

        # Check if the structure has any resource set to guarded
        struct_guarded_resources = list()
        for res in self.guardable_resources:
            if res in structure["resources"] and "guard" in structure["resources"][res] and structure["resources"][res]["guard"]:
                struct_guarded_resources.append(res)
        if not struct_guarded_resources:
            log_warning("Structure {0} is set to guarded but has no resource marked to guard".format(structure["name"]), self.debug)
            return

        # Check if structure is being monitored, otherwise, ignore
        if structure_subtype not in BDWATCHDOG_METRICS or structure_subtype not in GUARDIAN_METRICS or structure_subtype not in TAGS:
            log_error("Unknown structure subtype '{0}'".format(structure_subtype), self.debug)
            return

        try:
            metrics_to_retrieve = list()
            metrics_to_generate = dict()
            for res in struct_guarded_resources:
                metrics_to_retrieve += BDWATCHDOG_METRICS[structure_subtype][res]
                metrics_to_generate[generate_structure_usage_metric(res)] = GUARDIAN_METRICS[structure_subtype][generate_structure_usage_metric(res)]
            tag = TAGS[structure_subtype]

            # Remote database operation
            usages = self.opentsdb_handler.get_structure_timeseries({tag: structure["name"]},
                                                                    self.window_difference, self.window_delay,
                                                                    metrics_to_retrieve, metrics_to_generate)

            for metric in usages:
                if usages[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE:
                    log_warning("structure: {0} has no usage data for {1}".format(structure["name"], metric), self.debug)

            # Skip this structure if all the usage metrics are unavailable
            if all([usages[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE for metric in usages]):
                log_warning("structure: {0} has no usage data for any metric, skipping".format(structure["name"]), self.debug)
                return

            resources = structure["resources"]

            # Remote database operation
            limits = self.couchdb_handler.get_limits(structure)
            limits_resources = limits["resources"]

            if not limits_resources:
                log_warning("structure: {0} has no limits".format(structure["name"]), self.debug)
                return

            # Adjust the structure limits according to the current value
            limits["resources"] = self.adjust_container_state(resources, limits_resources, self.guardable_resources)

            # Remote database operation
            self.couchdb_handler.update_limit(limits)

            self.process_serverless_structure(structure, usages, limits_resources, rules)

        except Exception as e:
            log_error("Error with structure {0}: {1}".format(structure["name"], str(e)), self.debug)

    def guard_structures(self, structures):
        # Remote database operation
        rules = self.couchdb_handler.get_rules()

        threads = []
        for structure in structures:
            thread = Thread(name="process_structure_{0}".format(structure["name"]), target=self.serverless,
                            args=(structure, rules,))
            thread.start()
            threads.append(thread)

        for process in threads:
            process.join()

    def invalid_conf(self, ):
        for res in self.guardable_resources:
            if res not in ["cpu", "mem", "disk", "net", "energy"]:
                return True, "Resource to be guarded '{0}' is invalid".format(res)

        if self.structure_guarded not in ["container", "application"]:
            return True, "Structure to be guarded '{0}' is invalid".format(self.structure_guarded)

        for key, num in [("WINDOW_TIMELAPSE", self.window_difference), ("WINDOW_DELAY", self.window_delay), ("EVENT_TIMEOUT", self.event_timeout)]:
            if num < 5:
                return True, "Configuration item '{0}' with a value of '{1}' is likely invalid".format(key, num)
        return False, ""


    def guard(self, ):
        myConfig = MyConfig(CONFIG_DEFAULT_VALUES)
        logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

        while True:
            # Get service info
            service = get_service(self.couchdb_handler, SERVICE_NAME)

            # Heartbeat
            beat(self.couchdb_handler, SERVICE_NAME)

            # CONFIG
            myConfig.set_config(service["config"])
            self.debug = myConfig.get_value("DEBUG")
            debug = self.debug
            self.guardable_resources = myConfig.get_value("GUARDABLE_RESOURCES")
            self.cpu_shares_per_watt = myConfig.get_value("CPU_SHARES_PER_WATT")
            self.window_difference = myConfig.get_value("WINDOW_TIMELAPSE")
            self.window_delay = myConfig.get_value("WINDOW_DELAY")
            self.structure_guarded = myConfig.get_value("STRUCTURE_GUARDED")
            self.event_timeout = myConfig.get_value("EVENT_TIMEOUT")
            SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

            t0 = start_epoch(self.debug)

            log_info("Config is as follows:", debug)
            log_info(".............................................", debug)
            log_info("Time window lapse -> {0}".format(self.window_difference), debug)
            log_info("Delay -> {0}".format(self.window_delay), debug)
            log_info("Event timeout -> {0}".format(self.event_timeout), debug)
            log_info("Resources guarded are -> {0}".format(self.guardable_resources), debug)
            log_info("Structure type guarded is -> {0}".format(self.structure_guarded), debug)
            log_info(".............................................", debug)

            ## CHECK INVALID CONFIG ##
            invalid, message = self.invalid_conf()
            if invalid:
                log_error(message, debug)
                if self.window_difference < 5:
                    log_error("Window difference is too short, replacing with DEFAULT value '{0}'".format(CONFIG_DEFAULT_VALUES["WINDOW_TIMELAPSE"]), self.debug)
                    self.window_difference = CONFIG_DEFAULT_VALUES["WINDOW_TIMELAPSE"]
                time.sleep(self.window_difference)
                end_epoch(self.debug, self.window_difference, t0)
                continue

            thread = None
            if SERVICE_IS_ACTIVATED:
                # Remote database operation
                structures = get_structures(self.couchdb_handler, debug, subtype=self.structure_guarded)
                if structures:
                    log_info("{0} Structures to process, launching threads".format(len(structures)), debug)
                    thread = Thread(name="guard_structures", target=self.guard_structures, args=(structures,))
                    thread.start()
                else:
                    log_info("No structures to process", debug)
            else:
                log_warning("Guardian is not activated", debug)

            time.sleep(self.window_difference)

            wait_operation_thread(thread, debug)

            end_epoch(t0, self.window_difference, t0)


def main():
    try:
        guardian = Guardian()
        guardian.guard()
    except Exception as e:
        log_error("{0} {1}".format(str(e), str(traceback.format_exc())), debug=True)


if __name__ == "__main__":
    main()

Functions

def main()
Expand source code
def main():
    try:
        guardian = Guardian()
        guardian.guard()
    except Exception as e:
        log_error("{0} {1}".format(str(e), str(traceback.format_exc())), debug=True)

Classes

class Guardian

Guardian class that implements the logic for this microservice. The Guardian takes care of matching the resource time series with a subset of rules to generate Events and then, matches the event against another subset of rules to generate scaling Requests.

For more information you can visit: https://serverlesscontainers.readthedocs.io/en/latest/#architecture-and-microservices

Expand source code
class Guardian:
    """
    Guardian class that implements the logic for this microservice. The Guardian takes care of matching the resource
    time series with a subset of rules to generate Events and then, matches the event against another subset of rules
    to generate scaling Requests.

    For more information you can visit: https://serverlesscontainers.readthedocs.io/en/latest/#architecture-and-microservices
    """

    def __init__(self):
        self.opentsdb_handler = bdwatchdog.OpenTSDBServer()
        self.couchdb_handler = couchdb.CouchDBServer()
        self.NO_METRIC_DATA_DEFAULT_VALUE = self.opentsdb_handler.NO_METRIC_DATA_DEFAULT_VALUE

    @staticmethod
    def check_unset_values(value, label, resource):
        """Check if a value has the N/A value and, if that is the case, raise an informative exception

        Args:
            value (integer): The value to be inspected
            label (string): Resource label name (e.g., upper limit), used for the exception string creation
            resource (string): Resource name (e.g., cpu), used for the exception string creation

        Returns:
            None

        Raises:
            Exception if value is N/A
        """
        if value == NOT_AVAILABLE_STRING:
            raise ValueError(
                "value for '{0}' in resource '{1}' is not set or is not available.".format(label, resource))

    @staticmethod
    def check_invalid_values(value1, label1, value2, label2, resource="n/a"):
        """ Check that two values have properly values set with the policy value1 < value2, otherwise raise ValueError

        Args:
            value1 (integer): First value
            label1 (string): First resource label name (e.g., upper limit), used for the exception string creation
            value2 (integer): Second value
            label2 (string): Second resource label name (e.g., lower limit), used for the exception string creation
            resource (string): Resource name (e.g., cpu), used for the exception string creation

        Returns:
            None

        Raises:
            ValueError if value1 > value2
            RuntimeError if value1 > value2 and value1 is current and value2 is max, that is the current is higher than the max
        """
        if value1 > value2 and label1 == "current" and label2 == "max":
            raise ValueError(
                "somehow this structure has a resource limit applied higher than maximum for {0}".format(resource))
        if value1 > value2:
            raise ValueError("in resources: {0} value for '{1}': {2} is greater than value for '{3}': {4}".format(
                resource, label1, str(value1), label2, str(value2)))

    @staticmethod
    def try_get_value(d, key):
        """Get the value stored in the dictionary or return a N/A string value if:

        * it is not in it
        * it is not an valid integer

        Args:
            d (dict): A dictionary storing values
            key (string): A string key

        Returns:
            (integer/string) int-mapped value stored in dict
        """
        try:
            return int(d[key])
        except (KeyError, ValueError):
            return NOT_AVAILABLE_STRING

    @staticmethod
    def sort_events(structure_events, event_timeout):
        """Sorts the events according to a simple policy regarding the _[now - timeout <----> now]_ time window (TW):

        * The event is **inside** the TW -> valid event
        * The event is **outside** the TW -> invalid event

        The 'now' time reference is taken inside this function

        Args:
            structure_events (list): A list of the events triggered in the past for a specific structure
            event_timeout (integer): A timeout in seconds

        Returns:
            (tuple[list,list]) A tuple of lists of events, first the valid and then the invalid.

        """
        valid, invalid = list(), list()
        for event in structure_events:
            if event["timestamp"] < time.time() - event_timeout:
                invalid.append(event)
            else:
                valid.append(event)
        return valid, invalid

    @staticmethod
    def reduce_structure_events(structure_events):
        """Reduces a list of events that have been generated for a single Structure into one single event. Considering
        that each event is a dictionary with an integer value for either a 'down' or 'up' event, all of the dictionaries
        can be reduced to one that can have two values for either 'up' and 'down' events, considering that the Structure
        resource may have a high hysteresis.

        Args:
            structure_events (list): A list of events for a single Structure

        Returns:
            (dict) A dictionary with the added up events in a signle dictionary

        """
        events_reduced = {"action": {}}
        for event in structure_events:
            resource = event["resource"]
            if resource not in events_reduced["action"]:
                events_reduced["action"][resource] = {"events": {"scale": {"down": 0, "up": 0}}}
            for key in event["action"]["events"]["scale"].keys():
                value = event["action"]["events"]["scale"][key]
                events_reduced["action"][resource]["events"]["scale"][key] += value
        return events_reduced["action"]

    def get_resource_summary(self, resource_label, resources_dict, limits_dict, usages_dict):
        """Produces a string to summarize the current state of a resource with all of its information and
        the following format: _[max, current, upper limit, usage, lower limit, min]_

        Args:
            resource_label (string): The resource name label, used to create the string and to access the dictionaries
            resources_dict (dict): a dictionary with the metrics (e.g., max, min) of the resources
            limits_dict (dict): a dictionary with the limits (e.g., lower, upper) of the resources
            usages_dict (dict): a dictionary with the usages of the resources

        Returns:
            (string) A summary string that contains all of the appropriate values for all of the resources

        """
        metrics = resources_dict[resource_label]
        limits = limits_dict[resource_label]
        color_map = {"max": "red", "current": "magenta", "upper": "yellow", "usage": "cyan", "lower": "green", "min": "blue"}

        if not usages_dict or usages_dict[translator_dict[resource_label]] == self.NO_METRIC_DATA_DEFAULT_VALUE:
            usage_value_string = NOT_AVAILABLE_STRING
        else:
            usage_value_string = str("%.2f" % usages_dict[translator_dict[resource_label]])

        strings = list()
        for key, values in [("max", metrics), ("current", metrics), ("upper", limits), ("lower", limits), ("min", metrics)]:
            strings.append(colored(str(self.try_get_value(values, key)), color_map[key]))
        strings.insert(3, colored(usage_value_string, color_map["usage"]))  # Manually add the usage metric

        return ",".join(strings)

    @staticmethod
    def adjust_amount(amount, structure_resources, structure_limits):
        """Pre-check and, if needed, adjust the scaled amount with the policy:

        * If lower limit < min value -> Amount to reduce too large, adjust it so that the lower limit is set to the minimum
        * If new applied value > max value -> Amount to increase too large, adjust it so that the current value is set to the maximum

        Args:
            amount (integer): A number representing the amount to reduce or increase from the current value
            structure_resources (dict): Dictionary with the structure resource control values (min,current,max)
            structure_limits (dict): Dictionary with the structure resource limit values (lower,upper)

        Returns:
            (integer) The amount adjusted (trimmed) in case it would exceed any limit
        """
        expected_value = structure_resources["current"] + amount
        lower_limit = structure_limits["lower"] + amount
        max_limit = structure_resources["max"]
        min_limit = structure_resources["min"]

        if lower_limit < min_limit:
            amount += min_limit - lower_limit
        elif expected_value > max_limit:
            amount -= expected_value - max_limit

        return amount

    @staticmethod
    def get_amount_from_fit_reduction(current_resource_limit, boundary, current_resource_usage):
        """Get an amount that will be reduced from the current resource limit using a policy of *fit to the usage*.
        With this policy it is aimed at setting a new current value that gets close to the usage but leaving a boundary
        to avoid causing a severe bottleneck. More specifically, using the boundary configured this policy tries to
        find a scale down amount that makes the usage value stay between the _now new_ lower and upper limits.

        Args:
            current_resource_limit (integer): The current applied limit for this resource
            boundary (integer): The boundary used between limits
            current_resource_usage (integer): The usage value for this resource

        Returns:
            (int) The amount to be reduced using the fit to usage policy.

        """
        upper_to_lower_window = boundary
        current_to_upper_window = boundary

        # Set the limit so that the resource usage is placed in between the upper and lower limits
        # and keeping the boundary between the upper and the real resource limits
        desired_applied_resource_limit = \
            current_resource_usage + int(upper_to_lower_window / 2) + current_to_upper_window

        return -1 * (current_resource_limit - desired_applied_resource_limit)

    def get_amount_from_proportional_energy_rescaling(self, structure, resource):
        """Get an amount that will be reduced from the current resource limit using a policy of *proportional
        energy-based CPU scaling*.
        With this policy it is aimed at setting a new current CPU value that makes the energy consumed by a Structure
        get closer to a limit.

        *THIS FUNCTION IS USED WITH THE ENERGY CAPPING SCENARIO*, see: http://bdwatchdog.dec.udc.es/energy/index.html

        Args:
            structure (dict): The dictionary containing all of the structure resource information
            resource (string): The resource name, used for indexing puroposes

        Returns:
            (int) The amount to be reduced using the fit to usage policy.

        """
        max_resource_limit = structure["resources"][resource]["max"]
        current_resource_limit = structure["resources"][resource]["usage"]
        difference = max_resource_limit - current_resource_limit
        energy_amplification = difference * self.cpu_shares_per_watt  # How many cpu shares to rescale per watt
        return int(energy_amplification)

    def get_container_energy_str(self, resources_dict):
        """Get a summary string but for the energy resource, which has a different behavior from others such as CPU or
        Memory.

        *THIS FUNCTION IS USED WITH THE ENERGY CAPPING SCENARIO*, see: http://bdwatchdog.dec.udc.es/energy/index.html

        Args:
            resources_dict (dict): A dictionary with all the resources' information, including energy

        Returns:
            (string) A string that summarizes the state of the energy resource

        """
        energy_dict = resources_dict["energy"]
        string = list()
        for field in ["max", "usage", "min"]:
            string.append(str(self.try_get_value(energy_dict, field)))
        return ",".join(string)

    def adjust_container_state(self, resources, limits, resources_to_adjust):
        for resource in resources_to_adjust:
            if "boundary" not in limits[resource]:
                raise RuntimeError("Missing boundary value for resource {0}".format(resource))
            if "current" not in resources[resource]:
                raise RuntimeError("Missing current value for resource {0}".format(resource))

            n_loop, errors = 0, True
            while errors:
                n_loop += 1
                try:
                    self.check_invalid_container_state(resources, limits, resource)
                    errors = False
                except ValueError:
                    # Correct the chain current > upper > lower, including boundary between current and upper
                    boundary = int(limits[resource]["boundary"])
                    limits[resource]["upper"] = int(resources[resource]["current"] - boundary)
                    limits[resource]["lower"] = int(limits[resource]["upper"] - boundary)
                except RuntimeError as e:
                    log_error(str(e), self.debug)
                    raise e
                if n_loop >= 10:
                    message = "Limits for {0} can't be adjusted, check the configuration (max:{1},current:{2}, boundary:{3}, min:{4})".format(
                        resource, resources[resource]["max"], int(resources[resource]["current"]), limits[resource]["boundary"], resources[resource]["min"])
                    raise RuntimeError(message)
                    # TODO This prevents from checking other resources
        return limits

    def check_invalid_container_state(self, resources, limits, resource):
        if resource not in resources:
            raise RuntimeError("resource values not available for resource {0}".format(resource))
        if resource not in limits:
            raise RuntimeError("limit values not available for resource {0}".format(resource))
        data = {"res": resources, "lim": limits}
        values_tuples = [("max", "res"), ("current", "res"), ("upper", "lim"), ("lower", "lim"), ("min", "res")]
        values = dict()
        for value, vtype in values_tuples:
            values[value] = self.try_get_value(data[vtype][resource], value)
        values["boundary"] = data["lim"][resource]["boundary"]

        # Check values are set and valid, except for current as it may have not been persisted yet
        for value in values:
            self.check_unset_values(values[value], value, resource)

        # Check if the first value is greater than the second
        # check the full chain max > upper > current > lower
        if values["current"] != NOT_AVAILABLE_STRING:
            self.check_invalid_values(values["current"], "current", values["max"], "max", resource=resource)
        self.check_invalid_values(values["upper"], "upper", values["current"], "current", resource=resource)
        self.check_invalid_values(values["lower"], "lower", values["upper"], "upper", resource=resource)

        # Check that there is a boundary between values, like the current and upper, so
        # that the limit can be surpassed
        if values["current"] != NOT_AVAILABLE_STRING:
            if values["current"] - values["boundary"] < values["upper"]:
                raise ValueError("value for 'current': {0} is too close (less than {1}) to value for 'upper': {2}".format(
                    str(values["current"]), str(values["boundary"]), str(values["upper"])))

            elif values["current"] - values["boundary"] > values["upper"]:
                raise ValueError("value for 'current': {0} is too far (more than {1}) from value for 'upper': {2}".format(
                    str(values["current"]), str(values["boundary"]), str(values["upper"])))

    @staticmethod
    def rule_triggers_event(rule, data, resources):
        if rule["resource"] not in resources:
            return False
        else:
            return rule["active"] and \
                   resources[rule["resource"]]["guard"] and \
                   rule["generates"] == "events" and \
                   jsonLogic(rule["rule"], data)

    def match_usages_and_limits(self, structure_name, rules, usages, limits, resources):

        resources_with_rules = list()
        for rule in rules:
            if rule["resource"] in resources_with_rules:
                pass
            else:
                resources_with_rules.append(rule["resource"])

        useful_resources = list()
        for resource in self.guardable_resources:
            if resource not in resources_with_rules:
                log_warning("Resource {0} has no rules applied to it".format(resource), self.debug)
            else:
                useful_resources.append(resource)

        data = dict()
        for resource in useful_resources:
            if resource in resources:
                data[resource] = {
                    "limits": {resource: limits[resource]},
                    "structure": {resource: resources[resource]}}

        for usage_metric in usages:
            keys = usage_metric.split(".")
            struct_type, usage_resource = keys[0], keys[1]
            # Split the key from the retrieved data, e.g., structure.mem.usages, where mem is the resource
            if usage_resource in useful_resources:
                data[usage_resource][struct_type][usage_resource][keys[2]] = usages[usage_metric]

        events = []
        for rule in rules:
            try:
                # Check that the rule is active, the resource to watch is guarded and that the rule is activated
                if self.rule_triggers_event(rule, data, resources):
                    event_name = generate_event_name(rule["action"]["events"], rule["resource"])
                    event = self.generate_event(event_name, structure_name, rule["resource"], rule["action"])
                    events.append(event)

            except KeyError as e:
                log_warning("rule: {0} is missing a parameter {1} {2}".format(
                    rule["name"], str(e), str(traceback.format_exc())), self.debug)

        return events

    @staticmethod
    def generate_event(event_name, structure_name, resource, action):
        event = dict(
            name=event_name,
            resource=resource,
            type="event",
            structure=structure_name,
            action=action,
            timestamp=int(time.time()))
        return event

    @staticmethod
    def generate_request(structure, amount, resource_label):
        action = generate_request_name(amount, resource_label)
        request = dict(
            type="request",
            resource=resource_label,
            amount=int(amount),
            structure=structure["name"],
            action=action,
            timestamp=int(time.time()),
            structure_type=structure["subtype"]
        )
        # For the moment, energy rescaling is uniquely mapped to cpu rescaling
        if resource_label == "energy":
            request["resource"] = "cpu"
            request["for_energy"] = True

        # If scaling a container, add its host information as it will be needed
        if structure_is_container(structure):
            request["host"] = structure["host"]
            request["host_rescaler_ip"] = structure["host_rescaler_ip"]
            request["host_rescaler_port"] = structure["host_rescaler_port"]

        return request

    def match_rules_and_events(self, structure, rules, events, limits, usages):
        generated_requests = list()
        events_to_remove = dict()

        for rule in rules:
            # Check that the rule has the required parameters
            rule_invalid = False
            for key in ["active", "resource", "generates", "name", ]:
                if key not in rule:
                    log_warning("Rule: {0} is missing a key parameter '{1}', skipping it".format(rule["name"], key), self.debug)
                    rule_invalid = True
            if rule_invalid:
                continue

            if rule["generates"] == "requests":
                if "rescale_policy" not in rule or "rescale_type" not in rule:
                    log_warning("Rule: {0} is missing the 'rescale_type' or the 'rescale_policy' parameter, skipping it".format(rule["name"]), self.debug)
                    continue

                if rule["rescale_type"] == "up" and "amount" not in rule:
                    log_warning("Rule: {0} is missing a the amount parameter, skipping it".format(rule["name"]), self.debug)
                    continue

            resource_label = rule["resource"]

            rule_activated = rule["active"] and \
                             rule["generates"] == "requests" and \
                             resource_label in events and \
                             jsonLogic(rule["rule"], events[resource_label])

            if not rule_activated:
                continue

            # RULE HAS BEEN ACTIVATED

            # If rescaling a container, check that the current resource value exists, otherwise there is nothing to rescale
            if structure_is_container(structure) and "current" not in structure["resources"][resource_label]:
                log_warning("No current value for container' {0}' and "
                            "resource '{1}', can't rescale".format(structure["name"], resource_label), self.debug)
                continue

            # Get the amount to be applied from the policy set
            if rule["rescale_type"] == "up":
                if rule["rescale_policy"] == "amount":
                    amount = rule["amount"]
                elif rule["rescale_policy"] == "proportional":
                    amount = rule["amount"]
                    current_resource_limit = structure["resources"][resource_label]["current"]
                    upper_limit = limits[resource_label]["upper"]
                    usage = usages[translator_dict[resource_label]]
                    ratio = min((usage - upper_limit) / (current_resource_limit - upper_limit), 1)
                    amount = int(ratio * amount)
                    log_warning("PROP -> cur : {0} | upp : {1} | usa: {2} | ratio {3} | amount {4}".format(
                        current_resource_limit, upper_limit, usage, ratio, amount), self.debug)
                else:
                    log_warning("Invalid rescale policy '{0} for Rule {1}, skipping it".format(rule["rescale_policy"], rule["name"]), self.debug)
                    continue
            elif rule["rescale_type"] == "down":
                if rule["rescale_policy"] == "amount":
                    amount = rule["amount"]
                elif rule["rescale_policy"] == "fit_to_usage":
                    current_resource_limit = structure["resources"][resource_label]["current"]
                    boundary = limits[resource_label]["boundary"]
                    usage = usages[translator_dict[resource_label]]
                    amount = self.get_amount_from_fit_reduction(current_resource_limit, boundary, usage)
                elif rule["rescale_policy"] == "proportional" and rule["resource"] == "energy":
                    amount = self.get_amount_from_proportional_energy_rescaling(structure, resource_label)
                else:
                    log_warning("Invalid rescale policy '{0} for Rule {1}, skipping it".format(rule["rescale_policy"], rule["name"]), self.debug)
                    continue
            else:
                log_warning("Invalid rescale type '{0} for Rule {1}, skipping it".format(rule["rescale_type"], rule["name"]), self.debug)
                continue

            # Ensure that amount is an integer, either by converting float -> int, or string -> int
            amount = int(amount)

            # If it is 0, because there was a previous floating value between -1 and 1, set it to 0 so that it does not generate any Request
            if amount == 0:
                log_warning("Amount generated for structure {0} with rule {1} is 0".format(structure["name"], rule["name"]), self.debug)

            # If the resource is susceptible to check, ensure that it does not surpass any limit
            new_amount = amount
            if resource_label not in NON_ADJUSTABLE_RESOURCES:
                structure_resources = structure["resources"][resource_label]
                structure_limits = limits[resource_label]
                new_amount = self.adjust_amount(amount, structure_resources, structure_limits)
                if new_amount != amount:
                    log_warning("Amount generated for structure {0} with rule {1} has been trimmed from {2} to {3}".format(
                        structure["name"], rule["name"], amount, new_amount), self.debug)

            # Generate the request and append it
            request = self.generate_request(structure, new_amount, resource_label)
            generated_requests.append(request)

            # Remove the events that triggered the request
            event_name = generate_event_name(events[resource_label]["events"], resource_label)
            if event_name not in events_to_remove:
                events_to_remove[event_name] = 0
            events_to_remove[event_name] += rule["events_to_remove"]

        return generated_requests, events_to_remove

    def print_structure_info(self, container, usages, limits, triggered_events, triggered_requests):
        resources = container["resources"]

        container_name_str = "@" + container["name"]
        resources_str = "| "
        for resource in self.guardable_resources:
            if container["resources"][resource]["guard"]:
                resources_str += resource + "({0})".format(self.get_resource_summary(resource, resources, limits, usages)) + " | "

        ev, req = list(), list()
        for event in triggered_events:
            ev.append(event["name"])
        for request in triggered_requests:
            req.append(request["action"])
        triggered_requests_and_events = "#TRIGGERED EVENTS {0} AND TRIGGERED REQUESTS {1}".format(str(ev), str(req))
        log_info(
            " ".join([container_name_str, resources_str, triggered_requests_and_events]),
            self.debug)

    def process_serverless_structure(self, structure, usages, limits, rules):

        # Match usages and rules to generate events
        triggered_events = self.match_usages_and_limits(structure["name"], rules, usages, limits, structure["resources"])

        # Remote database operation
        if triggered_events:
            self.couchdb_handler.add_events(triggered_events)

        # Remote database operation
        all_events = self.couchdb_handler.get_events(structure)

        # Filter the events according to timestamp
        filtered_events, old_events = self.sort_events(all_events, self.event_timeout)

        if old_events:
            # Remote database operation
            self.couchdb_handler.delete_events(old_events)

        # If there are no events, nothing else to do as no requests will be generated
        if filtered_events:
            # Merge all the event counts
            reduced_events = self.reduce_structure_events(filtered_events)

            # Match events and rules to generate requests
            triggered_requests, events_to_remove = self.match_rules_and_events(structure, rules, reduced_events, limits, usages)

            # Remove events that generated the request
            # Remote database operation
            for event in events_to_remove:
                self.couchdb_handler.delete_num_events_by_structure(structure, event, events_to_remove[event])

            if triggered_requests:
                # Remote database operation
                self.couchdb_handler.add_requests(triggered_requests)

        else:
            triggered_requests = list()

        # DEBUG AND INFO OUTPUT
        if self.debug:
            self.print_structure_info(structure, usages, limits, triggered_events, triggered_requests)

    def serverless(self, structure, rules):
        structure_subtype = structure["subtype"]

        # Check if structure is guarded
        if "guard" not in structure or not structure["guard"]:
            log_warning("structure: {0} is set to leave alone, skipping".format(structure["name"]), self.debug)
            return

        # Check if the structure has any resource set to guarded
        struct_guarded_resources = list()
        for res in self.guardable_resources:
            if res in structure["resources"] and "guard" in structure["resources"][res] and structure["resources"][res]["guard"]:
                struct_guarded_resources.append(res)
        if not struct_guarded_resources:
            log_warning("Structure {0} is set to guarded but has no resource marked to guard".format(structure["name"]), self.debug)
            return

        # Check if structure is being monitored, otherwise, ignore
        if structure_subtype not in BDWATCHDOG_METRICS or structure_subtype not in GUARDIAN_METRICS or structure_subtype not in TAGS:
            log_error("Unknown structure subtype '{0}'".format(structure_subtype), self.debug)
            return

        try:
            metrics_to_retrieve = list()
            metrics_to_generate = dict()
            for res in struct_guarded_resources:
                metrics_to_retrieve += BDWATCHDOG_METRICS[structure_subtype][res]
                metrics_to_generate[generate_structure_usage_metric(res)] = GUARDIAN_METRICS[structure_subtype][generate_structure_usage_metric(res)]
            tag = TAGS[structure_subtype]

            # Remote database operation
            usages = self.opentsdb_handler.get_structure_timeseries({tag: structure["name"]},
                                                                    self.window_difference, self.window_delay,
                                                                    metrics_to_retrieve, metrics_to_generate)

            for metric in usages:
                if usages[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE:
                    log_warning("structure: {0} has no usage data for {1}".format(structure["name"], metric), self.debug)

            # Skip this structure if all the usage metrics are unavailable
            if all([usages[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE for metric in usages]):
                log_warning("structure: {0} has no usage data for any metric, skipping".format(structure["name"]), self.debug)
                return

            resources = structure["resources"]

            # Remote database operation
            limits = self.couchdb_handler.get_limits(structure)
            limits_resources = limits["resources"]

            if not limits_resources:
                log_warning("structure: {0} has no limits".format(structure["name"]), self.debug)
                return

            # Adjust the structure limits according to the current value
            limits["resources"] = self.adjust_container_state(resources, limits_resources, self.guardable_resources)

            # Remote database operation
            self.couchdb_handler.update_limit(limits)

            self.process_serverless_structure(structure, usages, limits_resources, rules)

        except Exception as e:
            log_error("Error with structure {0}: {1}".format(structure["name"], str(e)), self.debug)

    def guard_structures(self, structures):
        # Remote database operation
        rules = self.couchdb_handler.get_rules()

        threads = []
        for structure in structures:
            thread = Thread(name="process_structure_{0}".format(structure["name"]), target=self.serverless,
                            args=(structure, rules,))
            thread.start()
            threads.append(thread)

        for process in threads:
            process.join()

    def invalid_conf(self, ):
        for res in self.guardable_resources:
            if res not in ["cpu", "mem", "disk", "net", "energy"]:
                return True, "Resource to be guarded '{0}' is invalid".format(res)

        if self.structure_guarded not in ["container", "application"]:
            return True, "Structure to be guarded '{0}' is invalid".format(self.structure_guarded)

        for key, num in [("WINDOW_TIMELAPSE", self.window_difference), ("WINDOW_DELAY", self.window_delay), ("EVENT_TIMEOUT", self.event_timeout)]:
            if num < 5:
                return True, "Configuration item '{0}' with a value of '{1}' is likely invalid".format(key, num)
        return False, ""


    def guard(self, ):
        myConfig = MyConfig(CONFIG_DEFAULT_VALUES)
        logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

        while True:
            # Get service info
            service = get_service(self.couchdb_handler, SERVICE_NAME)

            # Heartbeat
            beat(self.couchdb_handler, SERVICE_NAME)

            # CONFIG
            myConfig.set_config(service["config"])
            self.debug = myConfig.get_value("DEBUG")
            debug = self.debug
            self.guardable_resources = myConfig.get_value("GUARDABLE_RESOURCES")
            self.cpu_shares_per_watt = myConfig.get_value("CPU_SHARES_PER_WATT")
            self.window_difference = myConfig.get_value("WINDOW_TIMELAPSE")
            self.window_delay = myConfig.get_value("WINDOW_DELAY")
            self.structure_guarded = myConfig.get_value("STRUCTURE_GUARDED")
            self.event_timeout = myConfig.get_value("EVENT_TIMEOUT")
            SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

            t0 = start_epoch(self.debug)

            log_info("Config is as follows:", debug)
            log_info(".............................................", debug)
            log_info("Time window lapse -> {0}".format(self.window_difference), debug)
            log_info("Delay -> {0}".format(self.window_delay), debug)
            log_info("Event timeout -> {0}".format(self.event_timeout), debug)
            log_info("Resources guarded are -> {0}".format(self.guardable_resources), debug)
            log_info("Structure type guarded is -> {0}".format(self.structure_guarded), debug)
            log_info(".............................................", debug)

            ## CHECK INVALID CONFIG ##
            invalid, message = self.invalid_conf()
            if invalid:
                log_error(message, debug)
                if self.window_difference < 5:
                    log_error("Window difference is too short, replacing with DEFAULT value '{0}'".format(CONFIG_DEFAULT_VALUES["WINDOW_TIMELAPSE"]), self.debug)
                    self.window_difference = CONFIG_DEFAULT_VALUES["WINDOW_TIMELAPSE"]
                time.sleep(self.window_difference)
                end_epoch(self.debug, self.window_difference, t0)
                continue

            thread = None
            if SERVICE_IS_ACTIVATED:
                # Remote database operation
                structures = get_structures(self.couchdb_handler, debug, subtype=self.structure_guarded)
                if structures:
                    log_info("{0} Structures to process, launching threads".format(len(structures)), debug)
                    thread = Thread(name="guard_structures", target=self.guard_structures, args=(structures,))
                    thread.start()
                else:
                    log_info("No structures to process", debug)
            else:
                log_warning("Guardian is not activated", debug)

            time.sleep(self.window_difference)

            wait_operation_thread(thread, debug)

            end_epoch(t0, self.window_difference, t0)

Static methods

def adjust_amount(amount, structure_resources, structure_limits)

Pre-check and, if needed, adjust the scaled amount with the policy:

  • If lower limit < min value -> Amount to reduce too large, adjust it so that the lower limit is set to the minimum
  • If new applied value > max value -> Amount to increase too large, adjust it so that the current value is set to the maximum

Args

amount : integer
A number representing the amount to reduce or increase from the current value
structure_resources : dict
Dictionary with the structure resource control values (min,current,max)
structure_limits : dict
Dictionary with the structure resource limit values (lower,upper)

Returns

(integer) The amount adjusted (trimmed) in case it would exceed any limit

Expand source code
@staticmethod
def adjust_amount(amount, structure_resources, structure_limits):
    """Pre-check and, if needed, adjust the scaled amount with the policy:

    * If lower limit < min value -> Amount to reduce too large, adjust it so that the lower limit is set to the minimum
    * If new applied value > max value -> Amount to increase too large, adjust it so that the current value is set to the maximum

    Args:
        amount (integer): A number representing the amount to reduce or increase from the current value
        structure_resources (dict): Dictionary with the structure resource control values (min,current,max)
        structure_limits (dict): Dictionary with the structure resource limit values (lower,upper)

    Returns:
        (integer) The amount adjusted (trimmed) in case it would exceed any limit
    """
    expected_value = structure_resources["current"] + amount
    lower_limit = structure_limits["lower"] + amount
    max_limit = structure_resources["max"]
    min_limit = structure_resources["min"]

    if lower_limit < min_limit:
        amount += min_limit - lower_limit
    elif expected_value > max_limit:
        amount -= expected_value - max_limit

    return amount
def check_invalid_values(value1, label1, value2, label2, resource='n/a')

Check that two values have properly values set with the policy value1 < value2, otherwise raise ValueError

Args

value1 : integer
First value
label1 : string
First resource label name (e.g., upper limit), used for the exception string creation
value2 : integer
Second value
label2 : string
Second resource label name (e.g., lower limit), used for the exception string creation
resource : string
Resource name (e.g., cpu), used for the exception string creation

Returns

None

Raises

ValueError if value1 > value2 RuntimeError if value1 > value2 and value1 is current and value2 is max, that is the current is higher than the max

Expand source code
@staticmethod
def check_invalid_values(value1, label1, value2, label2, resource="n/a"):
    """ Check that two values have properly values set with the policy value1 < value2, otherwise raise ValueError

    Args:
        value1 (integer): First value
        label1 (string): First resource label name (e.g., upper limit), used for the exception string creation
        value2 (integer): Second value
        label2 (string): Second resource label name (e.g., lower limit), used for the exception string creation
        resource (string): Resource name (e.g., cpu), used for the exception string creation

    Returns:
        None

    Raises:
        ValueError if value1 > value2
        RuntimeError if value1 > value2 and value1 is current and value2 is max, that is the current is higher than the max
    """
    if value1 > value2 and label1 == "current" and label2 == "max":
        raise ValueError(
            "somehow this structure has a resource limit applied higher than maximum for {0}".format(resource))
    if value1 > value2:
        raise ValueError("in resources: {0} value for '{1}': {2} is greater than value for '{3}': {4}".format(
            resource, label1, str(value1), label2, str(value2)))
def check_unset_values(value, label, resource)

Check if a value has the N/A value and, if that is the case, raise an informative exception

Args

value : integer
The value to be inspected
label : string
Resource label name (e.g., upper limit), used for the exception string creation
resource : string
Resource name (e.g., cpu), used for the exception string creation

Returns

None

Raises

Exception if value is N/A

Expand source code
@staticmethod
def check_unset_values(value, label, resource):
    """Check if a value has the N/A value and, if that is the case, raise an informative exception

    Args:
        value (integer): The value to be inspected
        label (string): Resource label name (e.g., upper limit), used for the exception string creation
        resource (string): Resource name (e.g., cpu), used for the exception string creation

    Returns:
        None

    Raises:
        Exception if value is N/A
    """
    if value == NOT_AVAILABLE_STRING:
        raise ValueError(
            "value for '{0}' in resource '{1}' is not set or is not available.".format(label, resource))
def generate_event(event_name, structure_name, resource, action)
Expand source code
@staticmethod
def generate_event(event_name, structure_name, resource, action):
    event = dict(
        name=event_name,
        resource=resource,
        type="event",
        structure=structure_name,
        action=action,
        timestamp=int(time.time()))
    return event
def generate_request(structure, amount, resource_label)
Expand source code
@staticmethod
def generate_request(structure, amount, resource_label):
    action = generate_request_name(amount, resource_label)
    request = dict(
        type="request",
        resource=resource_label,
        amount=int(amount),
        structure=structure["name"],
        action=action,
        timestamp=int(time.time()),
        structure_type=structure["subtype"]
    )
    # For the moment, energy rescaling is uniquely mapped to cpu rescaling
    if resource_label == "energy":
        request["resource"] = "cpu"
        request["for_energy"] = True

    # If scaling a container, add its host information as it will be needed
    if structure_is_container(structure):
        request["host"] = structure["host"]
        request["host_rescaler_ip"] = structure["host_rescaler_ip"]
        request["host_rescaler_port"] = structure["host_rescaler_port"]

    return request
def get_amount_from_fit_reduction(current_resource_limit, boundary, current_resource_usage)

Get an amount that will be reduced from the current resource limit using a policy of fit to the usage. With this policy it is aimed at setting a new current value that gets close to the usage but leaving a boundary to avoid causing a severe bottleneck. More specifically, using the boundary configured this policy tries to find a scale down amount that makes the usage value stay between the now new lower and upper limits.

Args

current_resource_limit : integer
The current applied limit for this resource
boundary : integer
The boundary used between limits
current_resource_usage : integer
The usage value for this resource

Returns

(int) The amount to be reduced using the fit to usage policy.

Expand source code
@staticmethod
def get_amount_from_fit_reduction(current_resource_limit, boundary, current_resource_usage):
    """Get an amount that will be reduced from the current resource limit using a policy of *fit to the usage*.
    With this policy it is aimed at setting a new current value that gets close to the usage but leaving a boundary
    to avoid causing a severe bottleneck. More specifically, using the boundary configured this policy tries to
    find a scale down amount that makes the usage value stay between the _now new_ lower and upper limits.

    Args:
        current_resource_limit (integer): The current applied limit for this resource
        boundary (integer): The boundary used between limits
        current_resource_usage (integer): The usage value for this resource

    Returns:
        (int) The amount to be reduced using the fit to usage policy.

    """
    upper_to_lower_window = boundary
    current_to_upper_window = boundary

    # Set the limit so that the resource usage is placed in between the upper and lower limits
    # and keeping the boundary between the upper and the real resource limits
    desired_applied_resource_limit = \
        current_resource_usage + int(upper_to_lower_window / 2) + current_to_upper_window

    return -1 * (current_resource_limit - desired_applied_resource_limit)
def reduce_structure_events(structure_events)

Reduces a list of events that have been generated for a single Structure into one single event. Considering that each event is a dictionary with an integer value for either a 'down' or 'up' event, all of the dictionaries can be reduced to one that can have two values for either 'up' and 'down' events, considering that the Structure resource may have a high hysteresis.

Args

structure_events : list
A list of events for a single Structure

Returns

(dict) A dictionary with the added up events in a signle dictionary

Expand source code
@staticmethod
def reduce_structure_events(structure_events):
    """Reduces a list of events that have been generated for a single Structure into one single event. Considering
    that each event is a dictionary with an integer value for either a 'down' or 'up' event, all of the dictionaries
    can be reduced to one that can have two values for either 'up' and 'down' events, considering that the Structure
    resource may have a high hysteresis.

    Args:
        structure_events (list): A list of events for a single Structure

    Returns:
        (dict) A dictionary with the added up events in a signle dictionary

    """
    events_reduced = {"action": {}}
    for event in structure_events:
        resource = event["resource"]
        if resource not in events_reduced["action"]:
            events_reduced["action"][resource] = {"events": {"scale": {"down": 0, "up": 0}}}
        for key in event["action"]["events"]["scale"].keys():
            value = event["action"]["events"]["scale"][key]
            events_reduced["action"][resource]["events"]["scale"][key] += value
    return events_reduced["action"]
def rule_triggers_event(rule, data, resources)
Expand source code
@staticmethod
def rule_triggers_event(rule, data, resources):
    if rule["resource"] not in resources:
        return False
    else:
        return rule["active"] and \
               resources[rule["resource"]]["guard"] and \
               rule["generates"] == "events" and \
               jsonLogic(rule["rule"], data)
def sort_events(structure_events, event_timeout)

Sorts the events according to a simple policy regarding the [now - timeout <----> now] time window (TW):

  • The event is inside the TW -> valid event
  • The event is outside the TW -> invalid event

The 'now' time reference is taken inside this function

Args

structure_events : list
A list of the events triggered in the past for a specific structure
event_timeout : integer
A timeout in seconds

Returns

(tuple[list,list]) A tuple of lists of events, first the valid and then the invalid.

Expand source code
@staticmethod
def sort_events(structure_events, event_timeout):
    """Sorts the events according to a simple policy regarding the _[now - timeout <----> now]_ time window (TW):

    * The event is **inside** the TW -> valid event
    * The event is **outside** the TW -> invalid event

    The 'now' time reference is taken inside this function

    Args:
        structure_events (list): A list of the events triggered in the past for a specific structure
        event_timeout (integer): A timeout in seconds

    Returns:
        (tuple[list,list]) A tuple of lists of events, first the valid and then the invalid.

    """
    valid, invalid = list(), list()
    for event in structure_events:
        if event["timestamp"] < time.time() - event_timeout:
            invalid.append(event)
        else:
            valid.append(event)
    return valid, invalid
def try_get_value(d, key)

Get the value stored in the dictionary or return a N/A string value if:

  • it is not in it
  • it is not an valid integer

Args

d : dict
A dictionary storing values
key : string
A string key

Returns

(integer/string) int-mapped value stored in dict

Expand source code
@staticmethod
def try_get_value(d, key):
    """Get the value stored in the dictionary or return a N/A string value if:

    * it is not in it
    * it is not an valid integer

    Args:
        d (dict): A dictionary storing values
        key (string): A string key

    Returns:
        (integer/string) int-mapped value stored in dict
    """
    try:
        return int(d[key])
    except (KeyError, ValueError):
        return NOT_AVAILABLE_STRING

Methods

def adjust_container_state(self, resources, limits, resources_to_adjust)
Expand source code
def adjust_container_state(self, resources, limits, resources_to_adjust):
    for resource in resources_to_adjust:
        if "boundary" not in limits[resource]:
            raise RuntimeError("Missing boundary value for resource {0}".format(resource))
        if "current" not in resources[resource]:
            raise RuntimeError("Missing current value for resource {0}".format(resource))

        n_loop, errors = 0, True
        while errors:
            n_loop += 1
            try:
                self.check_invalid_container_state(resources, limits, resource)
                errors = False
            except ValueError:
                # Correct the chain current > upper > lower, including boundary between current and upper
                boundary = int(limits[resource]["boundary"])
                limits[resource]["upper"] = int(resources[resource]["current"] - boundary)
                limits[resource]["lower"] = int(limits[resource]["upper"] - boundary)
            except RuntimeError as e:
                log_error(str(e), self.debug)
                raise e
            if n_loop >= 10:
                message = "Limits for {0} can't be adjusted, check the configuration (max:{1},current:{2}, boundary:{3}, min:{4})".format(
                    resource, resources[resource]["max"], int(resources[resource]["current"]), limits[resource]["boundary"], resources[resource]["min"])
                raise RuntimeError(message)
                # TODO This prevents from checking other resources
    return limits
def check_invalid_container_state(self, resources, limits, resource)
Expand source code
def check_invalid_container_state(self, resources, limits, resource):
    if resource not in resources:
        raise RuntimeError("resource values not available for resource {0}".format(resource))
    if resource not in limits:
        raise RuntimeError("limit values not available for resource {0}".format(resource))
    data = {"res": resources, "lim": limits}
    values_tuples = [("max", "res"), ("current", "res"), ("upper", "lim"), ("lower", "lim"), ("min", "res")]
    values = dict()
    for value, vtype in values_tuples:
        values[value] = self.try_get_value(data[vtype][resource], value)
    values["boundary"] = data["lim"][resource]["boundary"]

    # Check values are set and valid, except for current as it may have not been persisted yet
    for value in values:
        self.check_unset_values(values[value], value, resource)

    # Check if the first value is greater than the second
    # check the full chain max > upper > current > lower
    if values["current"] != NOT_AVAILABLE_STRING:
        self.check_invalid_values(values["current"], "current", values["max"], "max", resource=resource)
    self.check_invalid_values(values["upper"], "upper", values["current"], "current", resource=resource)
    self.check_invalid_values(values["lower"], "lower", values["upper"], "upper", resource=resource)

    # Check that there is a boundary between values, like the current and upper, so
    # that the limit can be surpassed
    if values["current"] != NOT_AVAILABLE_STRING:
        if values["current"] - values["boundary"] < values["upper"]:
            raise ValueError("value for 'current': {0} is too close (less than {1}) to value for 'upper': {2}".format(
                str(values["current"]), str(values["boundary"]), str(values["upper"])))

        elif values["current"] - values["boundary"] > values["upper"]:
            raise ValueError("value for 'current': {0} is too far (more than {1}) from value for 'upper': {2}".format(
                str(values["current"]), str(values["boundary"]), str(values["upper"])))
def get_amount_from_proportional_energy_rescaling(self, structure, resource)

Get an amount that will be reduced from the current resource limit using a policy of proportional energy-based CPU scaling. With this policy it is aimed at setting a new current CPU value that makes the energy consumed by a Structure get closer to a limit.

THIS FUNCTION IS USED WITH THE ENERGY CAPPING SCENARIO, see: http://bdwatchdog.dec.udc.es/energy/index.html

Args

structure : dict
The dictionary containing all of the structure resource information
resource : string
The resource name, used for indexing puroposes

Returns

(int) The amount to be reduced using the fit to usage policy.

Expand source code
def get_amount_from_proportional_energy_rescaling(self, structure, resource):
    """Get an amount that will be reduced from the current resource limit using a policy of *proportional
    energy-based CPU scaling*.
    With this policy it is aimed at setting a new current CPU value that makes the energy consumed by a Structure
    get closer to a limit.

    *THIS FUNCTION IS USED WITH THE ENERGY CAPPING SCENARIO*, see: http://bdwatchdog.dec.udc.es/energy/index.html

    Args:
        structure (dict): The dictionary containing all of the structure resource information
        resource (string): The resource name, used for indexing puroposes

    Returns:
        (int) The amount to be reduced using the fit to usage policy.

    """
    max_resource_limit = structure["resources"][resource]["max"]
    current_resource_limit = structure["resources"][resource]["usage"]
    difference = max_resource_limit - current_resource_limit
    energy_amplification = difference * self.cpu_shares_per_watt  # How many cpu shares to rescale per watt
    return int(energy_amplification)
def get_container_energy_str(self, resources_dict)

Get a summary string but for the energy resource, which has a different behavior from others such as CPU or Memory.

THIS FUNCTION IS USED WITH THE ENERGY CAPPING SCENARIO, see: http://bdwatchdog.dec.udc.es/energy/index.html

Args

resources_dict : dict
A dictionary with all the resources' information, including energy

Returns

(string) A string that summarizes the state of the energy resource

Expand source code
def get_container_energy_str(self, resources_dict):
    """Get a summary string but for the energy resource, which has a different behavior from others such as CPU or
    Memory.

    *THIS FUNCTION IS USED WITH THE ENERGY CAPPING SCENARIO*, see: http://bdwatchdog.dec.udc.es/energy/index.html

    Args:
        resources_dict (dict): A dictionary with all the resources' information, including energy

    Returns:
        (string) A string that summarizes the state of the energy resource

    """
    energy_dict = resources_dict["energy"]
    string = list()
    for field in ["max", "usage", "min"]:
        string.append(str(self.try_get_value(energy_dict, field)))
    return ",".join(string)
def get_resource_summary(self, resource_label, resources_dict, limits_dict, usages_dict)

Produces a string to summarize the current state of a resource with all of its information and the following format: [max, current, upper limit, usage, lower limit, min]

Args

resource_label : string
The resource name label, used to create the string and to access the dictionaries
resources_dict : dict
a dictionary with the metrics (e.g., max, min) of the resources
limits_dict : dict
a dictionary with the limits (e.g., lower, upper) of the resources
usages_dict : dict
a dictionary with the usages of the resources

Returns

(string) A summary string that contains all of the appropriate values for all of the resources

Expand source code
def get_resource_summary(self, resource_label, resources_dict, limits_dict, usages_dict):
    """Produces a string to summarize the current state of a resource with all of its information and
    the following format: _[max, current, upper limit, usage, lower limit, min]_

    Args:
        resource_label (string): The resource name label, used to create the string and to access the dictionaries
        resources_dict (dict): a dictionary with the metrics (e.g., max, min) of the resources
        limits_dict (dict): a dictionary with the limits (e.g., lower, upper) of the resources
        usages_dict (dict): a dictionary with the usages of the resources

    Returns:
        (string) A summary string that contains all of the appropriate values for all of the resources

    """
    metrics = resources_dict[resource_label]
    limits = limits_dict[resource_label]
    color_map = {"max": "red", "current": "magenta", "upper": "yellow", "usage": "cyan", "lower": "green", "min": "blue"}

    if not usages_dict or usages_dict[translator_dict[resource_label]] == self.NO_METRIC_DATA_DEFAULT_VALUE:
        usage_value_string = NOT_AVAILABLE_STRING
    else:
        usage_value_string = str("%.2f" % usages_dict[translator_dict[resource_label]])

    strings = list()
    for key, values in [("max", metrics), ("current", metrics), ("upper", limits), ("lower", limits), ("min", metrics)]:
        strings.append(colored(str(self.try_get_value(values, key)), color_map[key]))
    strings.insert(3, colored(usage_value_string, color_map["usage"]))  # Manually add the usage metric

    return ",".join(strings)
def guard(self)
Expand source code
def guard(self, ):
    myConfig = MyConfig(CONFIG_DEFAULT_VALUES)
    logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

    while True:
        # Get service info
        service = get_service(self.couchdb_handler, SERVICE_NAME)

        # Heartbeat
        beat(self.couchdb_handler, SERVICE_NAME)

        # CONFIG
        myConfig.set_config(service["config"])
        self.debug = myConfig.get_value("DEBUG")
        debug = self.debug
        self.guardable_resources = myConfig.get_value("GUARDABLE_RESOURCES")
        self.cpu_shares_per_watt = myConfig.get_value("CPU_SHARES_PER_WATT")
        self.window_difference = myConfig.get_value("WINDOW_TIMELAPSE")
        self.window_delay = myConfig.get_value("WINDOW_DELAY")
        self.structure_guarded = myConfig.get_value("STRUCTURE_GUARDED")
        self.event_timeout = myConfig.get_value("EVENT_TIMEOUT")
        SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

        t0 = start_epoch(self.debug)

        log_info("Config is as follows:", debug)
        log_info(".............................................", debug)
        log_info("Time window lapse -> {0}".format(self.window_difference), debug)
        log_info("Delay -> {0}".format(self.window_delay), debug)
        log_info("Event timeout -> {0}".format(self.event_timeout), debug)
        log_info("Resources guarded are -> {0}".format(self.guardable_resources), debug)
        log_info("Structure type guarded is -> {0}".format(self.structure_guarded), debug)
        log_info(".............................................", debug)

        ## CHECK INVALID CONFIG ##
        invalid, message = self.invalid_conf()
        if invalid:
            log_error(message, debug)
            if self.window_difference < 5:
                log_error("Window difference is too short, replacing with DEFAULT value '{0}'".format(CONFIG_DEFAULT_VALUES["WINDOW_TIMELAPSE"]), self.debug)
                self.window_difference = CONFIG_DEFAULT_VALUES["WINDOW_TIMELAPSE"]
            time.sleep(self.window_difference)
            end_epoch(self.debug, self.window_difference, t0)
            continue

        thread = None
        if SERVICE_IS_ACTIVATED:
            # Remote database operation
            structures = get_structures(self.couchdb_handler, debug, subtype=self.structure_guarded)
            if structures:
                log_info("{0} Structures to process, launching threads".format(len(structures)), debug)
                thread = Thread(name="guard_structures", target=self.guard_structures, args=(structures,))
                thread.start()
            else:
                log_info("No structures to process", debug)
        else:
            log_warning("Guardian is not activated", debug)

        time.sleep(self.window_difference)

        wait_operation_thread(thread, debug)

        end_epoch(t0, self.window_difference, t0)
def guard_structures(self, structures)
Expand source code
def guard_structures(self, structures):
    # Remote database operation
    rules = self.couchdb_handler.get_rules()

    threads = []
    for structure in structures:
        thread = Thread(name="process_structure_{0}".format(structure["name"]), target=self.serverless,
                        args=(structure, rules,))
        thread.start()
        threads.append(thread)

    for process in threads:
        process.join()
def invalid_conf(self)
Expand source code
def invalid_conf(self, ):
    for res in self.guardable_resources:
        if res not in ["cpu", "mem", "disk", "net", "energy"]:
            return True, "Resource to be guarded '{0}' is invalid".format(res)

    if self.structure_guarded not in ["container", "application"]:
        return True, "Structure to be guarded '{0}' is invalid".format(self.structure_guarded)

    for key, num in [("WINDOW_TIMELAPSE", self.window_difference), ("WINDOW_DELAY", self.window_delay), ("EVENT_TIMEOUT", self.event_timeout)]:
        if num < 5:
            return True, "Configuration item '{0}' with a value of '{1}' is likely invalid".format(key, num)
    return False, ""
def match_rules_and_events(self, structure, rules, events, limits, usages)
Expand source code
def match_rules_and_events(self, structure, rules, events, limits, usages):
    generated_requests = list()
    events_to_remove = dict()

    for rule in rules:
        # Check that the rule has the required parameters
        rule_invalid = False
        for key in ["active", "resource", "generates", "name", ]:
            if key not in rule:
                log_warning("Rule: {0} is missing a key parameter '{1}', skipping it".format(rule["name"], key), self.debug)
                rule_invalid = True
        if rule_invalid:
            continue

        if rule["generates"] == "requests":
            if "rescale_policy" not in rule or "rescale_type" not in rule:
                log_warning("Rule: {0} is missing the 'rescale_type' or the 'rescale_policy' parameter, skipping it".format(rule["name"]), self.debug)
                continue

            if rule["rescale_type"] == "up" and "amount" not in rule:
                log_warning("Rule: {0} is missing a the amount parameter, skipping it".format(rule["name"]), self.debug)
                continue

        resource_label = rule["resource"]

        rule_activated = rule["active"] and \
                         rule["generates"] == "requests" and \
                         resource_label in events and \
                         jsonLogic(rule["rule"], events[resource_label])

        if not rule_activated:
            continue

        # RULE HAS BEEN ACTIVATED

        # If rescaling a container, check that the current resource value exists, otherwise there is nothing to rescale
        if structure_is_container(structure) and "current" not in structure["resources"][resource_label]:
            log_warning("No current value for container' {0}' and "
                        "resource '{1}', can't rescale".format(structure["name"], resource_label), self.debug)
            continue

        # Get the amount to be applied from the policy set
        if rule["rescale_type"] == "up":
            if rule["rescale_policy"] == "amount":
                amount = rule["amount"]
            elif rule["rescale_policy"] == "proportional":
                amount = rule["amount"]
                current_resource_limit = structure["resources"][resource_label]["current"]
                upper_limit = limits[resource_label]["upper"]
                usage = usages[translator_dict[resource_label]]
                ratio = min((usage - upper_limit) / (current_resource_limit - upper_limit), 1)
                amount = int(ratio * amount)
                log_warning("PROP -> cur : {0} | upp : {1} | usa: {2} | ratio {3} | amount {4}".format(
                    current_resource_limit, upper_limit, usage, ratio, amount), self.debug)
            else:
                log_warning("Invalid rescale policy '{0} for Rule {1}, skipping it".format(rule["rescale_policy"], rule["name"]), self.debug)
                continue
        elif rule["rescale_type"] == "down":
            if rule["rescale_policy"] == "amount":
                amount = rule["amount"]
            elif rule["rescale_policy"] == "fit_to_usage":
                current_resource_limit = structure["resources"][resource_label]["current"]
                boundary = limits[resource_label]["boundary"]
                usage = usages[translator_dict[resource_label]]
                amount = self.get_amount_from_fit_reduction(current_resource_limit, boundary, usage)
            elif rule["rescale_policy"] == "proportional" and rule["resource"] == "energy":
                amount = self.get_amount_from_proportional_energy_rescaling(structure, resource_label)
            else:
                log_warning("Invalid rescale policy '{0} for Rule {1}, skipping it".format(rule["rescale_policy"], rule["name"]), self.debug)
                continue
        else:
            log_warning("Invalid rescale type '{0} for Rule {1}, skipping it".format(rule["rescale_type"], rule["name"]), self.debug)
            continue

        # Ensure that amount is an integer, either by converting float -> int, or string -> int
        amount = int(amount)

        # If it is 0, because there was a previous floating value between -1 and 1, set it to 0 so that it does not generate any Request
        if amount == 0:
            log_warning("Amount generated for structure {0} with rule {1} is 0".format(structure["name"], rule["name"]), self.debug)

        # If the resource is susceptible to check, ensure that it does not surpass any limit
        new_amount = amount
        if resource_label not in NON_ADJUSTABLE_RESOURCES:
            structure_resources = structure["resources"][resource_label]
            structure_limits = limits[resource_label]
            new_amount = self.adjust_amount(amount, structure_resources, structure_limits)
            if new_amount != amount:
                log_warning("Amount generated for structure {0} with rule {1} has been trimmed from {2} to {3}".format(
                    structure["name"], rule["name"], amount, new_amount), self.debug)

        # Generate the request and append it
        request = self.generate_request(structure, new_amount, resource_label)
        generated_requests.append(request)

        # Remove the events that triggered the request
        event_name = generate_event_name(events[resource_label]["events"], resource_label)
        if event_name not in events_to_remove:
            events_to_remove[event_name] = 0
        events_to_remove[event_name] += rule["events_to_remove"]

    return generated_requests, events_to_remove
def match_usages_and_limits(self, structure_name, rules, usages, limits, resources)
Expand source code
def match_usages_and_limits(self, structure_name, rules, usages, limits, resources):

    resources_with_rules = list()
    for rule in rules:
        if rule["resource"] in resources_with_rules:
            pass
        else:
            resources_with_rules.append(rule["resource"])

    useful_resources = list()
    for resource in self.guardable_resources:
        if resource not in resources_with_rules:
            log_warning("Resource {0} has no rules applied to it".format(resource), self.debug)
        else:
            useful_resources.append(resource)

    data = dict()
    for resource in useful_resources:
        if resource in resources:
            data[resource] = {
                "limits": {resource: limits[resource]},
                "structure": {resource: resources[resource]}}

    for usage_metric in usages:
        keys = usage_metric.split(".")
        struct_type, usage_resource = keys[0], keys[1]
        # Split the key from the retrieved data, e.g., structure.mem.usages, where mem is the resource
        if usage_resource in useful_resources:
            data[usage_resource][struct_type][usage_resource][keys[2]] = usages[usage_metric]

    events = []
    for rule in rules:
        try:
            # Check that the rule is active, the resource to watch is guarded and that the rule is activated
            if self.rule_triggers_event(rule, data, resources):
                event_name = generate_event_name(rule["action"]["events"], rule["resource"])
                event = self.generate_event(event_name, structure_name, rule["resource"], rule["action"])
                events.append(event)

        except KeyError as e:
            log_warning("rule: {0} is missing a parameter {1} {2}".format(
                rule["name"], str(e), str(traceback.format_exc())), self.debug)

    return events
def print_structure_info(self, container, usages, limits, triggered_events, triggered_requests)
Expand source code
def print_structure_info(self, container, usages, limits, triggered_events, triggered_requests):
    resources = container["resources"]

    container_name_str = "@" + container["name"]
    resources_str = "| "
    for resource in self.guardable_resources:
        if container["resources"][resource]["guard"]:
            resources_str += resource + "({0})".format(self.get_resource_summary(resource, resources, limits, usages)) + " | "

    ev, req = list(), list()
    for event in triggered_events:
        ev.append(event["name"])
    for request in triggered_requests:
        req.append(request["action"])
    triggered_requests_and_events = "#TRIGGERED EVENTS {0} AND TRIGGERED REQUESTS {1}".format(str(ev), str(req))
    log_info(
        " ".join([container_name_str, resources_str, triggered_requests_and_events]),
        self.debug)
def process_serverless_structure(self, structure, usages, limits, rules)
Expand source code
def process_serverless_structure(self, structure, usages, limits, rules):

    # Match usages and rules to generate events
    triggered_events = self.match_usages_and_limits(structure["name"], rules, usages, limits, structure["resources"])

    # Remote database operation
    if triggered_events:
        self.couchdb_handler.add_events(triggered_events)

    # Remote database operation
    all_events = self.couchdb_handler.get_events(structure)

    # Filter the events according to timestamp
    filtered_events, old_events = self.sort_events(all_events, self.event_timeout)

    if old_events:
        # Remote database operation
        self.couchdb_handler.delete_events(old_events)

    # If there are no events, nothing else to do as no requests will be generated
    if filtered_events:
        # Merge all the event counts
        reduced_events = self.reduce_structure_events(filtered_events)

        # Match events and rules to generate requests
        triggered_requests, events_to_remove = self.match_rules_and_events(structure, rules, reduced_events, limits, usages)

        # Remove events that generated the request
        # Remote database operation
        for event in events_to_remove:
            self.couchdb_handler.delete_num_events_by_structure(structure, event, events_to_remove[event])

        if triggered_requests:
            # Remote database operation
            self.couchdb_handler.add_requests(triggered_requests)

    else:
        triggered_requests = list()

    # DEBUG AND INFO OUTPUT
    if self.debug:
        self.print_structure_info(structure, usages, limits, triggered_events, triggered_requests)
def serverless(self, structure, rules)
Expand source code
def serverless(self, structure, rules):
    structure_subtype = structure["subtype"]

    # Check if structure is guarded
    if "guard" not in structure or not structure["guard"]:
        log_warning("structure: {0} is set to leave alone, skipping".format(structure["name"]), self.debug)
        return

    # Check if the structure has any resource set to guarded
    struct_guarded_resources = list()
    for res in self.guardable_resources:
        if res in structure["resources"] and "guard" in structure["resources"][res] and structure["resources"][res]["guard"]:
            struct_guarded_resources.append(res)
    if not struct_guarded_resources:
        log_warning("Structure {0} is set to guarded but has no resource marked to guard".format(structure["name"]), self.debug)
        return

    # Check if structure is being monitored, otherwise, ignore
    if structure_subtype not in BDWATCHDOG_METRICS or structure_subtype not in GUARDIAN_METRICS or structure_subtype not in TAGS:
        log_error("Unknown structure subtype '{0}'".format(structure_subtype), self.debug)
        return

    try:
        metrics_to_retrieve = list()
        metrics_to_generate = dict()
        for res in struct_guarded_resources:
            metrics_to_retrieve += BDWATCHDOG_METRICS[structure_subtype][res]
            metrics_to_generate[generate_structure_usage_metric(res)] = GUARDIAN_METRICS[structure_subtype][generate_structure_usage_metric(res)]
        tag = TAGS[structure_subtype]

        # Remote database operation
        usages = self.opentsdb_handler.get_structure_timeseries({tag: structure["name"]},
                                                                self.window_difference, self.window_delay,
                                                                metrics_to_retrieve, metrics_to_generate)

        for metric in usages:
            if usages[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE:
                log_warning("structure: {0} has no usage data for {1}".format(structure["name"], metric), self.debug)

        # Skip this structure if all the usage metrics are unavailable
        if all([usages[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE for metric in usages]):
            log_warning("structure: {0} has no usage data for any metric, skipping".format(structure["name"]), self.debug)
            return

        resources = structure["resources"]

        # Remote database operation
        limits = self.couchdb_handler.get_limits(structure)
        limits_resources = limits["resources"]

        if not limits_resources:
            log_warning("structure: {0} has no limits".format(structure["name"]), self.debug)
            return

        # Adjust the structure limits according to the current value
        limits["resources"] = self.adjust_container_state(resources, limits_resources, self.guardable_resources)

        # Remote database operation
        self.couchdb_handler.update_limit(limits)

        self.process_serverless_structure(structure, usages, limits_resources, rules)

    except Exception as e:
        log_error("Error with structure {0}: {1}".format(structure["name"], str(e)), self.debug)