Module src.NodeRescaler.node_resource_manager

Expand source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the ServerlessContainers framework, from
# now on referred to as ServerlessContainers.
#
# ServerlessContainers is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# ServerlessContainers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ServerlessContainers. If not, see <http://www.gnu.org/licenses/>.


import os
import subprocess

CGROUP_PATH = "/sys/fs/cgroup"


def read_cgroup_file_value(file_path):
    # Read only 1 line for these files as they are 'virtual' files
    try:
        if os.path.isfile(file_path) and os.access(file_path, os.R_OK):
            with open(file_path, 'r') as file_handler:
                value = file_handler.readline().rstrip("\n")
            return {"success": True, "data": value}
        else:
            return {"success": False, "error": "Couldn't access file: {0}".format(file_path)}
    except IOError as e:
        return {"success": False, "error": str(e)}


def write_cgroup_file_value(file_path, value):
    # Write only 1 line for these files as they are 'virtual' files
    try:
        if os.path.isfile(file_path) and os.access(file_path, os.W_OK):
            with open(file_path, 'w') as file_handler:
                # with open(file_path, 'r+') as file_handler:
                file_handler.write(str(value))
            return {"success": True, "data": value}
        else:
            return {"success": False, "error": "Couldn't access file: {0}".format(file_path)}
    except IOError as e:
        return {"success": False, "error": str(e)}


# CPU #
CPU_LIMIT_CPUS_LABEL = "cpu_num"
CPU_LIMIT_ALLOWANCE_LABEL = "cpu_allowance_limit"
CPU_EFFECTIVE_CPUS_LABEL = "effective_num_cpus"
CPU_EFFECTIVE_LIMIT = "effective_cpu_limit"
TICKS_PER_CPU_PERCENTAGE = 1000
MAX_TICKS_PER_CPU = 100000


def get_node_cpus(container_name):
    # Get info from cgroups cpuacct subsystem
    cpu_accounting_path = "/".join([CGROUP_PATH, "cpuacct", "lxc.payload.{0}".format(container_name), "cpu.cfs_quota_us"])
    op = read_cgroup_file_value(cpu_accounting_path)
    if op["success"]:
        cpu_limit = int(op["data"])
        if cpu_limit != -1:
            # A limit is set, else leave it untouchedset_node_mem
            cpu_limit = int(op["data"]) / TICKS_PER_CPU_PERCENTAGE
    else:
        return False, op

    # Get info from cgroups cpuset subsystem
    cpus_path = "/".join([CGROUP_PATH, "cpuset", "lxc.payload.{0}".format(container_name), "cpuset.cpus"])
    op = read_cgroup_file_value(cpus_path)
    if op["success"]:
        cpus = op["data"]
    else:
        return False, op

    # Get the number of effective, active cores for the container
    # 5-7 equals to 3 cores active
    # 0,1,2,4 equals to 4 cores active
    # 0-3,6 equals to 5 cores active
    effective_cpus = 0
    parts = cpus.split(",")
    for part in parts:
        ranges = part.split("-")
        if len(ranges) == 1:
            effective_cpus += 1  # No range so only 1 core
        else:
            effective_cpus += (int(ranges[1]) - int(ranges[0])) + 1

    # Get the effective limit of the container, if allowance is set, then it
    # is medium-limit by number of cpus available otherwise it is the number of
    # cores multiplied per 100 for percentage
    if cpu_limit == -1:
        effective_limit = effective_cpus * 100
    else:
        effective_limit = min(cpu_limit, effective_cpus * 100)
    # cpu_limit = str(cpu_limit)

    final_dict = dict()
    final_dict[CPU_LIMIT_CPUS_LABEL] = cpus
    final_dict[CPU_EFFECTIVE_CPUS_LABEL] = effective_cpus
    final_dict[CPU_EFFECTIVE_LIMIT] = effective_limit
    final_dict[CPU_LIMIT_ALLOWANCE_LABEL] = cpu_limit

    return True, final_dict


def set_node_cpus(container_name, cpu_resource):
    applied_changes = dict()

    if CPU_LIMIT_ALLOWANCE_LABEL in cpu_resource:
        cpu_accounting_path = "/".join([CGROUP_PATH, "cpuacct", "lxc.payload.{0}".format(container_name), "cpu.cfs_quota_us"])
        cpu_quota_path = "/".join([CGROUP_PATH, "cpuacct", "lxc.payload.{0}".format(container_name), "cpu.cfs_period_us"])

        try:
            if cpu_resource[CPU_LIMIT_ALLOWANCE_LABEL] == "-1":
                cpu_limit = -1
            else:
                cpu_limit = int(cpu_resource[CPU_LIMIT_ALLOWANCE_LABEL])
        except ValueError as e:
            return False, {"error": str(e)}

        if cpu_limit == 0:
            quota = -1  # Set to max
        else:
            quota = TICKS_PER_CPU_PERCENTAGE * cpu_limit  # Every 1000 period ticks count as 1% of CPU

        # Write the number of ticks per second
        op = write_cgroup_file_value(cpu_quota_path, str(MAX_TICKS_PER_CPU))
        if not op["success"]:
            # Something happened
            return False, op

        # Write the quota for this container in ticks
        op = write_cgroup_file_value(cpu_accounting_path, str(quota))
        if not op["success"]:
            # Something happened
            return False, op

        # This change was applied successfully
        applied_changes[CPU_LIMIT_ALLOWANCE_LABEL] = str(quota / TICKS_PER_CPU_PERCENTAGE)

    if CPU_LIMIT_CPUS_LABEL in cpu_resource:
        # container.config["limits.cpu"] = cpu_resource[CPU_LIMIT_CPUS_LABEL]
        cpu_cpuset_path = "/".join([CGROUP_PATH, "cpuset", "lxc.payload.{0}".format(container_name), "cpuset.cpus"])

        op = write_cgroup_file_value(cpu_cpuset_path, str(cpu_resource[CPU_LIMIT_CPUS_LABEL]))
        if not op["success"]:
            # Something happened
            return False, op

        # This change was applied successfully
        applied_changes[CPU_LIMIT_CPUS_LABEL] = str(cpu_resource[CPU_LIMIT_CPUS_LABEL])

    # Nothing bad happened
    return True, applied_changes


# MEM #
MEM_LIMIT_LABEL = "mem_limit"
LOWER_LIMIT_MEGABYTES = 64


def get_node_mem(container_name):
    final_dict = dict()
    memory_limit_path = "/".join([CGROUP_PATH, "memory", "lxc.payload.{0}".format(container_name), "memory.limit_in_bytes"])

    op = read_cgroup_file_value(memory_limit_path)
    if op["success"]:
        mem_limit = op["data"]
    else:
        return op

    mem_limit_converted = int(mem_limit) / 1048576  # Convert to MB
    if mem_limit_converted > 262144:
        # more than 256G?, probably not medium-limit so set to -1 ('unlimited')
        mem_limit_converted = -1
        final_dict["unit"] = "unlimited"
    else:
        final_dict["unit"] = "M"

    final_dict[MEM_LIMIT_LABEL] = mem_limit_converted
    return True, final_dict


def set_node_mem(container_name, mem_resource):
    # Assume an integer for megabytes, add the M and set to cgroups
    if MEM_LIMIT_LABEL in mem_resource:
        value = int(mem_resource[MEM_LIMIT_LABEL])
        value_megabytes_integer = 0
        if value == -1:
            value_megabytes = str(value)
        elif value < LOWER_LIMIT_MEGABYTES:
            # Don't allow values lower than an amount like 64 MB as it will probably block the container
            return False, {"error": "Memory limit is too low, less than {0} MB".format(str(LOWER_LIMIT_MEGABYTES))}
        else:
            value_megabytes_integer = value
            value_megabytes = str(value) + 'M'

        # Set the swap first to the same amount of memory due to centos not allowing less memory than swap
        # swap_limit_path = "/".join([CGROUP_PATH, "memory", "lxc", container_name, "memory.memsw.limit_in_bytes"])
        memory_limit_path = "/".join([CGROUP_PATH, "memory", "lxc.payload.{0}".format(container_name), "memory.limit_in_bytes"])

        # Get the current memory limit in megabytes, that should be equal to the swap space
        success, current_mem_value = get_node_mem(container_name)
        current_mem_value = current_mem_value[MEM_LIMIT_LABEL]
        if current_mem_value < value_megabytes_integer:
            # If we are to lower the amount, first memory, then swap
            mem_op = write_cgroup_file_value(memory_limit_path, value_megabytes)
            # swap_op = write_cgroup_file_value(swap_limit_path, value_megabytes)
        else:
            # If we are to increase the amount, first swap, then memory
            # swap_op = write_cgroup_file_value(swap_limit_path, value_megabytes)
            mem_op = write_cgroup_file_value(memory_limit_path, value_megabytes)

        if not mem_op["success"]:
            # Something happened with memory limit
            return False, "Memory error {0}".format(mem_op)
        # if not swap_op["success"]:
        # Something happened with swap limit
        #    return False, "Memory error {0}".format(swap_op)
        # Nothing bad happened
        return True, {MEM_LIMIT_LABEL: value}
    else:
        return True, {}


# DISK #
DISK_READ_LIMIT_LABEL = "disk_read_limit"
DISK_WRITE_LIMIT_LABEL = "disk_write_limit"


def get_system_mounted_filesystems():
    df = subprocess.Popen(["df", "--output=source,target"], stdout=subprocess.PIPE)
    trim = subprocess.Popen(["tr", "-s", "[:blank:]", ","], stdin=df.stdout, stdout=subprocess.PIPE)
    lines = trim.communicate()[0].decode('utf-8').strip().split('\n')
    return lines


def get_device_path_from_mounted_filesystem(path):
    filesystems = get_system_mounted_filesystems()
    for fs in filesystems:
        parts = fs.split(",")
        if parts[1] == path.rstrip("/"):
            return parts[0]
    return None


def get_device_major_minor_from_volumes(device_path):
    dmsetup = subprocess.Popen(
        ["dmsetup", "info", "-c", "-o", "major,minor", "--separator=,", "--noheadings", device_path],
        stdout=subprocess.PIPE)
    result = dmsetup.communicate()[0]
    if dmsetup.returncode == 0:
        # TODO check that this still works for volumes
        return result.decode('utf-8').strip().split(",")
    else:
        return None


def get_device_major_minor_raw_device(device_path):
    stat = subprocess.Popen(
        ["stat", "-c", "%t,%T", device_path],
        stdout=subprocess.PIPE)

    out, err = stat.communicate()
    if stat.returncode == 0:
        both = out.decode('utf-8').split(",")
        major_hex, minor_hex = both[0], both[1]
        return str(int(major_hex, 16)), str(int(minor_hex, 16))
    else:
        return None


def get_node_disk_limits(container_name):
    blkio_path = "/".join([CGROUP_PATH, "blkio", "lxc.payload.{0}".format(container_name)])
    devices_read_limit_path = blkio_path + "/blkio.throttle.read_bps_device"
    devices_write_limit_path = blkio_path + "/blkio.throttle.write_bps_device"
    devices_read_limits = dict()
    devices_write_limits = dict()

    if os.path.isfile(devices_read_limit_path) and os.access(devices_read_limit_path, os.R_OK):
        devices_read_limit_file = open(devices_read_limit_path, 'r')
        for line in devices_read_limit_file:
            parts = line.rstrip("\n").split()
            major_minor = parts[0]
            limit = parts[1]
            devices_read_limits[major_minor] = limit
        devices_read_limit_file.close()

    if os.path.isfile(devices_write_limit_path) and os.access(devices_write_limit_path, os.R_OK):
        devices_write_limit_file = open(devices_write_limit_path, 'r')
        for line in devices_write_limit_file:
            parts = line.rstrip("\n").split()
            major_minor = parts[0]
            limit = parts[1]
            devices_write_limits[major_minor] = limit
        devices_write_limit_file.close()

    return devices_read_limits, devices_write_limits


def set_node_disk(container_name, disk_resource):
    major = disk_resource["major"]
    minor = disk_resource["minor"]

    if DISK_WRITE_LIMIT_LABEL in disk_resource:
        limit_write = disk_resource[DISK_WRITE_LIMIT_LABEL]
        try:
            # TODO FIX the path issue
            # set_bandwidth_script_path = "/".join([os.getcwd(), "NodeRescaler"])
            set_bandwidth_script_path = "/".join([os.getcwd()])
            set_disk_bandwidth = subprocess.Popen(
                ["/bin/bash", "{0}/set_bandwidth.sh".format(set_bandwidth_script_path), container_name,
                 "{0}:{1}".format(major, minor),
                 limit_write], stderr=subprocess.PIPE)
            out, err = set_disk_bandwidth.communicate()
            if set_disk_bandwidth.returncode == 0:
                pass
            else:
                return False, {"error": "exit code of set_disk_bandwidth was: {0} with error message: {1}".format(str(
                    set_disk_bandwidth.returncode), str(err))}
        except subprocess.CalledProcessError as e:
            return False, {"error": str(e)}

    # blkio_path = "/".join([CGROUP_PATH, "blkio", "lxc", container_name])
    # if DISK_READ_LIMIT_LABEL in disk_resource:
    #     limit_read = disk_resource[DISK_READ_LIMIT_LABEL]
    #     devices_read_limit_path = blkio_path + "/blkio.throttle.read_bps_device"
    #
    #     op = write_cgroup_file_value(devices_read_limit_path, str(major + ":" + minor + " " + limit_read))
    #     if not op["success"]:
    #         # Something happened
    #         return False, op
    #
    # if DISK_WRITE_LIMIT_LABEL in disk_resource:
    #     limit_write = disk_resource[DISK_WRITE_LIMIT_LABEL]
    #     devices_write_limit_path = blkio_path + "/blkio.throttle.write_bps_device"
    #
    #     op = write_cgroup_file_value(devices_write_limit_path, str(major + ":" + minor + " " + limit_write))
    #     if not op["success"]:
    #         # Something happened
    #         return False, op

    # Nothing bad happened
    return True, disk_resource


def get_device_major_minor(device_path):
    # Try next for partitions or devices
    major_minor = get_device_major_minor_raw_device(device_path)
    if major_minor and major_minor != ("0", "0"):
        # TODO FIX this or find a solution, partitions can't be limited, only devices
        if device_path.startswith("/dev/sd"):
            device_path = device_path[0:8]
            major_minor = get_device_major_minor_raw_device(device_path)
            if major_minor and major_minor != ("0", "0"):
                return major_minor[0], major_minor[1]

    # Try first for volume devices
    major_minor = get_device_major_minor_from_volumes(device_path)
    if major_minor:
        return major_minor[0], major_minor[1]

    return None


def get_node_disks(container_name, devices):
    SKIP_DISKS = ["bdev", "development", "production", "root"]
    retrieved_disks = list()
    limits_read, limits_write = get_node_disk_limits(container_name)
    for device in devices.keys():
        # TODO FIX, ignored devices should be obtained from a file
        if device in SKIP_DISKS:
            continue
        device_mountpoint = devices[device]["source"]
        device_path = get_device_path_from_mounted_filesystem(device_mountpoint)

        if not device_path:
            print("Disk {0} not found for container {1}".format(device, container_name))
            continue

        try:
            major, minor = get_device_major_minor(device_path)
        except TypeError:
            # None was returned
            continue

        major_minor_str = major + ":" + minor

        if major_minor_str in limits_read:
            # Convert the limits to Mbits/s
            device_read_limit = int(limits_read[major_minor_str])
            device_read_limit = int(device_read_limit / 1048576)
        else:
            device_read_limit = -1

        if major_minor_str in limits_write:
            # Convert the limits to Mbits/s
            device_write_limit = int(limits_write[major_minor_str])
            device_write_limit = int(device_write_limit / 1048576)
        else:
            device_write_limit = -1

        disk_dict = dict()
        disk_dict["mountpoint"] = device_mountpoint
        disk_dict["device_path"] = device_path
        disk_dict["major"] = major
        disk_dict["minor"] = minor
        disk_dict["unit"] = "Mbit"
        disk_dict[DISK_READ_LIMIT_LABEL] = device_read_limit
        disk_dict[DISK_WRITE_LIMIT_LABEL] = device_write_limit

        retrieved_disks.append(disk_dict)

    return True, retrieved_disks


# NET #
NET_UNIT_NAME_LABEL = "unit"
NET_DEVICE_HOST_NAME_LABEL = "device_name_in_host"
NET_DEVICE_NAME_LABEL = "device_name_in_container"
NET_LIMIT_LABEL = "net_limit"


def get_interface_limit(interface_name):
    tc = subprocess.Popen(["tc", "-d", "qdisc", "show", "dev", interface_name], stdout=subprocess.PIPE)
    # TODO make sure to add the decode line everywhere necessary (where communicate is used)
    lines = tc.communicate()[0].decode('utf-8').strip().split(",")
    parts = list()
    for line in lines:
        parts = line.rstrip("\n").split()  # Just 1 line should be available
        break
    if len(parts) > 6:
        if parts[7].endswith("Mbit"):
            return int(parts[7].strip("Mbit"))
        elif parts[7].endswith("Kbit"):
            return int(parts[7].strip("Kbit")) / 1024

    else:
        return -1


def unset_interface_limit(interface_name):
    try:
        tc = subprocess.Popen(["tc", "qdisc", "del", "dev", interface_name, "root"], stderr=subprocess.PIPE)
        tc.wait()
        if tc.returncode == 0:
            return True
        else:
            return False, {"error": "exit code of tc was: {0}".format(str(tc.returncode))}
    except subprocess.CalledProcessError as e:
        return False, {"error": "error trying to execute command: {0}".format(str(e))}


# tc qdisc add dev veth6UQ01E root tbf rate 100Mbit burst 1000kb latency 100ms
def set_interface_limit(interface_name, net):
    try:
        net_limit = str(net[NET_LIMIT_LABEL]) + "Mbit"
        tc = subprocess.Popen(
            ["tc", "qdisc", "add", "dev", interface_name, "root", "tbf", "rate", net_limit, "burst",
             "1000kb", "latency", "100ms"], stderr=subprocess.PIPE)
        # tc.wait()
        out, err = tc.communicate()
        if tc.returncode == 0:
            return True, net
        else:
            return False, {"error": "exit code of tc was: {0} with error: {1}".format(str(tc.returncode), str(err))}
    except subprocess.CalledProcessError as e:
        return False, {"error": str(e)}


def set_node_net(net):
    if NET_LIMIT_LABEL in net:

        if NET_DEVICE_HOST_NAME_LABEL in net:
            # host network interface available (e.g., vethWMPX65)
            interface_in_host = net[NET_DEVICE_HOST_NAME_LABEL]
        else:
            # If no host interface name is available, it is not possible to do anything
            return False, {"error": "No host network interface name was provided"}

        if net[NET_LIMIT_LABEL] != -1 and net[NET_LIMIT_LABEL] != "":
            # Unset old limit and set new
            unset_interface_limit(interface_in_host)
            return set_interface_limit(interface_in_host, net)
        else:
            # Unset the limit, if it was not set before trying to unset it will give an error so skip it
            unset_interface_limit(interface_in_host)
            # Nothing bad happened
            return True, net

    else:
        # No limit to set, leave untouched
        # Nothing bad happened
        return True, net


def get_node_networks(networks):
    retrieved_nets = list()
    for net in networks:
        interface_host_name = net["host_interface"]

        net_dict = dict()
        net_dict[NET_DEVICE_NAME_LABEL] = net["container_interface"]
        net_dict[NET_DEVICE_HOST_NAME_LABEL] = interface_host_name
        net_dict[NET_LIMIT_LABEL] = get_interface_limit(interface_host_name)
        if net_dict[NET_LIMIT_LABEL] == -1:
            net_dict[NET_UNIT_NAME_LABEL] = "unlimited"
        else:
            net_dict[NET_UNIT_NAME_LABEL] = "Mbit"

        retrieved_nets.append(net_dict)

    return True, retrieved_nets

Functions

def get_device_major_minor(device_path)
Expand source code
def get_device_major_minor(device_path):
    # Try next for partitions or devices
    major_minor = get_device_major_minor_raw_device(device_path)
    if major_minor and major_minor != ("0", "0"):
        # TODO FIX this or find a solution, partitions can't be limited, only devices
        if device_path.startswith("/dev/sd"):
            device_path = device_path[0:8]
            major_minor = get_device_major_minor_raw_device(device_path)
            if major_minor and major_minor != ("0", "0"):
                return major_minor[0], major_minor[1]

    # Try first for volume devices
    major_minor = get_device_major_minor_from_volumes(device_path)
    if major_minor:
        return major_minor[0], major_minor[1]

    return None
def get_device_major_minor_from_volumes(device_path)
Expand source code
def get_device_major_minor_from_volumes(device_path):
    dmsetup = subprocess.Popen(
        ["dmsetup", "info", "-c", "-o", "major,minor", "--separator=,", "--noheadings", device_path],
        stdout=subprocess.PIPE)
    result = dmsetup.communicate()[0]
    if dmsetup.returncode == 0:
        # TODO check that this still works for volumes
        return result.decode('utf-8').strip().split(",")
    else:
        return None
def get_device_major_minor_raw_device(device_path)
Expand source code
def get_device_major_minor_raw_device(device_path):
    stat = subprocess.Popen(
        ["stat", "-c", "%t,%T", device_path],
        stdout=subprocess.PIPE)

    out, err = stat.communicate()
    if stat.returncode == 0:
        both = out.decode('utf-8').split(",")
        major_hex, minor_hex = both[0], both[1]
        return str(int(major_hex, 16)), str(int(minor_hex, 16))
    else:
        return None
def get_device_path_from_mounted_filesystem(path)
Expand source code
def get_device_path_from_mounted_filesystem(path):
    filesystems = get_system_mounted_filesystems()
    for fs in filesystems:
        parts = fs.split(",")
        if parts[1] == path.rstrip("/"):
            return parts[0]
    return None
def get_interface_limit(interface_name)
Expand source code
def get_interface_limit(interface_name):
    tc = subprocess.Popen(["tc", "-d", "qdisc", "show", "dev", interface_name], stdout=subprocess.PIPE)
    # TODO make sure to add the decode line everywhere necessary (where communicate is used)
    lines = tc.communicate()[0].decode('utf-8').strip().split(",")
    parts = list()
    for line in lines:
        parts = line.rstrip("\n").split()  # Just 1 line should be available
        break
    if len(parts) > 6:
        if parts[7].endswith("Mbit"):
            return int(parts[7].strip("Mbit"))
        elif parts[7].endswith("Kbit"):
            return int(parts[7].strip("Kbit")) / 1024

    else:
        return -1
def get_node_cpus(container_name)
Expand source code
def get_node_cpus(container_name):
    # Get info from cgroups cpuacct subsystem
    cpu_accounting_path = "/".join([CGROUP_PATH, "cpuacct", "lxc.payload.{0}".format(container_name), "cpu.cfs_quota_us"])
    op = read_cgroup_file_value(cpu_accounting_path)
    if op["success"]:
        cpu_limit = int(op["data"])
        if cpu_limit != -1:
            # A limit is set, else leave it untouchedset_node_mem
            cpu_limit = int(op["data"]) / TICKS_PER_CPU_PERCENTAGE
    else:
        return False, op

    # Get info from cgroups cpuset subsystem
    cpus_path = "/".join([CGROUP_PATH, "cpuset", "lxc.payload.{0}".format(container_name), "cpuset.cpus"])
    op = read_cgroup_file_value(cpus_path)
    if op["success"]:
        cpus = op["data"]
    else:
        return False, op

    # Get the number of effective, active cores for the container
    # 5-7 equals to 3 cores active
    # 0,1,2,4 equals to 4 cores active
    # 0-3,6 equals to 5 cores active
    effective_cpus = 0
    parts = cpus.split(",")
    for part in parts:
        ranges = part.split("-")
        if len(ranges) == 1:
            effective_cpus += 1  # No range so only 1 core
        else:
            effective_cpus += (int(ranges[1]) - int(ranges[0])) + 1

    # Get the effective limit of the container, if allowance is set, then it
    # is medium-limit by number of cpus available otherwise it is the number of
    # cores multiplied per 100 for percentage
    if cpu_limit == -1:
        effective_limit = effective_cpus * 100
    else:
        effective_limit = min(cpu_limit, effective_cpus * 100)
    # cpu_limit = str(cpu_limit)

    final_dict = dict()
    final_dict[CPU_LIMIT_CPUS_LABEL] = cpus
    final_dict[CPU_EFFECTIVE_CPUS_LABEL] = effective_cpus
    final_dict[CPU_EFFECTIVE_LIMIT] = effective_limit
    final_dict[CPU_LIMIT_ALLOWANCE_LABEL] = cpu_limit

    return True, final_dict
def get_node_disk_limits(container_name)
Expand source code
def get_node_disk_limits(container_name):
    blkio_path = "/".join([CGROUP_PATH, "blkio", "lxc.payload.{0}".format(container_name)])
    devices_read_limit_path = blkio_path + "/blkio.throttle.read_bps_device"
    devices_write_limit_path = blkio_path + "/blkio.throttle.write_bps_device"
    devices_read_limits = dict()
    devices_write_limits = dict()

    if os.path.isfile(devices_read_limit_path) and os.access(devices_read_limit_path, os.R_OK):
        devices_read_limit_file = open(devices_read_limit_path, 'r')
        for line in devices_read_limit_file:
            parts = line.rstrip("\n").split()
            major_minor = parts[0]
            limit = parts[1]
            devices_read_limits[major_minor] = limit
        devices_read_limit_file.close()

    if os.path.isfile(devices_write_limit_path) and os.access(devices_write_limit_path, os.R_OK):
        devices_write_limit_file = open(devices_write_limit_path, 'r')
        for line in devices_write_limit_file:
            parts = line.rstrip("\n").split()
            major_minor = parts[0]
            limit = parts[1]
            devices_write_limits[major_minor] = limit
        devices_write_limit_file.close()

    return devices_read_limits, devices_write_limits
def get_node_disks(container_name, devices)
Expand source code
def get_node_disks(container_name, devices):
    SKIP_DISKS = ["bdev", "development", "production", "root"]
    retrieved_disks = list()
    limits_read, limits_write = get_node_disk_limits(container_name)
    for device in devices.keys():
        # TODO FIX, ignored devices should be obtained from a file
        if device in SKIP_DISKS:
            continue
        device_mountpoint = devices[device]["source"]
        device_path = get_device_path_from_mounted_filesystem(device_mountpoint)

        if not device_path:
            print("Disk {0} not found for container {1}".format(device, container_name))
            continue

        try:
            major, minor = get_device_major_minor(device_path)
        except TypeError:
            # None was returned
            continue

        major_minor_str = major + ":" + minor

        if major_minor_str in limits_read:
            # Convert the limits to Mbits/s
            device_read_limit = int(limits_read[major_minor_str])
            device_read_limit = int(device_read_limit / 1048576)
        else:
            device_read_limit = -1

        if major_minor_str in limits_write:
            # Convert the limits to Mbits/s
            device_write_limit = int(limits_write[major_minor_str])
            device_write_limit = int(device_write_limit / 1048576)
        else:
            device_write_limit = -1

        disk_dict = dict()
        disk_dict["mountpoint"] = device_mountpoint
        disk_dict["device_path"] = device_path
        disk_dict["major"] = major
        disk_dict["minor"] = minor
        disk_dict["unit"] = "Mbit"
        disk_dict[DISK_READ_LIMIT_LABEL] = device_read_limit
        disk_dict[DISK_WRITE_LIMIT_LABEL] = device_write_limit

        retrieved_disks.append(disk_dict)

    return True, retrieved_disks
def get_node_mem(container_name)
Expand source code
def get_node_mem(container_name):
    final_dict = dict()
    memory_limit_path = "/".join([CGROUP_PATH, "memory", "lxc.payload.{0}".format(container_name), "memory.limit_in_bytes"])

    op = read_cgroup_file_value(memory_limit_path)
    if op["success"]:
        mem_limit = op["data"]
    else:
        return op

    mem_limit_converted = int(mem_limit) / 1048576  # Convert to MB
    if mem_limit_converted > 262144:
        # more than 256G?, probably not medium-limit so set to -1 ('unlimited')
        mem_limit_converted = -1
        final_dict["unit"] = "unlimited"
    else:
        final_dict["unit"] = "M"

    final_dict[MEM_LIMIT_LABEL] = mem_limit_converted
    return True, final_dict
def get_node_networks(networks)
Expand source code
def get_node_networks(networks):
    retrieved_nets = list()
    for net in networks:
        interface_host_name = net["host_interface"]

        net_dict = dict()
        net_dict[NET_DEVICE_NAME_LABEL] = net["container_interface"]
        net_dict[NET_DEVICE_HOST_NAME_LABEL] = interface_host_name
        net_dict[NET_LIMIT_LABEL] = get_interface_limit(interface_host_name)
        if net_dict[NET_LIMIT_LABEL] == -1:
            net_dict[NET_UNIT_NAME_LABEL] = "unlimited"
        else:
            net_dict[NET_UNIT_NAME_LABEL] = "Mbit"

        retrieved_nets.append(net_dict)

    return True, retrieved_nets
def get_system_mounted_filesystems()
Expand source code
def get_system_mounted_filesystems():
    df = subprocess.Popen(["df", "--output=source,target"], stdout=subprocess.PIPE)
    trim = subprocess.Popen(["tr", "-s", "[:blank:]", ","], stdin=df.stdout, stdout=subprocess.PIPE)
    lines = trim.communicate()[0].decode('utf-8').strip().split('\n')
    return lines
def read_cgroup_file_value(file_path)
Expand source code
def read_cgroup_file_value(file_path):
    # Read only 1 line for these files as they are 'virtual' files
    try:
        if os.path.isfile(file_path) and os.access(file_path, os.R_OK):
            with open(file_path, 'r') as file_handler:
                value = file_handler.readline().rstrip("\n")
            return {"success": True, "data": value}
        else:
            return {"success": False, "error": "Couldn't access file: {0}".format(file_path)}
    except IOError as e:
        return {"success": False, "error": str(e)}
def set_interface_limit(interface_name, net)
Expand source code
def set_interface_limit(interface_name, net):
    try:
        net_limit = str(net[NET_LIMIT_LABEL]) + "Mbit"
        tc = subprocess.Popen(
            ["tc", "qdisc", "add", "dev", interface_name, "root", "tbf", "rate", net_limit, "burst",
             "1000kb", "latency", "100ms"], stderr=subprocess.PIPE)
        # tc.wait()
        out, err = tc.communicate()
        if tc.returncode == 0:
            return True, net
        else:
            return False, {"error": "exit code of tc was: {0} with error: {1}".format(str(tc.returncode), str(err))}
    except subprocess.CalledProcessError as e:
        return False, {"error": str(e)}
def set_node_cpus(container_name, cpu_resource)
Expand source code
def set_node_cpus(container_name, cpu_resource):
    applied_changes = dict()

    if CPU_LIMIT_ALLOWANCE_LABEL in cpu_resource:
        cpu_accounting_path = "/".join([CGROUP_PATH, "cpuacct", "lxc.payload.{0}".format(container_name), "cpu.cfs_quota_us"])
        cpu_quota_path = "/".join([CGROUP_PATH, "cpuacct", "lxc.payload.{0}".format(container_name), "cpu.cfs_period_us"])

        try:
            if cpu_resource[CPU_LIMIT_ALLOWANCE_LABEL] == "-1":
                cpu_limit = -1
            else:
                cpu_limit = int(cpu_resource[CPU_LIMIT_ALLOWANCE_LABEL])
        except ValueError as e:
            return False, {"error": str(e)}

        if cpu_limit == 0:
            quota = -1  # Set to max
        else:
            quota = TICKS_PER_CPU_PERCENTAGE * cpu_limit  # Every 1000 period ticks count as 1% of CPU

        # Write the number of ticks per second
        op = write_cgroup_file_value(cpu_quota_path, str(MAX_TICKS_PER_CPU))
        if not op["success"]:
            # Something happened
            return False, op

        # Write the quota for this container in ticks
        op = write_cgroup_file_value(cpu_accounting_path, str(quota))
        if not op["success"]:
            # Something happened
            return False, op

        # This change was applied successfully
        applied_changes[CPU_LIMIT_ALLOWANCE_LABEL] = str(quota / TICKS_PER_CPU_PERCENTAGE)

    if CPU_LIMIT_CPUS_LABEL in cpu_resource:
        # container.config["limits.cpu"] = cpu_resource[CPU_LIMIT_CPUS_LABEL]
        cpu_cpuset_path = "/".join([CGROUP_PATH, "cpuset", "lxc.payload.{0}".format(container_name), "cpuset.cpus"])

        op = write_cgroup_file_value(cpu_cpuset_path, str(cpu_resource[CPU_LIMIT_CPUS_LABEL]))
        if not op["success"]:
            # Something happened
            return False, op

        # This change was applied successfully
        applied_changes[CPU_LIMIT_CPUS_LABEL] = str(cpu_resource[CPU_LIMIT_CPUS_LABEL])

    # Nothing bad happened
    return True, applied_changes
def set_node_disk(container_name, disk_resource)
Expand source code
def set_node_disk(container_name, disk_resource):
    major = disk_resource["major"]
    minor = disk_resource["minor"]

    if DISK_WRITE_LIMIT_LABEL in disk_resource:
        limit_write = disk_resource[DISK_WRITE_LIMIT_LABEL]
        try:
            # TODO FIX the path issue
            # set_bandwidth_script_path = "/".join([os.getcwd(), "NodeRescaler"])
            set_bandwidth_script_path = "/".join([os.getcwd()])
            set_disk_bandwidth = subprocess.Popen(
                ["/bin/bash", "{0}/set_bandwidth.sh".format(set_bandwidth_script_path), container_name,
                 "{0}:{1}".format(major, minor),
                 limit_write], stderr=subprocess.PIPE)
            out, err = set_disk_bandwidth.communicate()
            if set_disk_bandwidth.returncode == 0:
                pass
            else:
                return False, {"error": "exit code of set_disk_bandwidth was: {0} with error message: {1}".format(str(
                    set_disk_bandwidth.returncode), str(err))}
        except subprocess.CalledProcessError as e:
            return False, {"error": str(e)}

    # blkio_path = "/".join([CGROUP_PATH, "blkio", "lxc", container_name])
    # if DISK_READ_LIMIT_LABEL in disk_resource:
    #     limit_read = disk_resource[DISK_READ_LIMIT_LABEL]
    #     devices_read_limit_path = blkio_path + "/blkio.throttle.read_bps_device"
    #
    #     op = write_cgroup_file_value(devices_read_limit_path, str(major + ":" + minor + " " + limit_read))
    #     if not op["success"]:
    #         # Something happened
    #         return False, op
    #
    # if DISK_WRITE_LIMIT_LABEL in disk_resource:
    #     limit_write = disk_resource[DISK_WRITE_LIMIT_LABEL]
    #     devices_write_limit_path = blkio_path + "/blkio.throttle.write_bps_device"
    #
    #     op = write_cgroup_file_value(devices_write_limit_path, str(major + ":" + minor + " " + limit_write))
    #     if not op["success"]:
    #         # Something happened
    #         return False, op

    # Nothing bad happened
    return True, disk_resource
def set_node_mem(container_name, mem_resource)
Expand source code
def set_node_mem(container_name, mem_resource):
    # Assume an integer for megabytes, add the M and set to cgroups
    if MEM_LIMIT_LABEL in mem_resource:
        value = int(mem_resource[MEM_LIMIT_LABEL])
        value_megabytes_integer = 0
        if value == -1:
            value_megabytes = str(value)
        elif value < LOWER_LIMIT_MEGABYTES:
            # Don't allow values lower than an amount like 64 MB as it will probably block the container
            return False, {"error": "Memory limit is too low, less than {0} MB".format(str(LOWER_LIMIT_MEGABYTES))}
        else:
            value_megabytes_integer = value
            value_megabytes = str(value) + 'M'

        # Set the swap first to the same amount of memory due to centos not allowing less memory than swap
        # swap_limit_path = "/".join([CGROUP_PATH, "memory", "lxc", container_name, "memory.memsw.limit_in_bytes"])
        memory_limit_path = "/".join([CGROUP_PATH, "memory", "lxc.payload.{0}".format(container_name), "memory.limit_in_bytes"])

        # Get the current memory limit in megabytes, that should be equal to the swap space
        success, current_mem_value = get_node_mem(container_name)
        current_mem_value = current_mem_value[MEM_LIMIT_LABEL]
        if current_mem_value < value_megabytes_integer:
            # If we are to lower the amount, first memory, then swap
            mem_op = write_cgroup_file_value(memory_limit_path, value_megabytes)
            # swap_op = write_cgroup_file_value(swap_limit_path, value_megabytes)
        else:
            # If we are to increase the amount, first swap, then memory
            # swap_op = write_cgroup_file_value(swap_limit_path, value_megabytes)
            mem_op = write_cgroup_file_value(memory_limit_path, value_megabytes)

        if not mem_op["success"]:
            # Something happened with memory limit
            return False, "Memory error {0}".format(mem_op)
        # if not swap_op["success"]:
        # Something happened with swap limit
        #    return False, "Memory error {0}".format(swap_op)
        # Nothing bad happened
        return True, {MEM_LIMIT_LABEL: value}
    else:
        return True, {}
def set_node_net(net)
Expand source code
def set_node_net(net):
    if NET_LIMIT_LABEL in net:

        if NET_DEVICE_HOST_NAME_LABEL in net:
            # host network interface available (e.g., vethWMPX65)
            interface_in_host = net[NET_DEVICE_HOST_NAME_LABEL]
        else:
            # If no host interface name is available, it is not possible to do anything
            return False, {"error": "No host network interface name was provided"}

        if net[NET_LIMIT_LABEL] != -1 and net[NET_LIMIT_LABEL] != "":
            # Unset old limit and set new
            unset_interface_limit(interface_in_host)
            return set_interface_limit(interface_in_host, net)
        else:
            # Unset the limit, if it was not set before trying to unset it will give an error so skip it
            unset_interface_limit(interface_in_host)
            # Nothing bad happened
            return True, net

    else:
        # No limit to set, leave untouched
        # Nothing bad happened
        return True, net
def unset_interface_limit(interface_name)
Expand source code
def unset_interface_limit(interface_name):
    try:
        tc = subprocess.Popen(["tc", "qdisc", "del", "dev", interface_name, "root"], stderr=subprocess.PIPE)
        tc.wait()
        if tc.returncode == 0:
            return True
        else:
            return False, {"error": "exit code of tc was: {0}".format(str(tc.returncode))}
    except subprocess.CalledProcessError as e:
        return False, {"error": "error trying to execute command: {0}".format(str(e))}
def write_cgroup_file_value(file_path, value)
Expand source code
def write_cgroup_file_value(file_path, value):
    # Write only 1 line for these files as they are 'virtual' files
    try:
        if os.path.isfile(file_path) and os.access(file_path, os.W_OK):
            with open(file_path, 'w') as file_handler:
                # with open(file_path, 'r+') as file_handler:
                file_handler.write(str(value))
            return {"success": True, "data": value}
        else:
            return {"success": False, "error": "Couldn't access file: {0}".format(file_path)}
    except IOError as e:
        return {"success": False, "error": str(e)}