API Reference

Modules

class joule.BaseModule[source]

This is an abstract class and should not be inherited directly. Instead inherit from one of the following children joule.ReaderModule, joule.FilterModule, or joule.CompositeModule

custom_args(parser: argparse.ArgumentParser)[source]

Override to add custom command line arguments to the module.

class ModuleDemo(BaseModule):

    def custom_args(self, parser):
         parser.description = "**module description**"
         # add optional help text to the argument
         parser.add_argument("--arg", help="custom argument")
         # parse json input
         parser.add_argument("--json_arg", type=json.loads)
         # a yes|no argument that resolves to True|False
         parser.add_argument("--flag_arg", type=joule.yesno)

    #... other module code

Always use keyword arguments with modules so they can be specified in the [Arguments] section of module configuration file

Use the type parameter to specify a parser function. The parser function should accept a string input and return the appropriate object.

routes()[source]

Override to register HTTP handlers for the module. Return an array of handlers. This creates a visualization interface.

class ModuleDemo(BaseModule):

    def routes(self):
        return [
            web.get('/', self.index),
            # other handlers ...
        ]

    async def index(self, request):
        return web.Response(text="Hello World")

    #... other module code
start(parsed_args: argparse.Namespace = None)[source]

Execute the module. Do not override this function. Creates an event loop and executes the run() coroutine.

Parameters:parsed_args -- omit to parse the command line arguments
class ModuleDemo(BaseModule):
    # body of module...
    # at a minimum the run coroutine must be implemented

if __name__ == "__main__":
    my_module = ModuleDemo()
    my_module.start()
stop()[source]

Override to change the default shutdown strategy which simply sets the stop_requested flag. If a module does not terminate within a few seconds of this method being called Joule will forcibly stop the module with SIGKILL.

class joule.ReaderModule[source]

Inherit from this class and implement a run() coroutine to create a Joule reader module. Other methods documented below may be implemented as desired.

run(parsed_args: argparse.Namespace, output: joule.models.pipes.pipe.Pipe)[source]

This method must be implemented. It should run in a loop, if it returns the module stops.

Parameters:
  • parsed_args -- command line arguments, configure with custom_args()
  • output -- pipe connection to the output data stream
class ModuleDemo(ReaderModule):

    def run(self, parsed_args, output):
         while(not self.stop_requested):
            data = self.read_sensor()
            await output.write(data)

    def self.read_sensor(self) -> np.ndarray:
        # custom logic specific to the reader

    #... other module code
setup(parsed_args, app, output)[source]

Configure the module, executes before run()

Parameters:
  • parsed_args --
  • app --
  • output --
class joule.FilterModule[source]
run(parsed_args, inputs, outputs)[source]

This method must be implemented. It should run in a loop, if it returns the module stops.

Parameters:
  • parsed_args -- parsed command line arguments, configure with joule.BaseModule.custom_args()
  • inputs -- pipe connections to input streams indexed by name (specified in the module configuration file).
  • outputs -- pipe connections to output streams indexed by name (specified in the module configuration file).
class ModuleDemo(FilterModule):

    def run(self, parsed_args, inputs, outputs):
        raw = inputs["raw"]
        filtered = outputs["filtered"]
        # this filter just passes the input through to the output
         while(not self.stop_requested):
            data = await raw.read()
            await filtered.write(data)
            raw.consume(len(data))

    #... other module code
setup(parsed_args, app, inputs, outputs)[source]

Configure the module, executes before run()

Parameters:
  • parsed_args --
  • app --
  • inputs --
  • outputs --

Returns:

class joule.CompositeModule[source]
setup(parsed_args: argparse.Namespace, inputs: typing.Dict[str, joule.models.pipes.pipe.Pipe], outputs: typing.Dict[str, joule.models.pipes.pipe.Pipe], loop: asyncio.events.AbstractEventLoop)[source]

This method must be implemented

Parameters:
  • parsed_args -- parsed command line arguments
  • inputs -- pipe connections to input streams. Keys are the names specified in the module configuration file
  • outputs -- pipe connections ot output streams. Keys are the names specified in the module configuration
  • loop -- the current event loop
Returns:

array of coroutine objects

Pipes

class joule.Pipe(name=None, direction=None, module=None, stream=None, layout=None)[source]

This encapsulates streams and connects to modules. Some more infos2

Note

There are many different kinds of pipes

class DIRECTION[source]

An enumeration.

close()[source]

Close the pipe. This also closes any subscribers. If close_cb is defined it will be executed before the subscribers are closed.

close_interval()[source]

Signal a break in the data stream. This should be used to indicate missing data. Data returned from read() will be chunked by interval boundaries.

consume(num_rows)[source]

Flush data from the read buffer. The next call to read() will return any unflushed data followed by new incoming data.

Parameters:num_rows -- number of rows to flush from the read buffer
enable_cache(lines: int)[source]

Turn on caching for pipe writes. Data is only transmitted once the cache is full. This improves system performance especially if write() is called rapidly with short arrays. Once enabled, caching cannot be disabled.

Parameters:lines -- cache size
flush_cache()[source]

Force a pipe flush even if the cache is not full. Raises an error if caching is not enabled.

read(flatten=False)[source]

Read stream data. By default this method returns a structured array with timestamp and data fields. This method is a coroutine.

Parameters:flatten -- return an unstructured array (flat 2D matrix) with timestamps in the first column
Returns:numpy.ndarray
>>> data = await pipe.read()
[1, 2, 3]
>>> data = await pipe.read(flatten=True)
# the same data is returned again
[1,2,3]
>>> pipe.consume(len(data))
# next call to read will return only new data
write(data)[source]

Write timestamped data to the pipe. Timestamps must be monotonically increasing and should not overlap with existing stream data in the database. This method is a coroutine.

Parameters:
  • data (numpy.ndarray) -- May be a structured array with timestamp and data fields
  • an unstructured array with timestamps in the first column. (or) --
>>> await pipe.write([[1000, 2, 3],[1001, 3, 4]])
class joule.InputPipe(name=None, stream=None, layout=None, reader=None, reader_factory=None, close_cb=None, buffer_size=10000)[source]
BUFFER_SIZE = None

Note -- The StreamReader.read coroutine hangs even if the write side of the pipe is closed so the call is wrapped in a wait_for

class joule.OutputPipe(name=None, stream=None, layout=None, close_cb=None, writer: asyncio.streams.StreamWriter = None, writer_factory=None)[source]
class joule.LocalPipe(layout: str, loop: asyncio.events.AbstractEventLoop = None, name: str = None, close_cb=None, debug: bool = False, stream=None)[source]

Pipe for intra-module communication.

Parameters:

layout -- datatype_width, for example float32_3 for a three element stream must. See Stream.layout

Keyword Arguments:
 
  • loop -- specify a an event loop, otherwise the default one is used
  • name -- useful for debugging with multiple pipes
  • close_cb -- callback coroutine executed when pipe closes
  • debug -- enable to log pipe usage events
close_interval_nowait()[source]

Same as close_interval but this is not a coroutine. This should only be used for unit testing

close_nowait()[source]

Same as close but this is not a coroutine. This should only be used for unit testing

read_nowait(flatten=False)[source]

Same as read but this is not a coroutine. This should only be used for unit testing.

Parameters:flatten --
Returns:numpy.ndarray
>>> data = pipe.read_nowait()
[1, 2, 3]

Streams

class joule.Stream(**kwargs)[source]
name

str -- stream name

description

str -- stream description

datatype

Stream.DATATYPE -- data representation on disk see Stream Configuration

decimate

bool -- whether to store decimated data for stream visualization

folder

joule.Folder -- parent Folder

elements

List[joule.Element] -- array of stream elements

keep_us

int -- microseconds of data to keep (KEEP_NONE=0, KEEP_ALL=-1).

class DATATYPE[source]

An enumeration.

active

bool -- true if the stream is part of the data pipeline

is_remote

bool -- true if the stream resides on a remote system

layout

str -- formatted string specifying the datatype and number of elements

locked

bool -- true if the stream has a configuration file or is an active part of the data pipeline. Attributes of locked streams cannot be changed.

remote_path

str -- path on remote host, blank if the stream is local

remote_url

str -- URL of remote host, blank if the stream is local

set_remote(url: str, path: str)[source]

Associate the stream with a remote system :param url: remote URL :param path: stream path on remote system

to_json(info: typing.Dict[int, joule.models.data_store.data_store.StreamInfo] = None) → typing.Dict[source]
Args:
info: optional content added to data_info field

Returns: Dictionary of Stream attributes

class joule.Element(**kwargs)[source]
name

str -- element name

index

int -- order of element in the stream

units

str -- data unit (eg Watts, Volts, etc)

plottable

bool -- whether the data can be visualized as a time series

offset

float -- linear scaling y=mx+b, only applied to Lumen visualizations

scale_factor

float -- linear scaling only applied to Lumen visualizations

default_min

float -- fix lower limit on autoscaling in Lumen visualizations

default_max

float -- fix upper limit on autoscaling in Lumen visualizations

display_type

Element.DISPLAYTYPE -- visualization type

class DISPLAYTYPE[source]

An enumeration.

to_json()[source]

Returns: Dictionary of Element attributes

Errors

class joule.errors.SubscriptionError[source]

Error subscribing to a stream that is not available

class joule.errors.ConfigurationError[source]

Error setting up an object due to incorrect configuration

Utilities

joule.utilities.time_now()[source]
Returns:current time in UNIX microseconds
joule.utilities.timestamp_to_human(timestamp: int) → str[source]

Convert a timestamp (integer microseconds since epoch) to a human-readable string, using the local timezone for display (e.g. from the TZ env var).

joule.utilities.yesno(val: str)[source]

Convert a "yes" or "no" argument into a boolean value. Returns true if val is "yes" and false if val is "no". Raises ValueError otherwise. This is function can be used as the type parameter for to handle module arguments that are "yes|no" flags.