Module src.pipelines.hadoop_java_translator

Expand source code
# Copyright (c) 2019 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es, jonatan.enes.alvarez@gmail.com)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the BDWatchdog framework, from
# now on referred to as BDWatchdog.
#
# BDWatchdog is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# BDWatchdog is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BDWatchdog. If not, see <http://www.gnu.org/licenses/>.


from __future__ import print_function
import sys
import fileinput
import pickle
from time import sleep
import os

# JAVA_MAPPINGS_FOLDER_PATH = "JAVA_MAPPINGS_FOLDER_PATH"
JAVA_TRANSLATOR_MAX_TRIES = "JAVA_TRANSLATOR_MAX_TRIES"
JAVA_TRANSLATOR_WAIT_TIME = "JAVA_TRANSLATOR_WAIT_TIME"
#
# java_mappings_folder_path = os.getenv(JAVA_MAPPINGS_FOLDER_PATH, "./pipelines/java_mappings/")
java_translator_max_tries = int(os.getenv(JAVA_TRANSLATOR_MAX_TRIES, 4))
java_translator_wait_time = int(os.getenv(JAVA_TRANSLATOR_WAIT_TIME, 5))
#
# process_files = ["NameNode", "SecondaryNameNode", "DataNode", "ResourceManager", "NodeManager",
#                  "YarnChild", "MRAppMaster", "CoarseGrainedExecutorBackend", "OTHER"]
from MetricsFeeder.src.java_hadoop_snitch.java_snitch import read_process_pids_from_file, process_files

java_proc_dict = dict()
unresolvable_pids = list()


def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


def read_all():
    global java_proc_dict
    for proc_file in process_files:
        pid_list = read_process_pids_from_file(proc_file)
        java_proc_dict[proc_file] = pid_list


def process_java_doc(line, pid, number_of_tries):
    global java_proc_dict
    global unresolvable_pids

    if pid in unresolvable_pids:
        return line.strip()

    for process_name in process_files:
        if process_name in java_proc_dict:
            process_pids = java_proc_dict[process_name]
            # print "Process pids for " + process_name + " are: " + str(process_pids) + " and pid is : " + str(pid)
            if pid in process_pids:
                return line.replace("(java)", process_name).strip()
    # return line changed
    # Couldn't resolve this doc, wait, read map files and try again
    # Wait for a max of 60 seconds
    if number_of_tries < java_translator_max_tries:
        sleep(java_translator_wait_time)
        read_all()
        number_of_tries += 1
        return process_java_doc(line, pid, number_of_tries)
    else:
        eprint("[HADOOP JAVA TRANSLATOR PLUGIN] process java with pid " + str(pid) + " was unresolvable")
        unresolvable_pids.append(pid)
        return line.strip()


def process_line(line):
    if line.startswith('P'):
        fields = line.split(",")
        command = fields[5]
        pid_str = fields[4]
        if line.startswith('PRM'):
            command = fields[4]  # Override for this case
            pid_str = fields[3]
        if command == "(java)":
            return process_java_doc(line, int(pid_str), 0)
        else:
            return line.strip()
    elif line.startswith("NETHOGS"):
        fields = line.split(",")
        command = fields[5]
        pid_str = fields[6]
        if command == "(java)":
            return process_java_doc(line, int(pid_str), 0)
        else:
            return line.strip()
    else:
        return line.strip()


def behave_like_pipeline():
    try:
        # for line in fileinput.input():
        while True:
            line = sys.stdin.readline()
            print(process_line(line))
    except (KeyboardInterrupt, IOError):
        # Exit silently
        pass
    except Exception as e:
        eprint("[JSON TO VALID_JSON] error: " + str(e))


def main():
    behave_like_pipeline()


if __name__ == "__main__":
    main()

Functions

def behave_like_pipeline()
Expand source code
def behave_like_pipeline():
    try:
        # for line in fileinput.input():
        while True:
            line = sys.stdin.readline()
            print(process_line(line))
    except (KeyboardInterrupt, IOError):
        # Exit silently
        pass
    except Exception as e:
        eprint("[JSON TO VALID_JSON] error: " + str(e))
def eprint(*args, **kwargs)
Expand source code
def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)
def main()
Expand source code
def main():
    behave_like_pipeline()
def process_java_doc(line, pid, number_of_tries)
Expand source code
def process_java_doc(line, pid, number_of_tries):
    global java_proc_dict
    global unresolvable_pids

    if pid in unresolvable_pids:
        return line.strip()

    for process_name in process_files:
        if process_name in java_proc_dict:
            process_pids = java_proc_dict[process_name]
            # print "Process pids for " + process_name + " are: " + str(process_pids) + " and pid is : " + str(pid)
            if pid in process_pids:
                return line.replace("(java)", process_name).strip()
    # return line changed
    # Couldn't resolve this doc, wait, read map files and try again
    # Wait for a max of 60 seconds
    if number_of_tries < java_translator_max_tries:
        sleep(java_translator_wait_time)
        read_all()
        number_of_tries += 1
        return process_java_doc(line, pid, number_of_tries)
    else:
        eprint("[HADOOP JAVA TRANSLATOR PLUGIN] process java with pid " + str(pid) + " was unresolvable")
        unresolvable_pids.append(pid)
        return line.strip()
def process_line(line)
Expand source code
def process_line(line):
    if line.startswith('P'):
        fields = line.split(",")
        command = fields[5]
        pid_str = fields[4]
        if line.startswith('PRM'):
            command = fields[4]  # Override for this case
            pid_str = fields[3]
        if command == "(java)":
            return process_java_doc(line, int(pid_str), 0)
        else:
            return line.strip()
    elif line.startswith("NETHOGS"):
        fields = line.split(",")
        command = fields[5]
        pid_str = fields[6]
        if command == "(java)":
            return process_java_doc(line, int(pid_str), 0)
        else:
            return line.strip()
    else:
        return line.strip()
def read_all()
Expand source code
def read_all():
    global java_proc_dict
    for proc_file in process_files:
        pid_list = read_process_pids_from_file(proc_file)
        java_proc_dict[proc_file] = pid_list