Modules

Modules are executable programs that process data DataStream Configuration. They are connected to each other by Pipes. Joule runs each module as a separate process. This enforces isolation and improves resiliency. Malfunctioning modules do not affect other parts of the pipeline and can be restarted without interrupting the data flow. There are three basic types: Reader Modules, Filter Modules, and Composite Modules.

Examples in the documentation below are available at https://github.com/wattsworth/example-modules.git This repository provides serveral examples of each module types and can be used as a template to design your own installable Joule modules.

Command Line:
$> git clone https://github.com/wattsworth/example-modules.git # To install modules system-wide: $> python3 setup.py install # To run unittests: $> python3 setup.py tests

The layout of the repository is shown below.

example_modules/
├── jouleexamples
│   ├── example_composite.py
│   ├── example_filter.py
│   ├── example_reader.py
│   └── ... other modules
├── module_configs
│   ├── example_composite.conf
│   ├── example_filter.conf
│   ├── example_reader.conf
│   └── ... other module configs
├── README.rst
└── stream_configs
    └── ... stream config examples

Reader Modules

Reader modules are designed to read data into the Joule Framework. Data can come from sensors, system logs, HTTP API's or any other timeseries data source. Reader modules should extend the base class joule.ReaderModule illustrated below.

_images/reader_module.png

Examples

Basic Reader

Source: example_modules/jouleexamples/example_reader.py
 1#!/usr/bin/env python3
 2
 3from joule import ReaderModule
 4from joule.utilities import time_now
 5import asyncio
 6import numpy as np
 7
 8
 9class ExampleReader(ReaderModule):
10    "Example reader: generates random values"
11
12    async def run(self, parsed_args, output):
13        while True:
14            value = np.random.rand()  # data from sensor
15            await output.write(np.array([[time_now(), value]]))
16            await asyncio.sleep(1)
17
18
19def main():
20    r = ExampleReader()
21    r.start()
22
23
24if __name__ == "__main__":
25    main()

Reader modules should extend the base joule.ReaderModule class. The child class must implement the joule.ReaderModule.run() coroutine which should perform the following in a loop:

  1. Read data from the input

  2. Timestamp data with Unix microseconds

  3. Insert data into the output stream

  4. Sleep to create the data rate

Line 11 reads data from the input (a random number function). Line 12 timestamps the data and inserts it into the output stream. Line 13 sleeps for one second creating a 1Hz sample rate. Note that the asyncio.sleep coroutine is used instead of the time.sleep function.

Note

The loop structure shown above should only be used for low bandwidth data sources. For higher bandwidth data pipe caching should be enabled or the data should be written in chunks as shown below. Write frequency should be 1Hz or lower to reduce inter-process communication and network overhead.

High Bandwidth Reader

Source: example_modules/jouleexamples/high_bandwidth_reader.py
 1#!/usr/bin/env python3
 2
 3from joule import ReaderModule
 4from joule.utilities import time_now
 5import asyncio
 6import numpy as np
 7
 8
 9class HighBandwidthReader(ReaderModule):
10    """ Produce a 1Hz ramp sampled at [rate] Hz """
11
12    def custom_args(self, parser):
13        grp = parser.add_argument_group("module",
14                                        "module specific arguments")
15        grp.add_argument("--rate", type=float,
16                         required=True,
17                         help="sample rate in Hz")
18
19    async def run(self, parsed_args, output):
20        start_ts = time_now()
21        # run 5 times per second
22        period = 1
23        samples_per_period = np.round(parsed_args.rate * period)
24        while True:
25            end_ts = start_ts + period * 1e6
26            ts = np.linspace(start_ts, end_ts,
27                             samples_per_period, endpoint=False)
28            vals = np.linspace(0, 33, samples_per_period)
29            start_ts = end_ts
30            chunk = np.hstack((ts[:, None], vals[:, None]))
31            await output.write(chunk)
32            await asyncio.sleep(period)
33
34
35def main():
36    r = HighBandwidthReader()
37    r.start()
38
39if __name__ == "__main__":
40    main()

Describe the argument parsing setup

Intermittent Reader

Another example showing how to handle sensor errors by creating intervals

Source: example_modules/jouleexamples/intermittent_reader.py
 1#!/usr/bin/env python3
 2
 3from joule import ReaderModule
 4from joule.utilities import time_now
 5import asyncio
 6import numpy as np
 7import logging
 8
 9ERROR_PROBABILITY = 0.25
10
11
12class IntermittentReader(ReaderModule):
13    """ Like HighBandwidth reader with random data interruptions """
14
15    def custom_args(self, parser):
16        grp = parser.add_argument_group("module",
17                                        "module specific arguments")
18        grp.add_argument("--rate", type=float,
19                         required=True,
20                         help="sample rate in Hz")
21
22    async def run(self, parsed_args, output):
23        start_ts = time_now()
24        period = 1
25        samples_per_period = np.round(parsed_args.rate * period)
26        while True:
27            try:
28                end_ts = start_ts + period * 1e6
29                ts = np.linspace(start_ts, end_ts,
30                                 samples_per_period, endpoint=False)
31                vals = np.linspace(0, 33, samples_per_period)
32                start_ts = end_ts
33                chunk = np.hstack((ts[:, None], vals[:, None]))
34                # simulate an error
35                if np.random.rand() < ERROR_PROBABILITY:
36                    raise ValueError
37                await output.write(chunk)
38            except ValueError:
39                logging.error("simulated data interruption")
40                await output.close_interval()
41            await asyncio.sleep(period)
42
43
44def main():
45    r = IntermittentReader()
46    r.start()
47
48
49if __name__ == "__main__":
50    main()

Development

During development it is often helpful to run the reader module as a standalone process in order to use debuggers such as pdb or visualization tools like matplotlib.pyplot. When a reader module is executed from the command line the output pipe is connected to stdout:

Command Line:
$>./demo_reader.py 1485188853650944 0.32359053067687582 0.70028608966895545 1485188853750944 0.72139550945715136 0.39218791387411422 1485188853850944 0.40728044378612194 0.26446072057019654 1485188853950944 0.61021957330250398 0.27359526775709841 # hit ctrl-c to stop

If the --module_config argument is specified the output pipe is instead connected to the stream specified in the configuration file. The stream will be created if it does not exist. By default the module will connect to the local joule server, use the --url option to connect to a specific joule server. Any arguments in the configuration file will be parsed as if they were specified on the command line.

Command Line:
$>./demo_reader.py --module_config=module.conf Contacting joule server at http://localhost:8080 # hit ctrl-c to stop

Testing

This section refers to test_reader.py in the example_modules repository. Joule unittests are written using asynctest, a library built on top of the standard unittest module that reduces the boilerplate of writing tests for async coroutines.

Each unittest file should contain a single async.TestCase class. The test runner will automatically run any functions starting with test_. Each test should have a docstring explaining the input and desired output. Tests should have three main sections as shown in the test_reader function below:

class TestReader(asynctest.TestCase):

    def test_reader(self):
        " with a rate=0.1, reader should generate 10 values in 1 second "
        # 1. build test objects
        # 2. run reader in an event loop
        # 3. check the results

Build test objects

# build test objects
my_reader = ReaderDemo()
pipe = LocalNumpyPipe("output", layout="float32_1")
args = argparse.Namespace(rate=0.1, pipes="unset")
  1. Create an instance of the reader module. Properly designed readers should not require any initialization parameters.

  2. Create an output pipe to receive data from the module. LocalNumpyPipe takes two arguments, a pipe name which should be a helpful string, and a layout. The layout should match the stream configuration file associated with your module. See the NumpyPipe documentation for details on local pipes and the layout parameter.

  3. Create an args object that contains values for any custom arguments your module requires, it also should also initialize the pipes argument to "unset". In production, modules generate pipes automatically from their command line parameters. In testing we disable the pipe building routine by using the keyword "unset", and instead pass our own pipe to the module's run function, below.

Run event loop

loop = asyncio.get_event_loop()
my_task = asyncio.ensure_future(my_reader.run(args, pipe))
loop.call_later(1, my_task.cancel)
try:
    loop.run_until_complete(my_task)
except asyncio.CancelledError:
    pass
loop.close()

Modules are asynchronous coroutines that run in an event loop. The asynctest framework provides a new event loop for each test so we can safely use the global loop returned by asyncio.get_event_loop. This code is common boilerplate for all reader modules and in general it should not require any customization. The code does the following:

  1. Get a reference to the global event loop

  2. Set up the reader to run as a Task using the arguments and pipe created earlier

  3. Schedule the reader task to be cancelled after one second

  4. Run the event loop loop until the reader task stops

  5. When the reader task is cancelled it generates a CancelledError which can be safely ignored

  6. Close the event loop so the test exits cleanly

Check results

result = pipe.read_nowait()
# data should be 0,1,2,...,9
np.testing.assert_array_equal(result['data'],
                              np.arange(10))
# timestamps should be about 0.1s apart
np.testing.assert_array_almost_equal(np.diff(result['timestamp'])/1e6,
                                     np.ones(9)*0.1, decimal=2)

This is the most important part of the test and it will vary greatly from module to module. There are two steps:

  1. Retrieve data from the pipe using pipe.read_nowait(). This is the synchronous version of the read command and should only be used in testing. Modules should always use the await pipe.read() syntax. By default read_nowait returns a structured array with a data field and timestamp field. If you want timestamps in column 0 and elements in columns 1-N, use read_nowait(flatten=True)

  2. Use the numpy.testing library to compare the data to an expected dataset you create manually. Note that the assert_array_almost_equal is the preferred testing function. Floating point arithmetic is inexact so directly comparing data using == can generate spurious errors.

Reference

class joule.ReaderModule[source]

Inherit from this class and implement a run() coroutine to create a Joule reader module. Other methods documented below may be implemented as desired.

custom_args(parser: ArgumentParser)

Override to add custom command line arguments to the module.

class ModuleDemo(BaseModule):

    def custom_args(self, parser):
         parser.description = "**module description**"
         # add optional help text to the argument
         parser.add_argument("--arg", help="custom argument")
         # parse json input
         parser.add_argument("--json_arg", type=json.loads)
         # a yes|no argument that resolves to True|False
         parser.add_argument("--flag_arg", type=joule.yesno)

    #... other module code

Always use keyword arguments with modules so they can be specified in the [Arguments] section of module configuration file

Use the type parameter to specify a parser function. The parser function should accept a string input and return the appropriate object.

routes()

Override to register HTTP handlers for the module. Return an array of handlers. This creates a visualization interface.

class ModuleDemo(BaseModule):

    def routes(self):
        return [
            web.get('/', self.index),
            # other handlers ...
        ]

    async def index(self, request):
        return web.Response(text="Hello World")

    #... other module code
async run(parsed_args: Namespace, output: Pipe)[source]

This method must be implemented. It should run in a loop, if it returns the module stops.

Parameters:
  • parsed_args -- command line arguments, configure with custom_args()

  • output -- pipe connection to the output data stream

class ModuleDemo(ReaderModule):

    def run(self, parsed_args, output):
         while(not self.stop_requested):
            data = self.read_sensor()
            await output.write(data)

    def self.read_sensor(self) -> np.ndarray:
        # custom logic specific to the reader

    #... other module code
async setup(parsed_args, app, output)[source]

Configure the module, executes before run()

Parameters:
  • parsed_args --

  • app --

  • output --

start(parsed_args: Namespace | None = None)

Execute the module. Do not override this function. Creates an event loop and executes the run() coroutine.

Parameters:

parsed_args -- omit to parse the command line arguments

class ModuleDemo(BaseModule):
    # body of module...
    # at a minimum the run coroutine must be implemented

if __name__ == "__main__":
    my_module = ModuleDemo()
    my_module.start()
stop()

Override to change the default shutdown strategy which simply sets the stop_requested flag. If a module does not terminate within a few seconds of this method being called Joule will forcibly stop the module with SIGKILL.

Filter Modules

Filter modules process data. They may have one or more input streams and one or more output streams. Filter modules should extend the base class joule.FilterModule illustrated below.

_images/filter_module.png

Examples

Basic Filter

Source: example_modules/jouleexamples/example_filter.py
 1#!/usr/bin/env python3
 2
 3from joule import FilterModule, EmptyPipe
 4from scipy.signal import medfilt
 5import asyncio
 6
 7
 8class ExampleFilter(FilterModule):
 9    """Apply linear scaling to input"""
10
11    async def run(self, parsed_args, inputs, outputs):
12        # data pipes (specified in configuration file)
13        raw = inputs["raw"]
14        scaled = outputs["scaled"]
15
16        # linear scaling: y=mx+b
17        m = 2.0
18        b = 1.5
19
20        while True:
21            # read new data
22            vals = await raw.read()
23            # apply linear scaling y=mx+b
24            vals["data"] = vals["data"] * m + b
25            # write output
26            await scaled.write(vals)
27            # remove read data from the buffer
28            raw.consume(len(vals))
29            # propagate interval boundaries
30            if raw.end_of_interval:
31                await scaled.close_interval()
32            # limit execution to 1Hz chunks
33            await asyncio.sleep(1)
34
35
36def main():
37    r = ExampleFilter()
38    r.start()
39
40
41if __name__ == "__main__":
42    main()

Filter modules should extend the base FilterModule class. The child class must implement the joule.FilterModule.run() coroutine which should perform the following in a loop:

  1. Read from input pipe(s)

  2. Perform data processing

  3. Write to output pipe(s)

  4. Consume input data

Lines 10-11 retrieve the module's joule.Pipe connections to the input and output streams. Line 19 reads in new data from the "raw" stream into a Numpy structured array. Line 21 applies the linear scaling to the data in place. The data is then written to the output pipe in line 23 and the input data is removed from the buffer on line 25. The sleep statement ensures that data is processed in large chunks regardless of the rate at which it arrives. This ensures the system operates efficiently by reducing the frequency of context switches and inter-process communication.

Offset Filter

Source: example_modules/jouleexamples/offset_filter.py
 1#!/usr/bin/env python3
 2
 3from joule import FilterModule, EmptyPipe
 4
 5class OffsetFilter(FilterModule):
 6    "Add offset to data "
 7    
 8    def custom_args(self, parser):
 9        grp = parser.add_argument_group("module","module specific arguments")
10        grp.add_argument("--offset",
11                         type=int,
12                         required=True,
13                         help="apply an offset")
14        
15    async def run(self, parsed_args, inputs, outputs):
16        stream_in = inputs["input"]
17        stream_out = outputs["output"]
18        while(True):
19            try:
20                sarray = await stream_in.read()
21                sarray["data"] += parsed_args.offset
22                await stream_out.write(sarray)
23                stream_in.consume(len(sarray))
24            except EmptyPipe:
25                break
26
27
28def main():
29    r = OffsetFilter()
30    r.start()
31
32if __name__ == "__main__":
33    main()

The loop executes a WINDOW size median filter. Line 16 reads in new data from the “raw” stream into a structured array. Lines 19-20 execute the median filter in place. Many filtering algorithms including median require data before and after a sample to compute the output. Modules process data in chunks which produces artifacts at the beginning and end where there is insufficient data to compute the output. In this instance, the first and last EDGE samples of the chunk are invalid so they are omitted from the output in Line 23. The call to consume() on Line 26 prepends the last 2 × EDGE samples to the next input chunk to compensate for these boundary artifacts. This execution sequence produces exactly the same result as a median filter run over the entire dataset at once.

Development

During development it is often helpful to run modules as standalone processes in order to use debuggers such as pdb or visualization tools like matplotlib.pyplot. Filter (and Composite) modules may be executed outside of the Joule environment in either live or historic mode. When executed independently the module configuration file must be provided so that the module can request the appropriate stream connections from Joule.

Note

The joule service must be running in order to run filters as standalone processes

Live Isolation Connect filter inputs to live streams produced by the current joule pipeline. Specify the module configuration file and a directory with configurations for each output stream.

Command Line:
# [module.conf] is a module configuration file $>./demo_filter.py --module_config=module.conf Requesting live stream connections from jouled... [OK] #...stdout/stderr output from filter # hit ctrl-c to stop

Historic Isolation Connect filter inputs to a range of stream data saved in NilmDB.

Specify historic execution by including a time range with --start and --end arguments. The time range may be a date string or a Unix microseconds timestamp. Common phrases are also supported such as "2 hours ago" or "today".

Warning

Running a filter in historic isolation mode will overwrite existing output stream data

Command Line:
# [module.conf] is a module configuration file $>./demo_filter.py --module_config=module.conf \ --start="yesterday" --end="1 hour ago" Requesting historic stream connections from jouled... [OK] #...stdout/stderr output from filter # program exits after time range is processed

Testing

TODO

Reference

class joule.FilterModule[source]
custom_args(parser: ArgumentParser)

Override to add custom command line arguments to the module.

class ModuleDemo(BaseModule):

    def custom_args(self, parser):
         parser.description = "**module description**"
         # add optional help text to the argument
         parser.add_argument("--arg", help="custom argument")
         # parse json input
         parser.add_argument("--json_arg", type=json.loads)
         # a yes|no argument that resolves to True|False
         parser.add_argument("--flag_arg", type=joule.yesno)

    #... other module code

Always use keyword arguments with modules so they can be specified in the [Arguments] section of module configuration file

Use the type parameter to specify a parser function. The parser function should accept a string input and return the appropriate object.

routes()

Override to register HTTP handlers for the module. Return an array of handlers. This creates a visualization interface.

class ModuleDemo(BaseModule):

    def routes(self):
        return [
            web.get('/', self.index),
            # other handlers ...
        ]

    async def index(self, request):
        return web.Response(text="Hello World")

    #... other module code
async run(parsed_args, inputs, outputs)[source]

This method must be implemented. It should run in a loop, if it returns the module stops.

Parameters:
  • parsed_args -- parsed command line arguments, configure with joule.BaseModule.custom_args()

  • inputs -- pipe connections to input streams indexed by name (specified in the module configuration file).

  • outputs -- pipe connections to output streams indexed by name (specified in the module configuration file).

class ModuleDemo(FilterModule):

    def run(self, parsed_args, inputs, outputs):
        raw = inputs["raw"]
        filtered = outputs["filtered"]
        # this filter just passes the input through to the output
         while(not self.stop_requested):
            data = await raw.read()
            await filtered.write(data)
            raw.consume(len(data))

    #... other module code
async setup(parsed_args, app, inputs, outputs)[source]

Configure the module, executes before run()

Parameters:
  • parsed_args --

  • app --

  • inputs --

  • outputs --

Returns:

start(parsed_args: Namespace | None = None)

Execute the module. Do not override this function. Creates an event loop and executes the run() coroutine.

Parameters:

parsed_args -- omit to parse the command line arguments

class ModuleDemo(BaseModule):
    # body of module...
    # at a minimum the run coroutine must be implemented

if __name__ == "__main__":
    my_module = ModuleDemo()
    my_module.start()
stop()

Override to change the default shutdown strategy which simply sets the stop_requested flag. If a module does not terminate within a few seconds of this method being called Joule will forcibly stop the module with SIGKILL.

Composite Modules

Composite modules aggregate multiple modules into a single process. They may have one or more input streams and one or more output streams. Composite modules should extend the base class joule.CompositeModule illustrated below.

_images/composite_module.png

Examples

Source: example_modules/jouleexamples/example_composite.py
 1#!/usr/bin/python3
 2
 3import argparse
 4from joule import CompositeModule, LocalNumpyPipe
 5
 6from high_bandwidth_reader import HighBandwidthReader
 7from example_filter import ExampleFilter
 8
 9
10class ExampleComposite(CompositeModule):
11    """ Merge reader and filter into a single module:
12                [reader -> filter]->
13    """
14
15    async def setup(self, parsed_args,
16                    inputs, outputs):
17        # 1.) create nested modules
18        my_reader = HighBandwidthReader()
19        my_filter = ExampleFilter()
20
21        # 2.) create local pipes for interior streams
22        pipe = LocalNumpyPipe(name="raw", layout="float32_1")
23
24        # 3.) convert modules into tasks
25        #  output is an interior stream (write-end)
26        parsed_args = argparse.Namespace(rate=100)
27        task1 = my_reader.run(parsed_args, pipe)
28        #  raw is an interior stream (read-end)
29        #  filtered is an exterior stream
30        parsed_args = argparse.Namespace()
31        task2 = my_filter.run(parsed_args,
32                              {"raw": pipe},
33                              {"filtered": outputs["filtered"]})
34
35        # 4.) tasks are executed in the main event loop
36        return [task1, task2]
37
38
39def main():
40    r = ExampleComposite()
41    r.start()
42
43
44if __name__ == "__main__":
45    main()

The child class must implement the joule.CompositeModule.setup() coroutine which should perform the following:

  1. Create modules

  2. Create local pipes for interior streams

  3. Start modules by calling joule.BaseModule.run() with the appropriate parameters

  4. Return module tasks for execution in the main event loop

This example contains a High Bandwidth Reader connected to a Offset Filter. The modules are connected with a joule.LocalPipe and the output of the filter is connected to a joule.OutputPipe named filtered.

Creating Module Arguments

In the example above, both modules receive the parsed_args parameter directly. In more complex scenarios it is often necessary to construct a argparse.Namespace object for each module with the particular arguments it requires. Make sure all arguments are specified and match the expected data types The code snipped below constructs an appropriate Namespace object for the ArgumentParser configuration.

import json
import argparse

# example ArgumentParser

args = argparse.ArgumentParser("demo")
args.add_argument("--arg1", required=True)  # modules should use keyword arguments
args.add_argument("--arg2", type=int, required=True)
args.add_argument("--arg3", type=json.loads, required=True)

# to produce these arguments manually:

module_args = argparse.Namespace(**{
"arg1": "a string",  # type not specified
"arg2": 100,         # type=int
"arg3": [100,10,4]   # type=json
})

Development

See Filter Development.

Testing

See Filter Testing.

Reference

class joule.CompositeModule[source]
custom_args(parser: ArgumentParser)

Override to add custom command line arguments to the module.

class ModuleDemo(BaseModule):

    def custom_args(self, parser):
         parser.description = "**module description**"
         # add optional help text to the argument
         parser.add_argument("--arg", help="custom argument")
         # parse json input
         parser.add_argument("--json_arg", type=json.loads)
         # a yes|no argument that resolves to True|False
         parser.add_argument("--flag_arg", type=joule.yesno)

    #... other module code

Always use keyword arguments with modules so they can be specified in the [Arguments] section of module configuration file

Use the type parameter to specify a parser function. The parser function should accept a string input and return the appropriate object.

routes()

Override to register HTTP handlers for the module. Return an array of handlers. This creates a visualization interface.

class ModuleDemo(BaseModule):

    def routes(self):
        return [
            web.get('/', self.index),
            # other handlers ...
        ]

    async def index(self, request):
        return web.Response(text="Hello World")

    #... other module code
async setup(parsed_args: Namespace, inputs: Dict[str, Pipe], outputs: Dict[str, Pipe])[source]

This method must be implemented

Parameters:
  • parsed_args -- parsed command line arguments

  • inputs -- pipe connections to input streams. Keys are the names specified in the module configuration file

  • outputs -- pipe connections ot output streams. Keys are the names specified in the module configuration

  • loop -- the current event loop

Returns:

array of coroutine objects

start(parsed_args: Namespace | None = None)

Execute the module. Do not override this function. Creates an event loop and executes the run() coroutine.

Parameters:

parsed_args -- omit to parse the command line arguments

class ModuleDemo(BaseModule):
    # body of module...
    # at a minimum the run coroutine must be implemented

if __name__ == "__main__":
    my_module = ModuleDemo()
    my_module.start()
stop()

Override to change the default shutdown strategy which simply sets the stop_requested flag. If a module does not terminate within a few seconds of this method being called Joule will forcibly stop the module with SIGKILL.

User Interfaces

Modules can provide web-based user interfaces. When a Joule node is connected to a Lumen server, the user authentication and authorization is handled by Lumen and the interface is presented on a common dashboard with other modules the user is authorized to use.

To add an interface to a module implement the joule.BaseModule.routes() function and register handlers for any routes your module implements. Then enable the interface by changing the is_app attribute to yes in the Module Configuration file.

Examples

Basic Interface

Source: example_modules/jouleexamples/example_interface.py
 1#!/usr/bin/env python3
 2
 3import asyncio
 4from aiohttp import web
 5from joule import ReaderModule
 6
 7
 8class ExampleInterface(ReaderModule):
 9
10    async def run(self, parsed_args, output):
11        # data processing...
12        while True:
13            await asyncio.sleep(1)
14
15    def routes(self):
16        return [web.get('/', self.index)]
17
18    async def index(self, request):
19        return web.Response(text="hello world!")
20
21
22def main():
23    r = ExampleInterface()
24    r.start()
25
26
27if __name__ == "__main__":
28    main()

Bootstrap Interface

Typical web interfaces require more complex HTML, cascading style sheets (CSS), and javascript. The example below provides a complete module implementation using the Bootstrap CSS framework and Jinja HTML templates.

Source: example_modules/jouleexamples/bootstrap_interface.py
 1#!/usr/bin/env python3
 2
 3import asyncio
 4from aiohttp import web
 5import aiohttp_jinja2
 6import jinja2
 7import os
 8from random import randint
 9
10from joule.client.reader_module import ReaderModule
11
12CSS_DIR = os.path.join(os.path.dirname(__file__), 'assets', 'css')
13JS_DIR = os.path.join(os.path.dirname(__file__), 'assets', 'js')
14TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), 'assets', 'templates')
15
16
17class BootstrapInterface(ReaderModule):
18
19    async def setup(self, parsed_args, app, output):
20        loader = jinja2.FileSystemLoader(TEMPLATES_DIR)
21        aiohttp_jinja2.setup(app, loader=loader)
22
23    async def run(self, parsed_args, output):
24        # data processing...
25        while True:
26            await asyncio.sleep(1)
27
28    def routes(self):
29        return [
30            web.get('/', self.index),
31            web.get('/data.json', self.data),
32            web.static('/assets/css', CSS_DIR),
33            web.static('/assets/js', JS_DIR)
34        ]
35
36    @aiohttp_jinja2.template('index.jinja2')
37    async def index(self, request):
38        return {'message': "hello world"}
39
40    # json end point for AJAX requests
41    async def data(self, request):
42        # return summary statistics, etc.
43        return web.json_response(data={'random_value': randint(0, 10)})
44
45
46def main():
47    r = BootstrapInterface()
48    r.start()
49
50
51if __name__ == "__main__":
52    main()

In addition to the module code itself this interface requires several additional files located in the assets directory as shown:

file layout for ComplexInterface assets
├── bootstrap_interface.py
└── assets
    ├── css
    │   └── main.css # and other css files
    ├── js
    │   └── index.js # other js files
    └── templates
        ├── layout.jinja2
        └── index.jinja2

The HTML templates are stored in assets/templates. layout.jinja2 is common to all views and provides hooks to customize the content and inject additional stylesheet and script tags. The module home page renders index.jinja which is shown below:

Source: example_modules/jouleexamples/assets/templates/index.jinja2
 1{% extends "layout.jinja2" %}
 2
 3{% block title %} Bootstrap Interface {% endblock %}
 4
 5{% block css %}
 6    <link rel="stylesheet" href="assets/css/index.css">
 7{% endblock %}
 8
 9{% block js %}
10    <script src="assets/js/index.js"></script>
11{% endblock %}
12
13{% block content %}
14    <div class="jumbotron jumbotron-fluid">
15        <div class="container">
16            <h1 class="display-4">Bootstrap Interface</h1>
17            <p class="lead">
18                This module uses CSS and Javascript to create an interactive UI
19            </p>
20            <hr class="my-4">
21            <ul class="list-group">
22                <!-- example of jinja2 data -->
23                <li class="list-group-item">Static value injected with jinja2:
24                    <span class="message">{{ message }}</span>
25                </li>
26                <!-- example of AJAX data -->
27                <li class="list-group-item">Dynamic data loaded with AJAX:
28                    <!-- see assets/index.js for AJAX code -->
29                    <span id="data" class="message">&mdash;</span>
30                </li>
31            </ul>
32            <p class="lead mt-4">
33                <button class="btn btn-primary btn-lg" onclick="change_bkgd()">
34                    Change Background
35                </button>
36            </p>
37        </div>
38    </div>
39{% endblock %}

Notice that additional CSS and javascript assets that are injected into the appropriate blocks in the layout template. Bootstrap classes provide a simple and powerful mechanism for creating a basic page, but in some cases it may be necessary to add custom CSS to fine tune an element's appearance.

Source: example_modules/jouleexamples/assets/css/index.css
 1.message{
 2    background: #c2daf3;
 3    margin: 4px;
 4    padding: 8px;
 5    border-radius: 0.4rem;
 6    border-color: #007bff;
 7    border-style: solid;
 8    border-width: 2px;
 9    margin-left: 10px;
10    font-weight: bold;
11}

Javascript makes websites interactive. This file makes repeated calls to the server for new data. Using AJAX requests rather than reloading the entire page improves the user's experience and reduces network traffic.

Source: example_modules/jouleexamples/assets/js/index.js
 1$(function () {
 2    setInterval(loadData, 2000);
 3    $("#update-interval").val(loadData);
 4});
 5
 6function loadData(){
 7    $.get("data.json", function (data) {
 8        $("#data").text(data['random_value']);
 9    });
10}
11
12function change_bkgd(){
13    let color = parseInt(Math.random()*0xFFFFFF).toString(16);
14    $(".jumbotron").css("background", "#"+color);
15}

Development

When running as a standalone process, modules that provide a web interface will start a local webserver on port 8000 (by default). This is accessible from a browser at http://localhost:8000.