Module src.Refeeder.Refeeder

Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.


from __future__ import print_function

from threading import Thread
import requests
import time
import traceback
import logging

from src.MyUtils.MyUtils import MyConfig, log_error, get_service, beat, log_info, log_warning, \
    get_structures, generate_event_name, generate_request_name, wait_operation_thread, structure_is_container, generate_structure_usage_metric, update_structure, \
    end_epoch, start_epoch
import src.StateDatabase.couchdb as couchdb
import src.StateDatabase.opentsdb as bdwatchdog
from src.ReBalancer.Utils import get_user_apps

BDWATCHDOG_METRICS = ['proc.cpu.user', 'proc.cpu.kernel', 'proc.mem.resident', 'proc.disk.writes.mb',
                      'proc.disk.reads.mb', 'proc.net.tcp.in.mb', 'proc.net.tcp.out.mb', 'sys.cpu.energy']
BDWATCHDOG_ENERGY_METRICS = ['sys.cpu.user', 'sys.cpu.kernel', 'sys.cpu.energy']
GUARDIAN_METRICS = {'proc.cpu.user': ['proc.cpu.user', 'proc.cpu.kernel'], 'proc.mem.resident': ['proc.mem.resident'],
                    'proc.disk.writes.mb': ['proc.disk.writes.mb'], 'proc.disk.reads.mb': ['proc.disk.reads.mb'],
                    'proc.net.tcp.in.mb': ['proc.net.tcp.in.mb'], 'proc.net.tcp.out.mb': ['proc.net.tcp.out.mb']}
REFEEDER_ENERGY_METRICS = {'cpu': ['sys.cpu.user', 'sys.cpu.kernel'], 'energy': ['sys.cpu.energy']}

REFEEDER_APPLICATION_METRICS = {'cpu': ['proc.cpu.user', 'proc.cpu.kernel'],
                                'mem': ['proc.mem.resident'],
                                # 'disk': ['proc.disk.writes.mb', 'proc.disk.reads.mb'],
                                # 'net': ['proc.net.tcp.in.mb', 'proc.net.tcp.out.mb'],
                                'energy': ["sys.cpu.energy"]}

CONFIG_DEFAULT_VALUES = {"WINDOW_TIMELAPSE": 10, "WINDOW_DELAY": 20, "GENERATED_METRICS": ["cpu", "mem"], "DEBUG": True}

host_info_cache = dict()

SERVICE_NAME = "refeeder"

class ReFeeder:
    """ ReFeeder class that implements all the logic for this service"""

    def __init__(self):
        self.opentsdb_handler = bdwatchdog.OpenTSDBServer()
        self.couchdb_handler = couchdb.CouchDBServer()
        self.NO_METRIC_DATA_DEFAULT_VALUE = self.opentsdb_handler.NO_METRIC_DATA_DEFAULT_VALUE
        self.debug = True
        self.config = {}

    def merge(self, output_dict, input_dict):
        for key in input_dict:
            if key in output_dict:
                output_dict[key] = output_dict[key] + input_dict[key]
            else:
                output_dict[key] = input_dict[key]
        return output_dict

    def get_container_usages(self, container_name):
        try:
            container_info = self.opentsdb_handler.get_structure_timeseries({"host": container_name},
                                                                            self.window_difference,
                                                                            self.window_delay,
                                                                            BDWATCHDOG_METRICS,
                                                                            REFEEDER_APPLICATION_METRICS)

            for metric in REFEEDER_APPLICATION_METRICS:
                if metric not in CONFIG_DEFAULT_VALUES["GENERATED_METRICS"]:
                    continue
                if container_info[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE:
                    log_warning("No metric info for {0} in container {1}".format(metric, container_name), debug=True)


        except requests.ConnectionError as e:
            log_error("Connection error: {0} {1}".format(str(e), str(traceback.format_exc())), debug=True)
            raise e
        return container_info

    def generate_application_metrics(self, application):
        application_info = dict()
        for c in application["containers"]:
            container_info = self.get_container_usages(c)
            application_info = self.merge(application_info, container_info)

        for resource in application_info:
            if resource in application["resources"]:
                application["resources"][resource]["usage"] = application_info[resource]
            else:
                log_warning("No resource {0} info for application {1}".format(resource, application["name"]), debug=True)

        return application

    def refeed_applications(self, applications):
        for application in applications:
            application = self.generate_application_metrics(application)
            update_structure(application, self.couchdb_handler, self.debug)

    def refeed_user_used_energy(self, applications, users, db_handler, debug):
        for user in users:
            if "cpu" not in user:
                user["cpu"] = {}
            if "energy" not in user:
                user["energy"] = {}
            total_user = {"cpu": 0, "energy": 0}
            total_user_current_cpu = 0
            user_apps = get_user_apps(applications, user)
            for app in user_apps:
                for resource in ["energy", "cpu"]:
                    if "usage" in app["resources"][resource] and app["resources"][resource]["usage"]:
                        total_user[resource] += app["resources"][resource]["usage"]
                    else:
                        log_error("Application {0} of user {1} has no used {2} field or value".format(
                            app["name"], user["name"], resource), debug)

                if "current" in app["resources"]["cpu"] and app["resources"]["cpu"]["usage"]:
                    total_user_current_cpu += app["resources"][resource]["current"]
                else:
                    log_error("Application {0} of user {1} has no current cpu field or value".format(
                        app["name"], user["name"]), debug)

            user["energy"]["used"] = total_user["energy"]
            user["cpu"]["usage"] = total_user["cpu"]
            user["cpu"]["current"] = total_user_current_cpu
            db_handler.update_user(user)
            log_info("Updated energy consumed by user {0}".format(user["name"]), debug)

    def refeed_thread(self, ):
        applications = get_structures(self.couchdb_handler, self.debug, subtype="application")
        if applications:
            self.refeed_applications(applications)

        # users = db_handler.get_users()
        # if users:
        #     refeed_user_used_energy(applications, users, db_handler, debug)

    def refeed(self, ):
        myConfig = MyConfig(CONFIG_DEFAULT_VALUES)
        logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

        while True:
            # Get service info
            service = get_service(self.couchdb_handler, SERVICE_NAME)

            # Heartbeat
            beat(self.couchdb_handler, SERVICE_NAME)

            # CONFIG
            myConfig.set_config(service["config"])
            self.debug = myConfig.get_value("DEBUG")
            debug = self.debug
            self.window_difference = myConfig.get_value("WINDOW_TIMELAPSE")
            self.window_delay = myConfig.get_value("WINDOW_DELAY")
            SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

            t0 = start_epoch(self.debug)

            log_info("Config is as follows:", debug)
            log_info(".............................................", debug)
            log_info("Time window lapse -> {0}".format(self.window_difference), debug)
            log_info("Delay -> {0}".format(self.window_delay), debug)
            log_info(".............................................", debug)

            thread = None
            if SERVICE_IS_ACTIVATED:
                # Remote database operation
                host_info_cache = dict()
                containers = get_structures(self.couchdb_handler, debug, subtype="container")
                if not containers:
                    # As no container info is available, no application information will be able to be generated
                    log_info("No structures to process", debug)
                    time.sleep(self.window_difference)
                    end_epoch(self.debug, self.window_difference, t0)
                    continue
                else:
                    thread = Thread(target=self.refeed_thread, args=())
                    thread.start()
            else:
                log_warning("Refeeder is not activated", debug)

            time.sleep(self.window_difference)

            wait_operation_thread(thread, debug)
            log_info("Refeed processed", debug)

            end_epoch(self.debug, self.window_difference, t0)


def main():
    try:
        refeeder = ReFeeder()
        refeeder.refeed()
    except Exception as e:
        log_error("{0} {1}".format(str(e), str(traceback.format_exc())), debug=True)


if __name__ == "__main__":
    main()

Functions

def main()
Expand source code
def main():
    try:
        refeeder = ReFeeder()
        refeeder.refeed()
    except Exception as e:
        log_error("{0} {1}".format(str(e), str(traceback.format_exc())), debug=True)

Classes

class ReFeeder

ReFeeder class that implements all the logic for this service

Expand source code
class ReFeeder:
    """ ReFeeder class that implements all the logic for this service"""

    def __init__(self):
        self.opentsdb_handler = bdwatchdog.OpenTSDBServer()
        self.couchdb_handler = couchdb.CouchDBServer()
        self.NO_METRIC_DATA_DEFAULT_VALUE = self.opentsdb_handler.NO_METRIC_DATA_DEFAULT_VALUE
        self.debug = True
        self.config = {}

    def merge(self, output_dict, input_dict):
        for key in input_dict:
            if key in output_dict:
                output_dict[key] = output_dict[key] + input_dict[key]
            else:
                output_dict[key] = input_dict[key]
        return output_dict

    def get_container_usages(self, container_name):
        try:
            container_info = self.opentsdb_handler.get_structure_timeseries({"host": container_name},
                                                                            self.window_difference,
                                                                            self.window_delay,
                                                                            BDWATCHDOG_METRICS,
                                                                            REFEEDER_APPLICATION_METRICS)

            for metric in REFEEDER_APPLICATION_METRICS:
                if metric not in CONFIG_DEFAULT_VALUES["GENERATED_METRICS"]:
                    continue
                if container_info[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE:
                    log_warning("No metric info for {0} in container {1}".format(metric, container_name), debug=True)


        except requests.ConnectionError as e:
            log_error("Connection error: {0} {1}".format(str(e), str(traceback.format_exc())), debug=True)
            raise e
        return container_info

    def generate_application_metrics(self, application):
        application_info = dict()
        for c in application["containers"]:
            container_info = self.get_container_usages(c)
            application_info = self.merge(application_info, container_info)

        for resource in application_info:
            if resource in application["resources"]:
                application["resources"][resource]["usage"] = application_info[resource]
            else:
                log_warning("No resource {0} info for application {1}".format(resource, application["name"]), debug=True)

        return application

    def refeed_applications(self, applications):
        for application in applications:
            application = self.generate_application_metrics(application)
            update_structure(application, self.couchdb_handler, self.debug)

    def refeed_user_used_energy(self, applications, users, db_handler, debug):
        for user in users:
            if "cpu" not in user:
                user["cpu"] = {}
            if "energy" not in user:
                user["energy"] = {}
            total_user = {"cpu": 0, "energy": 0}
            total_user_current_cpu = 0
            user_apps = get_user_apps(applications, user)
            for app in user_apps:
                for resource in ["energy", "cpu"]:
                    if "usage" in app["resources"][resource] and app["resources"][resource]["usage"]:
                        total_user[resource] += app["resources"][resource]["usage"]
                    else:
                        log_error("Application {0} of user {1} has no used {2} field or value".format(
                            app["name"], user["name"], resource), debug)

                if "current" in app["resources"]["cpu"] and app["resources"]["cpu"]["usage"]:
                    total_user_current_cpu += app["resources"][resource]["current"]
                else:
                    log_error("Application {0} of user {1} has no current cpu field or value".format(
                        app["name"], user["name"]), debug)

            user["energy"]["used"] = total_user["energy"]
            user["cpu"]["usage"] = total_user["cpu"]
            user["cpu"]["current"] = total_user_current_cpu
            db_handler.update_user(user)
            log_info("Updated energy consumed by user {0}".format(user["name"]), debug)

    def refeed_thread(self, ):
        applications = get_structures(self.couchdb_handler, self.debug, subtype="application")
        if applications:
            self.refeed_applications(applications)

        # users = db_handler.get_users()
        # if users:
        #     refeed_user_used_energy(applications, users, db_handler, debug)

    def refeed(self, ):
        myConfig = MyConfig(CONFIG_DEFAULT_VALUES)
        logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

        while True:
            # Get service info
            service = get_service(self.couchdb_handler, SERVICE_NAME)

            # Heartbeat
            beat(self.couchdb_handler, SERVICE_NAME)

            # CONFIG
            myConfig.set_config(service["config"])
            self.debug = myConfig.get_value("DEBUG")
            debug = self.debug
            self.window_difference = myConfig.get_value("WINDOW_TIMELAPSE")
            self.window_delay = myConfig.get_value("WINDOW_DELAY")
            SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

            t0 = start_epoch(self.debug)

            log_info("Config is as follows:", debug)
            log_info(".............................................", debug)
            log_info("Time window lapse -> {0}".format(self.window_difference), debug)
            log_info("Delay -> {0}".format(self.window_delay), debug)
            log_info(".............................................", debug)

            thread = None
            if SERVICE_IS_ACTIVATED:
                # Remote database operation
                host_info_cache = dict()
                containers = get_structures(self.couchdb_handler, debug, subtype="container")
                if not containers:
                    # As no container info is available, no application information will be able to be generated
                    log_info("No structures to process", debug)
                    time.sleep(self.window_difference)
                    end_epoch(self.debug, self.window_difference, t0)
                    continue
                else:
                    thread = Thread(target=self.refeed_thread, args=())
                    thread.start()
            else:
                log_warning("Refeeder is not activated", debug)

            time.sleep(self.window_difference)

            wait_operation_thread(thread, debug)
            log_info("Refeed processed", debug)

            end_epoch(self.debug, self.window_difference, t0)

Methods

def generate_application_metrics(self, application)
Expand source code
def generate_application_metrics(self, application):
    application_info = dict()
    for c in application["containers"]:
        container_info = self.get_container_usages(c)
        application_info = self.merge(application_info, container_info)

    for resource in application_info:
        if resource in application["resources"]:
            application["resources"][resource]["usage"] = application_info[resource]
        else:
            log_warning("No resource {0} info for application {1}".format(resource, application["name"]), debug=True)

    return application
def get_container_usages(self, container_name)
Expand source code
def get_container_usages(self, container_name):
    try:
        container_info = self.opentsdb_handler.get_structure_timeseries({"host": container_name},
                                                                        self.window_difference,
                                                                        self.window_delay,
                                                                        BDWATCHDOG_METRICS,
                                                                        REFEEDER_APPLICATION_METRICS)

        for metric in REFEEDER_APPLICATION_METRICS:
            if metric not in CONFIG_DEFAULT_VALUES["GENERATED_METRICS"]:
                continue
            if container_info[metric] == self.NO_METRIC_DATA_DEFAULT_VALUE:
                log_warning("No metric info for {0} in container {1}".format(metric, container_name), debug=True)


    except requests.ConnectionError as e:
        log_error("Connection error: {0} {1}".format(str(e), str(traceback.format_exc())), debug=True)
        raise e
    return container_info
def merge(self, output_dict, input_dict)
Expand source code
def merge(self, output_dict, input_dict):
    for key in input_dict:
        if key in output_dict:
            output_dict[key] = output_dict[key] + input_dict[key]
        else:
            output_dict[key] = input_dict[key]
    return output_dict
def refeed(self)
Expand source code
def refeed(self, ):
    myConfig = MyConfig(CONFIG_DEFAULT_VALUES)
    logging.basicConfig(filename=SERVICE_NAME + '.log', level=logging.INFO)

    while True:
        # Get service info
        service = get_service(self.couchdb_handler, SERVICE_NAME)

        # Heartbeat
        beat(self.couchdb_handler, SERVICE_NAME)

        # CONFIG
        myConfig.set_config(service["config"])
        self.debug = myConfig.get_value("DEBUG")
        debug = self.debug
        self.window_difference = myConfig.get_value("WINDOW_TIMELAPSE")
        self.window_delay = myConfig.get_value("WINDOW_DELAY")
        SERVICE_IS_ACTIVATED = myConfig.get_value("ACTIVE")

        t0 = start_epoch(self.debug)

        log_info("Config is as follows:", debug)
        log_info(".............................................", debug)
        log_info("Time window lapse -> {0}".format(self.window_difference), debug)
        log_info("Delay -> {0}".format(self.window_delay), debug)
        log_info(".............................................", debug)

        thread = None
        if SERVICE_IS_ACTIVATED:
            # Remote database operation
            host_info_cache = dict()
            containers = get_structures(self.couchdb_handler, debug, subtype="container")
            if not containers:
                # As no container info is available, no application information will be able to be generated
                log_info("No structures to process", debug)
                time.sleep(self.window_difference)
                end_epoch(self.debug, self.window_difference, t0)
                continue
            else:
                thread = Thread(target=self.refeed_thread, args=())
                thread.start()
        else:
            log_warning("Refeeder is not activated", debug)

        time.sleep(self.window_difference)

        wait_operation_thread(thread, debug)
        log_info("Refeed processed", debug)

        end_epoch(self.debug, self.window_difference, t0)
def refeed_applications(self, applications)
Expand source code
def refeed_applications(self, applications):
    for application in applications:
        application = self.generate_application_metrics(application)
        update_structure(application, self.couchdb_handler, self.debug)
def refeed_thread(self)
Expand source code
def refeed_thread(self, ):
    applications = get_structures(self.couchdb_handler, self.debug, subtype="application")
    if applications:
        self.refeed_applications(applications)

    # users = db_handler.get_users()
    # if users:
    #     refeed_user_used_energy(applications, users, db_handler, debug)
def refeed_user_used_energy(self, applications, users, db_handler, debug)
Expand source code
def refeed_user_used_energy(self, applications, users, db_handler, debug):
    for user in users:
        if "cpu" not in user:
            user["cpu"] = {}
        if "energy" not in user:
            user["energy"] = {}
        total_user = {"cpu": 0, "energy": 0}
        total_user_current_cpu = 0
        user_apps = get_user_apps(applications, user)
        for app in user_apps:
            for resource in ["energy", "cpu"]:
                if "usage" in app["resources"][resource] and app["resources"][resource]["usage"]:
                    total_user[resource] += app["resources"][resource]["usage"]
                else:
                    log_error("Application {0} of user {1} has no used {2} field or value".format(
                        app["name"], user["name"], resource), debug)

            if "current" in app["resources"]["cpu"] and app["resources"]["cpu"]["usage"]:
                total_user_current_cpu += app["resources"][resource]["current"]
            else:
                log_error("Application {0} of user {1} has no current cpu field or value".format(
                    app["name"], user["name"]), debug)

        user["energy"]["used"] = total_user["energy"]
        user["cpu"]["usage"] = total_user["cpu"]
        user["cpu"]["current"] = total_user_current_cpu
        db_handler.update_user(user)
        log_info("Updated energy consumed by user {0}".format(user["name"]), debug)