Module src.mongodb.mongodb_agent

Expand source code
# Copyright (c) 2019 Universidade da Coruña
# Authors:
#     - Jonatan Enes [main](jonatan.enes@udc.es, jonatan.enes.alvarez@gmail.com)
#     - Roberto R. Expósito
#     - Juan Touriño
#
# This file is part of the BDWatchdog framework, from
# now on referred to as BDWatchdog.
#
# BDWatchdog is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# BDWatchdog is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BDWatchdog. If not, see <http://www.gnu.org/licenses/>.


from __future__ import print_function
import sys
import time
import requests
import json
import ast
import os

from TimestampsSnitch.src.timestamping.utils import eprint, iprint

headers = {'content-type': 'application/json'}

default_mongodb_port = 8000
default_mongodb_ip = "mongodb"
default_tests_database_name = "tests"
default_experiments_database_name = "experiments"

MONGODB_IP_OS_VARNAME = "MONGODB_IP"
MONGODB_PORT_OS_VARNAME = "MONGODB_PORT"
TESTS_POST_ENDPOINT_OS_VARNAME = "TESTS_POST_ENDPOINT"
EXPERIMENTS_POST_ENDPOINT_OS_VARNAME = "EXPERIMENTS_POST_ENDPOINT"
post_doc_buffer_length = 1
MAX_CONNECTION_TRIES = 3


class MongoDBTimestampAgent:

    def __init__(self, os_env=None):
        if not os_env:
            self.tests_post_endpoint = os.getenv(TESTS_POST_ENDPOINT_OS_VARNAME, default_tests_database_name)
            self.experiments_post_endpoint = os.getenv(EXPERIMENTS_POST_ENDPOINT_OS_VARNAME,
                                                       default_experiments_database_name)
            self.mongodb_ip = os.getenv(MONGODB_IP_OS_VARNAME, default_mongodb_ip)
            try:
                self.mongodb_port = str(int(os.getenv(MONGODB_PORT_OS_VARNAME, default_mongodb_port)))
            except ValueError:
                eprint("Invalid port configuration, using default '" + str(default_mongodb_port) + "'")
                self.mongodb_port = str(default_mongodb_port)
        else:
            try:
                self.tests_post_endpoint = os_env[TESTS_POST_ENDPOINT_OS_VARNAME]
            except KeyError:
                self.tests_post_endpoint = default_tests_database_name
            try:
                self.experiments_post_endpoint = os_env[EXPERIMENTS_POST_ENDPOINT_OS_VARNAME]
            except KeyError:
                self.experiments_post_endpoint = default_experiments_database_name
            try:
                self.mongodb_ip = os_env[MONGODB_IP_OS_VARNAME]
            except KeyError:
                self.mongodb_ip = default_mongodb_ip
            try:
                self.mongodb_port = str(int(os_env[MONGODB_PORT_OS_VARNAME]))
            except (ValueError, KeyError):
                self.mongodb_port = str(default_mongodb_port)

        self.tests_full_endpoint = "http://{0}:{1}/{2}".format(self.mongodb_ip, self.mongodb_port,
                                                               self.tests_post_endpoint)

        self.experiments_full_endpoint = "http://{0}:{1}/{2}".format(self.mongodb_ip, self.mongodb_port,
                                                                     self.experiments_post_endpoint)

    def get_experiments_endpoint(self):
        return self.experiments_full_endpoint

    def get_tests_endpoint(self):
        return self.tests_full_endpoint

    @staticmethod
    def merge_data_from_existing_doc(old, new):
        for key in ["start_time", "end_time"]:
            try:
                if key not in new.keys():
                    new[key] = old[key]
            except KeyError:
                pass

        return new

    def post_doc(self, doc, info, endpoint):
        tries = 0
        while tries <= MAX_CONNECTION_TRIES:
            if info == {}:
                # Document doesn't exist, create
                r = requests.post(endpoint, headers=headers, data=json.dumps(doc))
                if r.status_code != 201:
                    eprint("Couldn't properly put document to address {0}".format(endpoint))
                    eprint(r.text)
                    tries += 1
                else:
                    iprint("Document created at: {0}".format(
                        time.strftime("%D %H:%M:%S", time.localtime()) + " timestamp is " + str(time.time())))
                    break
            else:
                # Test exists, update
                etag = info["_etag"]
                doc_id = info["_id"]

                doc = self.merge_data_from_existing_doc(info, doc)

                these_headers = headers
                these_headers["If-Match"] = etag
                r = requests.put(endpoint + "/" + str(doc_id), headers=these_headers,
                                 data=json.dumps(doc))
                if r.status_code != 200:
                    eprint("Couldn't properly put document to address {0}".format(endpoint))
                    eprint(r.text)
                    tries += 1
                else:
                    iprint("Document updated at: " + time.strftime("%D %H:%M:%S", time.localtime()))
                    break
        if tries > MAX_CONNECTION_TRIES:
            error_string = "Information posting for document {0} failed too many times, aborting".format(str(doc))
            eprint(error_string)
            raise ConnectionError(error_string)

    def experiment_exists(self, experiment_id, username):
        info = self.get_experiment(experiment_id, username)
        if info:
            return True
        return False

    def send_experiment_docs(self, documents):
        for experiment in documents:
            info = self.get_experiment(experiment["experiment_id"], experiment["username"])
            self.post_doc(experiment, info, self.experiments_full_endpoint)

    def send_test_docs(self, documents):
        for test in documents:
            if not self.experiment_exists(test["experiment_id"], test["username"]):
                # Experiment doesn't exist
                eprint("Experiment {0} doesn't exist".format(test["experiment_id"]))
                continue
            info = self.get_test(test["experiment_id"], test["test_name"], test["username"])
            self.post_doc(test, info, self.tests_full_endpoint)

    def send_docs(self, documents):
        if len(documents["experiment"]) >= 1:
            self.send_experiment_docs(documents["experiment"])
        if len(documents["test"]) >= 1:
            self.send_test_docs(documents["test"])

    @staticmethod
    def get_legth_docs(documents):
        num_docs = 0
        for key in documents:
            num_docs += len(documents[key])
        return num_docs

    def delete_test(self, experiment_id, test_name, username):
        if self.experiment_exists(experiment_id, username):
            test = self.get_test(experiment_id, test_name, username)
            if not test:
                iprint("Document doesn't {0} exist".format(test_name))
                return
            headers["If-Match"] = test["_etag"]
            tests_full_endpoint = "{0}/{1}".format(self.tests_full_endpoint, test["_id"])
            r = requests.delete(tests_full_endpoint, headers=headers)
            if r.status_code != 204:
                eprint("Couldn't properly delete document to address {0}".format(tests_full_endpoint))
                eprint(r.text)
            else:
                iprint("Document deleted at: " + time.strftime("%D %H:%M:%S", time.localtime()))
        else:
            return False

    def delete_experiment(self, experiment_id, username):
        if self.experiment_exists(experiment_id, username):
            experiment = self.get_experiment(experiment_id, username)
            headers["If-Match"] = experiment["_etag"]
            experiment_full_endoint = "{0}/{1}".format(self.experiments_full_endpoint, experiment["_id"])
            r = requests.delete(experiment_full_endoint, headers=headers)
            if r.status_code != 204:
                eprint("Couldn't properly delete document in {0}".format(experiment_full_endoint))
                eprint(r.text)
            else:
                iprint("Document deleted at: " + time.strftime("%D %H:%M:%S", time.localtime()))
        else:
            eprint("Document doesn't {0} exist".format(experiment_id))
            return False

    def get_all_experiments(self, username):
        last_page = False
        all_experiments = list()
        host_endpoint = "http://{0}:{1}".format(self.mongodb_ip, self.mongodb_port)
        endpoint = "{0}/{1}".format(host_endpoint, self.experiments_post_endpoint) + \
                   "/?where=" + "{" + '"username":"' + username + '"}'
        while not last_page:
            data = self.get_paginated_docs(endpoint)
            if not data:
                return all_experiments
            num_page = data["_meta"]["page"]
            max_num_retrieved_experiments = num_page * data["_meta"]["max_results"]
            num_total_experiments = data["_meta"]["total"]
            if max_num_retrieved_experiments < num_total_experiments:
                endpoint = "{0}/{1}".format(host_endpoint, data["_links"]["next"]["href"])
            else:
                last_page = True
            all_experiments += data["_items"]
        return all_experiments

    def get_doc(self, endpoint, doc_name):
        tries = 0
        while tries <= MAX_CONNECTION_TRIES:
            status_code = None
            try:
                r = requests.get(endpoint)
                if r.status_code == 200:
                    experiments = r.json()["_items"]
                    if experiments:
                        return r.json()["_items"][0]
                    else:
                        return {}
                elif r.status_code == 404:
                    return {}
                else:
                    status_code = r.status_code
            except requests.ConnectionError as error:
                eprint("Error with request {0}".format(str(error)))
            eprint("Couldn't get document {0}, trying again for the {1} time out of {2}".format(
                doc_name, tries, MAX_CONNECTION_TRIES))
            if status_code:
                eprint("Status code was {0}".format(str(status_code)))
            tries += 1

        if tries > MAX_CONNECTION_TRIES:
            error_string = "Information retrieval for document {0} failed too many times, aborting".format(
                doc_name)
            eprint(error_string)
            raise requests.ConnectionError(error_string)

    def get_experiment(self, experiment_name, username):
        query = '?where={"experiment_id": "' + experiment_name + '","username":"' + username + '"}'
        endpoint = "{0}/{1}".format(self.experiments_full_endpoint, query)
        return self.get_doc(endpoint, experiment_name)

    def get_test(self, experiment_id, test_name, username):
        query = '?where={"experiment_id": "' + experiment_id + '", "test_name":"' + test_name + '","username":"' + username + '"}'
        endpoint = "{0}/{1}".format(self.tests_full_endpoint, query)
        return self.get_doc(endpoint, test_name)

    def get_experiment_tests(self, experiment_id, username):
        last_page = False
        all_tests = list()
        host_endpoint = "http://{0}:{1}".format(self.mongodb_ip, self.mongodb_port)
        endpoint = "{0}/{1}".format(host_endpoint, self.tests_post_endpoint) + \
                   '/?where={"experiment_id":"' + experiment_id + '", "username":"' + username + '"}'
        while not last_page:
            data = self.get_paginated_docs(endpoint)
            if not data:
                return all_tests
            num_page = data["_meta"]["page"]
            max_num_retrieved_experiments = num_page * data["_meta"]["max_results"]
            num_total_tests = data["_meta"]["total"]
            if max_num_retrieved_experiments < num_total_tests:
                endpoint = "{0}/{1}".format(host_endpoint, data["_links"]["next"]["href"])
            else:
                last_page = True
            all_tests += data["_items"]
        return all_tests

    @staticmethod
    def get_paginated_docs(endpoint):
        try:
            r = requests.get(endpoint)
            return r.json()
        except requests.ConnectionError as error:
            eprint("Error with request {0}".format(str(error)))
            return None


if __name__ == '__main__':
    abort = False

    docs = dict()
    docs["experiment"] = []
    docs["test"] = []

    agent = MongoDBTimestampAgent()

    eprint("Mongodb agent started, waiting for input to send.")
    # eprint("Endpoints to use will be {0} to experiments and {1} to tests".format(
    #    agent.get_experiments_endpoint(), agent.get_tests_endpoint()
    # ))

    try:
        # UNAFFECTED BY BUFFERING
        while True:
            line = sys.stdin.readline()

            if line == "":
                # Reached EOF
                break

            try:
                new_doc = ast.literal_eval(line)
                docs[new_doc["type"]] = docs[new_doc["type"]] + [new_doc["info"]]
            except ValueError:
                eprint("Error with document " + str(line))
                continue

            length_docs = agent.get_legth_docs(docs)
            if length_docs >= post_doc_buffer_length:
                agent.send_docs(docs)
                docs = dict()
                docs["experiment"] = []
                docs["test"] = []

                if abort:
                    exit(1)
                sys.stdout.flush()
        agent.send_docs(docs)
    except IOError as e:
        eprint("Terminated due to error")
        eprint(e)
    except KeyboardInterrupt:
        eprint("Terminated due to user interrupt")
        agent.send_docs(docs)

Classes

class MongoDBTimestampAgent (os_env=None)
Expand source code
class MongoDBTimestampAgent:

    def __init__(self, os_env=None):
        if not os_env:
            self.tests_post_endpoint = os.getenv(TESTS_POST_ENDPOINT_OS_VARNAME, default_tests_database_name)
            self.experiments_post_endpoint = os.getenv(EXPERIMENTS_POST_ENDPOINT_OS_VARNAME,
                                                       default_experiments_database_name)
            self.mongodb_ip = os.getenv(MONGODB_IP_OS_VARNAME, default_mongodb_ip)
            try:
                self.mongodb_port = str(int(os.getenv(MONGODB_PORT_OS_VARNAME, default_mongodb_port)))
            except ValueError:
                eprint("Invalid port configuration, using default '" + str(default_mongodb_port) + "'")
                self.mongodb_port = str(default_mongodb_port)
        else:
            try:
                self.tests_post_endpoint = os_env[TESTS_POST_ENDPOINT_OS_VARNAME]
            except KeyError:
                self.tests_post_endpoint = default_tests_database_name
            try:
                self.experiments_post_endpoint = os_env[EXPERIMENTS_POST_ENDPOINT_OS_VARNAME]
            except KeyError:
                self.experiments_post_endpoint = default_experiments_database_name
            try:
                self.mongodb_ip = os_env[MONGODB_IP_OS_VARNAME]
            except KeyError:
                self.mongodb_ip = default_mongodb_ip
            try:
                self.mongodb_port = str(int(os_env[MONGODB_PORT_OS_VARNAME]))
            except (ValueError, KeyError):
                self.mongodb_port = str(default_mongodb_port)

        self.tests_full_endpoint = "http://{0}:{1}/{2}".format(self.mongodb_ip, self.mongodb_port,
                                                               self.tests_post_endpoint)

        self.experiments_full_endpoint = "http://{0}:{1}/{2}".format(self.mongodb_ip, self.mongodb_port,
                                                                     self.experiments_post_endpoint)

    def get_experiments_endpoint(self):
        return self.experiments_full_endpoint

    def get_tests_endpoint(self):
        return self.tests_full_endpoint

    @staticmethod
    def merge_data_from_existing_doc(old, new):
        for key in ["start_time", "end_time"]:
            try:
                if key not in new.keys():
                    new[key] = old[key]
            except KeyError:
                pass

        return new

    def post_doc(self, doc, info, endpoint):
        tries = 0
        while tries <= MAX_CONNECTION_TRIES:
            if info == {}:
                # Document doesn't exist, create
                r = requests.post(endpoint, headers=headers, data=json.dumps(doc))
                if r.status_code != 201:
                    eprint("Couldn't properly put document to address {0}".format(endpoint))
                    eprint(r.text)
                    tries += 1
                else:
                    iprint("Document created at: {0}".format(
                        time.strftime("%D %H:%M:%S", time.localtime()) + " timestamp is " + str(time.time())))
                    break
            else:
                # Test exists, update
                etag = info["_etag"]
                doc_id = info["_id"]

                doc = self.merge_data_from_existing_doc(info, doc)

                these_headers = headers
                these_headers["If-Match"] = etag
                r = requests.put(endpoint + "/" + str(doc_id), headers=these_headers,
                                 data=json.dumps(doc))
                if r.status_code != 200:
                    eprint("Couldn't properly put document to address {0}".format(endpoint))
                    eprint(r.text)
                    tries += 1
                else:
                    iprint("Document updated at: " + time.strftime("%D %H:%M:%S", time.localtime()))
                    break
        if tries > MAX_CONNECTION_TRIES:
            error_string = "Information posting for document {0} failed too many times, aborting".format(str(doc))
            eprint(error_string)
            raise ConnectionError(error_string)

    def experiment_exists(self, experiment_id, username):
        info = self.get_experiment(experiment_id, username)
        if info:
            return True
        return False

    def send_experiment_docs(self, documents):
        for experiment in documents:
            info = self.get_experiment(experiment["experiment_id"], experiment["username"])
            self.post_doc(experiment, info, self.experiments_full_endpoint)

    def send_test_docs(self, documents):
        for test in documents:
            if not self.experiment_exists(test["experiment_id"], test["username"]):
                # Experiment doesn't exist
                eprint("Experiment {0} doesn't exist".format(test["experiment_id"]))
                continue
            info = self.get_test(test["experiment_id"], test["test_name"], test["username"])
            self.post_doc(test, info, self.tests_full_endpoint)

    def send_docs(self, documents):
        if len(documents["experiment"]) >= 1:
            self.send_experiment_docs(documents["experiment"])
        if len(documents["test"]) >= 1:
            self.send_test_docs(documents["test"])

    @staticmethod
    def get_legth_docs(documents):
        num_docs = 0
        for key in documents:
            num_docs += len(documents[key])
        return num_docs

    def delete_test(self, experiment_id, test_name, username):
        if self.experiment_exists(experiment_id, username):
            test = self.get_test(experiment_id, test_name, username)
            if not test:
                iprint("Document doesn't {0} exist".format(test_name))
                return
            headers["If-Match"] = test["_etag"]
            tests_full_endpoint = "{0}/{1}".format(self.tests_full_endpoint, test["_id"])
            r = requests.delete(tests_full_endpoint, headers=headers)
            if r.status_code != 204:
                eprint("Couldn't properly delete document to address {0}".format(tests_full_endpoint))
                eprint(r.text)
            else:
                iprint("Document deleted at: " + time.strftime("%D %H:%M:%S", time.localtime()))
        else:
            return False

    def delete_experiment(self, experiment_id, username):
        if self.experiment_exists(experiment_id, username):
            experiment = self.get_experiment(experiment_id, username)
            headers["If-Match"] = experiment["_etag"]
            experiment_full_endoint = "{0}/{1}".format(self.experiments_full_endpoint, experiment["_id"])
            r = requests.delete(experiment_full_endoint, headers=headers)
            if r.status_code != 204:
                eprint("Couldn't properly delete document in {0}".format(experiment_full_endoint))
                eprint(r.text)
            else:
                iprint("Document deleted at: " + time.strftime("%D %H:%M:%S", time.localtime()))
        else:
            eprint("Document doesn't {0} exist".format(experiment_id))
            return False

    def get_all_experiments(self, username):
        last_page = False
        all_experiments = list()
        host_endpoint = "http://{0}:{1}".format(self.mongodb_ip, self.mongodb_port)
        endpoint = "{0}/{1}".format(host_endpoint, self.experiments_post_endpoint) + \
                   "/?where=" + "{" + '"username":"' + username + '"}'
        while not last_page:
            data = self.get_paginated_docs(endpoint)
            if not data:
                return all_experiments
            num_page = data["_meta"]["page"]
            max_num_retrieved_experiments = num_page * data["_meta"]["max_results"]
            num_total_experiments = data["_meta"]["total"]
            if max_num_retrieved_experiments < num_total_experiments:
                endpoint = "{0}/{1}".format(host_endpoint, data["_links"]["next"]["href"])
            else:
                last_page = True
            all_experiments += data["_items"]
        return all_experiments

    def get_doc(self, endpoint, doc_name):
        tries = 0
        while tries <= MAX_CONNECTION_TRIES:
            status_code = None
            try:
                r = requests.get(endpoint)
                if r.status_code == 200:
                    experiments = r.json()["_items"]
                    if experiments:
                        return r.json()["_items"][0]
                    else:
                        return {}
                elif r.status_code == 404:
                    return {}
                else:
                    status_code = r.status_code
            except requests.ConnectionError as error:
                eprint("Error with request {0}".format(str(error)))
            eprint("Couldn't get document {0}, trying again for the {1} time out of {2}".format(
                doc_name, tries, MAX_CONNECTION_TRIES))
            if status_code:
                eprint("Status code was {0}".format(str(status_code)))
            tries += 1

        if tries > MAX_CONNECTION_TRIES:
            error_string = "Information retrieval for document {0} failed too many times, aborting".format(
                doc_name)
            eprint(error_string)
            raise requests.ConnectionError(error_string)

    def get_experiment(self, experiment_name, username):
        query = '?where={"experiment_id": "' + experiment_name + '","username":"' + username + '"}'
        endpoint = "{0}/{1}".format(self.experiments_full_endpoint, query)
        return self.get_doc(endpoint, experiment_name)

    def get_test(self, experiment_id, test_name, username):
        query = '?where={"experiment_id": "' + experiment_id + '", "test_name":"' + test_name + '","username":"' + username + '"}'
        endpoint = "{0}/{1}".format(self.tests_full_endpoint, query)
        return self.get_doc(endpoint, test_name)

    def get_experiment_tests(self, experiment_id, username):
        last_page = False
        all_tests = list()
        host_endpoint = "http://{0}:{1}".format(self.mongodb_ip, self.mongodb_port)
        endpoint = "{0}/{1}".format(host_endpoint, self.tests_post_endpoint) + \
                   '/?where={"experiment_id":"' + experiment_id + '", "username":"' + username + '"}'
        while not last_page:
            data = self.get_paginated_docs(endpoint)
            if not data:
                return all_tests
            num_page = data["_meta"]["page"]
            max_num_retrieved_experiments = num_page * data["_meta"]["max_results"]
            num_total_tests = data["_meta"]["total"]
            if max_num_retrieved_experiments < num_total_tests:
                endpoint = "{0}/{1}".format(host_endpoint, data["_links"]["next"]["href"])
            else:
                last_page = True
            all_tests += data["_items"]
        return all_tests

    @staticmethod
    def get_paginated_docs(endpoint):
        try:
            r = requests.get(endpoint)
            return r.json()
        except requests.ConnectionError as error:
            eprint("Error with request {0}".format(str(error)))
            return None

Static methods

def get_legth_docs(documents)
Expand source code
@staticmethod
def get_legth_docs(documents):
    num_docs = 0
    for key in documents:
        num_docs += len(documents[key])
    return num_docs
def get_paginated_docs(endpoint)
Expand source code
@staticmethod
def get_paginated_docs(endpoint):
    try:
        r = requests.get(endpoint)
        return r.json()
    except requests.ConnectionError as error:
        eprint("Error with request {0}".format(str(error)))
        return None
def merge_data_from_existing_doc(old, new)
Expand source code
@staticmethod
def merge_data_from_existing_doc(old, new):
    for key in ["start_time", "end_time"]:
        try:
            if key not in new.keys():
                new[key] = old[key]
        except KeyError:
            pass

    return new

Methods

def delete_experiment(self, experiment_id, username)
Expand source code
def delete_experiment(self, experiment_id, username):
    if self.experiment_exists(experiment_id, username):
        experiment = self.get_experiment(experiment_id, username)
        headers["If-Match"] = experiment["_etag"]
        experiment_full_endoint = "{0}/{1}".format(self.experiments_full_endpoint, experiment["_id"])
        r = requests.delete(experiment_full_endoint, headers=headers)
        if r.status_code != 204:
            eprint("Couldn't properly delete document in {0}".format(experiment_full_endoint))
            eprint(r.text)
        else:
            iprint("Document deleted at: " + time.strftime("%D %H:%M:%S", time.localtime()))
    else:
        eprint("Document doesn't {0} exist".format(experiment_id))
        return False
def delete_test(self, experiment_id, test_name, username)
Expand source code
def delete_test(self, experiment_id, test_name, username):
    if self.experiment_exists(experiment_id, username):
        test = self.get_test(experiment_id, test_name, username)
        if not test:
            iprint("Document doesn't {0} exist".format(test_name))
            return
        headers["If-Match"] = test["_etag"]
        tests_full_endpoint = "{0}/{1}".format(self.tests_full_endpoint, test["_id"])
        r = requests.delete(tests_full_endpoint, headers=headers)
        if r.status_code != 204:
            eprint("Couldn't properly delete document to address {0}".format(tests_full_endpoint))
            eprint(r.text)
        else:
            iprint("Document deleted at: " + time.strftime("%D %H:%M:%S", time.localtime()))
    else:
        return False
def experiment_exists(self, experiment_id, username)
Expand source code
def experiment_exists(self, experiment_id, username):
    info = self.get_experiment(experiment_id, username)
    if info:
        return True
    return False
def get_all_experiments(self, username)
Expand source code
def get_all_experiments(self, username):
    last_page = False
    all_experiments = list()
    host_endpoint = "http://{0}:{1}".format(self.mongodb_ip, self.mongodb_port)
    endpoint = "{0}/{1}".format(host_endpoint, self.experiments_post_endpoint) + \
               "/?where=" + "{" + '"username":"' + username + '"}'
    while not last_page:
        data = self.get_paginated_docs(endpoint)
        if not data:
            return all_experiments
        num_page = data["_meta"]["page"]
        max_num_retrieved_experiments = num_page * data["_meta"]["max_results"]
        num_total_experiments = data["_meta"]["total"]
        if max_num_retrieved_experiments < num_total_experiments:
            endpoint = "{0}/{1}".format(host_endpoint, data["_links"]["next"]["href"])
        else:
            last_page = True
        all_experiments += data["_items"]
    return all_experiments
def get_doc(self, endpoint, doc_name)
Expand source code
def get_doc(self, endpoint, doc_name):
    tries = 0
    while tries <= MAX_CONNECTION_TRIES:
        status_code = None
        try:
            r = requests.get(endpoint)
            if r.status_code == 200:
                experiments = r.json()["_items"]
                if experiments:
                    return r.json()["_items"][0]
                else:
                    return {}
            elif r.status_code == 404:
                return {}
            else:
                status_code = r.status_code
        except requests.ConnectionError as error:
            eprint("Error with request {0}".format(str(error)))
        eprint("Couldn't get document {0}, trying again for the {1} time out of {2}".format(
            doc_name, tries, MAX_CONNECTION_TRIES))
        if status_code:
            eprint("Status code was {0}".format(str(status_code)))
        tries += 1

    if tries > MAX_CONNECTION_TRIES:
        error_string = "Information retrieval for document {0} failed too many times, aborting".format(
            doc_name)
        eprint(error_string)
        raise requests.ConnectionError(error_string)
def get_experiment(self, experiment_name, username)
Expand source code
def get_experiment(self, experiment_name, username):
    query = '?where={"experiment_id": "' + experiment_name + '","username":"' + username + '"}'
    endpoint = "{0}/{1}".format(self.experiments_full_endpoint, query)
    return self.get_doc(endpoint, experiment_name)
def get_experiment_tests(self, experiment_id, username)
Expand source code
def get_experiment_tests(self, experiment_id, username):
    last_page = False
    all_tests = list()
    host_endpoint = "http://{0}:{1}".format(self.mongodb_ip, self.mongodb_port)
    endpoint = "{0}/{1}".format(host_endpoint, self.tests_post_endpoint) + \
               '/?where={"experiment_id":"' + experiment_id + '", "username":"' + username + '"}'
    while not last_page:
        data = self.get_paginated_docs(endpoint)
        if not data:
            return all_tests
        num_page = data["_meta"]["page"]
        max_num_retrieved_experiments = num_page * data["_meta"]["max_results"]
        num_total_tests = data["_meta"]["total"]
        if max_num_retrieved_experiments < num_total_tests:
            endpoint = "{0}/{1}".format(host_endpoint, data["_links"]["next"]["href"])
        else:
            last_page = True
        all_tests += data["_items"]
    return all_tests
def get_experiments_endpoint(self)
Expand source code
def get_experiments_endpoint(self):
    return self.experiments_full_endpoint
def get_test(self, experiment_id, test_name, username)
Expand source code
def get_test(self, experiment_id, test_name, username):
    query = '?where={"experiment_id": "' + experiment_id + '", "test_name":"' + test_name + '","username":"' + username + '"}'
    endpoint = "{0}/{1}".format(self.tests_full_endpoint, query)
    return self.get_doc(endpoint, test_name)
def get_tests_endpoint(self)
Expand source code
def get_tests_endpoint(self):
    return self.tests_full_endpoint
def post_doc(self, doc, info, endpoint)
Expand source code
def post_doc(self, doc, info, endpoint):
    tries = 0
    while tries <= MAX_CONNECTION_TRIES:
        if info == {}:
            # Document doesn't exist, create
            r = requests.post(endpoint, headers=headers, data=json.dumps(doc))
            if r.status_code != 201:
                eprint("Couldn't properly put document to address {0}".format(endpoint))
                eprint(r.text)
                tries += 1
            else:
                iprint("Document created at: {0}".format(
                    time.strftime("%D %H:%M:%S", time.localtime()) + " timestamp is " + str(time.time())))
                break
        else:
            # Test exists, update
            etag = info["_etag"]
            doc_id = info["_id"]

            doc = self.merge_data_from_existing_doc(info, doc)

            these_headers = headers
            these_headers["If-Match"] = etag
            r = requests.put(endpoint + "/" + str(doc_id), headers=these_headers,
                             data=json.dumps(doc))
            if r.status_code != 200:
                eprint("Couldn't properly put document to address {0}".format(endpoint))
                eprint(r.text)
                tries += 1
            else:
                iprint("Document updated at: " + time.strftime("%D %H:%M:%S", time.localtime()))
                break
    if tries > MAX_CONNECTION_TRIES:
        error_string = "Information posting for document {0} failed too many times, aborting".format(str(doc))
        eprint(error_string)
        raise ConnectionError(error_string)
def send_docs(self, documents)
Expand source code
def send_docs(self, documents):
    if len(documents["experiment"]) >= 1:
        self.send_experiment_docs(documents["experiment"])
    if len(documents["test"]) >= 1:
        self.send_test_docs(documents["test"])
def send_experiment_docs(self, documents)
Expand source code
def send_experiment_docs(self, documents):
    for experiment in documents:
        info = self.get_experiment(experiment["experiment_id"], experiment["username"])
        self.post_doc(experiment, info, self.experiments_full_endpoint)
def send_test_docs(self, documents)
Expand source code
def send_test_docs(self, documents):
    for test in documents:
        if not self.experiment_exists(test["experiment_id"], test["username"]):
            # Experiment doesn't exist
            eprint("Experiment {0} doesn't exist".format(test["experiment_id"]))
            continue
        info = self.get_test(test["experiment_id"], test["test_name"], test["username"])
        self.post_doc(test, info, self.tests_full_endpoint)