Module scripts.jsons-to-mongodb

Expand source code
#!/usr/bin/env python
from __future__ import print_function
import sys
import fileinput
import time
import requests
import json
import ast
import os


def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


max_failed_connection_tries = 4
post_doc_buffer_length = 10000


def get_post_endpoint():
    default_mongodb_port = 8000
    default_mongodb_ip = "mongodb"
    default_profiling_database_name = "cpu"
    PROFILING_POST_ENDPOINT = "PROFILING_POST_ENDPOINT"
    profiling_post_endpoint = os.getenv(PROFILING_POST_ENDPOINT, default_profiling_database_name)

    MONGODB_IP = "MONGODB_IP"
    mongodb_ip = os.getenv(MONGODB_IP, default_mongodb_ip)

    MONGODB_PORT = "MONGODB_PORT"
    try:
        mongodb_port = str(int(os.getenv(MONGODB_PORT, default_mongodb_port)))
    except ValueError:
        eprint("Invalid port configuration, using default '" + str(default_mongodb_port) + "'")
        mongodb_port = str(default_mongodb_port)

    post_endpoint = 'http://' + mongodb_ip + ':' + mongodb_port + '/' + profiling_post_endpoint
    return post_endpoint


def send_docs(docs, post_endpoint):
    headers = {'content-type': 'application/json'}

    try:
        r = requests.post(post_endpoint, headers=headers, data=json.dumps(docs))
        if r.status_code != 201:
            eprint("[MONGODB SENDER] couldn't properly post documents to address " + post_endpoint)
            eprint(r.text)
            return False
        else:
            print("Post was done at: " + time.strftime("%D %H:%M:%S", time.localtime()) + " with " + str(
                len(docs)) + " documents , timestamp is " + str(time.time()))
            return True
    except requests.exceptions.ConnectionError as e:
        eprint("[MONGODB SENDER] couldn't properly post documents to address " + post_endpoint)
        eprint(e)
        return False


def main():
    post_endpoint = get_post_endpoint()
    failed_connections = 0
    json_documents = []
    try:
        for line in fileinput.input():
            try:
                new_doc = ast.literal_eval(line)
                json_documents = json_documents + [new_doc]
            except ValueError:
                print("Error with document " + str(line))
                continue

            length_docs = len(json_documents)
            if length_docs >= post_doc_buffer_length:
                if not send_docs(json_documents, post_endpoint):
                    failed_connections += 1
                json_documents = []

                if failed_connections >= max_failed_connection_tries:
                    eprint(
                        "[MONGODB SENDER] couldn't send documents to address " + post_endpoint + " and tried for " + str(
                            failed_connections) + " times, aborting")
                    exit(1)
                sys.stdout.flush()
        send_docs(json_documents, post_endpoint)
    except IOError as e:
        eprint("[MONGODB SENDER] terminated")
        eprint(e)
        pass
    except KeyboardInterrupt:
        eprint("[MONGODB SENDER] terminated")
        pass


if __name__ == "__main__":
    main()

Functions

def eprint(*args, **kwargs)
Expand source code
def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)
def get_post_endpoint()
Expand source code
def get_post_endpoint():
    default_mongodb_port = 8000
    default_mongodb_ip = "mongodb"
    default_profiling_database_name = "cpu"
    PROFILING_POST_ENDPOINT = "PROFILING_POST_ENDPOINT"
    profiling_post_endpoint = os.getenv(PROFILING_POST_ENDPOINT, default_profiling_database_name)

    MONGODB_IP = "MONGODB_IP"
    mongodb_ip = os.getenv(MONGODB_IP, default_mongodb_ip)

    MONGODB_PORT = "MONGODB_PORT"
    try:
        mongodb_port = str(int(os.getenv(MONGODB_PORT, default_mongodb_port)))
    except ValueError:
        eprint("Invalid port configuration, using default '" + str(default_mongodb_port) + "'")
        mongodb_port = str(default_mongodb_port)

    post_endpoint = 'http://' + mongodb_ip + ':' + mongodb_port + '/' + profiling_post_endpoint
    return post_endpoint
def main()
Expand source code
def main():
    post_endpoint = get_post_endpoint()
    failed_connections = 0
    json_documents = []
    try:
        for line in fileinput.input():
            try:
                new_doc = ast.literal_eval(line)
                json_documents = json_documents + [new_doc]
            except ValueError:
                print("Error with document " + str(line))
                continue

            length_docs = len(json_documents)
            if length_docs >= post_doc_buffer_length:
                if not send_docs(json_documents, post_endpoint):
                    failed_connections += 1
                json_documents = []

                if failed_connections >= max_failed_connection_tries:
                    eprint(
                        "[MONGODB SENDER] couldn't send documents to address " + post_endpoint + " and tried for " + str(
                            failed_connections) + " times, aborting")
                    exit(1)
                sys.stdout.flush()
        send_docs(json_documents, post_endpoint)
    except IOError as e:
        eprint("[MONGODB SENDER] terminated")
        eprint(e)
        pass
    except KeyboardInterrupt:
        eprint("[MONGODB SENDER] terminated")
        pass
def send_docs(docs, post_endpoint)
Expand source code
def send_docs(docs, post_endpoint):
    headers = {'content-type': 'application/json'}

    try:
        r = requests.post(post_endpoint, headers=headers, data=json.dumps(docs))
        if r.status_code != 201:
            eprint("[MONGODB SENDER] couldn't properly post documents to address " + post_endpoint)
            eprint(r.text)
            return False
        else:
            print("Post was done at: " + time.strftime("%D %H:%M:%S", time.localtime()) + " with " + str(
                len(docs)) + " documents , timestamp is " + str(time.time()))
            return True
    except requests.exceptions.ConnectionError as e:
        eprint("[MONGODB SENDER] couldn't properly post documents to address " + post_endpoint)
        eprint(e)
        return False