Async and Sync Python Pub/Sub with Redis

Aug. 1, 2022

Usually, Redis is used for caching layer, but also can be leveraged for indexing, communication, data storage, and time series patterns.

Redis pub/sub feature

In this communication pattern, senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Rather, published messages send into channels, without knowledge of what subscribers there may be.

In Redis, these messages are fire-and-forget, in that if a message is published and no subscribers exists, the message evaporates and cannot be recovered, so no queue.

For more reliable message delivery using a queue, the command RPOPLPUSH can be utilized. But all logic moved into the client side, responsible to do message queue management, e.g: https://github.com/adjust/rmq

Example pub/sub client, listen to command channel using the command subscribe, using command-line:

$ redis-cli
127.0.0.1:6379> subscribe command
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "command"
3) (integer) 1

then try sending a message using the publish command

$ redis-cli publish command whoami
(integer) 1

The publish command returns an integer reply: the number of clients that received the message.

Note that in a Redis Cluster, only clients that are connected to the same node as the publishing client is included in the count.

On client console will get:

1) "message"
2) "command"
3) "whoami"

Python async

Since Python 3.4, async concurrent programming added to the standard library using asyncio module, before in Python ecosystem to use lightweight thread (goroutine in Golang, task in Rust Tokio) Python program/app need to set up Stackless, greenlet, or using epoll (Tornado, Twisted).

Using async makes the Python program more scalable and handles better concurrency, since Python use GIL only one thread can be run in one interpreter process.

For comparison, let's use httpx library both support sync and async to do HTTP get request to my website:

> GET /test/1 HTTP/2
> Host: saktidwicahyono.name
> accept: */*

< HTTP/2 200 
< content-type: text/html
< vary: Accept-Encoding
< date: Sat, 30 Jul 2022 13:51:42 GMT
< content-length: 633
< server: deno/asia-southeast1-a

The program will fetch GET request 100x times in /test/{i} path and assert response status code should be 200. Here is the sync code:

import httpx

def main():
    for i in range(1, 101):
        r = httpx.get(f"https://saktidwicahyono.name/test/{i}")
        assert r.status_code == 200

if __name__ == "__main__":
    main()

Async version:

import asyncio
import httpx

async def main():
    async with httpx.AsyncClient() as client:
        for i in range(1, 101):
            r = await client.get(f"https://saktidwicahyono.name/test/{i}")
            assert r.status_code == 200


if __name__ == "__main__":
    asyncio.run(main())

To do benchmarking hyperfine is used, and close others program to remove noise and keep resource usage not affected by other programs (network connection also should be conditioned e.g: using cabled and no other network activity, but I use shared internet and WiFi from my laptop)

$ hyperfine 'python request_async.py' 'python request_sync.py'
Benchmark 1: python request_async.py
  Time (mean ± σ):      3.432 s ±  0.189 s    [User: 0.777 s, System: 0.145 s]
  Range (min … max):    3.179 s …  3.785 s    10 runs

Benchmark 2: python request_sync.py
  Time (mean ± σ):     12.569 s ±  0.257 s    [User: 2.835 s, System: 0.301 s]
  Range (min … max):   12.334 s … 13.091 s    10 runs

Summary
  'python request_async.py' ran
    3.66 ± 0.22 times faster than 'python request_sync.py'

In this use case, async run 3.66x faster than sync since sync will do HTTP requests sequentially but async will do interleave between requests.

Note: since the comparison program is IO-bound, using async gives significant performance improvement, but if the program is CPU-bound better use thread-pool with work-stealing scheduler (in real multi-thread). In Python use multiprocessing module.

redis-py

redis-py is one of many Redis clients available in Python, and listed by Redis documentation, also this lib used by Django if you are using Django cache framework using Redis which officially supported in Django v4.

This library support pub/sub feature both in sync and async mode. For this post scenario, there is a use case to communicate between web app using sync mode and bot app using async mode. The web app act as the publisher and the bot app act as the subscriber, web app want to send notification as a chat message using functionality available in bot app, so these apps need to communicate with each others.

Let's define a schema, assume the bot app needs only user id and notification content and encoded it using JSON. Since the purpose is for notification, simply name the channel as notification.

{
  "user_id": 42,
  "message": "Your order is on its way!"
}

web app publisher

import json
import redis

CHANNEL_NAME = "notification"
r = redis.Redis()


def send_notification(user_id: int, message: str):
    """
    Send a notification to a user.
    note: this maybe located in services.py
    """
    ...
    r.publish(CHANNEL_NAME, json.dumps({"user_id": user_id, "message": message}))


def main():
    # imagine this in view / API endpoint or signals
    send_notification(42, "Your order is on its way!")


if __name__ == "__main__":
    main()

bot app subscriber

import asyncio
import async_timeout
import json
import logging
import redis.asyncio as redis

CHANNEL_NAME = "notification"

logging.basicConfig(level=logging.DEBUG)


async def send_message(user_id: int, message: str):
    # write real code here
    logging.debug(f"Sending message to user {user_id}: {message}")
    await asyncio.sleep(0.2)


async def handle_notification():
    r = redis.Redis()
    pubsub = r.pubsub()
    await pubsub.subscribe(CHANNEL_NAME)
    while True:
        try:
            async with async_timeout.timeout(1):
                message = await pubsub.get_message()
                if message and message["type"] == "message":
                    payload = json.loads(message["data"])
                    # TODO: do validation on payload
                    await send_message(payload["user_id"], payload["message"])
        except (asyncio.TimeoutError, json.decoder.JSONDecodeError) as e:
            logging.error(e)


if __name__ == "__main__":
    asyncio.run(handle_notification())

Summary

With the simple pub/sub feature provided by Redis, two apps can communicate to do some use cases. As stated above in Redis, the messages are fire-and-forget so not as reliable as using dedicated message broker, distributed commit log or dedicated task queue.

But, depends on the requirements if your app only needs simple functionality maybe Redis pub/sub solution is enough, and don't bother with additional setup/operational costs dedicated to those services.


         

古池や 蛙飛び込む 水の音

Return to blog

footer