Module src.java_hadoop_snitch.java_snitch

Expand source code
# Copyright (c) 2019 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es, jonatan.enes.alvarez@gmail.com)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the BDWatchdog framework, from
# now on referred to as BDWatchdog.
#
# BDWatchdog is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# BDWatchdog is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BDWatchdog. If not, see <http://www.gnu.org/licenses/>.
import socket
import subprocess
import time
import pickle
import os
import errno

_base_path = os.path.dirname(os.path.abspath(__file__))
VAR_JAVA_MAPPINGS_FOLDER = "JAVA_MAPPINGS_FOLDER_PATH"
DEFAULT_JAVA_MAPPINGS_FOLDER = "java_mappings"
VAR_JAVA_SNITCH_POLLING_SECONDS = "JAVA_SNITCH_POLLING_SECONDS"
DEFAULT_JAVA_SNITCH_POLLING_SECONDS = 3
VAR_JAVA_SNITCH_TIME_TO_DUMP = "JAVA_SNITCH_TIME_TO_DUMP_COUNTER_MAX"
DEFAULT_JAVA_SNITCH_TIME_TO_DUMP = 2

java_mappings_folder_path = os.getenv(VAR_JAVA_MAPPINGS_FOLDER, os.path.join(_base_path, DEFAULT_JAVA_MAPPINGS_FOLDER))
java_snitch_polling_seconds = int(os.getenv(VAR_JAVA_SNITCH_POLLING_SECONDS, DEFAULT_JAVA_SNITCH_POLLING_SECONDS))
time_to_dump_counter_max = int(os.getenv(VAR_JAVA_SNITCH_TIME_TO_DUMP, DEFAULT_JAVA_SNITCH_TIME_TO_DUMP))

process_names = ["NameNode", "SecondaryNameNode", "DataNode", "ResourceManager", "NodeManager", "YarnChild",
                 "MRAppMaster", "CoarseGrainedExecutorBackend"]
process_files = ["NameNode", "SecondaryNameNode", "DataNode", "ResourceManager", "NodeManager", "YarnChild",
                 "MRAppMaster", "CoarseGrainedExecutorBackend", "OTHER"]


def get_filepath(process_name):
    return "{0}/{1}.{2}.p".format(java_mappings_folder_path, process_name, socket.gethostname())


def run_command(c):
    """given shell command, returns communication tuple of stdout and stderr"""
    return subprocess.Popen(c,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            stdin=subprocess.PIPE).communicate()


def check_path_existence_and_create(path):
    try:
        os.makedirs(path)
    except OSError as exception:
        if exception.errno != errno.EEXIST:
            raise


def read_process_pids_from_file(process_name):
    try:
        filename = get_filepath(process_name)
        with open(filename, 'rb') as content_file:
            itemlist = pickle.load(content_file)
        return [int(x) for x in itemlist]
    except (IOError, EOFError):
        return []


def dump_process_pids_to_file(process_name, pidlist):
    try:
        filename = get_filepath(process_name)
        with open(filename, 'wb') as f1:
            pickle.dump([int(x) for x in pidlist], f1)
    except IOError:
        # mapping folder doesn't exist, create it for next time
        check_path_existence_and_create(java_mappings_folder_path)


def read_all():
    proc_dict = dict()
    for proc_file in process_files:
        pids = read_process_pids_from_file(proc_file)
        proc_dict[proc_file] = pids
    return proc_dict


def process_line(line):
    fields = line.strip().split(" ")
    pid = fields[0]
    for field in fields:
        if field.startswith("org.apache.hadoop.") or field.startswith("org.apache.spark."):
            return pid + " " + field.strip().split(".")[-1]


def merge_lists(l1, l2):
    return list(set(l1 + l2))


def merge_dicts(dict1, dict2):
    # print "Merging dictionaries"
    # print "Dict 1 is: " + str(dict1)
    # print "Dict 2 is: " + str(dict2)
    for key in dict2:
        try:
            dict1[key] = merge_lists(dict1[key], dict2[key])
        except KeyError:
            dict1[key] = dict2[key]
    # print "New dict is " + str(dict1)
    return dict1


def dump_all(java_dict):
    existing_data = read_all()
    java_dict = merge_dicts(java_dict, existing_data)
    print("Dumping dictionaries " + str(java_dict))
    for key in java_dict:
        dump_process_pids_to_file(key, java_dict[key])


def run_ps():
    lines = list()
    cmd = ["ps", "h", "-eo", "pid,cmd"]
    ps = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    while True:
        line = ps.stdout.readline()
        if line == b'' and ps.poll() is not None:
            break
        lines.append(line.decode('utf-8').strip("\n"))
    output = ps.communicate()[0]
    return lines


def remove_files():
    for process_name in process_files:
        try:
            filename = get_filepath(process_name)
            os.remove(filename)
        except (IOError, EOFError):
            pass


def main():
    time_to_dump_counter = 0
    print("[HADOOP JAVA SNITCH] Going to monitor hadoop java process every '" + str(
        java_snitch_polling_seconds) + "' seconds dumping every '" + str(
        java_snitch_polling_seconds * time_to_dump_counter_max) + "' seconds")

    java_proc_dict = dict()
    remove_files()
    while True:
        try:
            lines = run_ps()
            for line in lines:
                line = process_line(line)
                if not line or line == "":
                    continue
                pid_command = line.split(" ")
                pid, command = pid_command[0], pid_command[1]

                if command.strip() in process_names:
                    proc_dict_key = command
                else:
                    proc_dict_key = "OTHER"

                try:
                    java_proc_dict[proc_dict_key].append(int(pid))
                except KeyError:
                    java_proc_dict[proc_dict_key] = [int(pid)]

            time_to_dump_counter += 1

            time.sleep(java_snitch_polling_seconds)

            if time_to_dump_counter >= time_to_dump_counter_max:
                time_to_dump_counter = 0
                dump_all(java_proc_dict)
                java_proc_dict = dict()

        except KeyboardInterrupt:
            dump_all(java_proc_dict)
            exit(0)


if __name__ == "__main__":
    main()

Functions

def check_path_existence_and_create(path)
Expand source code
def check_path_existence_and_create(path):
    try:
        os.makedirs(path)
    except OSError as exception:
        if exception.errno != errno.EEXIST:
            raise
def dump_all(java_dict)
Expand source code
def dump_all(java_dict):
    existing_data = read_all()
    java_dict = merge_dicts(java_dict, existing_data)
    print("Dumping dictionaries " + str(java_dict))
    for key in java_dict:
        dump_process_pids_to_file(key, java_dict[key])
def dump_process_pids_to_file(process_name, pidlist)
Expand source code
def dump_process_pids_to_file(process_name, pidlist):
    try:
        filename = get_filepath(process_name)
        with open(filename, 'wb') as f1:
            pickle.dump([int(x) for x in pidlist], f1)
    except IOError:
        # mapping folder doesn't exist, create it for next time
        check_path_existence_and_create(java_mappings_folder_path)
def get_filepath(process_name)
Expand source code
def get_filepath(process_name):
    return "{0}/{1}.{2}.p".format(java_mappings_folder_path, process_name, socket.gethostname())
def main()
Expand source code
def main():
    time_to_dump_counter = 0
    print("[HADOOP JAVA SNITCH] Going to monitor hadoop java process every '" + str(
        java_snitch_polling_seconds) + "' seconds dumping every '" + str(
        java_snitch_polling_seconds * time_to_dump_counter_max) + "' seconds")

    java_proc_dict = dict()
    remove_files()
    while True:
        try:
            lines = run_ps()
            for line in lines:
                line = process_line(line)
                if not line or line == "":
                    continue
                pid_command = line.split(" ")
                pid, command = pid_command[0], pid_command[1]

                if command.strip() in process_names:
                    proc_dict_key = command
                else:
                    proc_dict_key = "OTHER"

                try:
                    java_proc_dict[proc_dict_key].append(int(pid))
                except KeyError:
                    java_proc_dict[proc_dict_key] = [int(pid)]

            time_to_dump_counter += 1

            time.sleep(java_snitch_polling_seconds)

            if time_to_dump_counter >= time_to_dump_counter_max:
                time_to_dump_counter = 0
                dump_all(java_proc_dict)
                java_proc_dict = dict()

        except KeyboardInterrupt:
            dump_all(java_proc_dict)
            exit(0)
def merge_dicts(dict1, dict2)
Expand source code
def merge_dicts(dict1, dict2):
    # print "Merging dictionaries"
    # print "Dict 1 is: " + str(dict1)
    # print "Dict 2 is: " + str(dict2)
    for key in dict2:
        try:
            dict1[key] = merge_lists(dict1[key], dict2[key])
        except KeyError:
            dict1[key] = dict2[key]
    # print "New dict is " + str(dict1)
    return dict1
def merge_lists(l1, l2)
Expand source code
def merge_lists(l1, l2):
    return list(set(l1 + l2))
def process_line(line)
Expand source code
def process_line(line):
    fields = line.strip().split(" ")
    pid = fields[0]
    for field in fields:
        if field.startswith("org.apache.hadoop.") or field.startswith("org.apache.spark."):
            return pid + " " + field.strip().split(".")[-1]
def read_all()
Expand source code
def read_all():
    proc_dict = dict()
    for proc_file in process_files:
        pids = read_process_pids_from_file(proc_file)
        proc_dict[proc_file] = pids
    return proc_dict
def read_process_pids_from_file(process_name)
Expand source code
def read_process_pids_from_file(process_name):
    try:
        filename = get_filepath(process_name)
        with open(filename, 'rb') as content_file:
            itemlist = pickle.load(content_file)
        return [int(x) for x in itemlist]
    except (IOError, EOFError):
        return []
def remove_files()
Expand source code
def remove_files():
    for process_name in process_files:
        try:
            filename = get_filepath(process_name)
            os.remove(filename)
        except (IOError, EOFError):
            pass
def run_command(c)

given shell command, returns communication tuple of stdout and stderr

Expand source code
def run_command(c):
    """given shell command, returns communication tuple of stdout and stderr"""
    return subprocess.Popen(c,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            stdin=subprocess.PIPE).communicate()
def run_ps()
Expand source code
def run_ps():
    lines = list()
    cmd = ["ps", "h", "-eo", "pid,cmd"]
    ps = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    while True:
        line = ps.stdout.readline()
        if line == b'' and ps.poll() is not None:
            break
        lines.append(line.decode('utf-8').strip("\n"))
    output = ps.communicate()[0]
    return lines