Replicating orderbooks from Websocket stream with Python and Asyncio

Introduction

We are currently working on time-series database solution for collecting high-frequency crypto-exchanges data – namely for tick data and one-minute orderbook snapshots. We’ve developed REST API collector bots which are continuously fetching data from numerous REST API endpoints and saving them to database. This solution would work in the perfect world but that’s not where we do live. During the traffic peaks the endpoints are often unreachable and connections to them time out regularly which in turn has effect of missing valuable data. The first upgrade in collectors infrastructure was to deploy collector bots on two hosts and synchronize both databases. Did it work? Well, it did, but we can do better. We deploy third collector node which is scraping not via REST API but via Websocket(WS).


There are quite significant differences in collectors implementation collecting via REST or WS. Let’s compare both APIs with respect to high-frequency data.

Orderbook data

When fetched via REST API the process is very straightforward. You will usually make HTTP GET request with orderbook depth and pair parameter and in turn get JSON formatted response. Saving the data into the database is the matter of proper parse. Parameter options may vary from exchange to exchange but orderbook depth and pair are considered to be standard.

When fetched via Websocket the whole process goes little bit twisted. Firstly we need to subscribe to the proper channel on WS server provided by exchange via subscription message which usually contains settings regarding the data stream. Then we have WS connection established and we are able to receive messages. How do these messages look like regarding orderbook data? Convention is to get initial complete orderbook snapshot in the first message. In every other consequent message we get just data which were changed and we have to incorporate these update messages to the orderbook instance in the client program and thus update it.

Tick data

When fetched via REST API we are making GET request generally with pair and since parameter. We can simply remember last saved tick (according to the trade id or timestamp) and fetch only the new data. Unfortunately these parameters are not available on all exchanges which are sometimes trying to be original and introduce new exotic parameters or new since technique.

WS tick data stream is simpler then its orderbook brother. Every new message we get from WS server is new trade which has occurred on exchange.

Pros & Cons

REST API calls are subject to HTTP requests limits. If you hit the limit you get your IP blocked usually for few minutes. On the other hand if exchange REST API server gets down we are still able to retrieve historical ticks exactly thanks to the since parameter if available. In the case of the orderbook we are out of luck because historical orderbook snapshots are impossible to get directly from exchanges.

WS messages flow in stream hence there are no API limits and you can receive messages up to the real-time speed. If WS exchange server gets down and your stream is interrupted you are unable to retrieve historical ticks. What was once broadcast is never broadcast again. Sometimes exchanges broadcast messages with useless data which can cause serious overhead network traffic. WS is slightly trickier to implement due to the work with streams and not with discrete data packages.


Asynchronous programming – quick intro

In a nutshell – imagine you need to make 20 HTTP requests to different hosts. Let’s imagine that these requests are part of some function – coroutine. Now we have 20 coroutines which constitute task. If you would go classical synchronous way you would execute above mentioned coroutines one after one which would result in final processing time as follows – 20x request <—> response waiting time + 20x request/response cpu processing time on your machine. This is huge waste of time. If you manage this task asynchronously you start the coroutine and as soon as program finds out it’s waiting (and not processing) it immediately starts processing next coroutine and so on. When request in one of the coroutine completed the program goes back into it and continues processing code after “waiting” point. Note that program is going back to the “waiting points randomly in time and requests are not handled sequentially as in case of synchronous programming. On the other hand the “waiting” points must be declared in the code. Such leaping behaviour is called switching context.

We say that such tasks (or particular coroutines) which cause waiting(and not processing) of program which they are called from to be blocking or I/O bound. Network communication tasks can be considered as blocking as well as reading/writing operations. Now it’s clear that even communication over WS protocol is blocking task and that asynchronous programming makes sense regarding it.


Implementation

This code rebuilds multiple orderbooks using asyncio coroutines. There is no waiting for blocking tasks (waiting for messages after receive() call). I’m using here the Bitfinex WS API. Program is continuously updating opted pairs orderbooks keeping them in global orderbooks instance. From there they are printed out every 10 seconds in the pretty table structured as you are used to see on exchanges platforms. But what is the point here is that we have orderbooks available in real-time in the orderbooks global variable!

I built following solution on Python’s asyncio library if you are not familiar with it I recommend this tutorial which is in my humble opinion the shortest and most accessible one you can find on the internet.

import aiohttp
import asyncio
import ujson
from tabulate import tabulate
from copy import deepcopy

# Pairs which generate orderbook for.
PAIRS = [
        'BTCUSD',
        'ETCBTC',
        # 'ETCUSD',
        # 'ETHBTC',
        # 'ETHUSD',
        # 'XMRBTC',
        # 'XMRUSD',
        # 'ZECBTC',
        # 'ZECUSD'
    ]

# If there is n pairs we need to subscribe to n websocket channels.
# This the subscription message template.
# For details about settings refer to https://bitfinex.readme.io/v2/reference#ws-public-order-books.
SUB_MESG = {
        'event': 'subscribe',
        'channel': 'book',
        'freq': 'F1',
        'len': '25',
        'prec': 'P0'
        # 'pair': <pair>
    }


def build_book(res, pair):
    """ Updates orderbook.
    :param res: Orderbook update message.
    :param pair: Updated pair.
    
    """

    global orderbooks

    # Filter out subscription status messages.
    if res.data[0] == '[':

        # String to json
        data = ujson.loads(res.data)[1]

        # Build orderbook
        # Observe the structure of orderbook. The prices are keys for corresponding count and amount.
        # Structuring data in this way significantly simplifies orderbook updates.
        if len(data) > 10:
            bids = {
                       str(level[0]): [str(level[1]), str(level[2])]
                       for level in data if level[2] > 0
            }

            asks = {
                       str(level[0]): [str(level[1]), str(level[2])[1:]]
                       for level in data if level[2] < 0
            }

            orderbooks[pair]['bids'] = bids
            orderbooks[pair]['asks'] = asks

        # Update orderbook and filter out heartbeat messages.
        elif data[0] != 'h':

            # Example update message structure [1765.2, 0, 1] where we have [price, count, amount].
            # Update algorithm pseudocode from Bitfinex documentation:
            # 1. - When count > 0 then you have to add or update the price level.
            #   1.1- If amount > 0 then add/update bids.
            #   1.2- If amount < 0 then add/update asks.
            # 2. - When count = 0 then you have to delete the price level.
            #   2.1- If amount = 1 then remove from bids
            #   2.2- If amount = -1 then remove from asks

            data = [str(data[0]), str(data[1]), str(data[2])]
            if int(data[1]) > 0:  # 1.

                if float(data[2]) > 0:  # 1.1
                    orderbooks[pair]['bids'].update({data[0]: [data[1], data[2]]})

                elif float(data[2]) < 0:  # 1.2
                    orderbooks[pair]['asks'].update({data[0]: [data[1], str(data[2])[1:]]})

            elif data[1] == '0':  # 2.

                if data[2] == '1':  # 2.1
                    if orderbooks[pair]['bids'].get(data[0]):
                        del orderbooks[pair]['bids'][data[0]]

                elif data[2] == '-1':  # 2.2
                    if orderbooks[pair]['asks'].get(data[0]):
                        del orderbooks[pair]['asks'][data[0]]

async def print_books():
    """ Prints orderbooks snapshots for all pairs every 10 seconds. """
    global orderbooks
    while 1:
        await asyncio.sleep(10)
        for pair in PAIRS:
            bids = [[v[1], v[0], k] for k, v in orderbooks[pair]['bids'].items()]
            asks = [[k, v[0], v[1]] for k, v in orderbooks[pair]['asks'].items()]
            bids.sort(key=lambda x: float(x[2]), reverse=True)
            asks.sort(key=lambda x: float(x[0]))
            table = [[*bid, *ask] for (bid, ask) in zip(bids, asks)]
            headers = ['bid:amount', 'bid:count', 'bid:price', 'ask:price', 'ask:count', 'ask:amount']
            print('orderbook for {}'.format(pair))
            print(tabulate(table, headers=headers))


async def get_book(pair, session):
    """ Subscribes for orderbook updates and fetches updates. """
    print('enter get_book, pair: {}'.format(pair))
    pair_dict = deepcopy(SUB_MESG)
    pair_dict.update({'pair': pair})
    async with session.ws_connect('wss://api.bitfinex.com/ws/2') as ws:
        ws.send_json(pair_dict)
        while 1:
            res = await ws.receive()
            # print(pair_dict['pair'], res.data)  # debug
            build_book(res, pair)

async def main():
    """ Driver coroutine. """
    async with aiohttp.ClientSession() as session:
        coros = [
            get_book(pair, session)
            for pair in PAIRS
        ]
        # Append coroutine for printing orderbook snapshots every 10s.
        coros.append(print_books())

        await asyncio.wait(coros)

orderbooks = {
    pair: {}
    for pair in PAIRS
}
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Code analysis

Let’s start on line 136. We declare empty orderbook dictionary which serves as shared variable between orderbook update and reading coroutines. On line 140 and 141 we create and start asyncio event loop which in turn executes main() coroutine. Let’s move to line 124-134. The main() coro initializes aiohttp session which we will use for all WS connections in the script. On line 127 we create list of get_book() coroutines already equipped with arguments. The rest of main() coro just appends printing coro to the coros list and registers the content of coros to the event loop. As soon as code run reaches line 134 the control is switched back to the event loop because await asyncio.wait(coros) is blocking call.

Now we have as many get_book() coros running as many PAIRS we uncommented before code execution + one printing print_books() coro.

Let’s enter into execution of one of the get_book() coros on line 112. First run of every get_book() coro prepares subscription message, initiates websocket connector and sends subscription message. Every get_book() coro has different websocket connection, every get_book() fetches orderbook messages for one currency pair.

Now comes the important part. When code control reaches line 120 – res = await ws.receive() in while loop – it immediately returns control to the event loop hence another get_book() coro can be executed and so on. Now imagine that all of our get_book() coros were executed and all of them are waiting on the line 120 for response message[1]. If res = await ws.receive() is unblocked (receives message) in arbitrary get_book() instance, the waiting event loop grasp opportunity and immediately continues code execution after unblocked res = await ws.receive() statement and so on.

When control reaches build_book() function call it simply continues program execution sequentially until build_book() is complete. Then another get_book() while loop starts with same blocking mechanism as described above. build_book() function is matter of websocket message parse. It creates orderbook from the initial orderbook message with complete orderbook snapshot and thereafter incorporates new update messages into the global orderbook instance with each subsequent update message. That way we have ever updating orderbook instance available and we can use it for whatever we want.

For demonstration purposes we registered the print_books() function on the event loop which prints out orderbook snapshots from the global orderbooks variable. Note that code in while loop is blocked periodically for 10 seconds by await asyncio.sleep(). In other words the content of while loop is being unblocked (available for execution on the event loop) every 10 seconds.



1. This situation actually never occurs or the waiting time is negligible simply because the blocking call in one of the get_book() coro on line 120 unblocks before processing of lines 114-118 in all other get_book() coros finishes.

Leave a Reply