SBE BBO Integration
Overview
Bybit provides real-time Level 1 Orderbook data in SBE (Simple Binary Encoding) format, following the FIX Protocol SBE 1.0 specification.
- The current version only supports SBE binary format.
- For linear / inverse / spot Level 1 data: if 3 seconds have elapsed without a change in the orderbook, a snapshot message will be pushed again, and the field
uwill be the same as that in the previous message. - Under extreme market conditions, there are merge and packet-drop strategies on both the generation side and the push side, so
ucontinuity is not guaranteed.
Connection
Testnet
wss://stream-testnet.bybits.org/v5/public-sbe/spotwss://stream-testnet.bybits.org/v5/public-sbe/linearwss://stream-testnet.bybits.org/v5/public-sbe/inverse
Mainnet
xxxx.bybit-aws.com/v5/public-sbe/spotxxxx.bybit-aws.com/v5/public-sbe/linearxxxx.bybit-aws.com/v5/public-sbe/inverse
Please use your dedicated MMWS host for mainnet SBE connections.
Data Frame Type
SBE messages are transmitted as binary WebSocket frames (opcode = 2).
Control Messages
Control messages (subscription, unsubscription, ping/pong, etc.) follow the Bybit V5 WebSocket JSON standard.
Subscription Flow
Send subscription request
{
"op": "subscribe",
"args": ["ob.rpi.1.sbe.BTCUSDT"]
}
- Topic format:
ob.rpi.1.sbe.<symbol> - Example symbols:
BTCUSDT,ETHUSDT, etc.
Subscription confirmation
{
"success": true,
"ret_msg": "",
"conn_id": "d30fdpbboasp1pjbe7r0",
"req_id": "xxx",
"op": "subscribe"
}
Receive data
b"R\x00 N\x01\x00\x00\x00\xdb\x84\xd0k\x00\x00\x00\x00f\xb7\x003\x99\x01\x00\x00\x02\x06\xa1\xcb\xa1\x00\x00\x00\x00\x00\xe7\xda\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\xc8\xa1\x00\x00\x00\x00\x00 N\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x008\x01\x00\x00\x00\x00\x00\x00v\xba\x003\x99\x01\x00\x00\x07BTCUSDT"
Decode example
{
"header": {
"block_length": 82,
"template_id": 20000,
"schema_id": 1,
"version": 0
},
"seq": 1808827611,
"cts": 1757497309030,
"price_exponent": 2,
"size_exponent": 6,
"ask_price": 1060342500,
"ask_normal_size": 776935000000,
"ask_rpi_size": 0,
"bid_price": 1060250000,
"bid_normal_size": 20000000000,
"bid_rpi_size": 0,
"u": 312,
"ts": 1757497309814,
"symbol": "BTCUSDT",
"parsed_length": 98
}
SBE Message Structure
SBE XML Schema
- The
templateId = 20000identifies the message type. - Validate that
templateId = 20000to confirm it is a Level 1 Orderbook event.
<?xml version="1.0" encoding="UTF-8"?>
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
xmlns:mbx="https://bybit-exchange.github.io/docs/v5/intro"
package="quote.sbe"
id="1"
version="0"
semanticVersion="1.0.0"
description="Bybit market data streams SBE message schema"
byteOrder="littleEndian"
headerType="messageHeader">
<types>
<composite name="messageHeader" description="Template ID and length of message root">
<type name="blockLength" primitiveType="uint16"/>
<type name="templateId" primitiveType="uint16"/>
<type name="schemaId" primitiveType="uint16"/>
<type name="version" primitiveType="uint16"/>
</composite>
<composite name="varString8" description="Variable length UTF-8 string.">
<type name="length" primitiveType="uint8"/>
<type name="varData"
length="0"
primitiveType="uint8"
semanticType="String"
characterEncoding="UTF-8"/>
</composite>
</types>
<!-- Stream event for "ob.rpi.1.sbe.<symbol>" channel -->
<sbe:message name="BestOBRpiEvent" id="20000">
<field id="1" name="ts" type="int64" description="The timestamp in microseconds that the system generates the data"/>
<field id="2" name="seq" type="int64" description="Cross sequence ID"/>
<field id="3" name="cts" type="int64" description="The timestamp in microseconds from the matching engine when this orderbook data is produced."/>
<field id="4" name="u" type="int64" description="Update Id"/>
<field id="5" name="askNormalPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best ask normal price"/>
<field id="6" name="askNormalSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best ask normal size"/>
<field id="7" name="askRpiPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best ask rpi price"/>
<field id="8" name="askRpiSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best ask rpi size"/>
<field id="9" name="bidNormalPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best bid normal price"/>
<field id="10" name="bidNormalSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best bid normal size"/>
<field id="11" name="bidRpiPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best bid rpi price"/>
<field id="12" name="bidRpiSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best bid rpi size"/>
<field id="13" name="priceExponent" type="int8" description="Price exponent for decimal point positioning"/>
<field id="14" name="sizeExponent" type="int8" description="Size exponent for decimal point positioning"/>
<data id="55" name="symbol" type="varString8"/>
</sbe:message>
</sbe:messageSchema>
Message Structure Details
Message Header (8 bytes)
| Field | Type | Size (bytes) | Description |
|---|---|---|---|
| blockLength | uint16 | 2 | Message body length |
| templateId | uint16 | 2 | Fixed = 20000 |
| schemaId | uint16 | 2 | Fixed = 1 |
| version | uint16 | 2 | Fixed = 0 |
Message Body (BestOBRpiEvent)
| ID | Field | Type | Description |
|---|---|---|---|
| 1 | ts | int64 | Snapshot timestamp (µs) |
| 2 | seq | int64 | Unique message sequence number |
| 3 | cts | int64 | Trade timestamp (µs) |
| 4 | u | int64 | Update ID |
| 5 | askNormalPrice | int64 | Best ask price mantissa |
| 6 | askNormalSize | int64 | Best ask size (normal) mantissa |
| 7 | askRpiPrice | int64 | Best RPI ask price mantissa |
| 8 | askRpiSize | int64 | Best RPI ask size mantissa |
| 9 | bidNormalPrice | int64 | Best bid price mantissa |
| 10 | bidNormalSize | int64 | Best bid size (normal) mantissa |
| 11 | bidRpiPrice | int64 | Best bid price (RPI) mantissa |
| 12 | bidRpiSize | int64 | Best bid size (RPI) mantissa |
| 13 | priceExponent | int8 | Price exponent |
| 14 | sizeExponent | int8 | Size exponent |
| 55 | symbol | varStr | Trading pair (e.g., BTCUSDT) |
Variable-Length String (varString8)
uint8 length(1 byte) – number of following characters- UTF-8 string – trading symbol
Example: "BTCUSDT" encoded as:
0x07 0x42 0x54 0x43 0x55 0x53 0x44 0x54
^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
len "BTCUSDT"
Optimisation
Optimisation 1
ts and cts will be displayed in microseconds for BBO SBE.
Optimisation 2
Adjust the field order.
Optimisation 3
New field definitions (sell side as example; buy side is analogous):
| Field | Definition |
|---|---|
| askNormalPrice | No RPI order best ask price |
| askNormalSize | No RPI order best ask size |
| askRpiPrice | RPI order best ask price |
| askRpiSize | RPI order best ask size |
The current logic might result in:
| Price | normalQty | rpiQty |
|---|---|---|
| 1000 | 0 | 100 |
This means that users without RPI permissions won't know at what price they can take their orders, making this message useless. To address this, we adjust the message content as follows.
Case 1: askNormalSize != 0 && askRpiSize != 0
This is a normal response based on the actual situation. This case conveys the same meaning as the original message.
Example:
| Field | Definition | Example |
|---|---|---|
| askNormalPrice | No RPI order best ask price | 1000 |
| askNormalSize | No RPI order best ask size | 200 |
| askRpiPrice | RPI order best ask price | 1000 |
| askRpiSize | RPI order best ask size | 300 |
Case 2: askNormalSize != 0 && askRpiSize == 0
askRpiPrice is assigned the value askNormalPrice, and askRpiSize = 0.
In this case, the askRpiPrice value will not be searched further.
Example:
| Field | Definition | Example | Note |
|---|---|---|---|
| askNormalPrice | No RPI order best ask price | 1000 | |
| askNormalSize | No RPI order best ask size | 200 | |
| askRpiPrice | RPI order best ask price | 1000 | This itself has no meaning. The price field is assigned a non-RPI sell price. |
| askRpiSize | RPI order best ask size | 0 |
Case 3: askNormalSize == 0 && askRpiSize != 0
askNormalPrice = the actual non-RPI asking price.
In this case, askNormalPrice is retrieved and returned.
Example:
| Field | Definition | Example |
|---|---|---|
| askNormalPrice | No RPI order best ask price | 1200 |
| askNormalSize | No RPI order best ask size | 100 |
| askRpiPrice | RPI order best ask price | 1000 |
| askRpiSize | RPI order best ask size | 20 |
Case 4
When the market is so bad that there is no liquidity, no message is pushed.
Integration Example
import json
import logging
import struct
import threading
import time
from datetime import datetime
from typing import Dict, Any
import websocket
logging.basicConfig(
filename="logfile_wrapper.log",
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
# Change symbol/topic as you wish
TOPIC = "ob.rpi.1.sbe.BTCUSDT"
WS_URL = "wss://stream-testnet.bybits.org/v5/public-sbe/spot"
class SBEBestOBRpiParser:
"""
Parser for BestOBRpiEvent (template_id = 20000) per XML schema:
ts(int64), seq(int64), cts(int64), u(int64),
askNormalPrice(int64), askNormalSize(int64),
askRpiPrice(int64), askRpiSize(int64),
bidNormalPrice(int64), bidNormalSize(int64),
bidRpiPrice(int64), bidRpiSize(int64),
priceExponent(int8), sizeExponent(int8),
symbol(varString8)
All values are little-endian.
"""
def __init__(self) -> None:
# Header: blockLength, templateId, schemaId, version
self.header_fmt = "<HHHH"
self.header_sz = struct.calcsize(self.header_fmt)
# 12 x int64 + 2 x int8:
# ts, seq, cts, u,
# askNormalPrice, askNormalSize, askRpiPrice, askRpiSize,
# bidNormalPrice, bidNormalSize, bidRpiPrice, bidRpiSize,
# priceExponent, sizeExponent
self.body_fmt = "<" + ("q" * 12) + "bb"
self.body_sz = struct.calcsize(self.body_fmt)
self.target_template_id = 20000
def _parse_header(self, data: bytes) -> Dict[str, Any]:
if len(data) < self.header_sz:
raise ValueError("insufficient data for SBE header")
block_length, template_id, schema_id, version = struct.unpack_from(
self.header_fmt, data, 0
)
return {
"block_length": block_length,
"template_id": template_id,
"schema_id": schema_id,
"version": version,
}
@staticmethod
def _parse_varstring8(data: bytes, offset: int) -> tuple[str, int]:
if offset + 1 > len(data):
raise ValueError("insufficient data for varString8 length")
(length,) = struct.unpack_from("<B", data, offset)
offset += 1
if offset + length > len(data):
raise ValueError("insufficient data for varString8 bytes")
s = data[offset : offset + length].decode("utf-8")
offset += length
return s, offset
@staticmethod
def _apply_exponent(value: int, exponent: int) -> float:
# Exponent is for decimal point positioning.
# If exponent = 2 and value=1060342500 -> 10603425.00
return value / (10 ** exponent) if exponent >= 0 else value * (
10 ** (-exponent)
)
def parse(self, data: bytes) -> Dict[str, Any]:
hdr = self._parse_header(data)
if hdr["template_id"] != self.target_template_id:
raise NotImplementedError(
f"unsupported template_id={hdr['template_id']}"
)
if len(data) < self.header_sz + self.body_sz:
raise ValueError("insufficient data for BestOBRpiEvent body")
fields = struct.unpack_from(self.body_fmt, data, self.header_sz)
(
ts,
seq,
cts,
u,
ask_np_m,
ask_ns_m,
ask_rp_m,
ask_rs_m,
bid_np_m,
bid_ns_m,
bid_rp_m,
bid_rs_m,
price_exp,
size_exp,
) = fields
offset = self.header_sz + self.body_sz
symbol, offset = self._parse_varstring8(data, offset)
# Apply exponents
ask_np = self._apply_exponent(ask_np_m, price_exp)
ask_ns = self._apply_exponent(ask_ns_m, size_exp)
ask_rp = self._apply_exponent(ask_rp_m, price_exp)
ask_rs = self._apply_exponent(ask_rs_m, size_exp)
bid_np = self._apply_exponent(bid_np_m, price_exp)
bid_ns = self._apply_exponent(bid_ns_m, size_exp)
bid_rp = self._apply_exponent(bid_rp_m, price_exp)
bid_rs = self._apply_exponent(bid_rs_m, size_exp)
return {
"header": hdr,
"ts": ts,
"seq": seq,
"cts": cts,
"u": u,
"price_exponent": price_exp,
"size_exponent": size_exp,
"symbol": symbol,
# Normal book (best)
"ask_normal_price": ask_np,
"ask_normal_size": ask_ns,
"bid_normal_price": bid_np,
"bid_normal_size": bid_ns,
# RPI book (best)
"ask_rpi_price": ask_rp,
"ask_rpi_size": ask_rs,
"bid_rpi_price": bid_rp,
"bid_rpi_size": bid_rs,
"parsed_length": offset,
}
parser = SBEBestOBRpiParser()
# --------------------------- WebSocket handlers ---------------------------
def on_message(ws, message):
try:
# Binary SBE frames; text frames for control/acks/errors
if isinstance(message, (bytes, bytearray)):
decoded = parser.parse(message)
logging.info(
"SBE %s seq=%s u=%s "
"NORM bid=%.8f@%.8f ask=%.8f@%.8f "
"RPI bid=%.8f@%.8f ask=%.8f@%.8f ts=%s",
decoded["symbol"],
decoded["seq"],
decoded["u"],
decoded["bid_normal_price"],
decoded["bid_normal_size"],
decoded["ask_normal_price"],
decoded["ask_normal_size"],
decoded["bid_rpi_price"],
decoded["bid_rpi_size"],
decoded["ask_rpi_price"],
decoded["ask_rpi_size"],
decoded["ts"],
)
print(
f"{decoded['symbol']} u={decoded['u']} "
f"NORM: {decoded['bid_normal_price']:.8f} x {decoded['bid_normal_size']:.8f} "
f"| {decoded['ask_normal_price']:.8f} x {decoded['ask_normal_size']:.8f} "
f"RPI: {decoded['bid_rpi_price']:.8f} x {decoded['bid_rpi_size']:.8f} "
f"| {decoded['ask_rpi_price']:.8f} x {decoded['ask_rpi_size']:.8f} "
f"(seq={decoded['seq']} ts={decoded['ts']})"
)
else:
try:
obj = json.loads(message)
logging.info("TEXT %s", obj)
print(obj)
except json.JSONDecodeError:
logging.warning("non-JSON text frame: %r", message)
except Exception as e:
logging.exception("decode error: %s", e)
print("decode error:", e)
def on_error(ws, error):
print("WS error:", error)
logging.error("WS error: %s", error)
def on_close(ws, *_):
print("### connection closed ###")
logging.info("connection closed")
def on_open(ws):
print("opened")
sub = {"op": "subscribe", "args": [TOPIC]}
ws.send(json.dumps(sub))
print("subscribed:", TOPIC)
threading.Thread(target=ping_per, args=(ws,), daemon=True).start()
threading.Thread(target=manage_subscription, args=(ws,), daemon=True).start()
def manage_subscription(ws):
# demo: unsubscribe/resubscribe once
time.sleep(20)
ws.send(json.dumps({"op": "unsubscribe", "args": [TOPIC]}))
print("unsubscribed:", TOPIC)
time.sleep(5)
ws.send(json.dumps({"op": "subscribe", "args": [TOPIC]}))
print("resubscribed:", TOPIC)
def ping_per(ws):
while True:
try:
ws.send(json.dumps({"op": "ping"}))
except Exception:
return
time.sleep(10)
def on_pong(ws, *_):
print("pong received")
def on_ping(ws, *_):
print("ping received @", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def connWS():
ws = websocket.WebSocketApp(
WS_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_ping=on_ping,
on_pong=on_pong,
)
ws.run_forever(ping_interval=20, ping_timeout=10)
if __name__ == "__main__":
websocket.enableTrace(False)
connWS()