API Reference

Note

The Joule Application Programming Interface (API) uses asynchronous coroutines and must be run inside an event loop. To run the examples in this documentation interactively use the asyncio module:

$> python3 -m asyncio
asyncio REPL 3.8.10
...
>>> import asyncio
>>>

A Jupyter Notebook that provides an overview of the API functionality is available here. To run this notebook follow the commands below to install Jupyter and matplotlib, retrieve the notebook file, and start the Jupyter server:

$> pip install jupyterlab matplotlib # prefix with sudo for system-wide install
$> wget https://raw.githubusercontent.com/wattsworth/joule/master/API_demo.ipynb
$> jupyter lab --ip=0.0.0.0 # add the --ip flag to allow external connections

Node

class joule.api.Node(url: str, loop: asyncio.AbstractEventLoop)
Parameters:
url:

Joule node (default: http://localhost:8088)

loop:

Event Loop (default: current event loop)

A node represents a Joule instance and is the only means to access API methods. Joule modules have a node instance created automatically that refers to the node where it is running.

""" Inside a joule.ReaderModule class """
async def run(self, parsed_args, output):
    node_info = await self.node.info()
    # other run code

Standalone scripts may connect to any authorized node using the api.get_node function:

>>> import joule
>>> my_node = joule.api.get_node()

The rest of this section describes class methods separated by category.

Folder Actions

Node.folder_root() joule.Folder

Retrieve the node's root folder. This returns the entire database structure as shown in the example below.

Example:
>>> root = await node.folder_root()
>>> root
<joule.api.Folder id=2728 name='root' description=None locked=True>
>>> root.children
[<joule.api.Folder id=3123 name='tmp' description=None locked=False>,
 <joule.api.Folder id=2729 name='archive' description=None locked=True>,
 <joule.api.Folder id=2730 name='live' description=None locked=True>]
Node.folder_get(folder: joule.Folder | str | int) joule.Folder

Retrieve the specified folder. Folder may be specified by a joule.api.Folder object, a path, or numeric ID. Raises joule.errors.ApiError if folder specification is invalid.

Examples:
>>> folder = await node.folder_get("/parent/my_folder")  # query by path
<joule.api.Folder id=2729 name='archive' description=None locked=True>
>>> live = await node.folder_get(2730) # query by ID
<joule.api.Folder id=2730 name='my_folder' description=None locked=True>
>>> await node.folder_get("/does/not/exist") # raises ApiError
joule.errors.ApiError: folder does not exist [404]
Node.folder_move(source: Folder | str | int, destination: Folder | str | int) None

Move the source folder into the destination folder. The source and destination may be specified by joule.Folder objects, paths, or numeric ID's. The source folder name must be unique in the destination and not be locked (actively in use by a module or statically configured) Raises joule.errors.ApiError if folder specifications are invalid or the requested move cannot be performed. The destination is automatically created if it does not exist.

Examples:
>>> await node.folder_move("/parent1/my_folder","/parent2")
>>> parent2 = await node.folder_get("/parent2")
>>> parent2.children
[<joule.api.Folder id=3321 name='my_folder' description=None locked=False>]
>>> await node.folder_move("/parent1/missing_folder","/parent2") # rasises ApiError
joule.errors.ApiError: folder does not exist [404]
Node.folder_update(folder: joule.Folder) None

Update folder attributes. The name and description are the only writable attributes. Raises joule.errors.ApiError if folder is locked or the specification is invalid

Example:
>>> folder = await node.folder_get("/parent/my_folder")
<joule.api.Folder id=3329 name='my_folder' description=None locked=False>
>>> folder.name="new name"
>>> await node.folder_update(folder) # save the change
>>> parent = await node.folder_get("/parent")
>>> parent.children # folder 3329 has a new name
[<joule.api.Folder id=3329 name='new name' description=None locked=False>]
Node.folder_delete(folder: Folder | str | int, recursive: bool) None:

Delete the specified folder. If recursive is True delete any child folders as well. Raises joule.errors.ApiError if the folder specification is invalid or if the folder has children and recursive is False

Example:
>>> folder = await node.folder_get("/parent")
>>> folder.children
[<joule.api.Folder id=3331 name='my_folder' description=None locked=False>]
>>> await node.folder_delete(folder, False) # raises ApiError because of child folder
joule.errors.ApiError: specify [recursive] or remove child folders first [400]
>>> await node.folder_delete(folder, True) # successfully removes /parent

DataStream Actions

Node.data_stream_get(stream: DataStream | str | int) DataStream:

Retrieve the specified stream. DataStream may be specified by a joule.api.DataStream object, a path, or numeric ID. Raises joule.errors.ApiError if stream specification is invalid.

Examples:
>>> stream = await node.data_stream_get("/parent/my_folder/stream")
<joule.api.DataStream id=2627 name='stream' description='' datatype='int32'
 is_configured=False is_source=False is_destination=False locked=False decimate=True>
>>> stream = await node.data_stream_get(2627)
<joule.api.DataStream id=2627 name='stream' description='' datatype='int32'
 is_configured=False is_source=False is_destination=False locked=False decimate=True>
>>> await node.data_stream_get("/does/not/exist") # raises ApiError
joule.errors.ApiError: stream does not exist [404]
Node.data_stream_move(stream: DataStream | str | int, folder: Folder | str | int) None:

Move a stream into a different folder. The stream and folder may be specified by objects, paths, or numeric ID's. The stream name must be unique in the destination and not be locked (actively in use by a module or statically configured). Raises joule.errors.ApiError if stream or folder specifications are invalid or the requested move cannot be performed. The destination is automatically created if it does not exist.

Examples:
>>> await node.data_stream_move("/parent1/my_folder/stream","/parent2")
>>> parent2 = await node.folder_get("/parent2")
>>> parent2.data_streams
[<joule.api.DataStream id=2627 name='stream' description='' datatype='int32'
 is_configured=False is_source=False is_destination=False locked=False decimate=True>]
>>> await node.data_stream_move("/does/not/exist","/parent2") # raises ApiError
joule.errors.ApiError: stream does not exist [404]
Node.data_stream_update(stream: DataStream) None:

Update stream and element attributes. Raises joule.errors.ApiError if stream is locked or the specification is invalid. The datatype and number of elements may not be changed.

Example:
>>> stream = await node.data_stream_get("/parent/my_folder/stream")
>>> stream.elements # Element name is "Element1"
[<joule.api.Element id=3192 index=0, name='Element1' units=None
 plottable=True display_type='CONTINUOUS'>]
>>> stream.elements[0].name="New Name"
>>> await node.data_stream_update(stream) # send updated values to Joule
>>> updated_stream = await node.data_stream_get(stream) # refresh local copy
>>> updated_stream.elements # Element name is now "New Name"
[<joule.api.Element id=3192 index=0, name='New Name' units=None
 plottable=True display_type='CONTINUOUS'>]
Node.data_stream_delete(stream: DataStream | str | int) None:

Delete a stream. DataStream may be specified by a joule.api.DataStream object, a path, or numeric ID. Raises joule.errors.ApiError if the stream specification is invalid or if the stream is locked. To remove data within a stream see Node.data_delete().

Example:
>>> folder = await node.folder_get("/parent/my_folder")
>>> folder.data_streams # my_folder has one stream
[<joule.api.DataStream id=2627 name='stream' description='' datatype='int32'
 is_configured=False is_source=False is_destination=False locked=False decimate=True>]
>>> await node.data_stream_delete("/parent/my_folder/stream") # delete the stream
>>> folder = await node.folder_get("/parent/my_folder")
>>> folder.data_streams # my_folder is now empty
[]
Node.data_stream_create(stream: DataStream, folder: Folder | str | int) DataStream:

Create a stream and place in the specified folder. Folder may be specified by object, path or numeric ID. See joule.api.DataStream for details on creating DataStream objects. Raises joule.errors.ApiError if the stream or folder specification is invalid. If the folder is specified by path it will be created if it does not exist.

Example:
>>> new_stream = joule.api.DataStream(name="New Stream")
>>> new_stream.elements = [joule.api.Element(name="Element1")]
>>> await node.data_stream_create(new_stream,"/parent/new_folder")
<joule.api.DataStream id=2628 name='New Stream' description='' datatype='float32'
 is_configured=False is_source=False is_destination=False locked=False decimate=True>
Node.data_stream_info(stream: DataStream | str | int) DataStreamInfo:

Get information about a stream as a joule.api.DataStreamInfo object. DataStream may be specified by a joule.api.DataStream object, a path, or numeric ID. Raises joule.errors.ApiError if the stream specification is invalid.

Example:
>>> await node.data_stream_info("/parent/my_folder/stream")
<joule.api.DataStreamInfo start=1551730769556442 end=1551751402742424
 rows=61440, total_time=20633185982>
Node.data_stream_annotation_delete(stream: Union[DataStream, str, int], start: Optional[int] = None,
end: Optional[int] = None) -> None:

Remove annotations from this stream. If start and/or end is specified only remove annotations within this range. If a bound is not specified it defaults to the extreme (min/max) timestamp in the stream.

Data Actions

Node.data_read(stream: DataStream | str | int, start: int | None = None, end: int | None = None, max_rows: int | None = None) Pipe

Read historic data from a stream. Specify timestamp bounds for a particular range or omit to read all historic data. This method returns a pipe which should be used to read the data. The pipe must be closed after use. See Pipes for details on Joule Pipes.

Parameters:
stream:

DataStream specification, may be a joule.api.DataStream object, path, or numeric ID

start:

Timestamp in UNIX microseconds. Omit to read from beginning of the stream.

end:

Timestamp in UNIX microseconds. Omit to read until the end of the stream.

max_rows:

Return a decimated view of the data with at most this many rows, decimations are provided in powers of 4.

Returns:
pipe:

A joule.models.Pipe connection to the specified stream.

Example:
>>> pipe = await node.data_read("/parent/my_folder/stream")
>>> data = await pipe.read() # for large data run in a loop
array([(1551730769556442,      0), (1551730769657062,     10),
       (1551730769757882,     20), ..., (1551751850688250, 661380),
       (1551751850795677, 661390), (1551751850896756, 661400)],
      dtype=[('timestamp', '<i8'), ('data', '<i4')])
>>> pipe.consume(len(data)) # flush the pipe
>>> await pipe.close() # close the data connection
Node.data_subscribe(stream: DataStream | str | int) Pipe

Read live data from a stream. The stream must be actively produced by a module. This method returns a pipe which should be used to read the data. The pipe must be closed after use. See Pipes for details on Joule Pipes.

Example:
>>> pipe = await node.data_read("/live/stream")
>>> data = await pipe.read() # run in a loop for continuous updates
array([(1551730769556442,      0), (1551730769657062,     10),
       (1551730769757882,     20), ..., (1551751850688250, 661380),
       (1551751850795677, 661390), (1551751850896756, 661400)],
      dtype=[('timestamp', '<i8'), ('data', '<i4')])
>>> pipe.consume(len(data)) # flush the pipe
>>> await pipe.close() # close the data connection
Node.data_write(stream: DataStream | str | int, start: int | None = None, end: int | None = None)

Write data to a stream. The stream must not be an active destination from any other source. Optionally specify start and end timestamps to remove existing data over the interval you plan to write. This is required when writing to a NilmDB backend as intervals are write-once. Writing into an existing interval with the default TimescaleDB backend will merge the new data with the existing data although this is not recommended. This method returns a pipe which should be used to write the data. The pipe must be closed after use. See Pipes for details on Joule Pipes.

Example:
>>> import numpy as np
>>> pipe_out = await node.data_write("/parent/my_folder/stream")
>>> for i in range(4):
...   await pipe_out.write(np.array([[joule.utilities.time_now(), i]]))
>>> await pipe_out.close()
>>> pipe_in = await node.data_read("/parent/my_folder/stream") # read the data back
>>> await pipe_in.read()
array([(1551758297114942, 0.), (1551758297115062, 1.),
       (1551758297115090, 2.), (1551758297115111, 3.)],
      dtype=[('timestamp', '<i8'), ('data', '<f4')])
Node.data_delete(stream: DataStream | str | int, start: int | None = None, end: int | None = None) None:

Delete data from a stream. Specify timestamp bounds to delete a particular range or omit to delete all data. Deleting a range of data creates an interval break in the stream as show in the example below.

Example:
>>> await node.data_intervals("/parent/my_folder/stream")
[[1551759387204004, 1551863787204004]]
>>> left=1551795387204004  # Tue, 05 Mar 2019 08:16:27
>>> right=1551831387204004 # Tue, 05 Mar 2019 19:16:27
>>> await node.data_delete("/parent/my_folder/stream", start=left, end=right)
>>> await node.data_intervals("/parent/my_folder/stream")
[[1551759387204004, 1551791787204004],
 [1551831387204004, 1551863787204004]]
Node.data_intervals(stream: DataStream | str | int, start: int | None = None, end: int | None = None) List

Retrieve list of data intervals. See Intervals for details on data intervals. Specify timestamp bounds to list intervals over a particular range or omit to list all intervals.

Examples:
>>> await node.data_intervals("/parent/my_folder/stream") # unbroken data stream
[[1551759387204004, 1551863787204004]]
>>> await node.data_intervals("/parent/my_folder/stream") # one segment of missing data
[[1551730769556442, 1551757125630201],
 [1551758221956119, 1551811071052711]]
>>> await node.data_intervals("/parent/my_folder/stream") # no stream data
[]

Annotation Actions

Node.annotation_create(annotation: Annotation, stream: int | str | DataStream) Annotation

Add an annotation. Create a new joule.api.Annotation object locally and associate it with a data stream. The stream may be specified by path, ID, or a joule.api.DataStream object

Example:
>>> from joule import utilities
>>> event_note = Annotation(title='Event Annotation',
                       start=utilities.human_to_timestamp('now'))
>>> await node.annotation_create(event_note, '/path/to/stream')
>>> range_note = Annotation(title='Range Annotation',
                       start=utilities.human_to_timestamp('1 minute ago'),
                       end=utilities.human_to_timestamp('now'))
>>> await node.annotation_create(range_note, '/path/to/stream')
Node.annotation_delete(annotation: int | Annotation) None

Remove an annotation. The annotation may be specified by ID or joule.api.Annotation object.

Example:
>>> await node.annotation_delete(5) # assuming 5 is a valid annotation ID
Node.annotation_update(annotation: Annotation) Annotation

Update an annotation with new title or content. The time ranges (start,end) may not be changed

Example:
>>> await node.annotation_update(annotation) # assuming annotation already exists on the stream
Node.annotation_get(stream: 'DataStream' | str | int, start: int | None, end: int | None) List[Annotation]:

Retrieve annotations for a particular stream. Specify timestamps to only retrieve annotations over a particular interval. The stream may be specified by path, ID, or a joule.api.DataStream object

Example:
>>> annotations = await node.annotation_get("/path/to/stream")
>>> print(annotations[0].name)
"Demo Annotation"

Event Stream Actions

Node.event_stream_get(stream: EventStream | str | int) EventStream

Retrieve the specified stream. EventStream may be specified by a joule.api.EventStream object, a path, or a numeric ID. Raises joule.errors.ApiError if stream specification is invalid.

Examples:

Notes

Node.event_stream_move(stream: DataStream | str | int, folder: Folder | str | int) None

Move a stream into a different folder. The stream and folder may be specified by objects, paths, or numeric ID's. The stream name must be unique in the destination. Raises joule.errors.ApiError if stream or folder specifications are invalid or the requested move cannot be performed. The destination is automatically created if it does not exist.

Examples:
>>> await node.event_stream_move("/parent1/my_folder/stream","/parent2")
>>> parent2 = await node.folder_get("/parent2")
>>> parent2.event_streams
[<joule.api.DataStream id=2627 name='stream' description='' datatype='int32'
 is_configured=False is_source=False is_destination=False locked=False decimate=True>]
>>> await node.event_stream_move("/does/not/exist","/parent2") # raises ApiError
joule.errors.ApiError: stream does not exist [404]
Node.event_stream_update(stream: EventStream) None

Update the event stream name and/or description. Raises joule.errors.ApiError if the specification is invalid.

Example:
>>> stream = await node.event_stream_get("/parent/my_folder/stream")
>>> stream.elements # Element name is "Element1"
[<joule.api.Element id=3192 index=0, name='Element1' units=None
 plottable=True display_type='CONTINUOUS'>]
>>> stream.elements[0].name="New Name"
>>> await node.data_stream_update(stream) # send updated values to Joule
>>> updated_stream = await node.data_stream_get(stream) # refresh local copy
>>> updated_stream.elements # Element name is now "New Name"
[<joule.api.Element id=3192 index=0, name='New Name' units=None
 plottable=True display_type='CONTINUOUS'>]
Node.event_stream_delete(stream: EventStream | str | int) None

Delete a stream. EventStream may be specified by a joule.api.EventStream object, a path, or numeric ID. Raises joule.errors.ApiError if the stream specification is invalid or if the stream is locked. To remove data within a stream see Node.event_stream_remove().

Example:
>>> folder = await node.folder_get("/parent/my_folder")
>>> folder.event_streams # my_folder has one stream
[<joule.api.EventStream id=39 name='stream' description='example'>
>>> await node.event_stream_delete("/parent/my_folder/stream") # delete the stream
>>> folder = await node.folder_get("/parent/my_folder")
>>> folder.event_streams # my_folder is now empty
[]
Node.event_stream_create(stream: EventStream, folder: Folder | str | int) EventStream

Create an event stream and place in the specified folder. Folder may be specified by object, path or numeric ID. See joule.api.EventStream for details on creating EventStream objects. Raises joule.errors.ApiError if the stream or folder specification is invalid. If the folder is specified by path it will be created if it does not exist.

Example:
>>> new_stream = joule.api.EventStream(name="New Stream")
>>> await node.event_stream_create(new_stream,"/parent/new_folder")
<joule.api.EventStream id=38 name='New Stream' description=''>
Node.event_stream_info(stream: EventStream | str | int) EventStreamInfo

Get information about a stream as a joule.api.EventStreamInfo object. EventStream may be specified by a joule.api.EventStream object, a path, or numeric ID. Raises joule.errors.ApiError if the stream specification is invalid.

Example:
>>> await node.event_stream_info("/parent/my_folder/stream")
<joule.api.EventStreamInfo start=1643858892000000 end=1643891982000000
    events=10, total_time=33090000000>
Node.event_stream_write(stream: EventStream | str | int, events: List[Event]) None

Add events to an existing stream. EventStream may be specified by a joule.api.EventStream object, a path, or numeric ID. Events is a list of joule.api.Event objects.

Example:
>>> from joule.utilities import time_now
>>> e1 = Event(start_time=time_now()-1e6, end_time = time_now(), content={'name': 'event1'})
>>> await asyncio.sleep(2)
>>> e2 = Event(start_time=time_now()-1e6, end_time = time_now(), content={'name': 'event2'})
>>> await node.event_stream_write("/plugs/events",[e1,e2])
Node.event_stream_read(stream: EventStream | str | int, start: int | None = None, end: int | None = None, limit: int | None = None, json_filter: string | None = None) List[Event]

Read events from an existing stream. Returns a list of joule.api.Event objects.

Parameters:
stream:

Event stream, path, numeric ID, or joule.api.EventStream object

start:

UNIX timestamp (microseconds). Omit to read from start of stream.

end:

UNIX timestamp (microseconds). Omit to read to end of stream.

limit:

Limit query to first N result.

json_filter:

Select events based on content, see below.

JSON Filter Format:

Specify a list of lists where each item is a tuple of [key,comparison,value] Outer lists are OR conditions, inner lists are AND conditions. Comparison must be one of the following

  • String Comparison: is, not, like, unlike

  • Numeric Comparison: gt, gte, lt, lte, eq, neq

Example:

Retrieve the first 100 events where name is LIKE sample OR value is between 0 AND 10:

>>>  #       ((-------- test ----------)  OR (------test------ AND ------test-------))
>>> filter = [[['name','like','sample']]   , [['value','gt',0],    ['value','lt',10]]]
>>> await node.event_stream_read(stream,limit=100,json_filter=json.dumps(filter))
Node.event_stream_remove(stream: EventStream | str | int, start: int | None = None, end: int | None = None, json_filter: string | None = None)

Remove events from an existing stream. EventStream may be specified by a joule.api.EventStream object, a path, or numeric ID. Specify start and/or end timestamps to remove over events over a specific time range. Use json_filter to remove events based on content.

Module Actions

Node.module_list(statistics: bool = False) List[Module]

Retrieve a list of the current Joule modules as joule.api.Module objects. If statistics is True retrieve CPU and memory statistics for each module. Collecting statistics takes additional time because the CPU usage is averaged over a short time interval.

Example:
>>> await node.module_list()
[<joule.api.Module id=0 name='plus1' description='adds 1 to the input' is_app=False>,
 <joule.api.Module id=1 name='counter' description='counts up by 10s' is_app=False>]
Node.module_get(module: Module | str | int, statistics: bool = False) Module

Retrieve a specific module as a joule.api.Module object. If statistics is True retrieve CPU and memory statistics for each module. Collecting statistics takes additional time because the CPU usage is averaged over a short time interval. Module may be specified by object, name, or numeric ID.

Example:
>>> my_module = await node.module_get("my module")
<joule.api.Module id=0 name='my module' description='adds 1 to the input' is_app=False>
>>> my_module.statistics
<joule.api.ModuleStatistics pid=1460 create_time=1551805343.4 cpu_percent=5.60 memory_percent=6.23>
Node.module_logs(module: Module | str | int) List[str]

Retrieve a list of module logs. Logs are the stdout and stderr streams from the module. The easiest way to generate logs is by adding print statements to a module. The maximum number of lines is controlled by the MaxLogLines parameter in the main configuration file (see System Configuration). Logs are automatically rolled if a module produces more than the maximum number of lines. The module may be specified by object, name, or numeric ID.

Example:
>>> await node.module_logs("my module")
['[2019-03-04T22:57:01.049266]: ---starting module---',
 '[2019-03-04T22:59:02.089463]: serial input restarted',
 '[2019-03-04T23:04:36.948160]: WARNING: temperature > 98.6']

Proxy Actions

Node.proxy_list() List[Proxy]

Retrieve a list of proxied URL's as joule.api.Proxy objects.

Example:
>>> await node.proxy_list()
[<joule.api.Proxy id=0 name='flask_app' proxied_url='http://localhost:8088/interface/p0'
  target_url='http://localhost:5000'>,
 <joule.api.Proxy id=1 name='intranet_host' proxied_url='http://localhost:8088/interface/p1'
  target_url='http://internal.domain.com'>]
Node.proxy_get(module: Proxy | str | int) Proxy

Retrieve a specific proxy as a joule.api.Proxy object. Proxy may be specified by object, name, or numeric ID.

Example:
>>> await node.proxy_get("flask app")
<joule.api.Proxy id=0 name='flask app' proxied_url='http://localhost:8088/interface/p0'
 target_url='http://localhost:5000'>

Master Actions

Node.master_add(master_type: str, identifier: str, lumen_parameters: Dict | None = None) Master:

Grant API access to a user or node. It is recommended to use the CLI to add masters to Joule

Parameters:
master_type:

one of [user|joule_node|lumen_node]

identifier:

master name, either username, URL, or domain name

lumen_parameters:

authentication credentials required for a lumen master

Example:
>>> todo
Node.master_list() List[Master]

Retrieve a list of masters that can control this node as joule.api.Master objects.

Example:
>>> todo
Node.master_removet(master: Master | str)

Remove the specified master, revokes API access priveleges

Follower Actions

Node.follower_list() List[BaseNode]

Retrieve a list of nodes that can be controlled by this node as joule.api.BaseNode objects.

Example:
>>> todo
Node.follower_remove(follower: BaseNode | str)

Remove the specified follower, does not invalidate the associated API key

Database Actions

Node.db_connect() sqlalchemy.engine.Engine:

Create a connection to the node database. Note the node's pg_hba.conf must allow remote connections to the database.

Node.db_connection_info() joule.utilities.ConnectionInfo

Connection information necessary to connect to the node database. This is useful if the IP address or domain name must be changed before connecting to the databse. Returns a joule.utilities.ConnectionInfo object.

Models

class joule.api.Folder(name='')[source]

API Folder model. See Folder Actions for details on using the API to manipulate folders. Folders are locked if they contain locked streams (streams that are active or statically configured). Folder objects should not be created directly. They should be received via API calls.

Parameters:
  • name (str) -- folder name, must be unique in the parent

  • description (str) -- optional field

  • locked (bool) -- folder may not be moved, deleted, or changed

  • data_streams (List[DataStream]) -- data streams in the folder

  • event_streams (List[EventStream]) -- event streams in the folder

  • children (List[Folder]) -- subfolders in the folder

class joule.api.DataStream(name: str = '', description: str = '', datatype: str = 'float32', keep_us: int = -1, elements: List[Element] | None = None)[source]

API DataStream model. See DataStream Actions for details on using the API to manipulate data streams. Streams are locked if they are active or statically configured. When creating a stream manually, omit the ID and status attributes (is_*, active, and locked), these are set by the Joule server.

Parameters:
  • name (str) -- stream name, must be unique in the parent

  • description (str) -- optional field

  • datatype (str) -- element datatype

  • keep_us (int) -- store the last N microseconds of data (-1 to keep all and 0 to keep none)

  • is_configured -- is the stream statically configured with a *.conf file

  • is_source -- is the stream an active data source

  • is_destination -- is the stream an active data destination

  • active (bool) -- is the stream a source or destination

  • locked (bool) -- is the stream active or configured

  • decimate (bool) -- is the stream data decimated for visualization

  • elements (List[Element]) -- list of the stream elements

class joule.api.DataStreamInfo(start: int | None, end: int | None, rows: int, total_time: int = 0, bytes: int = 0)[source]

API DataStreamInfo model. Received from Node.data_stream_info() and should not be created directly.

Warning

Rows and Bytes values are approximate

Parameters:
  • start (int) -- timestamp in UNIX microseconds of the first data element

  • end (int) -- timestamp in UNIX microsseconds of the last data element

  • rows (int) -- approximate rows of data in the stream

  • bytes (int) -- approximate size of the data on disk

  • total_time (int) -- data duration in microseconds (start-end)

class joule.api.Element(name: str = '', units: str = '', plottable: bool = True, display_type: str = 'continuous')[source]

API Element model. Streams have one or more elements. See DataStream Configuration for details on the stream data model.

Parameters:
  • id (int) -- unique numeric ID assigned by Joule

  • index (int) -- column position in the data array (0 = first element)

  • name (str) -- element name

  • units (str) -- unit of measurement, may be any string

  • plottable (bool) -- should the element be visible in the Lumen plotting interface

  • [continous|discrete|event] (display_type) -- plot type, defaults to continuous

  • offset (float) -- offset data visualization by y=(x-offset)*scale_factor

  • scale_factor (float) -- scale data visualation with above equation

  • default_max (float) -- fix auto scale max (set to None to fit plotted data)

  • default_min (float) -- fix auto scale min (set to None to fit plotted data)

class joule.api.Module(id: int, name: str, description: str, is_app: bool, inputs: Dict[str, DataStream], outputs: Dict[str, DataStream], statistics: ModuleStatistics | None = None)[source]

API Module model. See Module Actions for details on using the API to query modules. See Modules for details on writing new modules. See Module Configuration for details on adding modules to Joule. Use Node.data_stream_get() to retrieve the associated stream from the path string in the inputs and outputs dictionaries.

Parameters:
  • id (int) -- unique numeric ID assigned by Joule server

  • name (str) -- module name, must be unique

  • description (str) -- optional field

  • is_app (bool) -- whether the module provides a web interface

  • inputs (Dict) -- Mapping of [pipe_name] = stream_path for module inputs

  • outputs (Dict) -- Mapping of [pipe_name] = stream_path for output connections

  • statistics (ModuleStatistics) -- execution statistics, may be None depending on API call parameters

class joule.api.ModuleStatistics(pid: int, create_time: int, cpu_percent: float, memory_percent: float)[source]

API ModuleStatistics model. Received by settings statistics to True when calling Node.module_get() or Node.module_info() and should not be created directly.

Parameters:
  • pid (int) -- process ID

  • create_time (int) -- process creation time in UNIX seconds

  • cpu_percent (float) -- approximate CPU usage

  • memory_percent (float) -- approximate memory usage

class joule.api.Annotation(title: str, start: int, end: int | None = None, content: str = '')[source]

API Annotation model. See Annotation Actions for details on using the API to manipulate annotations. Annotations are associated with streams and may either coverage a range of data or a single event. If end is None the annotation marks an event, otherwise it marks a range.

Parameters:
  • title (string) -- annotation title

  • content (string) -- additional description (optional)

  • start (int) -- Unix microsecond timestamp

  • end (int) -- specify for range annotation, omit for event annotation

Errors

class joule.errors.ApiError[source]

Error generated by an API call. Catches all API related errors.

class joule.errors.StreamNotFound[source]

Type of APIError. Requested stream does not exist

class joule.errors.EmptyPipeError[source]

Type of APIError. Attempt to read from an empty pipe

Note

The following errors are not expected in typical usage:

class joule.errors.SubscriptionError[source]

Error subscribing to a stream that is not available

class joule.errors.ConfigurationError[source]

Error setting up an object due to incorrect configuration

class joule.errors.DataError[source]

Error generated by backend storage (nilmdb or timescale)

class joule.errors.DecimationError[source]

Requested data is not sufficiently decimated

Utilities

class joule.utilities.ConnectionInfo(username: str, password: str, port: int, database: str, host: str, nilmdb_url)[source]

Encapsulates database connection information

to_dsn() str[source]

Convert connection parameters to a DSN string used by SQLAlchemy and other database engines.

to_json() Dict[source]

Convert connection parameters to a dictionary of attributes appropriate for transmission as JSON

joule.utilities.human_to_timestamp(time: str) int[source]

Convert a time specification into a UNIX microsecond timestamp. Time specification may be a wide variety of date formats, relative interval such as "one minute ago", or a numeric timestamp. Raises :exc:ValueError for invalid time specification

joule.utilities.time_now() int[source]
Returns:

current time in UNIX microseconds

joule.utilities.timestamp_to_human(timestamp: int) str[source]

Convert a timestamp (integer microseconds since epoch) to a human-readable string, using the local timezone for display (e.g. from the TZ env var).

joule.utilities.yesno(val: str)[source]

Convert a "yes" or "no" argument into a boolean value. Returns true if val is "yes" and false if val is "no". Raises ValueError otherwise. This is function can be used as the type parameter for to handle module arguments that are "yes|no" flags.

class joule.utilities.ConnectionInfo

Returned by joule.api.db_connection_info()

Parameters:
username (str):

database username

password (str):

database password

port (int):

database port

database (str):

database namne

host (str):

hostname

Methods: