Module src.daemons.daemon_utils
Expand source code
# Copyright (c) 2019 Universidade da Coruña
# Authors:
# - Jonatan Enes [main](jonatan.enes@udc.es, jonatan.enes.alvarez@gmail.com)
# - Roberto R. Expósito
# - Juan Touriño
#
# This file is part of the BDWatchdog framework, from
# now on referred to as BDWatchdog.
#
# BDWatchdog is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# BDWatchdog is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BDWatchdog. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import abc
import traceback
from threading import Thread
import os
import time
import configparser
import errno
import subprocess
import sys
import logging
import requests
_base_path = os.path.dirname(os.path.abspath(__file__))
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class MonitoringDaemon:
__metaclass__ = abc.ABCMeta
def __init__(self, service_name, environment):
self.SERVICE_NAME = service_name
self.environment = environment
self.stdin_path = '/dev/null'
self.stderr_path = os.path.join(self.environment["BDW_LOG_DIR"], self.SERVICE_NAME + ".err")
self.stdout_path = os.path.join(self.environment["BDW_LOG_DIR"], self.SERVICE_NAME + ".out")
self.log_path = os.path.join(self.environment["BDW_LOG_DIR"], self.SERVICE_NAME + ".log")
self.configure_daemon_log()
self.pidfile_path = os.path.join(self.environment["BDW_PID_DIR"], self.SERVICE_NAME + ".pid")
self.pidfile_timeout = 5
self.pipeline_tries = 0
self.MAX_TRIES = 5
self.processes_list = []
self.dumper_thread = None
self.is_runnable = None
self.not_runnable_message = None
def set_logger(self, logger):
self.logger = logger
def reload_pipeline(self, _signo, _stack_frame):
self.logger.info("Going to reload pipeline")
self.destroy_pipeline()
self.processes_list = list()
self.launch_pipeline()
def launch_pipeline(self):
self.logger.info("Launching pipeline")
self.processes_list += self.create_pipeline()
# Launch thread to log last process output (send to opentsdb of pipeline)
thread = Thread(target=self.threaded_read_last_process_output, args=(self.processes_list[-1],))
thread.start()
self.dumper_thread = thread
def beat(self):
# Serverless Containers
from src.StateDatabase import couchdb as couchDB
from src.MyUtils import MyUtils as MyUtils
self.logger.info("Starting heartbeat of " + self.SERVICE_NAME)
db_handler = couchDB.CouchDBServer()
while True:
try:
MyUtils.beat(db_handler, self.SERVICE_NAME)
time.sleep(10)
except ValueError:
# Service not found:
# - maybe it doesn't exist at all, register it
# - it may have been deleted while the daemon was running, re-register it
register_service(db_handler, self.SERVICE_NAME)
except requests.ConnectionError:
# Error connecting to the Couchdb database, ignore as it may be temporary
pass
def launch_heartbeat(self):
# Launch the heartbeat thread
if "HEARTBEAT_ENABLED" in self.environment and self.environment["HEARTBEAT_ENABLED"] == "true":
# Launch heartbeat thread
heartbeat = Thread(target=self.beat, args=())
heartbeat.daemon = True
heartbeat.start()
@staticmethod
def threaded_read_last_process_output(process):
for line in process.stdout:
print(line.strip().decode()) # Dump to stdout of daemon
sys.stdout.flush()
@staticmethod
def good_finish():
sys.exit(0)
@staticmethod
def bad_finish():
sys.exit(1)
@staticmethod
def create_pipe(cmd, environment, pipe_input, pipe_output):
return subprocess.Popen(cmd,
env=environment,
stdin=pipe_input,
stdout=pipe_output
)
# Terminate all the programs that create the pipeline
def destroy_pipeline(self):
self.logger.info("Destroying pipeline")
for process in self.processes_list:
try:
process.terminate()
process.wait()
self.logger.info(
"Process " + str(process.pid) + " terminated with exit status " + str(process.returncode))
except OSError:
# Process may have already exited
pass
def poll_for_exited_processes(self):
for process in self.processes_list:
process.poll()
if process.returncode is not None:
return True
return False
def check_if_runnable(self):
if not self.is_runnable(self.environment):
eprint(self.not_runnable_message)
def loop(self):
try:
while True:
exited_processes = self.poll_for_exited_processes()
if exited_processes:
self.logger.info("Error in pipeline")
self.destroy_pipeline()
if self.pipeline_tries < self.MAX_TRIES:
self.pipeline_tries += 1
self.logger.info("The pipeline was destroyed, re-creating and launching a new one")
self.launch_pipeline()
else:
self.logger.info(
"Pipeline failed too many times, (" + str(self.MAX_TRIES) + "), stopping daemon")
self.bad_finish()
time.sleep(5)
except(SystemExit, KeyboardInterrupt):
self.logger.info("Exception or signal caught, stopping daemon and destroying the pipeline.")
self.destroy_pipeline()
self.good_finish()
def configure_daemon_log(self):
logger = logging.getLogger(self.SERVICE_NAME)
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s")
log_path = self.environment["BDW_LOG_DIR"]
check_path_existance_and_create(log_path)
pids_path = self.environment["BDW_PID_DIR"]
check_path_existance_and_create(pids_path)
handler = logging.FileHandler(self.log_path)
handler.setFormatter(formatter)
logger.addHandler(handler)
self.logger = logger
self.handler = handler
def get_handler(self):
return self.handler
@abc.abstractmethod
def create_pipeline(self):
"""Method documentation"""
return []
def command_is_runnable(command_as_list):
try:
subprocess.check_call(command_as_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
eprint(str(e) + " " + str(traceback.format_exc()))
return False
return True
def initialize_environment(config_path, config_keys, default_environment_values):
environment = create_environment(read_config(config_path, config_keys),
config_keys,
default_environment_values)
return environment
def create_environment(config_dict, config_keys, default_environment_values):
custom_environment = os.environ.copy()
for key in config_keys:
if key in custom_environment:
# Key has already been configured at the environment level
pass
elif key in config_dict.keys():
# Key has been read from config file
custom_environment[key] = config_dict[key]
else:
# Key was neither present at the environment level nor in the config file
# Take default value
custom_environment[key] = default_environment_values[key]
return custom_environment
def read_config(config_path, config_keys):
config_dict = {}
config = configparser.ConfigParser()
config.read(os.path.join(_base_path, config_path))
for key in config_keys:
try:
config_dict[key] = config['DEFAULT'][key]
except KeyError:
pass # Key is not configured, take the default value
return config_dict
def check_path_existance_and_create(path):
try:
os.makedirs(path)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
def register_service(db_handler, service_name):
service = dict(
name=service_name,
heartbeat="",
heartbeat_human="",
type="service"
)
MyUtils.register_service(db_handler, service)
Functions
def check_path_existance_and_create(path)
-
Expand source code
def check_path_existance_and_create(path): try: os.makedirs(path) except OSError as exception: if exception.errno != errno.EEXIST: raise
def command_is_runnable(command_as_list)
-
Expand source code
def command_is_runnable(command_as_list): try: subprocess.check_call(command_as_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except Exception as e: eprint(str(e) + " " + str(traceback.format_exc())) return False return True
def create_environment(config_dict, config_keys, default_environment_values)
-
Expand source code
def create_environment(config_dict, config_keys, default_environment_values): custom_environment = os.environ.copy() for key in config_keys: if key in custom_environment: # Key has already been configured at the environment level pass elif key in config_dict.keys(): # Key has been read from config file custom_environment[key] = config_dict[key] else: # Key was neither present at the environment level nor in the config file # Take default value custom_environment[key] = default_environment_values[key] return custom_environment
def eprint(*args, **kwargs)
-
Expand source code
def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
def initialize_environment(config_path, config_keys, default_environment_values)
-
Expand source code
def initialize_environment(config_path, config_keys, default_environment_values): environment = create_environment(read_config(config_path, config_keys), config_keys, default_environment_values) return environment
def read_config(config_path, config_keys)
-
Expand source code
def read_config(config_path, config_keys): config_dict = {} config = configparser.ConfigParser() config.read(os.path.join(_base_path, config_path)) for key in config_keys: try: config_dict[key] = config['DEFAULT'][key] except KeyError: pass # Key is not configured, take the default value return config_dict
def register_service(db_handler, service_name)
-
Expand source code
def register_service(db_handler, service_name): service = dict( name=service_name, heartbeat="", heartbeat_human="", type="service" ) MyUtils.register_service(db_handler, service)
Classes
class MonitoringDaemon (service_name, environment)
-
Expand source code
class MonitoringDaemon: __metaclass__ = abc.ABCMeta def __init__(self, service_name, environment): self.SERVICE_NAME = service_name self.environment = environment self.stdin_path = '/dev/null' self.stderr_path = os.path.join(self.environment["BDW_LOG_DIR"], self.SERVICE_NAME + ".err") self.stdout_path = os.path.join(self.environment["BDW_LOG_DIR"], self.SERVICE_NAME + ".out") self.log_path = os.path.join(self.environment["BDW_LOG_DIR"], self.SERVICE_NAME + ".log") self.configure_daemon_log() self.pidfile_path = os.path.join(self.environment["BDW_PID_DIR"], self.SERVICE_NAME + ".pid") self.pidfile_timeout = 5 self.pipeline_tries = 0 self.MAX_TRIES = 5 self.processes_list = [] self.dumper_thread = None self.is_runnable = None self.not_runnable_message = None def set_logger(self, logger): self.logger = logger def reload_pipeline(self, _signo, _stack_frame): self.logger.info("Going to reload pipeline") self.destroy_pipeline() self.processes_list = list() self.launch_pipeline() def launch_pipeline(self): self.logger.info("Launching pipeline") self.processes_list += self.create_pipeline() # Launch thread to log last process output (send to opentsdb of pipeline) thread = Thread(target=self.threaded_read_last_process_output, args=(self.processes_list[-1],)) thread.start() self.dumper_thread = thread def beat(self): # Serverless Containers from src.StateDatabase import couchdb as couchDB from src.MyUtils import MyUtils as MyUtils self.logger.info("Starting heartbeat of " + self.SERVICE_NAME) db_handler = couchDB.CouchDBServer() while True: try: MyUtils.beat(db_handler, self.SERVICE_NAME) time.sleep(10) except ValueError: # Service not found: # - maybe it doesn't exist at all, register it # - it may have been deleted while the daemon was running, re-register it register_service(db_handler, self.SERVICE_NAME) except requests.ConnectionError: # Error connecting to the Couchdb database, ignore as it may be temporary pass def launch_heartbeat(self): # Launch the heartbeat thread if "HEARTBEAT_ENABLED" in self.environment and self.environment["HEARTBEAT_ENABLED"] == "true": # Launch heartbeat thread heartbeat = Thread(target=self.beat, args=()) heartbeat.daemon = True heartbeat.start() @staticmethod def threaded_read_last_process_output(process): for line in process.stdout: print(line.strip().decode()) # Dump to stdout of daemon sys.stdout.flush() @staticmethod def good_finish(): sys.exit(0) @staticmethod def bad_finish(): sys.exit(1) @staticmethod def create_pipe(cmd, environment, pipe_input, pipe_output): return subprocess.Popen(cmd, env=environment, stdin=pipe_input, stdout=pipe_output ) # Terminate all the programs that create the pipeline def destroy_pipeline(self): self.logger.info("Destroying pipeline") for process in self.processes_list: try: process.terminate() process.wait() self.logger.info( "Process " + str(process.pid) + " terminated with exit status " + str(process.returncode)) except OSError: # Process may have already exited pass def poll_for_exited_processes(self): for process in self.processes_list: process.poll() if process.returncode is not None: return True return False def check_if_runnable(self): if not self.is_runnable(self.environment): eprint(self.not_runnable_message) def loop(self): try: while True: exited_processes = self.poll_for_exited_processes() if exited_processes: self.logger.info("Error in pipeline") self.destroy_pipeline() if self.pipeline_tries < self.MAX_TRIES: self.pipeline_tries += 1 self.logger.info("The pipeline was destroyed, re-creating and launching a new one") self.launch_pipeline() else: self.logger.info( "Pipeline failed too many times, (" + str(self.MAX_TRIES) + "), stopping daemon") self.bad_finish() time.sleep(5) except(SystemExit, KeyboardInterrupt): self.logger.info("Exception or signal caught, stopping daemon and destroying the pipeline.") self.destroy_pipeline() self.good_finish() def configure_daemon_log(self): logger = logging.getLogger(self.SERVICE_NAME) logger.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") log_path = self.environment["BDW_LOG_DIR"] check_path_existance_and_create(log_path) pids_path = self.environment["BDW_PID_DIR"] check_path_existance_and_create(pids_path) handler = logging.FileHandler(self.log_path) handler.setFormatter(formatter) logger.addHandler(handler) self.logger = logger self.handler = handler def get_handler(self): return self.handler @abc.abstractmethod def create_pipeline(self): """Method documentation""" return []
Static methods
def bad_finish()
-
Expand source code
@staticmethod def bad_finish(): sys.exit(1)
def create_pipe(cmd, environment, pipe_input, pipe_output)
-
Expand source code
@staticmethod def create_pipe(cmd, environment, pipe_input, pipe_output): return subprocess.Popen(cmd, env=environment, stdin=pipe_input, stdout=pipe_output )
def good_finish()
-
Expand source code
@staticmethod def good_finish(): sys.exit(0)
def threaded_read_last_process_output(process)
-
Expand source code
@staticmethod def threaded_read_last_process_output(process): for line in process.stdout: print(line.strip().decode()) # Dump to stdout of daemon sys.stdout.flush()
Methods
def beat(self)
-
Expand source code
def beat(self): # Serverless Containers from src.StateDatabase import couchdb as couchDB from src.MyUtils import MyUtils as MyUtils self.logger.info("Starting heartbeat of " + self.SERVICE_NAME) db_handler = couchDB.CouchDBServer() while True: try: MyUtils.beat(db_handler, self.SERVICE_NAME) time.sleep(10) except ValueError: # Service not found: # - maybe it doesn't exist at all, register it # - it may have been deleted while the daemon was running, re-register it register_service(db_handler, self.SERVICE_NAME) except requests.ConnectionError: # Error connecting to the Couchdb database, ignore as it may be temporary pass
def check_if_runnable(self)
-
Expand source code
def check_if_runnable(self): if not self.is_runnable(self.environment): eprint(self.not_runnable_message)
def configure_daemon_log(self)
-
Expand source code
def configure_daemon_log(self): logger = logging.getLogger(self.SERVICE_NAME) logger.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") log_path = self.environment["BDW_LOG_DIR"] check_path_existance_and_create(log_path) pids_path = self.environment["BDW_PID_DIR"] check_path_existance_and_create(pids_path) handler = logging.FileHandler(self.log_path) handler.setFormatter(formatter) logger.addHandler(handler) self.logger = logger self.handler = handler
def create_pipeline(self)
-
Method documentation
Expand source code
@abc.abstractmethod def create_pipeline(self): """Method documentation""" return []
def destroy_pipeline(self)
-
Expand source code
def destroy_pipeline(self): self.logger.info("Destroying pipeline") for process in self.processes_list: try: process.terminate() process.wait() self.logger.info( "Process " + str(process.pid) + " terminated with exit status " + str(process.returncode)) except OSError: # Process may have already exited pass
def get_handler(self)
-
Expand source code
def get_handler(self): return self.handler
def launch_heartbeat(self)
-
Expand source code
def launch_heartbeat(self): # Launch the heartbeat thread if "HEARTBEAT_ENABLED" in self.environment and self.environment["HEARTBEAT_ENABLED"] == "true": # Launch heartbeat thread heartbeat = Thread(target=self.beat, args=()) heartbeat.daemon = True heartbeat.start()
def launch_pipeline(self)
-
Expand source code
def launch_pipeline(self): self.logger.info("Launching pipeline") self.processes_list += self.create_pipeline() # Launch thread to log last process output (send to opentsdb of pipeline) thread = Thread(target=self.threaded_read_last_process_output, args=(self.processes_list[-1],)) thread.start() self.dumper_thread = thread
def loop(self)
-
Expand source code
def loop(self): try: while True: exited_processes = self.poll_for_exited_processes() if exited_processes: self.logger.info("Error in pipeline") self.destroy_pipeline() if self.pipeline_tries < self.MAX_TRIES: self.pipeline_tries += 1 self.logger.info("The pipeline was destroyed, re-creating and launching a new one") self.launch_pipeline() else: self.logger.info( "Pipeline failed too many times, (" + str(self.MAX_TRIES) + "), stopping daemon") self.bad_finish() time.sleep(5) except(SystemExit, KeyboardInterrupt): self.logger.info("Exception or signal caught, stopping daemon and destroying the pipeline.") self.destroy_pipeline() self.good_finish()
def poll_for_exited_processes(self)
-
Expand source code
def poll_for_exited_processes(self): for process in self.processes_list: process.poll() if process.returncode is not None: return True return False
def reload_pipeline(self, _signo, _stack_frame)
-
Expand source code
def reload_pipeline(self, _signo, _stack_frame): self.logger.info("Going to reload pipeline") self.destroy_pipeline() self.processes_list = list() self.launch_pipeline()
def set_logger(self, logger)
-
Expand source code
def set_logger(self, logger): self.logger = logger