Example code
A combination of below examples can also be used within the same class. A common use-case is a service listening on HTTP providing an API or health checks, while also performing some kind of async pub/sub tasks.
Basic HTTP based service
Code for a pretty basic service that serves data over HTTP.
import tomodachi
class Service(tomodachi.Service):
name = "http-example"
# Request paths are specified as regex for full flexibility
@tomodachi.http("GET", r"/resource/(?P<id>[^/]+?)/?")
async def resource(self, request, id):
# Returning a string value normally means 200 OK
return f"id = {id}"
@tomodachi.http("GET", r"/health")
async def health_check(self, request):
# Return can also be a tuple, dict or even an aiohttp.web.Response
# object for more complex responses - for example if you need to
# send byte data, set your own status code or define own headers
return {
"body": "Healthy",
"status": 200,
}
# Specify custom 404 catch-all response
@tomodachi.http_error(status_code=404)
async def error_404(self, request):
return "error 404"
RabbitMQ or AWS SNS+SQS event based messaging service
Example of a service that calls a function when messages are published on an AMQP topic exchange.
import tomodachi
class Service(tomodachi.Service):
name = "amqp-example"
# The "message_envelope" attribute can be set on the service class to
# build / parse data.
# message_envelope = ...
# A route / topic on which the service will subscribe to via
# RabbitMQ / AMQP.
@tomodachi.amqp("example.topic")
async def example_func(self, message):
# Received message, fordarding the same message as response on
# another route / topic.
await tomodachi.amqp_publish(
self,
message,
routing_key="example.response"
)
AWS SNS+SQS event based messaging service
Example of a service using AWS SNS+SQS managed pub/sub messaging. AWS SNS and AWS SQS together brings managed message queues for microservices, distributed systems, and serverless applications hosted on AWS. tomodachi services can customize their enveloping functionality to both unwrap incoming messages and/or to produce enveloped messages for published events / messages. Pub/sub patterns are great for scalability in distributed architectures, when for example hosted in Docker on Kubernetes.
import tomodachi
class Service(tomodachi.Service):
name = "aws-example"
# The "message_envelope" attribute can be set on the service class to
# build / parse data.
# message_envelope = ...
# Using the @tomodachi.aws_sns_sqs decorator to make the service
# create an AWS SNS topic, an AWS SQS queue and to make a subscription
# from the topic. The queue will be polled to receive messages using
# SQS.ReceiveMessages API.
@tomodachi.aws_sns_sqs("example-topic", queue_name="example-queue")
async def example_func(self, message):
# Received message, forwarding the same message as response on
# another topic.
await tomodachi.aws_sns_sqs_publish(
self,
message,
topic="another-example-topic"
)
Scheduled function calls / cron scheduling
Examples of how function calls may be invoked on a set interval, etc.
import tomodachi
class Service(tomodachi.Service):
name = "scheduler-example"
@tomodachi.hourly
async def every_hour(self) -> None:
print("Called every hour")
# Using cron notation
@tomodachi.schedule(interval="1/2 8-18 * * mon-fri")
async def work_hours(self) -> None:
print("Every odd minute between 8-18 on weekdays")
# The last Tuesday of January and March at 05:30 AM
@tomodachi.schedule(interval="30 5 * jan,mar Ltue")
async def advanced_cron_notation_scheduling(self) -> None:
print("The last Tuesday of January and March at 05:30 AM")
# Executed at 22:15:30 every day (server timezone)
@tomodachi.schedule(timestamp="22:15:30")
async def as_timestamp(self) -> None:
print("Local server time is now 22:15:30")
# Midnight in Sweden ("Europe/Stockholm" timezone)
@tomodachi.schedule(timestamp="00:00:00", timezone="Europe/Stockholm")
async def midnight_in_sweden(self) -> None:
print("It's midnight in Sweden")
# Called every (interval=20) seconds
@schedule(interval=20)
async def every_twenty_seconds(self) -> None:
print("Every 20 seconds")
# Called every (interval=20) seconds and directly on service start
@schedule(interval=20, immediately=True)
async def every_twenty_seconds_and_immediately(self) -> None:
print("Every 20 seconds and immediately")
More example code
There are other examples available with code for inter-communication pub/sub between different services on both AMQP or AWS SNS+SQS as shown above and examples using message attributes / filter policies, etc.
- GitHub repository: https://github.com/kalaspuff/tomodachi/tree/master/examples
Updated about 3 years ago