SBE Level 50 Integration
Overview
- Channel: private MMWS only (not available on public WS).
- Topic:
ob.50.sbe.{symbol}(snapshot or delta, every 20 ms). - Format: SBE binary frames (
opcode = 2), little-endian. - Depth: 50 levels per side (no RPI in this stream).
- Units: timestamps in microseconds (µs); price/size are mantissas with exponents.
SBE Connection Limit
- Spot: 1500 connections limit per dedicated MMWS host.
- Futures (linear + inverse): 3000 connections limit per dedicated MMWS host.
- Once you breach the connection limit, new connections return HTTP 429.
Flow
Ping / Pong (JSON control frames)
Send Ping
{"req_id": "100001", "op": "ping"}
Receive Pong
{"success": true,"ret_msg": "pong","conn_id": "xxxxx-xx","req_id": "","op": "ping"}
Subscribe
- Topic format:
ob.50.sbe.<symbol>
Subscribe request
{"op": "subscribe", "args": ["ob.50.sbe.BTCUSDT"]}
Subscription confirmation
{"success": true,"ret_msg": "","conn_id": "d30fdpbboasp1pjbe7r0","req_id": "xxx","op": "subscribe"}
SBE XML Template (L50 OB)
<?xml version="1.0" encoding="UTF-8"?>
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
xmlns:mbx="https://bybit-exchange.github.io/docs/v5/intro"
package="quote.sbe"
id="1"
version="0"
semanticVersion="1.0.0"
description="Bybit market data streams SBE message schema"
byteOrder="littleEndian"
headerType="messageHeader">
<types>
<composite name="messageHeader" description="Template ID and length of message root">
<type name="blockLength" primitiveType="uint16"/>
<type name="templateId" primitiveType="uint16"/>
<type name="schemaId" primitiveType="uint16"/>
<type name="version" primitiveType="uint16"/>
</composite>
<composite name="varString8" description="Variable length UTF-8 string.">
<type name="length" primitiveType="uint8"/>
<type name="varData" length="0" primitiveType="uint8" semanticType="String" characterEncoding="UTF-8"/>
</composite>
<composite name="groupSize16Encoding" description="Repeating group dimensions.">
<type name="blockLength" primitiveType="uint16"/>
<type name="numInGroup" primitiveType="uint16"/>
</composite>
<!-- NEW: package type enum -->
<enum name="pkgTypeEnum" encodingType="uint8">
<validValue name="SNAPSHOT">0</validValue>
<validValue name="DELTA">1</validValue>
</enum>
</types>
<sbe:message name="BestOBRpiEvent" id="20000">
<field id="1" name="ts" type="int64" description="The timestamp in microseconds that the system generates the data"/>
<field id="2" name="seq" type="int64" description="Cross sequence ID"/>
<field id="3" name="cts" type="int64" description="The timestamp in microseconds from the matching engine when this orderbook data is produced."/>
<field id="4" name="u" type="int64" description="Update Id"/>
<field id="5" name="askNormalPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best ask normal price"/>
<field id="6" name="askNormalSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best ask normal size"/>
<field id="7" name="askRpiPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best ask rpi price"/>
<field id="8" name="askRpiSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best ask rpi size"/>
<field id="9" name="bidNormalPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best bid normal price"/>
<field id="10" name="bidNormalSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best bid normal size"/>
<field id="11" name="bidRpiPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best bid rpi price"/>
<field id="12" name="bidRpiSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best bid rpi size"/>
<field id="13" name="priceExponent" type="int8" description="Price exponent for decimal point positioning"/>
<field id="14" name="sizeExponent" type="int8" description="Size exponent for decimal point positioning"/>
<data id="55" name="symbol" type="varString8"/>
</sbe:message>
<!-- Stream event for "ob.50.sbe.<symbol>" channel -->
<sbe:message name="OBL50Event" id="20001">
<field id="1" name="ts" type="int64" description="The timestamp in microseconds that the system generates the data"/>
<field id="2" name="seq" type="int64" description="Cross sequence ID"/>
<field id="3" name="cts" type="int64" description="The timestamp in microseconds from the matching engine when this orderbook data is produced."/>
<field id="4" name="u" type="int64" description="Update Id"/>
<field id="5" name="priceExponent" type="int8" description="Price exponent for decimal point positioning"/>
<field id="6" name="sizeExponent" type="int8" description="Size exponent for decimal point positioning"/>
<!-- NEW: package type -->
<field id="7" name="pkgType" type="pkgTypeEnum" description="Package type: 0 = SNAPSHOT (full book), 1 = DELTA (partial update)"/>
<group id="40" name="asks" dimensionType="groupSize16Encoding" description="Sell side order book updates">
<field id="1" name="price" type="int64" description="Price mantissa"/>
<field id="2" name="size" type="int64" description="Size mantissa"/>
</group>
<group id="41" name="bids" dimensionType="groupSize16Encoding" description="Buy side order book updates">
<field id="1" name="price" type="int64" description="Price mantissa"/>
<field id="2" name="size" type="int64" description="Size mantissa"/>
</group>
<data id="55" name="symbol" type="varString8"/>
</sbe:message>
</sbe:messageSchema>
Field Reference
Message: OBL50Event (id = 20001)
| Field Name | ID | SBE Type | Unit / Format | Notes |
|---|---|---|---|---|
| ts | 1 | int64 | µs | System generation time at push side (dispatcher). |
| seq | 2 | int64 | integer | Cross-sequence id (monotonic per feed; not guaranteed continuous). |
| cts | 3 | int64 | µs | Matching-engine creation time of this OB snapshot or delta; used for latency measurements. |
| u | 4 | int64 | integer | Update id (monotonic per symbol). Useful to check continuity. |
| priceExponent | 5 | int8 | exponent | Decimal places for price. Display price = mantissa × 10^priceExponent. |
| sizeExponent | 6 | int8 | exponent | Decimal places for size. Display size = mantissa × 10^sizeExponent. |
| pkgType | 7 | uint8 (pkgTypeEnum) | integer | Package type (0 = snapshot, 1 = delta). |
| asks | 40 | group(groupSize16Encoding) | — | Sell side updates (up to 50 levels). |
| bids | 41 | group(groupSize16Encoding) | — | Buy side updates (up to 50 levels). |
| symbol | 55 | varString8 | UTF-8 | 1-byte length + bytes, e.g., 0x07 "BTCUSDT". |
Asks Group
| Field (id) | Type | Description |
|---|---|---|
| price (1) | int64 | Ask price mantissa. Display ask price = price × 10^priceExponent. |
| size (2) | int64 | Ask size mantissa. Display ask size = size × 10^sizeExponent. |
Bids Group
| Field (id) | Type | Description |
|---|---|---|
| price (1) | int64 | Bid price mantissa. Display bid price = price × 10^priceExponent. |
| size (2) | int64 | Bid size mantissa. Display bid size = size × 10^sizeExponent. |
Order Book Update Logic
Rules for the u (Update ID) Field
Behavior of u
- Field
uincreases monotonically for all snapshots and deltas. - Field
udoes not reset, unless there is a system reset or precision change. - Field
u = 1always indicates a snapshot, and continuity checks must stop.
Continuity Validation
Continuity must be checked only when u != 1.
| Condition | Action |
|---|---|
u != 1 | Validate continuity: next u should follow previous u + 1. |
u == 1 | Special snapshot (service restart / precision change). Do not perform continuity checks. |
Rules for Order Book Maintenance
First Message of connection and reconnection
After subscribing, the first message is always a snapshot, clients must initialize the local book with it.
Snapshot Handling
A snapshot must always replace the entire local order book
Snapshots may appear:
- after initial subscription
- when the number of changed levels > 100 (extreme market condition auto-fallback)
- after internal service restart
- after exponent / precision changes
Delta Handling
A delta applies incrementally:
- Insert/update levels with
size > 0,remove levels whensize == 0,continue continuity checks using theufield.
Extreme Market Condition Handling
- When a delta contains more than 100 combined bid+ask updates (buy + sell), the system automatically sends a full snapshot instead of a delta.
- Ensures client books resync cleanly.
- Prevents explosion of delta packets during high churn.
- Keeps snapshot size fixed length for predictable decoding.
Example Push Update
Below is a real case where the connection stays healthy and messages arrive in order:
| u | Type | Notes |
|---|---|---|
| 10000 | snapshot | First message after subscription. |
| 10001 | delta | Incremental updates. Must apply changes to the existing book. |
| 10002 | delta | Normal incremental update. |
| 10003 | snapshot | Large market move (> 100 level changes). Use snapshot to replace local book. |
| 10004 | delta | Continue delta from the new snapshot. |
| 1 | snapshot | Service restarted / precision changed — reset u to 1. |
| 2 | delta | New continuity sequence. |
| 3 | delta | — |
| 4 | delta | — |
Integration Script
Python
import json
import logging
import struct
import threading
import time
from datetime import datetime
from typing import Dict, Any, List, Tuple
import websocket
logging.basicConfig(
filename='logfile_ob50.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)
# -------------------------------------------------------------------
# Config
# -------------------------------------------------------------------
# L50 SBE order book topic
TOPIC = "ob.50.sbe.BTCUSDT"
# Adjust URL for spot / contract environment as needed:
WS_URL = "wss://stream-testnet.bybits.org/v5/public-sbe/spot"
# -------------------------------------------------------------------
# SBE Parser for OBL50Event (template_id = 20001)
#
# XML schema:
# ts(int64), seq(int64), cts(int64), u(int64),
# priceExponent(int8), sizeExponent(int8),
# pkgType(uint8) # 0 = SNAPSHOT, 1 = DELTA
# group asks: blockLen(uint16), numInGroup(uint16),
# then numInGroup * [ price(int64), size(int64) ]
# group bids: same as asks
# symbol(varString8)
# -------------------------------------------------------------------
class SBEOBL50Parser:
def __init__(self):
# message header: blockLength, templateId, schemaId, version
self.header_fmt = "<HHHH"
self.header_sz = struct.calcsize(self.header_fmt)
# fixed body fields:
# ts, seq, cts, u -> 4 x int64
# priceExponent, sizeExponent -> 2 x int8
# pkgType -> uint8
self.body_fmt = "<qqqqbbB" # 4*q + 2*b + B
self.body_sz = struct.calcsize(self.body_fmt)
# group header for repeating groups: blockLength(uint16), numInGroup(uint16)
self.group_hdr_fmt = "<HH"
self.group_hdr_sz = struct.calcsize(self.group_hdr_fmt)
# each group entry: price(int64), size(int64)
self.level_fmt = "<qq"
self.level_sz = struct.calcsize(self.level_fmt)
self.target_template_id = 20001
# ---------------- core small helpers ----------------
def _parse_header(self, data: bytes) -> Dict[str, Any]:
if len(data) < self.header_sz:
raise ValueError("insufficient data for SBE header")
block_length, template_id, schema_id, version = struct.unpack_from(
self.header_fmt, data, 0
)
return {
"block_length": block_length,
"template_id": template_id,
"schema_id": schema_id,
"version": version,
}
@staticmethod
def _parse_varstring8(data: bytes, offset: int) -> Tuple[str, int]:
if offset + 1 > len(data):
raise ValueError("insufficient data for varString8 length")
(length,) = struct.unpack_from("<B", data, offset)
offset += 1
if length == 0:
return "", offset
if offset + length > len(data):
raise ValueError("insufficient data for varString8 bytes")
s = data[offset: offset + length].decode("utf-8")
offset += length
return s, offset
@staticmethod
def _apply_exponent(value: int, exponent: int) -> float:
return value / (10 ** exponent) if exponent >= 0 else value * (10 ** (-exponent))
def _parse_levels(self, data: bytes, offset: int) -> Tuple[List[Dict[str, float]], int]:
"""
Parse one repeating group (asks or bids).
Layout:
uint16 blockLength
uint16 numInGroup
numInGroup * [ price(int64), size(int64) ] (within blockLength)
"""
if offset + self.group_hdr_sz > len(data):
raise ValueError("insufficient data for group header")
block_len, num_in_group = struct.unpack_from(self.group_hdr_fmt, data, offset)
offset += self.group_hdr_sz
if block_len < self.level_sz:
raise ValueError(f"blockLength({block_len}) < level_sz({self.level_sz})")
levels = []
for _ in range(num_in_group):
if offset + block_len > len(data):
raise ValueError("insufficient data for group entry")
# we only care about first 16 bytes (price, size)
price_m, size_m = struct.unpack_from(self.level_fmt, data, offset)
offset += block_len # skip the whole block (safe if future adds extra fields)
levels.append({
"price_m": price_m,
"size_m": size_m,
})
return levels, offset
# ---------------- public parse ----------------
def parse(self, data: bytes) -> Dict[str, Any]:
hdr = self._parse_header(data)
if hdr["template_id"] != self.target_template_id:
raise NotImplementedError(f"unsupported template_id={hdr['template_id']}")
if len(data) < self.header_sz + self.body_sz:
raise ValueError("insufficient data for OBL50Event body")
# parse fixed body
(ts, seq, cts, u,
price_exp, size_exp, pkg_type) = struct.unpack_from(
self.body_fmt, data, self.header_sz
)
offset = self.header_sz + self.body_sz
# asks group
asks_raw, offset = self._parse_levels(data, offset)
# bids group
bids_raw, offset = self._parse_levels(data, offset)
# symbol
symbol, offset = self._parse_varstring8(data, offset)
# apply exponents
asks = [
{
"price": self._apply_exponent(l["price_m"], price_exp),
"size": self._apply_exponent(l["size_m"], size_exp),
}
for l in asks_raw
]
bids = [
{
"price": self._apply_exponent(l["price_m"], price_exp),
"size": self._apply_exponent(l["size_m"], size_exp),
}
for l in bids_raw
]
return {
"header": hdr,
"ts": ts,
"seq": seq,
"cts": cts,
"u": u,
"price_exponent": price_exp,
"size_exponent": size_exp,
"pkg_type": pkg_type, # 0 = SNAPSHOT, 1 = DELTA
"symbol": symbol,
"asks": asks,
"bids": bids,
"parsed_length": offset,
}
parser = SBEOBL50Parser()
# -------------------------------------------------------------------
# WebSocket handlers
# -------------------------------------------------------------------
def on_message(ws, message):
try:
if isinstance(message, (bytes, bytearray)):
decoded = parser.parse(message)
pkg_type = decoded["pkg_type"]
pkg_str = "SNAPSHOT" if pkg_type == 0 else "DELTA" if pkg_type == 1 else f"UNKNOWN({pkg_type})"
asks = decoded["asks"]
bids = decoded["bids"]
best_ask = asks[0] if asks else {"price": 0.0, "size": 0.0}
best_bid = bids[0] if bids else {"price": 0.0, "size": 0.0}
logging.info(
"SBE %s u=%s seq=%s type=%s asks=%d bids=%d "
"BEST bid=%.8f@%.8f ask=%.8f@%.8f ts=%s",
decoded["symbol"], decoded["u"], decoded["seq"], pkg_str,
len(asks), len(bids),
best_bid["price"], best_bid["size"],
best_ask["price"], best_ask["size"],
decoded["ts"],
)
print(
f"{decoded['symbol']} u={decoded['u']} seq={decoded['seq']} {pkg_str} "
f"levels: asks={len(asks)} bids={len(bids)} "
f"BEST: bid {best_bid['price']:.8f} x {best_bid['size']:.8f} | "
f"ask {best_ask['price']:.8f} x {best_ask['size']:.8f}"
)
else:
# text frame: control / errors / ping-pong
try:
obj = json.loads(message)
logging.info("TEXT %s", obj)
print("TEXT:", obj)
except json.JSONDecodeError:
logging.warning("non-JSON text frame: %r", message)
print("TEXT(non-json):", message)
except Exception as e:
logging.exception("decode error: %s", e)
print("decode error:", e)
def on_error(ws, error):
print("WS error:", error)
logging.error("WS error: %s", error)
def on_close(ws, *_):
print("### connection closed ###")
logging.info("connection closed")
def ping_per(ws):
while True:
try:
ws.send(json.dumps({"op": "ping"}))
except Exception:
return
time.sleep(10)
def on_open(ws):
print("opened")
sub = {"op": "subscribe", "args": [TOPIC]}
ws.send(json.dumps(sub))
print("subscribed:", TOPIC)
# background ping thread
threading.Thread(target=ping_per, args=(ws,), daemon=True).start()
def on_pong(ws, *_):
print("pong received")
def on_ping(ws, *_):
print("ping received @", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def connWS():
ws = websocket.WebSocketApp(
WS_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_ping=on_ping,
on_pong=on_pong,
)
ws.run_forever(ping_interval=20, ping_timeout=10)
if __name__ == "__main__":
websocket.enableTrace(False)
connWS()
Golang
// sbe_ob50_client.go
package main
import (
"bytes"
"compress/flate"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"math"
"time"
"github.com/gorilla/websocket"
"yourmodule/quote" // generated SBE package
)
const (
WSURL = "wss://stream.bybit.com/v5/market/sbe"
CHANNEL = "ob.50.sbe.BTCUSDT"
)
func toReal(mantissa int64, exponent int8) float64 {
return float64(mantissa) * math.Pow10(int(exponent))
}
func decodeOBL50(buf []byte) (*quote.OBL50Event, error) {
var hdr quote.MessageHeader
reader := bytes.NewReader(buf)
// decode messageHeader (little endian)
if err := binary.Read(reader, binary.LittleEndian, &hdr); err != nil {
return nil, fmt.Errorf("read header: %w", err)
}
if hdr.TemplateId != 20001 {
return nil, fmt.Errorf("unexpected templateId: %d", hdr.TemplateId)
}
var msg quote.OBL50Event
// many generators provide WrapForDecode; here assume we can read the fixed block then groups
if err := msg.Decode(reader, int(hdr.BlockLength), int(hdr.Version)); err != nil {
return nil, fmt.Errorf("decode OBL50: %w", err)
}
return &msg, nil
}
func main() {
book := NewOrderBook()
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
EnableCompression: false,
}
conn, _, err := dialer.Dial(WSURL, nil)
if err != nil {
log.Fatalf("dial: %v", err)
}
defer conn.Close()
// subscribe
sub := map[string]interface{}{
"op": "subscribe",
"args": []string{CHANNEL},
}
if err := conn.WriteJSON(sub); err != nil {
log.Fatalf("subscribe: %v", err)
}
for {
mt, data, err := conn.ReadMessage()
if err != nil {
log.Fatalf("read: %v", err)
}
if mt == websocket.TextMessage {
// control JSON or pong etc
var m map[string]interface{}
_ = json.Unmarshal(data, &m)
continue
}
// if server wraps SBE in per-message deflate, you may need to decompress:
if isDeflatedFrame(data) {
data, err = inflate(data)
if err != nil {
log.Printf("inflate error: %v", err)
continue
}
}
msg, err := decodeOBL50(data)
if err != nil {
log.Printf("decode error: %v", err)
continue
}
u := msg.U
pkgType := msg.PkgType // 0 snapshot, 1 delta
pxExp := msg.PriceExponent
szExp := msg.SizeExponent
// extract levels
var asks, bids [][2]float64
for _, a := range msg.Asks {
p := toReal(a.Price, pxExp)
sz := toReal(a.Size, szExp)
asks = append(asks, [2]float64{p, sz})
}
for _, b := range msg.Bids {
p := toReal(b.Price, pxExp)
sz := toReal(b.Size, szExp)
bids = append(bids, [2]float64{p, sz})
}
// continuity logic:
if u == 1 {
// service restart / precision change snapshot
book.Asks.SnapshotFrom(asks)
book.Bids.SnapshotFrom(bids)
book.LastU = 1
fmt.Printf("[RESET SNAPSHOT] u=%d seq=%d symbol=%s\n", u, msg.Seq, msg.Symbol)
continue
}
if book.LastU != 0 && u != book.LastU+1 {
log.Printf("[WARN] u jump: lastU=%d newU=%d – consider resync", book.LastU, u)
}
if pkgType == quote.PkgTypeEnum_SNAPSHOT {
book.Asks.SnapshotFrom(asks)
book.Bids.SnapshotFrom(bids)
} else {
for _, lv := range asks {
book.Asks.Apply(lv[0], lv[1])
}
for _, lv := range bids {
book.Bids.Apply(lv[0], lv[1])
}
}
book.LastU = u
bestBid := book.Bids.BestBid()
bestAsk := book.Asks.BestAsk()
fmt.Printf("u=%d pkgType=%d bestBid=%.5f bestAsk=%.5f\n", u, pkgType, bestBid, bestAsk)
}
}
// helpers (optional, depending on ws framing)
func isDeflatedFrame(data []byte) bool {
// placeholder: detect by protocol; many setups know from WS sub-protocol
return false
}
func inflate(data []byte) ([]byte, error) {
r := flate.NewReader(bytes.NewReader(data))
defer r.Close()
var out bytes.Buffer
if _, err := out.ReadFrom(r); err != nil {
return nil, err
}
return out.Bytes(), nil
}