Module src.ReBalancer.ContainerReBalancer

Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.

import time
import traceback

import requests
from json_logic import jsonLogic

from src.MyUtils.MyUtils import log_info, get_config_value, log_error, log_warning, get_structures
from src.ReBalancer.Utils import CONFIG_DEFAULT_VALUES, app_can_be_rebalanced
from src.StateDatabase import opentsdb
from src.StateDatabase import couchdb

BDWATCHDOG_CONTAINER_METRICS = ['proc.cpu.user', 'proc.cpu.kernel']
GUARDIAN_CONTAINER_METRICS = {
    'structure.cpu.usage': ['proc.cpu.user', 'proc.cpu.kernel']}


class ContainerRebalancer:
    def __init__(self):
        self.__opentsdb_handler = opentsdb.OpenTSDBServer()
        self.__couchdb_handler = couchdb.CouchDBServer()
        self.__NO_METRIC_DATA_DEFAULT_VALUE = self.__opentsdb_handler.NO_METRIC_DATA_DEFAULT_VALUE
        self.__debug = True
        self.__config = {}

    # @staticmethod
    # def __generate_request(structure_name, amount, resource, action):
    #     request = dict(
    #         type="request",
    #         resource=resource,
    #         amount=int(amount),
    #         structure=structure_name,
    #         action=action,
    #         timestamp=int(time.time()))
    #     return request

    def __get_container_usages(self, container):
        window_difference = get_config_value(self.__config, CONFIG_DEFAULT_VALUES, "WINDOW_TIMELAPSE")
        window_delay = get_config_value(self.__config, CONFIG_DEFAULT_VALUES, "WINDOW_DELAY")

        try:
            # Remote database operation
            usages = self.__opentsdb_handler.get_structure_timeseries({"host": container["name"]},
                                                                      window_difference,
                                                                      window_delay,
                                                                      BDWATCHDOG_CONTAINER_METRICS,
                                                                      GUARDIAN_CONTAINER_METRICS)

            # Skip this structure if all the usage metrics are unavailable
            if all([usages[metric] == self.__NO_METRIC_DATA_DEFAULT_VALUE for metric in usages]):
                log_warning("container: {0} has no usage data".format(container["name"]), self.__debug)
                return None

            return usages
        except Exception as e:
            log_error("error with structure: {0} {1} {2}".format(container["name"], str(e), str(traceback.format_exc())),
                      self.__debug)

            return None

    def __fill_containers_with_usage_info(self, containers):
        # Get the usages
        containers_with_resource_usages = list()
        for container in containers:
            usages = self.__get_container_usages(container)
            if usages:
                for usage_metric in usages:
                    keys = usage_metric.split(".")
                    # Split the key from the retrieved data, e.g., structure.mem.usages, where mem is the resource
                    container["resources"][keys[1]][keys[2]] = usages[usage_metric]
                containers_with_resource_usages.append(container)
        return containers_with_resource_usages

    def __get_container_donors(self, containers):
        donors = list()
        for container in containers:
            try:
                data = {"cpu": {"structure": {"cpu": {
                    "usage": container["resources"]["cpu"]["usage"],
                    "min": container["resources"]["cpu"]["min"],
                    "max": container["resources"]["cpu"]["max"],
                    "current": container["resources"]["cpu"]["current"]}}}}
            except KeyError:
                continue

            # containers that have low resource usage (donors)
            rule_low_usage = self.__couchdb_handler.get_rule("cpu_usage_low")
            if jsonLogic(rule_low_usage["rule"], data):
                donors.append(container)
        return donors

    def __get_container_receivers(self, containers):
        receivers = list()
        for container in containers:
            try:
                data = {"cpu": {"structure": {"cpu": {
                    "usage": container["resources"]["cpu"]["usage"],
                    "min": container["resources"]["cpu"]["min"],
                    "max": container["resources"]["cpu"]["max"],
                    "current": container["resources"]["cpu"]["current"]}}}}
            except KeyError:
                continue

            # containers that have a bottleneck (receivers)
            rule_high_usage = self.__couchdb_handler.get_rule("cpu_usage_high")
            if jsonLogic(rule_high_usage["rule"], data):
                receivers.append(container)
        return receivers


    def __rebalance_containers_by_pair_swapping(self, containers, app_name):
        # Filter the containers between donors and receivers, according to usage and rules
        donors = self.__get_container_donors(containers)
        receivers = self.__get_container_receivers(containers)

        log_info("Nodes that will give: {0}".format(str([c["name"] for c in donors])), self.__debug)
        log_info("Nodes that will receive:  {0}".format(str([c["name"] for c in receivers])), self.__debug)

        if not receivers:
            log_info("No containers in need of rebalancing for {0}".format(app_name), self.__debug)
            return
        else:
            # Order the containers from lower to upper current CPU limit
            receivers = sorted(receivers, key=lambda c: c["resources"]["cpu"]["current"])

        # Steal resources from the low-usage containers (givers), create 'slices' of resources
        donor_slices = list()
        id = 0
        for container in donors:
            # Ensure that this request will be successfully processed, otherwise we are 'giving' away extra resources
            current_value = container["resources"]["cpu"]["current"]
            min_value = container["resources"]["cpu"]["min"]
            usage_value = container["resources"]["cpu"]["usage"]
            stolen_amount = 0.5 * (current_value - max(min_value,  usage_value))

            slice_amount = 25
            acum = 0
            while acum + slice_amount < stolen_amount:
                donor_slices.append((container, slice_amount, id))
                acum += slice_amount
                id += 1

            # Remaining
            if acum < stolen_amount:
                donor_slices.append((container, int(stolen_amount-acum), id))
                acum += slice_amount
                id += 1

        donor_slices = sorted(donor_slices, key=lambda c: c[1])
        print("Donor slices are")
        for c in donor_slices:
            print(c[0]["name"], c[1])

        # Remove those donors that are of no use (there are no possible receivers for them)
        viable_donors = list()
        for c in donor_slices:
            viable = False
            for r in receivers:
                if r["host"] == c[0]["host"]:
                    viable = True
                    break
            if viable:
                viable_donors.append(c)
        print("VIABLE donor slices are")
        for c in viable_donors:
            print(c[0]["name"], c[1], c[2])
        donor_slices = viable_donors

        # Give the resources to the bottlenecked containers
        requests = dict()
        while donor_slices:
            print("Donor slices are")
            for c in donor_slices:
                print(c[0]["name"], c[1], c[2])

            for receiver in receivers:
                # Look for a donor container on the same host
                amount_to_scale, donor, id = None, None, None
                for c, amount, i in donor_slices:
                    if c["host"] == receiver["host"]:
                        amount_to_scale = amount
                        donor = c
                        id = i
                        break

                if not amount_to_scale:
                    log_info("No more donors on its host, container {0} left out".format(receiver["name"]), self.__debug)
                    continue

                # Remove this slice from the list
                donor_slices = list(filter(lambda x: x[2] != id, donor_slices))

                max_receiver_amount = receiver["resources"]["cpu"]["max"] - receiver["resources"]["cpu"]["current"]
                # If this container can't be scaled anymore, skip
                if max_receiver_amount == 0:
                    continue

                # Trim the amount to scale if needed
                if amount_to_scale > max_receiver_amount:
                    amount_to_scale = max_receiver_amount

                # Create the pair of scaling requests
                # TODO This should use Guardians method to generate requests
                request = dict(
                    type="request",
                    resource="cpu",
                    amount=int(amount_to_scale),
                    structure=receiver["name"],
                    action="CpuRescaleUp",
                    timestamp=int(time.time()),
                    structure_type="container",
                    host=receiver["host"],
                    host_rescaler_ip=receiver["host_rescaler_ip"],
                    host_rescaler_port=receiver["host_rescaler_port"]
                )

                if receiver["name"] not in requests:
                    requests[receiver["name"]] = list()
                requests[receiver["name"]].append(request)

                # TODO This should use Guardians method to generate requests
                request = dict(
                    type="request",
                    resource="cpu",
                    amount=int(-amount_to_scale),
                    structure=donor["name"],
                    action="CpuRescaleDown",
                    timestamp=int(time.time()),
                    structure_type="container",
                    host=donor["host"],
                    host_rescaler_ip=donor["host_rescaler_ip"],
                    host_rescaler_port=donor["host_rescaler_port"]
                )

                if donor["name"] not in requests:
                    requests[donor["name"]] = list()
                requests[donor["name"]].append(request)
                log_info("Resource swap between {0}(donor) and {1}(receiver)".format(donor["name"], receiver["name"]), self.__debug)

        log_info("No more donors", self.__debug)

        final_requests = list()
        for container in requests:
            # Copy the first request as the base request
            flat_request = dict(requests[container][0])
            flat_request["amount"] = 0
            for request in requests[container]:
                flat_request["amount"] += request["amount"]
            final_requests.append(flat_request)

        log_info("REQUESTS ARE:", self.__debug)
        for c in requests.values():
            for r in c:
                print(r)

        # TODO
        # Adjust requests amounts according to the maximums (trim), otherwise the scaling down will be performed but not the scaling up, and shares will be lost

        log_info("FINAL REQUESTS ARE:", self.__debug)
        for r in final_requests:
            print(r)
            self.__couchdb_handler.add_request(r)


    def __app_containers_can_be_rebalanced(self, application):
        return app_can_be_rebalanced(application, "container", self.__couchdb_handler)

    def rebalance_containers(self, config):
        self.__config = config
        self.__debug = get_config_value(self.__config, CONFIG_DEFAULT_VALUES, "DEBUG")

        log_info("_______________", self.__debug)
        log_info("Performing CONTAINER CPU Balancing", self.__debug)

        # Get the containers and applications
        try:
            applications = get_structures(self.__couchdb_handler, self.__debug, subtype="application")
            containers = get_structures(self.__couchdb_handler, self.__debug, subtype="container")
        except requests.exceptions.HTTPError as e:
            log_error("Couldn't get applications", self.__debug)
            log_error(str(e), self.__debug)
            return

        # Filter out the ones that do not accept rebalancing or that do not need any internal rebalancing
        rebalanceable_apps = list()
        for app in applications:
            # TODO Improve this management
            if "rebalance" not in app or app["rebalance"] == True:
                pass
            else:
                continue
            if len(app["containers"]) <= 1:
                continue

            if self.__app_containers_can_be_rebalanced(app):
                rebalanceable_apps.append(app)

        # Sort them according to each application they belong
        app_containers = dict()
        for app in rebalanceable_apps:
            app_name = app["name"]
            app_containers[app_name] = list()
            app_containers_names = app["containers"]
            for container in containers:
                if container["name"] in app_containers_names:
                    app_containers[app_name].append(container)
            # Get the container usages
            app_containers[app_name] = self.__fill_containers_with_usage_info(app_containers[app_name])

        # Rebalance applications
        for app in rebalanceable_apps:
            app_name = app["name"]
            log_info("Going to rebalance {0} now".format(app_name), self.__debug)
            self.__rebalance_containers_by_pair_swapping(app_containers[app_name], app_name)

        log_info("_______________", self.__debug)

Classes

class ContainerRebalancer
Expand source code
class ContainerRebalancer:
    def __init__(self):
        self.__opentsdb_handler = opentsdb.OpenTSDBServer()
        self.__couchdb_handler = couchdb.CouchDBServer()
        self.__NO_METRIC_DATA_DEFAULT_VALUE = self.__opentsdb_handler.NO_METRIC_DATA_DEFAULT_VALUE
        self.__debug = True
        self.__config = {}

    # @staticmethod
    # def __generate_request(structure_name, amount, resource, action):
    #     request = dict(
    #         type="request",
    #         resource=resource,
    #         amount=int(amount),
    #         structure=structure_name,
    #         action=action,
    #         timestamp=int(time.time()))
    #     return request

    def __get_container_usages(self, container):
        window_difference = get_config_value(self.__config, CONFIG_DEFAULT_VALUES, "WINDOW_TIMELAPSE")
        window_delay = get_config_value(self.__config, CONFIG_DEFAULT_VALUES, "WINDOW_DELAY")

        try:
            # Remote database operation
            usages = self.__opentsdb_handler.get_structure_timeseries({"host": container["name"]},
                                                                      window_difference,
                                                                      window_delay,
                                                                      BDWATCHDOG_CONTAINER_METRICS,
                                                                      GUARDIAN_CONTAINER_METRICS)

            # Skip this structure if all the usage metrics are unavailable
            if all([usages[metric] == self.__NO_METRIC_DATA_DEFAULT_VALUE for metric in usages]):
                log_warning("container: {0} has no usage data".format(container["name"]), self.__debug)
                return None

            return usages
        except Exception as e:
            log_error("error with structure: {0} {1} {2}".format(container["name"], str(e), str(traceback.format_exc())),
                      self.__debug)

            return None

    def __fill_containers_with_usage_info(self, containers):
        # Get the usages
        containers_with_resource_usages = list()
        for container in containers:
            usages = self.__get_container_usages(container)
            if usages:
                for usage_metric in usages:
                    keys = usage_metric.split(".")
                    # Split the key from the retrieved data, e.g., structure.mem.usages, where mem is the resource
                    container["resources"][keys[1]][keys[2]] = usages[usage_metric]
                containers_with_resource_usages.append(container)
        return containers_with_resource_usages

    def __get_container_donors(self, containers):
        donors = list()
        for container in containers:
            try:
                data = {"cpu": {"structure": {"cpu": {
                    "usage": container["resources"]["cpu"]["usage"],
                    "min": container["resources"]["cpu"]["min"],
                    "max": container["resources"]["cpu"]["max"],
                    "current": container["resources"]["cpu"]["current"]}}}}
            except KeyError:
                continue

            # containers that have low resource usage (donors)
            rule_low_usage = self.__couchdb_handler.get_rule("cpu_usage_low")
            if jsonLogic(rule_low_usage["rule"], data):
                donors.append(container)
        return donors

    def __get_container_receivers(self, containers):
        receivers = list()
        for container in containers:
            try:
                data = {"cpu": {"structure": {"cpu": {
                    "usage": container["resources"]["cpu"]["usage"],
                    "min": container["resources"]["cpu"]["min"],
                    "max": container["resources"]["cpu"]["max"],
                    "current": container["resources"]["cpu"]["current"]}}}}
            except KeyError:
                continue

            # containers that have a bottleneck (receivers)
            rule_high_usage = self.__couchdb_handler.get_rule("cpu_usage_high")
            if jsonLogic(rule_high_usage["rule"], data):
                receivers.append(container)
        return receivers


    def __rebalance_containers_by_pair_swapping(self, containers, app_name):
        # Filter the containers between donors and receivers, according to usage and rules
        donors = self.__get_container_donors(containers)
        receivers = self.__get_container_receivers(containers)

        log_info("Nodes that will give: {0}".format(str([c["name"] for c in donors])), self.__debug)
        log_info("Nodes that will receive:  {0}".format(str([c["name"] for c in receivers])), self.__debug)

        if not receivers:
            log_info("No containers in need of rebalancing for {0}".format(app_name), self.__debug)
            return
        else:
            # Order the containers from lower to upper current CPU limit
            receivers = sorted(receivers, key=lambda c: c["resources"]["cpu"]["current"])

        # Steal resources from the low-usage containers (givers), create 'slices' of resources
        donor_slices = list()
        id = 0
        for container in donors:
            # Ensure that this request will be successfully processed, otherwise we are 'giving' away extra resources
            current_value = container["resources"]["cpu"]["current"]
            min_value = container["resources"]["cpu"]["min"]
            usage_value = container["resources"]["cpu"]["usage"]
            stolen_amount = 0.5 * (current_value - max(min_value,  usage_value))

            slice_amount = 25
            acum = 0
            while acum + slice_amount < stolen_amount:
                donor_slices.append((container, slice_amount, id))
                acum += slice_amount
                id += 1

            # Remaining
            if acum < stolen_amount:
                donor_slices.append((container, int(stolen_amount-acum), id))
                acum += slice_amount
                id += 1

        donor_slices = sorted(donor_slices, key=lambda c: c[1])
        print("Donor slices are")
        for c in donor_slices:
            print(c[0]["name"], c[1])

        # Remove those donors that are of no use (there are no possible receivers for them)
        viable_donors = list()
        for c in donor_slices:
            viable = False
            for r in receivers:
                if r["host"] == c[0]["host"]:
                    viable = True
                    break
            if viable:
                viable_donors.append(c)
        print("VIABLE donor slices are")
        for c in viable_donors:
            print(c[0]["name"], c[1], c[2])
        donor_slices = viable_donors

        # Give the resources to the bottlenecked containers
        requests = dict()
        while donor_slices:
            print("Donor slices are")
            for c in donor_slices:
                print(c[0]["name"], c[1], c[2])

            for receiver in receivers:
                # Look for a donor container on the same host
                amount_to_scale, donor, id = None, None, None
                for c, amount, i in donor_slices:
                    if c["host"] == receiver["host"]:
                        amount_to_scale = amount
                        donor = c
                        id = i
                        break

                if not amount_to_scale:
                    log_info("No more donors on its host, container {0} left out".format(receiver["name"]), self.__debug)
                    continue

                # Remove this slice from the list
                donor_slices = list(filter(lambda x: x[2] != id, donor_slices))

                max_receiver_amount = receiver["resources"]["cpu"]["max"] - receiver["resources"]["cpu"]["current"]
                # If this container can't be scaled anymore, skip
                if max_receiver_amount == 0:
                    continue

                # Trim the amount to scale if needed
                if amount_to_scale > max_receiver_amount:
                    amount_to_scale = max_receiver_amount

                # Create the pair of scaling requests
                # TODO This should use Guardians method to generate requests
                request = dict(
                    type="request",
                    resource="cpu",
                    amount=int(amount_to_scale),
                    structure=receiver["name"],
                    action="CpuRescaleUp",
                    timestamp=int(time.time()),
                    structure_type="container",
                    host=receiver["host"],
                    host_rescaler_ip=receiver["host_rescaler_ip"],
                    host_rescaler_port=receiver["host_rescaler_port"]
                )

                if receiver["name"] not in requests:
                    requests[receiver["name"]] = list()
                requests[receiver["name"]].append(request)

                # TODO This should use Guardians method to generate requests
                request = dict(
                    type="request",
                    resource="cpu",
                    amount=int(-amount_to_scale),
                    structure=donor["name"],
                    action="CpuRescaleDown",
                    timestamp=int(time.time()),
                    structure_type="container",
                    host=donor["host"],
                    host_rescaler_ip=donor["host_rescaler_ip"],
                    host_rescaler_port=donor["host_rescaler_port"]
                )

                if donor["name"] not in requests:
                    requests[donor["name"]] = list()
                requests[donor["name"]].append(request)
                log_info("Resource swap between {0}(donor) and {1}(receiver)".format(donor["name"], receiver["name"]), self.__debug)

        log_info("No more donors", self.__debug)

        final_requests = list()
        for container in requests:
            # Copy the first request as the base request
            flat_request = dict(requests[container][0])
            flat_request["amount"] = 0
            for request in requests[container]:
                flat_request["amount"] += request["amount"]
            final_requests.append(flat_request)

        log_info("REQUESTS ARE:", self.__debug)
        for c in requests.values():
            for r in c:
                print(r)

        # TODO
        # Adjust requests amounts according to the maximums (trim), otherwise the scaling down will be performed but not the scaling up, and shares will be lost

        log_info("FINAL REQUESTS ARE:", self.__debug)
        for r in final_requests:
            print(r)
            self.__couchdb_handler.add_request(r)


    def __app_containers_can_be_rebalanced(self, application):
        return app_can_be_rebalanced(application, "container", self.__couchdb_handler)

    def rebalance_containers(self, config):
        self.__config = config
        self.__debug = get_config_value(self.__config, CONFIG_DEFAULT_VALUES, "DEBUG")

        log_info("_______________", self.__debug)
        log_info("Performing CONTAINER CPU Balancing", self.__debug)

        # Get the containers and applications
        try:
            applications = get_structures(self.__couchdb_handler, self.__debug, subtype="application")
            containers = get_structures(self.__couchdb_handler, self.__debug, subtype="container")
        except requests.exceptions.HTTPError as e:
            log_error("Couldn't get applications", self.__debug)
            log_error(str(e), self.__debug)
            return

        # Filter out the ones that do not accept rebalancing or that do not need any internal rebalancing
        rebalanceable_apps = list()
        for app in applications:
            # TODO Improve this management
            if "rebalance" not in app or app["rebalance"] == True:
                pass
            else:
                continue
            if len(app["containers"]) <= 1:
                continue

            if self.__app_containers_can_be_rebalanced(app):
                rebalanceable_apps.append(app)

        # Sort them according to each application they belong
        app_containers = dict()
        for app in rebalanceable_apps:
            app_name = app["name"]
            app_containers[app_name] = list()
            app_containers_names = app["containers"]
            for container in containers:
                if container["name"] in app_containers_names:
                    app_containers[app_name].append(container)
            # Get the container usages
            app_containers[app_name] = self.__fill_containers_with_usage_info(app_containers[app_name])

        # Rebalance applications
        for app in rebalanceable_apps:
            app_name = app["name"]
            log_info("Going to rebalance {0} now".format(app_name), self.__debug)
            self.__rebalance_containers_by_pair_swapping(app_containers[app_name], app_name)

        log_info("_______________", self.__debug)

Methods

def rebalance_containers(self, config)
Expand source code
def rebalance_containers(self, config):
    self.__config = config
    self.__debug = get_config_value(self.__config, CONFIG_DEFAULT_VALUES, "DEBUG")

    log_info("_______________", self.__debug)
    log_info("Performing CONTAINER CPU Balancing", self.__debug)

    # Get the containers and applications
    try:
        applications = get_structures(self.__couchdb_handler, self.__debug, subtype="application")
        containers = get_structures(self.__couchdb_handler, self.__debug, subtype="container")
    except requests.exceptions.HTTPError as e:
        log_error("Couldn't get applications", self.__debug)
        log_error(str(e), self.__debug)
        return

    # Filter out the ones that do not accept rebalancing or that do not need any internal rebalancing
    rebalanceable_apps = list()
    for app in applications:
        # TODO Improve this management
        if "rebalance" not in app or app["rebalance"] == True:
            pass
        else:
            continue
        if len(app["containers"]) <= 1:
            continue

        if self.__app_containers_can_be_rebalanced(app):
            rebalanceable_apps.append(app)

    # Sort them according to each application they belong
    app_containers = dict()
    for app in rebalanceable_apps:
        app_name = app["name"]
        app_containers[app_name] = list()
        app_containers_names = app["containers"]
        for container in containers:
            if container["name"] in app_containers_names:
                app_containers[app_name].append(container)
        # Get the container usages
        app_containers[app_name] = self.__fill_containers_with_usage_info(app_containers[app_name])

    # Rebalance applications
    for app in rebalanceable_apps:
        app_name = app["name"]
        log_info("Going to rebalance {0} now".format(app_name), self.__debug)
        self.__rebalance_containers_by_pair_swapping(app_containers[app_name], app_name)

    log_info("_______________", self.__debug)