Source code for joule.client.base_module

import argparse
import asyncio
import signal

from typing import List, Tuple, Dict, Optional
from aiohttp import web
import socket
import logging
import os
import uvloop

from joule.api import node
from joule import api
from joule.client import helpers
# import directly so it can be mocked easily in unit tests
from joule.errors import ConfigurationError, EmptyPipeError
from joule.models import pipes, stream

Pipes = Dict[str, 'pipes.Pipe']
Loop = asyncio.AbstractEventLoop
log = logging.getLogger('joule')


class BaseModule:
    """
    This is an abstract class and should not be inherited directly. Instead inherit from one of
    the following children :class:`joule.ReaderModule`, :class:`joule.FilterModule`, or :class:`joule.CompositeModule`
    """

    def __init__(self):
        self.stop_requested = False
        self.pipes: List[pipes.Pipe] = []
        self.STOP_TIMEOUT = 2
        self.runner = None
        self.node = None

    async def run_as_task(self, parsed_args, app: web.Application, loop) -> asyncio.Task:
        assert False, "implement in child class"  # pragma: no cover

    def custom_args(self, parser: argparse.ArgumentParser):
        """

        Override to add custom command line arguments to the module.

        .. code-block:: python

            class ModuleDemo(BaseModule):

                def custom_args(self, parser):
                     parser.description = "**module description**"
                     # add optional help text to the argument
                     parser.add_argument("--arg", help="custom argument")
                     # parse json input
                     parser.add_argument("--json_arg", type=json.loads)
                     # a yes|no argument that resolves to True|False
                     parser.add_argument("--flag_arg", type=joule.yesno)

                #... other module code


        Always use keyword arguments with modules so they can be specified
        in the **[Arguments]** section  of module configuration file

        Use the ``type`` parameter to specify a parser function. The parser
        function should accept a string input and return the appropriate object.

        """
        # parser.add_argument("--custom_flag")
        pass  # pragma: no cover

    def stop(self):
        """
        Override to change the default shutdown strategy which simply sets
        the ``stop_requested`` flag. If a module does not terminate within a few seconds
        of this method being called Joule will forcibly stop the module with SIGKILL.
        """
        # override in client for alternate shutdown strategy
        self.stop_requested = True

    def create_dev_app(self, loop: asyncio.AbstractEventLoop) -> web.Application:
        parser = argparse.ArgumentParser()
        self._build_args(parser)
        # must specify module config in JOULE_MODULE_CONFIG environment variable
        if 'JOULE_MODULE_CONFIG' not in os.environ:
            raise ConfigurationError("JOULE_MODULE_CONFIG not set, must specify config file")
        module_config_file = os.environ['JOULE_MODULE_CONFIG']
        if not os.path.isfile(module_config_file):
            raise ConfigurationError("JOULE_MODULE_CONFIG is not a module config file")

        module_args = helpers.load_args_from_file(module_config_file)
        parsed_args = parser.parse_args(module_args)
        self.node = api.get_node(parsed_args.node)

        self.stop_requested = False
        my_app = self._create_app()

        async def on_startup(app):
            app['task'] = await self.run_as_task(parsed_args, app, loop)

        my_app.on_startup.append(on_startup)

        async def on_shutdown(app):
            self.stop()
            await app['task']

        my_app.on_shutdown.append(on_shutdown)
        return my_app

    def start(self, parsed_args: argparse.Namespace = None):
        """
        Execute the module. Do not override this function. Creates an event loop and
        executes the :meth:`run` coroutine.

        Args:
            parsed_args: omit to parse the command line arguments

        .. code-block:: python

            class ModuleDemo(BaseModule):
                # body of module...
                # at a minimum the run coroutine must be implemented

            if __name__ == "__main__":
                my_module = ModuleDemo()
                my_module.start()
        """

        if parsed_args is None:  # pragma: no cover
            parser = argparse.ArgumentParser()
            self._build_args(parser)
            module_args = helpers.module_args()
            parsed_args = parser.parse_args(module_args)

        #asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        loop = asyncio.get_event_loop()
        self.stop_requested = False
        if parsed_args.api_socket != "unset":
            # should be set by the joule daemon
            if 'JOULE_CA_FILE' in os.environ:
                cafile = os.environ['JOULE_CA_FILE']
            else:
                cafile = ""
            self.node = node.UnixNode("local", parsed_args.api_socket, cafile, loop)
        else:
            self.node = api.get_node(parsed_args.node)
        self.runner: web.AppRunner = loop.run_until_complete(self._start_interface(parsed_args))
        app = None
        if self.runner is not None:
            app = self.runner.app
        try:
            task = loop.run_until_complete(
                self.run_as_task(parsed_args, app, loop))
        except ConfigurationError as e:
            log.error("ERROR: " + str(e))
            self._cleanup(loop)
            return

        def stop_task():
            # give task no more than 2 seconds to exit
            loop.call_later(self.STOP_TIMEOUT, task.cancel)
            # run custom exit routine
            self.stop()

        loop.add_signal_handler(signal.SIGINT, stop_task)
        loop.add_signal_handler(signal.SIGTERM, stop_task)
        try:
            loop.run_until_complete(task)
        except (asyncio.CancelledError, pipes.EmptyPipe, EmptyPipeError):
            pass
        self._cleanup(loop)

    def _cleanup(self, loop: Loop):

        if self.runner is not None:
            loop.run_until_complete(self.runner.cleanup())
        for pipe in self.pipes:
            loop.run_until_complete(pipe.close())
        if self.node is not None:
            loop.run_until_complete(self.node.close())
        loop.run_until_complete(asyncio.sleep(0))
        loop.close()

    def _build_args(self, parser):  # pragma: no cover
        grp = parser.add_argument_group('joule',
                                        'control module execution')
        # --pipes: JSON argument set by jouled
        grp.add_argument("--pipes",
                         default="unset",
                         help='RESERVED, managed by jouled')
        # --socket: UNIX socket set by jouled for interface proxy
        grp.add_argument("--socket",
                         default="unset",
                         help='RESERVED, managed by jouled')
        # --api-socket: UNIX socket for API calls back to jouled
        grp.add_argument("--api-socket",
                         default="unset",
                         help='RESERVED, managed by jouled')
        # --port: port to host interface on during isolated execution
        grp.add_argument("--port", type=int,
                         default=8080,
                         help='port for isolated execution')
        # --host: IP address to host interface on during isolated exedcution
        grp.add_argument("--host",
                         default="0.0.0.0",
                         help="IP address for isolated execution")
        # --module_config: set to run module standalone
        grp.add_argument("--module_config",
                         default="unset",
                         help='specify *.conf file for isolated execution')
        # --stream_configs: set to run module standalone
        grp.add_argument("--stream_configs",
                         default="unset",
                         help="specify directory of stream configs " +
                              "for isolated execution")
        # --node: node to connect to for isolated execution
        grp.add_argument("--node", default="",
                         help="joule node for isolated execution")
        # --start_time: historical isolation mode
        grp.add_argument("--start_time", default=None,
                         help="input start time for historic isolation")
        # --end_time: historical isolation mode
        grp.add_argument("--end_time", default=None,
                         help="input end time for historic isolation")
        # --force: do not ask for confirmation
        grp.add_argument("--force", action="store_true",
                         help="do not ask for confirmation before dangerous operations")

        parser.formatter_class = argparse.RawDescriptionHelpFormatter
        self.custom_args(parser)

    async def _build_pipes(self, parsed_args) -> Tuple[Pipes, Pipes]:
        # for unit testing mocks
        from joule.client.helpers.pipes import build_network_pipes

        pipe_args = parsed_args.pipes

        # figure out whether we should run with fd's or network sockets
        if pipe_args == 'unset':
            start, end = helpers.validate_time_bounds(parsed_args.start_time,
                                                      parsed_args.end_time)
            if parsed_args.module_config == 'unset':
                raise ConfigurationError('must specify --module_config')
            inputs = {}
            outputs = {}
            module_config = helpers.read_module_config(parsed_args.module_config)
            if 'Inputs' in module_config:
                for name, path in module_config['Inputs'].items():
                    inputs[name] = path
            if 'Outputs' in module_config:
                for name, path in module_config['Outputs'].items():
                    outputs[name] = path

            configured_streams = {}
            if parsed_args.stream_configs != "unset":
                configured_streams = helpers.read_stream_configs(parsed_args.stream_configs)
            pipes_in, pipes_out = await build_network_pipes(
                inputs, outputs, configured_streams, self.node, start, end, parsed_args.force)
        else:
            # run the module within joule
            (pipes_in, pipes_out) = await helpers.build_fd_pipes(pipe_args, self.node)
        # keep track of the pipes so they can be closed
        self.pipes = list(pipes_in.values()) + list(pipes_out.values())
        return pipes_in, pipes_out

    def routes(self):
        """
        Override to register HTTP handlers for the module. Return
        an array of handlers. This creates a visualization interface.

        .. code-block:: python

            class ModuleDemo(BaseModule):

                def routes(self):
                    return [
                        web.get('/', self.index),
                        # other handlers ...
                    ]

                async def index(self, request):
                    return web.Response(text="Hello World")

                #... other module code

        """
        return []  # override in child to implement a web interface

    def _create_app(self) -> web.Application:
        routes = self.routes()
        app = web.Application()
        app.add_routes(routes)
        return app

    async def _start_interface(self, args) -> Optional[web.AppRunner]:
        routes = self.routes()
        # socket config is 'unset' when run as a standalone process
        if args.socket == 'unset':
            if len(routes) == 0:
                return None  # no routes requested, nothing to do
            # create a local server
            app = web.Application()
            app.add_routes(routes)
            runner = web.AppRunner(app)
            await runner.setup()
            site = web.TCPSite(runner, port=args.port, host=args.host)
            await site.start()
            print("starting web server at %s:%d" % (args.host, args.port))
            return runner
        # socket config is 'none' when joule does not connect a socket
        if args.socket == 'none':
            if len(routes) > 0:
                logging.error("No socket available for the interface, check module configuration")
            return None
        # otherwise start a UNIX runner on the socket
        if os.path.exists(args.socket):
            log.error("Socket address [%s] is already in use, cannot start interface" % socket)
            return None
        app = web.Application()
        app.add_routes(routes)
        runner = web.AppRunner(app)
        await runner.setup()
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.bind(args.socket)
        site = web.SockSite(runner, sock)
        await site.start()
        print("starting web server at [%s]" % args.socket)
        return runner