PostTroll
PostTroll is a message system for pytroll.
A typical use is for event-driven production chains, using messages for notifications.
To get the software, take a look on github.
Use Example
The main use of this library is the posttroll.message.Message
,
posttroll.subscriber.Subscribe
and
posttroll.publisher.Publish
classes, but the nameserver script is
also necessary. The nameserver scripts allows to register data publishers and
then for the subscribers to find them. Here is the usage of the nameserver
script:
usage: nameserver [-h] [-d {start,stop,status,restart}] [-l LOG] [-v]
optional arguments:
-h, --help show this help message and exit
-d {start,stop,status,restart}, --daemon {start,stop,status,restart}
Run as a daemon
-l LOG, --log LOG File to log to (defaults to stdout)
-v, --verbose print debug messages too
--no-multicast disable address broadcasting via multicasting
So, after starting the nameserver, making two processes communicate is fairly easy. Here is an example of publishing code:
from posttroll.publisher import Publish
from posttroll.message import Message
import time
try:
with Publish("a_service", 9000) as pub:
counter = 0
while True:
counter += 1
message = Message("/counter", "info", str(counter))
print "publishing", message
pub.send(str(message))
time.sleep(3)
except KeyboardInterrupt:
print "terminating publisher..."
And the subscribing code:
from posttroll.subscriber import Subscribe
with Subscribe("a_service", "counter",) as sub:
for msg in sub.recv():
print msg
There is also a threaded container for the listener that can be used eg. inside a class for continuously monitoring incoming messages:
from posttroll.publisher import NoisyPublisher
from posttroll.listener import ListenerContainer
from posttroll.message import Message
import time
pub = NoisyPublisher("test")
pub.start()
sub = ListenerContainer(topics=["/counter"])
# Wait that both sub and pub to register to nameserver
time.sleep(3)
for counter in range(5):
msg_out = Message("/counter", "info", str(counter))
pub.send(str(msg_out))
print "published", str(msg_out)
msg_in = sub.output_queue.get(True, 1)
print "received", str(msg_in), ""
pub.stop()
sub.stop()
If you do not want to broadcast addresses via multicasting to nameservers in your network, you can start the nameserver with the argument –no-multicast. Doing that, you have to specify the nameserver(s) explicitly in the publishing code:
from posttroll.publisher import Publish
from posttroll.message import Message
import time
try:
with Publish("a_service", 9000, nameservers=['localhost']) as pub:
counter = 0
while True:
counter += 1
message = Message("/counter", "info", str(counter))
print "publishing", message
pub.send(str(message))
time.sleep(3)
except KeyboardInterrupt:
print "terminating publisher..."
Configuration parameters
Global configuration variables that are available through a Donfig configuration object: - tcp_keepalive - tcp_keepalive_cnt - tcp_keepalive_idle - tcp_keepalive_intvl - multicast_interface - mc_group
Setting TCP keep-alive
If the network connection between a publisher and a subscriber seem to be dropping, it is possible to set TCP keep-alive settings via environment variables. Below are some rudimentary example values:
import os
os.environ["POSTTROLL_TCP_KEEPALIVE"] = "1"
os.environ["POSTTROLL_TCP_KEEPALIVE_CNT"] = "10"
os.environ["POSTTROLL_TCP_KEEPALIVE_IDLE"] = "1"
os.environ["POSTTROLL_TCP_KEEPALIVE_INTVL"] = "1"
These values need to be set before any subscriber/publisher are
created to have them take any effect. Another option is to set these
in the shell initialization, like $HOME/.bashrc
.
For further information on the 0MQ TCP keep-alive, see zmq_setsockopts for relevant socket options.
Using secure ZeroMQ backend
To use securely authenticated sockets with posttroll (uses ZMQ’s curve authentication), the backend needs to be defined through posttroll config system, for example using an environment variable:
POSTTROLL_BACKEND=secure_zmq
On the server side (for example a publisher), we need to define the server’s secret key and the directory where the accepted client keys are provided:
POSTTROLL_SERVER_SECRET_KEY_FILE=/path/to/server.key_secret
POSTTROLL_CLIENTS_PUBLIC_KEYS_DIRECTORY=/path/to/client_public_keys/
On the client side (for example a subscriber), we need to define the server’s public key file and the client’s secret key file:
POSTTROLL_CLIENT_SECRET_KEY_FILE=/path/to/client.key_secret
POSTTROLL_SERVER_PUBLIC_KEY_FILE=/path/to/server.key
These settings can also be set using the posttroll config object, for example:
>>> from posttroll import config
>>> with config.set(backend="secure_zmq", server_public_key_file="..."):
...
The posttroll configuration uses donfig, for more information, check https://donfig.readthedocs.io/en/latest/.
Generating the public and secret key pairs
In order for the secure ZMQ backend to work, public/secret key pairs need to be generated, one for the client side and one for the server side. A command-line script is provided for this purpose:
> posttroll-generate-keys -h
usage: posttroll-generate-keys [-h] [-d DIRECTORY] name
Create a public/secret key pair for the secure zmq backend. This will create two files (in the current directory if not otherwise specified) with the suffixes '.key' and '.key_secret'. The name of the files will be the one provided.
positional arguments:
name Name of the file.
options:
-h, --help show this help message and exit
-d DIRECTORY, --directory DIRECTORY
Directory to place the keys in.
Converting from older posttroll versions
Migrating from older versions of posttroll (pre v0.2), so some adaptations have to be made. Instead of data types, the services now have aliases. So, for the publishing, the following call:
with Publish("a_service", ["data_type1", "data_type2"], 9000) as pub:
would translate into:
with Publish("a_service", 9000, ["data_type1", "data_type2"]) as pub:
On the subscriber side, the following:
with Subscribe("data_type1") as sub:
would have to be changed to:
with Subscribe("a_service") as sub:
Note that the behaviour is changed: all the messages comming from the publisher
a_service would be iterated over, including messages that have another data
type than the one you want. This is why there is now the possibility to add a
subject filter directly inside the posttroll.subscriber.Subscribe
call:
with Subscribe("a_service", "data_type1") as sub:
This means that the subjects of the messages you are interested in should start with “data_type1” though…
Handling timezone-aware datetime objects
Timezone-aware datetime object were historically unsupported in posttroll, such as encoding or decoding them was leading to problems. Recent versions of posttroll were fixed to address the problem, however message sent with these versions are not backwards compatible. To ensure backwards compatibility, it is possible to configure posttroll to send message that drop the timezone information on encoding. This can be done with an environment variable:
POSTTROLL_MESSAGE_VERSION=v1.01
or within python code:
>>> from posttroll import config
>>> with config.set(message_version="v1.01"):
...
API
The publisher module gives high-level tools to publish messages on a port.
- class posttroll.publisher.NoisyPublisher(name, port=0, aliases=None, broadcast_interval=2, nameservers=None, min_port=None, max_port=None)
Same as a Publisher, but with broadcasting of its own name and address.
Setting the name to a meaningful value is import since it will be searchable in the nameserver. The port is to be provided as an int, and setting to 0 means it will be set to a random free port. aliases is a list of alternative names for the process. broadcast_interval, in seconds (2 by default) says how often the current name and address should be broadcasted. If nameservers is non-empty, multicasting will be deactivated and the publisher registers on these nameservers only
- close()
Alias for stop.
- heartbeat(min_interval=0)
Send a heartbeat … but only if min_interval seconds has passed since last beat.
- property port_number
Get the port number.
- send(msg)
Send a msg.
- start()
Start the publisher.
- stop()
Stop the publisher.
- class posttroll.publisher.Publish(name, port=0, aliases=None, broadcast_interval=2, nameservers=None, min_port=None, max_port=None)
The publishing context.
See
Publisher
andNoisyPublisher
for more information on the arguments.The publisher is selected based on the arguments, see
create_publisher_from_dict_config()
for information how the selection is done.Example on how to use the
Publish
context:from posttroll.publisher import Publish from posttroll.message import Message import time try: with Publish("my_service", port=9000) as pub: counter = 0 while True: counter += 1 message = Message("/counter", "info", str(counter)) print("publishing", message) pub.send(message.encode()) time.sleep(3) except KeyboardInterrupt: print("terminating publisher...")
- class posttroll.publisher.Publisher(address, name='', min_port=None, max_port=None)
The publisher class.
address is the current address of the Publisher, e.g.:
tcp://localhost:1234
Setting the port to 0 means that a random free port will be chosen for you. It is still possible to limit the range from which the port is selected by either setting environment variables POSTTROLL_PUB_MIN_PORT and POSTTROLL_PUB_MAX_PORT, or passing the values, as integers, using arguments min_port and max_port when creating the Publisher.
name is simply the name of the publisher.
An example on how to use the
Publisher
:from posttroll.publisher import Publisher, get_own_ip from posttroll.message import Message import time pub_address = "tcp://" + str(get_own_ip()) + ":9000" pub = Publisher(pub_address) try: counter = 0 while True: counter += 1 message = Message("/counter", "info", str(counter)) pub.send_string(message.encode()) time.sleep(3) except KeyboardInterrupt: print("terminating publisher...") pub.stop()
- close()
Alias for stop.
- heartbeat(min_interval=0)
Send a heartbeat … but only if min_interval seconds has passed since last beat.
- property name
Get the name of the publisher.
- property port_number
Get the port number from the actual publisher.
- send(msg)
Send the given message.
- start()
Start the publisher.
- stop()
Stop the publisher.
- posttroll.publisher.create_publisher_from_dict_config(settings)
Create a publisher based on dictionary of configuration items.
The publisher is created based on the given options in the following way:
setting settings[‘port’] to non-zero integer AND settings[‘nameservers’] to False will disable nameserver connections and address broadcasting, and publish the messages only on the localhost on the given port
setting settings[‘nameservers’] to a list of hostnames will connect to nameservers running on those servers, and in addition publish the messages on a random port on the localhost
setting settings[‘port’] to zero and settings[‘nameservers’] to None will broadcast the publisher address and port with multicast, and publish the messages on a random port.
The last two cases will require settings[‘name’] to be set. Additional options are described in the docstrings of the respective classes, namely
Publisher
andNoisyPublisher
.
- posttroll.publisher.get_own_ip()
Get the host’s ip number.
Simple library to subscribe to messages.
- class posttroll.subscriber.NSSubscriber(services='', topics='pytroll:/', addr_listener=False, addresses=None, timeout=10, translate=False, nameserver='localhost')
Automatically subscribe to services.
Automatic subscriptions are done by requesting addresses from the nameserver. If topics are specified, filter the messages through the beginning of the subject. addr_listener allows to add new services on the fly as they appear on the network. Additional addresses to subscribe to can be specified, and address translation can be performed if translate is set to True (False by default). The timeout here is specified in seconds. The nameserver tells which host should be used for nameserver requests, defaulting to “localhost”.
Note: ‘services = None’, means no services, and ‘services =””’ means all services. Default is to listen to all services.
- close()
Alias for stop.
- start()
Start the subscriber.
- stop()
Stop the subscriber.
- class posttroll.subscriber.Subscribe(services='', topics='pytroll:/', addr_listener=False, addresses=None, timeout=10, translate=False, nameserver='localhost', message_filter=None)
Subscriber context.
See
NSSubscriber
andSubscriber
for initialization parameters.The subscriber is selected based on the arguments, see
create_subscriber_from_dict_config()
for information how the selection is done.- Example::
del tmp
from posttroll.subscriber import Subscribe
- with Subscribe(“a_service”, “my_topic”,) as sub:
- for msg in sub.recv():
print(msg)
- class posttroll.subscriber.Subscriber(addresses, topics='', message_filter=None, translate=False)
Class for subscribing to message streams.
Subscribes to addresses for topics, and perform address translation of translate is true. The function message_filter can be used to discriminate some messages on the subscriber side. topics on the other hand performs filtering on the publishing side (from zeromq 3).
Example:
from posttroll.subscriber import Subscriber, get_pub_address addr = get_pub_address(service, timeout=2) sub = Subscriber([addr], 'my_topic') try: for msg in sub(timeout=2): print("Consumer got", msg) except KeyboardInterrupt: print("terminating consumer...") sub.close()
- add(address, topics=None)
Add address to the subscribing list for topics.
It topics is None we will subscribe to already specified topics.
- add_hook_pull(address, callback)
Specify a PULL callback in the same stream (thread) as the main receive loop.
The callback will be called with the received messages from the specified subscription. Good for pushed ‘inproc’ messages from another thread.
- add_hook_sub(address, topics, callback)
Specify a SUB callback in the same stream (thread) as the main receive loop.
The callback will be called with the received messages from the specified subscription.
Good for operations, which is required to be done in the same thread as the main recieve loop (e.q operations on the underlying sockets).
- property addresses
Get the addresses.
- close()
Close the subscriber: stop it and close the local subscribers.
- recv(timeout=None)
Receive, optionally with timeout in seconds.
- remove(address)
Remove address from the subscribing list for topics.
- property running
Check if suscriber is running.
- stop()
Stop the subscriber.
- property subscribers
Get the subscribers.
- update(addresses)
Update with a set of addresses.
- posttroll.subscriber.create_subscriber_from_dict_config(settings)
Get a subscriber class instance defined by a dictionary of configuration options.
The subscriber is created based on the given options in the following way:
setting settings[‘addresses’] to a non-empty list AND settings[‘nameserver’] to False will disable nameserver connections and connect only the listed addresses
setting settings[‘nameserver’] to a string will connect to nameserver running on the indicated server
if settings[‘nameserver’] is missing or None, the nameserver on localhost is assumed.
Additional options are described in the docstrings of the respective classes, namely
Subscriber
andNSSubscriber
.
Module for Pytroll messages.
A Message is formatted as: <subject> <type> <sender> <timestamp> <version> [mime-type data]
Message('/DC/juhu', 'info', 'jhuuuu !!!')
will be encoded as (at the right time and by the right user at the right host):
pytroll://DC/juhu info henry@prodsat 2010-12-01T12:21:11.123456 v1.01 application/json "jhuuuu !!!"
Note: the Message class is not optimized for BIG messages.
- class posttroll.message.Message(subject: str = '', atype: str = '', data='', binary: bool = False, rawstr: str | None = None, version: str = 'v1.2')
A Message.
Has to be initialized with a rawstr (encoded message to decode) OR
Has to be initialized with a subject, type and optionally data, in which case:
It will add a few extra attributes.
It will make a Message pickleable.
- static decode(rawstr)
Decode a raw string into a Message.
- encode()
Encode a Message to a raw string.
- property head
Return header of a message (a message without the data part).
- property host
Try to return a host from a sender.
- property user
Try to return a user from a sender.
- exception posttroll.message.MessageError
This modules exceptions.
- posttroll.message.create_datetime_encoder_for_version(version='v1.2')
Create a datetime encoder depending on the message protocol version.
- posttroll.message.create_datetime_json_encoder_for_version(version='v1.2')
Create a datetime json encoder depending on the message protocol version.
- posttroll.message.datetime_decoder(dct)
Decode datetimes to python objects.
- posttroll.message.datetime_encoder(obj, encoder)
Encode datetimes into iso format.
- posttroll.message.is_valid_data(obj, version='v1.2')
Check if data is JSON serializable.
- posttroll.message.is_valid_sender(obj)
Check that the sender is valid.
Currently we only check for empty strings.
- posttroll.message.is_valid_subject(obj)
Check that the message subject is valid.
Currently we only check for empty strings.
- posttroll.message.is_valid_type(obj)
Check that the message type is valid.
Currently we only check for empty strings.
Receive broadcasted addresses in a standard pytroll Message.
It will look like: /<server-name>/address info … host:port
- class posttroll.address_receiver.AddressReceiver(max_age=datetime.timedelta(seconds=600), port=None, do_heartbeat=True, multicast_enabled=True, restrict_to_localhost=False)
General thread to receive broadcast addresses.
- get(name='')
Get the address(es).
- is_running()
Check if the receiver is alive.
- process_address_message(data, pub)
Process a new address message.
- set_up_address_receiver(port)
Set up the address receiver depending on if it is multicast or not.
- start()
Start the receiver.
- stop()
Stop the receiver.
- posttroll.address_receiver.getaddress
alias of
AddressReceiver
Manage other’s subscriptions.
Default port is 5557, if $POSTTROLL_NAMESERVER_PORT is not defined.
- class posttroll.ns.NameServer(max_age=None, multicast_enabled=True, restrict_to_localhost=False)
The name server.
- run(address_receiver=None, nameserver_address=None)
Run the listener and answer to requests.
- stop()
Stop the nameserver.
- posttroll.ns.get_active_address(name, arec, message_version='v1.2')
Get the addresses of the active modules for a given publisher name.
- posttroll.ns.get_configured_nameserver_port() int
Get the configured nameserver port.
- posttroll.ns.get_pub_address(name: str, timeout: float | int = 10, nameserver: str = 'localhost')
Get the address of the named publisher.
- Parameters:
name – name of the publishers
timeout – how long to wait for an address, in seconds.
nameserver – nameserver address to query the publishers from (default: localhost).
- posttroll.ns.get_pub_addresses(names: list[str] | None = None, timeout: float = 10, nameserver: str = 'localhost')
Get the addresses of the publishers.
Kwargs: - names: names of the publishers - nameserver: nameserver address to query the publishers from (default: localhost).
- posttroll.ns.main()
Run the nameserver script.
- posttroll.ns.run(ns, logger)
Run a nameserver process.
- posttroll.ns.setup_logging(opts)
Set up logging.
Message broadcast module.
- class posttroll.message_broadcaster.AddressBroadcaster(name, address, interval, nameservers)
Class to broadcast addresses.
- class posttroll.message_broadcaster.MessageBroadcaster(msg, port, interval, designated_receivers=None)
Class to broadcast stuff.
If interval is 0 or negative, no broadcasting is done.
- is_running()
Are we running.
- start()
Start the broadcasting.
- stop()
Stop the broadcasting.
- posttroll.message_broadcaster.sendaddress
alias of
AddressBroadcaster
Send/receive UDP multicast packets.
Requires that your OS kernel supports IP multicast.
This is based on python-examples Demo/sockets/mcast.py
- class posttroll.bbmcast.MulticastReceiver(port, mcgroup=None)
Multicast receiver on port for an mcgroup.
- BUFSIZE = 1024
- close()
Close the receiver.
- settimeout(tout=None)
Set timeout.
A timeout will throw a ‘socket.timeout’.
- class posttroll.bbmcast.MulticastSender(port, mcgroup=None)
Multicast sender on port and mcgroup.
- close()
Close the sender.
- posttroll.bbmcast.SocketTimeout
alias of
TimeoutError
- posttroll.bbmcast.mcast_receiver(port, mcgroup=None)
Open a UDP socket, bind it to a port and select a multicast group.
- posttroll.bbmcast.mcast_sender(mcgroup=None)
Non-object interface for sending multicast messages.
Posttroll packages.
- posttroll.get_context()
Provide the context to use.
This function takes care of creating new contexts in case of forks.