Documentation
code/hello-world/ $ tomodachi run service/app.py

AWS SNS+SQS messaging

Publish / Subscribe

πŸ“˜

See the example service after the decorator descriptions for a reference implementation of how the invoked functions may be used.

Invoker functions for AWS SNS+SQS events – pub/sub

@tomodachi.aws_sns_sqs(
    topic=None, 
    competing=True, 
    queue_name=None, 
    filter_policy=None, 
    **kwargs
)

This would set up an AWS SQS queue (queue_name,) subscribing to messages on the AWS SNS topic topic (if a topic is specified), whereafter it will start consuming messages from the queue.

The competing value is used when the same queue name should be used for several services of the same type and thus "compete" for who should consume the message. Since tomodachi version 0.19.x this value has a changed default value and will now default to True as this is the most likely use-case for pub/sub in distributed architectures.

Unless queue_name is specified an auto generated queue name will be used. Additional prefixes to both topic and queue_name can be assigned by setting the options.aws_sns_sqs.topic_prefix and options.aws_sns_sqs.queue_name_prefix parameters on the service class.

SNS topic subscription with filter policy (applied to match message attributes)

The filter_policy value of specified as a keyword argument will be applied on the SNS subscription (for the specified topic and queue) as the "FilterPolicy" attribute. This will apply a filter on SNS messages using the chosen "message attributes" and/or their values specified in the filter. Make note that the filter policy dict structure differs somewhat from the actual message attributes, as values to the keys in the filter policy must be a dict (object) or list (array). Example: A filter policy value of {"event": ["order_paid"], "currency": ["EUR", "USD"]} would set up the SNS subscription to receive messages on the topic only where the message attribute "event" is "order_paid" and the "currency" value is either "EUR" or "USD".

If filter_policy is not specified as an argument (default), the queue will receive messages on the topic as per already specified if using an existing subscription, or receive all messages on the topic if a new subscription is set up (default). Changing the filter_policy on an existing subscription may take several minutes to propagate. Read more about the filter policy format on AWS.

Related to the above mentioned filter policy, the aws_sns_sqs_publish function (which is used for publishing messages) can specify "message attributes" using the optional message_attributes keyword argument. Values should be specified as a simple dict with keys and values. Example: {"event": "order_paid", "paid_amount": 100, "currency": "EUR"}.

Message enveloping functionality / custom enveloping / raw messaging

Depending on if the service message_envelope attribute (previously named message_protocol) is used, parts of the enveloped data would be included as keyword arguments to the decorated function. It's usually safe to just use data as an argument. You can also specify a specific message_envelope value as a keyword argument to the decorator for custom enveloping method to use instead of the global one set for the service.

If you're utilizing from tomodachi.envelope import ProtobufBase and using ProtobufBase as the service's message_envelope you may also pass the keyword argument proto_class into the decorator, describing the protobuf (Protocol Buffers) generated Python class to use for decoding incoming messages. Likewise additional keyword arguments set on the decorator will be passed into the envelope class' decoder function parse_message. Custom enveloping classes can be built to fit your existing architecture or for even more control of tracing and shared metadata between services.

Encryption – options related to SNS + SQS encryption at rest using AWS KMS

Encryption at rest for AWS SNS and/or AWS SQS which can optionally be configured by specifying the KMS key alias or KMS key id as a tomodachi service option options.aws_sns_sqs.sns_kms_master_key_id (to configure encryption at rest on the SNS topics for which the tomodachi service handles the SNS -> SQS subscriptions) and/or options.aws_sns_sqs.sqs_kms_master_key_id (to configure encryption at rest for the SQS queues which the service is consuming).

Note that an option value set to empty string ("") or False will unset the KMS master key id and thus disable encryption at rest. (The AWS APIs for SNS and SQS uses empty string value to the KMSMasterKeyId attribute to disable encryption with KMS if it was previously enabled).

If instead an option is completely unset or set to None value no changes will be done to the KMS related attributes on an existing topic or queue.

If it's expected that the services themselves, via their IAM credentials or assumed role, are responsible for creating queues and topics, these options could be used to provide encryption at rest without additional manual intervention.

Make sure that the key policy / role policy are set to allow KMS use and the SNS -> SQS functionality. https://aws.amazon.com/premiumsupport/knowledge-center/sns-topic-sqs-queue-sse-cmk-policy/

Do not use these options if you instead are using IaC tooling to handle the topics, queues and subscriptions or that they for example are created / updated as a part of deployments. To not have the service update any attributes keep the options unset or set to a None value.

Read more at


Example implementation (AWS SNS+SQS)

import tomodachi


class Service(tomodachi.Service):
    name = "aws-example"

    options = {
        "aws_sns_sqs.region_name": None,  # AWS region (example: 'eu-west-1')
        "aws_sns_sqs.aws_access_key_id": None,  # AWS access key id
        "aws_sns_sqs.aws_secret_access_key": None,  # AWS secret key
    }

    # The "message_envelope" attribute can be set on the service class to 
    # build / parse data.

    # message_envelope = ...

    # Using the @tomodachi.aws_sns_sqs decorator to make the service 
    # create an AWS SNS topic, an AWS SQS queue and to make a subscription 
    # from the topic. The queue will be polled to receive messages using
    # SQS.ReceiveMessages API.
    @tomodachi.aws_sns_sqs("example-topic", queue_name="example-queue")
    async def example_func(self, message):
        # Received message, forwarding the same message as response on 
        # another topic.
        await tomodachi.aws_sns_sqs_publish(
            self, 
            message, 
            topic="another-example-topic"
        )