Module src.turbostat.turbostat_to_json

Expand source code
# Copyright (c) 2019 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es, jonatan.enes.alvarez@gmail.com)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the BDWatchdog framework, from
# now on referred to as BDWatchdog.
#
# BDWatchdog is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# BDWatchdog is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BDWatchdog. If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function

import socket
import sys
import fileinput
import signal
import time
import traceback

from MetricsFeeder.src.pipelines import csv_to_json
from MetricsFeeder.src.pipelines.csv_to_json import process_line as to_json
from MetricsFeeder.src.pipelines.json_to_TSDB_json import process_line as to_TSDB_json

metrics_dict, tags_dict, template = None, None, None
header_mapping = dict()


def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


def sigterm_handler(_signo, _stack_frame):
    good_finish()


def good_finish():
    sys.stdout.flush()
    sys.exit(0)


def bad_finish():
    sys.exit(1)


def get_hostname():
    return socket.gethostname()


def get_timestamp():
    return int(time.time())


def create_header_mapping(header):
    mapping = dict()
    fs = header.split(",")
    counter = 0
    for f in fs:
        mapping[f] = counter
        counter += 1
    return mapping


signal.signal(signal.SIGTERM, sigterm_handler)


def turbostat_to_csv(line, header_mapping, fields):
    if not line:
        eprint("Empty line")
        return

    # Skip header by looking for it and avoiding it
    try:
        # Try to test for string values for different versions of turbostat
        # if fields[0] == "Core" or fields[0] == "Package" or fields[0] == "usec":
        #    continue

        # Try to cast the average Megahertz, which has an int value for all the lines to be processed
        int(fields[header_mapping["Avg_MHz"]])

    except ValueError:
        # Line is header, skip
        return
    except IndexError:
        # Line is ?
        eprint("Error with line:" + line)
        return

    try:
        if fields[header_mapping["CPU"]] == "-":
            # line is for entire system
            return ("SYS_PWR" + "," + get_hostname() + "," + str(get_timestamp()) + "," + fields[
                header_mapping["Core"]] + "," + fields[header_mapping["PkgTmp"]] + "," + fields[
                        header_mapping["PkgWatt"]])
        else:
            if len(fields) >= header_mapping["PkgWatt"]:
                # line is for package, because turbostat aggregates and joins first core
                # and package, print also core
                if "Package" in header_mapping:
                    return ("PKG_PWR" + "," + get_hostname() + "," + str(get_timestamp()) + "," + fields[
                        header_mapping["Package"]] + "," + fields[header_mapping["PkgTmp"]] + "," + fields[
                                header_mapping["PkgWatt"]])

                else:
                    # Host may only have one package and turbostat may not report the Package field
                    return ("PKG_PWR" + "," + get_hostname() + "," + str(get_timestamp()) + "," + fields[
                        header_mapping["Core"]] + "," + fields[header_mapping["PkgTmp"]] + "," + fields[
                                header_mapping["PkgWatt"]])

    except IndexError:
        eprint("Line : '" + line.strip() + "' couldn't be parsed")
        return


def process_line(line):
    fields = line.split(",")
    line_in_csv = turbostat_to_csv(line, header_mapping, fields)
    if line_in_csv:
        json_docs = to_json(line_in_csv, metrics_dict, tags_dict, template)
        if json_docs:
            for doc in json_docs:
                line_in_valid_json = to_TSDB_json(doc)
                print(line_in_valid_json)


def process_header(line):
    # required_fields = ["PkgWatt", "CorWatt", "Core", "CPU"]
    required_fields = ["PkgWatt", "Core", "CPU"]
    header_mapping = create_header_mapping(line.strip())
    eprint("HEADER MAPPING to be used is:" + str(header_mapping))
    for field in required_fields:
        if field not in header_mapping:
            eprint("Field " + field + " not present in output, can't continue")
            bad_finish()
    return header_mapping


def behave_like_pipeline():
    global header_mapping

    try:
        header_was_processed = False

        for line in fileinput.input():
            # Process header and adapt pipe, after that header will be skipped
            if not header_was_processed:
                header_mapping = process_header(line)
                header_was_processed = True
            # Process line
            else:
                process_line(line)

    except KeyboardInterrupt:
        good_finish()
    except Exception as error:
        eprint("[TURBOSTAT TO CSV] error : " + str(error) + str(traceback.format_exc()))


def main():
    global metrics_dict, tags_dict, template
    metrics_dict, tags_dict, template = csv_to_json.initialize()
    behave_like_pipeline()


if __name__ == "__main__":
    main()

Functions

def bad_finish()
Expand source code
def bad_finish():
    sys.exit(1)
def behave_like_pipeline()
Expand source code
def behave_like_pipeline():
    global header_mapping

    try:
        header_was_processed = False

        for line in fileinput.input():
            # Process header and adapt pipe, after that header will be skipped
            if not header_was_processed:
                header_mapping = process_header(line)
                header_was_processed = True
            # Process line
            else:
                process_line(line)

    except KeyboardInterrupt:
        good_finish()
    except Exception as error:
        eprint("[TURBOSTAT TO CSV] error : " + str(error) + str(traceback.format_exc()))
def create_header_mapping(header)
Expand source code
def create_header_mapping(header):
    mapping = dict()
    fs = header.split(",")
    counter = 0
    for f in fs:
        mapping[f] = counter
        counter += 1
    return mapping
def eprint(*args, **kwargs)
Expand source code
def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)
def get_hostname()
Expand source code
def get_hostname():
    return socket.gethostname()
def get_timestamp()
Expand source code
def get_timestamp():
    return int(time.time())
def good_finish()
Expand source code
def good_finish():
    sys.stdout.flush()
    sys.exit(0)
def main()
Expand source code
def main():
    global metrics_dict, tags_dict, template
    metrics_dict, tags_dict, template = csv_to_json.initialize()
    behave_like_pipeline()
def process_header(line)
Expand source code
def process_header(line):
    # required_fields = ["PkgWatt", "CorWatt", "Core", "CPU"]
    required_fields = ["PkgWatt", "Core", "CPU"]
    header_mapping = create_header_mapping(line.strip())
    eprint("HEADER MAPPING to be used is:" + str(header_mapping))
    for field in required_fields:
        if field not in header_mapping:
            eprint("Field " + field + " not present in output, can't continue")
            bad_finish()
    return header_mapping
def process_line(line)
Expand source code
def process_line(line):
    fields = line.split(",")
    line_in_csv = turbostat_to_csv(line, header_mapping, fields)
    if line_in_csv:
        json_docs = to_json(line_in_csv, metrics_dict, tags_dict, template)
        if json_docs:
            for doc in json_docs:
                line_in_valid_json = to_TSDB_json(doc)
                print(line_in_valid_json)
def sigterm_handler(_signo, _stack_frame)
Expand source code
def sigterm_handler(_signo, _stack_frame):
    good_finish()
def turbostat_to_csv(line, header_mapping, fields)
Expand source code
def turbostat_to_csv(line, header_mapping, fields):
    if not line:
        eprint("Empty line")
        return

    # Skip header by looking for it and avoiding it
    try:
        # Try to test for string values for different versions of turbostat
        # if fields[0] == "Core" or fields[0] == "Package" or fields[0] == "usec":
        #    continue

        # Try to cast the average Megahertz, which has an int value for all the lines to be processed
        int(fields[header_mapping["Avg_MHz"]])

    except ValueError:
        # Line is header, skip
        return
    except IndexError:
        # Line is ?
        eprint("Error with line:" + line)
        return

    try:
        if fields[header_mapping["CPU"]] == "-":
            # line is for entire system
            return ("SYS_PWR" + "," + get_hostname() + "," + str(get_timestamp()) + "," + fields[
                header_mapping["Core"]] + "," + fields[header_mapping["PkgTmp"]] + "," + fields[
                        header_mapping["PkgWatt"]])
        else:
            if len(fields) >= header_mapping["PkgWatt"]:
                # line is for package, because turbostat aggregates and joins first core
                # and package, print also core
                if "Package" in header_mapping:
                    return ("PKG_PWR" + "," + get_hostname() + "," + str(get_timestamp()) + "," + fields[
                        header_mapping["Package"]] + "," + fields[header_mapping["PkgTmp"]] + "," + fields[
                                header_mapping["PkgWatt"]])

                else:
                    # Host may only have one package and turbostat may not report the Package field
                    return ("PKG_PWR" + "," + get_hostname() + "," + str(get_timestamp()) + "," + fields[
                        header_mapping["Core"]] + "," + fields[header_mapping["PkgTmp"]] + "," + fields[
                                header_mapping["PkgWatt"]])

    except IndexError:
        eprint("Line : '" + line.strip() + "' couldn't be parsed")
        return