Skip to main content

SBE Level 50 Integration

Overview

  • Channel: private MMWS only (not available on public WS).
  • Topic: ob.50.sbe.{symbol} (snapshot or delta, every 20 ms).
  • Format: SBE binary frames (opcode = 2), little-endian.
  • Depth: 50 levels per side (no RPI in this stream).
  • Units: timestamps in microseconds (µs); price/size are mantissas with exponents.

SBE Connection Limit

  • Spot: 1500 connections limit per dedicated MMWS host.
  • Futures (linear + inverse): 3000 connections limit per dedicated MMWS host.
  • Once you breach the connection limit, new connections return HTTP 429.

Flow

Ping / Pong (JSON control frames)

Send Ping

{"req_id": "100001", "op": "ping"}

Receive Pong

{"success": true,"ret_msg": "pong","conn_id": "xxxxx-xx","req_id": "","op": "ping"}

Subscribe

  • Topic format: ob.50.sbe.<symbol>

Subscribe request

{"op": "subscribe", "args": ["ob.50.sbe.BTCUSDT"]}

Subscription confirmation

{"success": true,"ret_msg": "","conn_id": "d30fdpbboasp1pjbe7r0","req_id": "xxx","op": "subscribe"}

SBE XML Template (L50 OB)

<?xml version="1.0" encoding="UTF-8"?>
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
xmlns:mbx="https://bybit-exchange.github.io/docs/v5/intro"
package="quote.sbe"
id="1"
version="0"
semanticVersion="1.0.0"
description="Bybit market data streams SBE message schema"
byteOrder="littleEndian"
headerType="messageHeader">
<types>
<composite name="messageHeader" description="Template ID and length of message root">
<type name="blockLength" primitiveType="uint16"/>
<type name="templateId" primitiveType="uint16"/>
<type name="schemaId" primitiveType="uint16"/>
<type name="version" primitiveType="uint16"/>
</composite>

<composite name="varString8" description="Variable length UTF-8 string.">
<type name="length" primitiveType="uint8"/>
<type name="varData" length="0" primitiveType="uint8" semanticType="String" characterEncoding="UTF-8"/>
</composite>

<composite name="groupSize16Encoding" description="Repeating group dimensions.">
<type name="blockLength" primitiveType="uint16"/>
<type name="numInGroup" primitiveType="uint16"/>
</composite>

<!-- NEW: package type enum -->
<enum name="pkgTypeEnum" encodingType="uint8">
<validValue name="SNAPSHOT">0</validValue>
<validValue name="DELTA">1</validValue>
</enum>
</types>

<sbe:message name="BestOBRpiEvent" id="20000">
<field id="1" name="ts" type="int64" description="The timestamp in microseconds that the system generates the data"/>
<field id="2" name="seq" type="int64" description="Cross sequence ID"/>
<field id="3" name="cts" type="int64" description="The timestamp in microseconds from the matching engine when this orderbook data is produced."/>
<field id="4" name="u" type="int64" description="Update Id"/>
<field id="5" name="askNormalPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best ask normal price"/>
<field id="6" name="askNormalSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best ask normal size"/>
<field id="7" name="askRpiPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best ask rpi price"/>
<field id="8" name="askRpiSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best ask rpi size"/>
<field id="9" name="bidNormalPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best bid normal price"/>
<field id="10" name="bidNormalSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best bid normal size"/>
<field id="11" name="bidRpiPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best bid rpi price"/>
<field id="12" name="bidRpiSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best bid rpi size"/>
<field id="13" name="priceExponent" type="int8" description="Price exponent for decimal point positioning"/>
<field id="14" name="sizeExponent" type="int8" description="Size exponent for decimal point positioning"/>
<data id="55" name="symbol" type="varString8"/>
</sbe:message>

<!-- Stream event for "ob.50.sbe.<symbol>" channel -->
<sbe:message name="OBL50Event" id="20001">
<field id="1" name="ts" type="int64" description="The timestamp in microseconds that the system generates the data"/>
<field id="2" name="seq" type="int64" description="Cross sequence ID"/>
<field id="3" name="cts" type="int64" description="The timestamp in microseconds from the matching engine when this orderbook data is produced."/>
<field id="4" name="u" type="int64" description="Update Id"/>
<field id="5" name="priceExponent" type="int8" description="Price exponent for decimal point positioning"/>
<field id="6" name="sizeExponent" type="int8" description="Size exponent for decimal point positioning"/>
<!-- NEW: package type -->
<field id="7" name="pkgType" type="pkgTypeEnum" description="Package type: 0 = SNAPSHOT (full book), 1 = DELTA (partial update)"/>

<group id="40" name="asks" dimensionType="groupSize16Encoding" description="Sell side order book updates">
<field id="1" name="price" type="int64" description="Price mantissa"/>
<field id="2" name="size" type="int64" description="Size mantissa"/>
</group>

<group id="41" name="bids" dimensionType="groupSize16Encoding" description="Buy side order book updates">
<field id="1" name="price" type="int64" description="Price mantissa"/>
<field id="2" name="size" type="int64" description="Size mantissa"/>
</group>

<data id="55" name="symbol" type="varString8"/>
</sbe:message>
</sbe:messageSchema>

Field Reference

Message: OBL50Event (id = 20001)

Field NameIDSBE TypeUnit / FormatNotes
ts1int64µsSystem generation time at push side (dispatcher).
seq2int64integerCross-sequence id (monotonic per feed; not guaranteed continuous).
cts3int64µsMatching-engine creation time of this OB snapshot or delta; used for latency measurements.
u4int64integerUpdate id (monotonic per symbol). Useful to check continuity.
priceExponent5int8exponentDecimal places for price. Display price = mantissa × 10^priceExponent.
sizeExponent6int8exponentDecimal places for size. Display size = mantissa × 10^sizeExponent.
pkgType7uint8 (pkgTypeEnum)integerPackage type (0 = snapshot, 1 = delta).
asks40group(groupSize16Encoding)Sell side updates (up to 50 levels).
bids41group(groupSize16Encoding)Buy side updates (up to 50 levels).
symbol55varString8UTF-81-byte length + bytes, e.g., 0x07 "BTCUSDT".

Asks Group

Field (id)TypeDescription
price (1)int64Ask price mantissa. Display ask price = price × 10^priceExponent.
size (2)int64Ask size mantissa. Display ask size = size × 10^sizeExponent.

Bids Group

Field (id)TypeDescription
price (1)int64Bid price mantissa. Display bid price = price × 10^priceExponent.
size (2)int64Bid size mantissa. Display bid size = size × 10^sizeExponent.

Order Book Update Logic

Rules for the u (Update ID) Field

Behavior of u

  • Field u increases monotonically for all snapshots and deltas.
  • Field u does not reset, unless there is a system reset or precision change.
  • Field u = 1 always indicates a snapshot, and continuity checks must stop.

Continuity Validation

Continuity must be checked only when u != 1.

ConditionAction
u != 1Validate continuity: next u should follow previous u + 1.
u == 1Special snapshot (service restart / precision change). Do not perform continuity checks.

Rules for Order Book Maintenance

First Message of connection and reconnection

After subscribing, the first message is always a snapshot, clients must initialize the local book with it.

Snapshot Handling

A snapshot must always replace the entire local order book

Snapshots may appear:

  • after initial subscription
  • when the number of changed levels > 100 (extreme market condition auto-fallback)
  • after internal service restart
  • after exponent / precision changes

Delta Handling

A delta applies incrementally:

  • Insert/update levels with size > 0,remove levels when size == 0,continue continuity checks using the u field.

Extreme Market Condition Handling

  • When a delta contains more than 100 combined bid+ask updates (buy + sell), the system automatically sends a full snapshot instead of a delta.
  • Ensures client books resync cleanly.
  • Prevents explosion of delta packets during high churn.
  • Keeps snapshot size fixed length for predictable decoding.

Example Push Update

Below is a real case where the connection stays healthy and messages arrive in order:

uTypeNotes
10000snapshotFirst message after subscription.
10001deltaIncremental updates. Must apply changes to the existing book.
10002deltaNormal incremental update.
10003snapshotLarge market move (> 100 level changes). Use snapshot to replace local book.
10004deltaContinue delta from the new snapshot.
1snapshotService restarted / precision changed — reset u to 1.
2deltaNew continuity sequence.
3delta
4delta

Integration Script

Python

import json
import logging
import struct
import threading
import time
from datetime import datetime
from typing import Dict, Any, List, Tuple

import websocket

logging.basicConfig(
filename='logfile_ob50.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)

# -------------------------------------------------------------------
# Config
# -------------------------------------------------------------------

# L50 SBE order book topic
TOPIC = "ob.50.sbe.BTCUSDT"

# Adjust URL for spot / contract environment as needed:
WS_URL = "wss://stream-testnet.bybits.org/v5/public-sbe/spot"

# -------------------------------------------------------------------
# SBE Parser for OBL50Event (template_id = 20001)
#
# XML schema:
# ts(int64), seq(int64), cts(int64), u(int64),
# priceExponent(int8), sizeExponent(int8),
# pkgType(uint8) # 0 = SNAPSHOT, 1 = DELTA
# group asks: blockLen(uint16), numInGroup(uint16),
# then numInGroup * [ price(int64), size(int64) ]
# group bids: same as asks
# symbol(varString8)
# -------------------------------------------------------------------

class SBEOBL50Parser:
def __init__(self):
# message header: blockLength, templateId, schemaId, version
self.header_fmt = "<HHHH"
self.header_sz = struct.calcsize(self.header_fmt)

# fixed body fields:
# ts, seq, cts, u -> 4 x int64
# priceExponent, sizeExponent -> 2 x int8
# pkgType -> uint8
self.body_fmt = "<qqqqbbB" # 4*q + 2*b + B
self.body_sz = struct.calcsize(self.body_fmt)

# group header for repeating groups: blockLength(uint16), numInGroup(uint16)
self.group_hdr_fmt = "<HH"
self.group_hdr_sz = struct.calcsize(self.group_hdr_fmt)

# each group entry: price(int64), size(int64)
self.level_fmt = "<qq"
self.level_sz = struct.calcsize(self.level_fmt)

self.target_template_id = 20001

# ---------------- core small helpers ----------------

def _parse_header(self, data: bytes) -> Dict[str, Any]:
if len(data) < self.header_sz:
raise ValueError("insufficient data for SBE header")
block_length, template_id, schema_id, version = struct.unpack_from(
self.header_fmt, data, 0
)
return {
"block_length": block_length,
"template_id": template_id,
"schema_id": schema_id,
"version": version,
}

@staticmethod
def _parse_varstring8(data: bytes, offset: int) -> Tuple[str, int]:
if offset + 1 > len(data):
raise ValueError("insufficient data for varString8 length")
(length,) = struct.unpack_from("<B", data, offset)
offset += 1
if length == 0:
return "", offset
if offset + length > len(data):
raise ValueError("insufficient data for varString8 bytes")
s = data[offset: offset + length].decode("utf-8")
offset += length
return s, offset

@staticmethod
def _apply_exponent(value: int, exponent: int) -> float:
return value / (10 ** exponent) if exponent >= 0 else value * (10 ** (-exponent))

def _parse_levels(self, data: bytes, offset: int) -> Tuple[List[Dict[str, float]], int]:
"""
Parse one repeating group (asks or bids).
Layout:
uint16 blockLength
uint16 numInGroup
numInGroup * [ price(int64), size(int64) ] (within blockLength)
"""
if offset + self.group_hdr_sz > len(data):
raise ValueError("insufficient data for group header")
block_len, num_in_group = struct.unpack_from(self.group_hdr_fmt, data, offset)
offset += self.group_hdr_sz

if block_len < self.level_sz:
raise ValueError(f"blockLength({block_len}) < level_sz({self.level_sz})")

levels = []
for _ in range(num_in_group):
if offset + block_len > len(data):
raise ValueError("insufficient data for group entry")
# we only care about first 16 bytes (price, size)
price_m, size_m = struct.unpack_from(self.level_fmt, data, offset)
offset += block_len # skip the whole block (safe if future adds extra fields)

levels.append({
"price_m": price_m,
"size_m": size_m,
})
return levels, offset

# ---------------- public parse ----------------

def parse(self, data: bytes) -> Dict[str, Any]:
hdr = self._parse_header(data)
if hdr["template_id"] != self.target_template_id:
raise NotImplementedError(f"unsupported template_id={hdr['template_id']}")

if len(data) < self.header_sz + self.body_sz:
raise ValueError("insufficient data for OBL50Event body")

# parse fixed body
(ts, seq, cts, u,
price_exp, size_exp, pkg_type) = struct.unpack_from(
self.body_fmt, data, self.header_sz
)

offset = self.header_sz + self.body_sz

# asks group
asks_raw, offset = self._parse_levels(data, offset)
# bids group
bids_raw, offset = self._parse_levels(data, offset)
# symbol
symbol, offset = self._parse_varstring8(data, offset)

# apply exponents
asks = [
{
"price": self._apply_exponent(l["price_m"], price_exp),
"size": self._apply_exponent(l["size_m"], size_exp),
}
for l in asks_raw
]
bids = [
{
"price": self._apply_exponent(l["price_m"], price_exp),
"size": self._apply_exponent(l["size_m"], size_exp),
}
for l in bids_raw
]

return {
"header": hdr,
"ts": ts,
"seq": seq,
"cts": cts,
"u": u,
"price_exponent": price_exp,
"size_exponent": size_exp,
"pkg_type": pkg_type, # 0 = SNAPSHOT, 1 = DELTA
"symbol": symbol,
"asks": asks,
"bids": bids,
"parsed_length": offset,
}


parser = SBEOBL50Parser()

# -------------------------------------------------------------------
# WebSocket handlers
# -------------------------------------------------------------------

def on_message(ws, message):
try:
if isinstance(message, (bytes, bytearray)):
decoded = parser.parse(message)

pkg_type = decoded["pkg_type"]
pkg_str = "SNAPSHOT" if pkg_type == 0 else "DELTA" if pkg_type == 1 else f"UNKNOWN({pkg_type})"

asks = decoded["asks"]
bids = decoded["bids"]

best_ask = asks[0] if asks else {"price": 0.0, "size": 0.0}
best_bid = bids[0] if bids else {"price": 0.0, "size": 0.0}

logging.info(
"SBE %s u=%s seq=%s type=%s asks=%d bids=%d "
"BEST bid=%.8f@%.8f ask=%.8f@%.8f ts=%s",
decoded["symbol"], decoded["u"], decoded["seq"], pkg_str,
len(asks), len(bids),
best_bid["price"], best_bid["size"],
best_ask["price"], best_ask["size"],
decoded["ts"],
)

print(
f"{decoded['symbol']} u={decoded['u']} seq={decoded['seq']} {pkg_str} "
f"levels: asks={len(asks)} bids={len(bids)} "
f"BEST: bid {best_bid['price']:.8f} x {best_bid['size']:.8f} | "
f"ask {best_ask['price']:.8f} x {best_ask['size']:.8f}"
)

else:
# text frame: control / errors / ping-pong
try:
obj = json.loads(message)
logging.info("TEXT %s", obj)
print("TEXT:", obj)
except json.JSONDecodeError:
logging.warning("non-JSON text frame: %r", message)
print("TEXT(non-json):", message)
except Exception as e:
logging.exception("decode error: %s", e)
print("decode error:", e)


def on_error(ws, error):
print("WS error:", error)
logging.error("WS error: %s", error)


def on_close(ws, *_):
print("### connection closed ###")
logging.info("connection closed")


def ping_per(ws):
while True:
try:
ws.send(json.dumps({"op": "ping"}))
except Exception:
return
time.sleep(10)


def on_open(ws):
print("opened")
sub = {"op": "subscribe", "args": [TOPIC]}
ws.send(json.dumps(sub))
print("subscribed:", TOPIC)

# background ping thread
threading.Thread(target=ping_per, args=(ws,), daemon=True).start()


def on_pong(ws, *_):
print("pong received")


def on_ping(ws, *_):
print("ping received @", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


def connWS():
ws = websocket.WebSocketApp(
WS_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_ping=on_ping,
on_pong=on_pong,
)
ws.run_forever(ping_interval=20, ping_timeout=10)


if __name__ == "__main__":
websocket.enableTrace(False)
connWS()

Golang

// sbe_ob50_client.go
package main

import (
"bytes"
"compress/flate"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"math"
"time"

"github.com/gorilla/websocket"
"yourmodule/quote" // generated SBE package
)

const (
WSURL = "wss://stream.bybit.com/v5/market/sbe"
CHANNEL = "ob.50.sbe.BTCUSDT"
)

func toReal(mantissa int64, exponent int8) float64 {
return float64(mantissa) * math.Pow10(int(exponent))
}

func decodeOBL50(buf []byte) (*quote.OBL50Event, error) {
var hdr quote.MessageHeader
reader := bytes.NewReader(buf)

// decode messageHeader (little endian)
if err := binary.Read(reader, binary.LittleEndian, &hdr); err != nil {
return nil, fmt.Errorf("read header: %w", err)
}

if hdr.TemplateId != 20001 {
return nil, fmt.Errorf("unexpected templateId: %d", hdr.TemplateId)
}

var msg quote.OBL50Event
// many generators provide WrapForDecode; here assume we can read the fixed block then groups
if err := msg.Decode(reader, int(hdr.BlockLength), int(hdr.Version)); err != nil {
return nil, fmt.Errorf("decode OBL50: %w", err)
}

return &msg, nil
}

func main() {
book := NewOrderBook()

dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
EnableCompression: false,
}

conn, _, err := dialer.Dial(WSURL, nil)
if err != nil {
log.Fatalf("dial: %v", err)
}
defer conn.Close()

// subscribe
sub := map[string]interface{}{
"op": "subscribe",
"args": []string{CHANNEL},
}
if err := conn.WriteJSON(sub); err != nil {
log.Fatalf("subscribe: %v", err)
}

for {
mt, data, err := conn.ReadMessage()
if err != nil {
log.Fatalf("read: %v", err)
}

if mt == websocket.TextMessage {
// control JSON or pong etc
var m map[string]interface{}
_ = json.Unmarshal(data, &m)
continue
}

// if server wraps SBE in per-message deflate, you may need to decompress:
if isDeflatedFrame(data) {
data, err = inflate(data)
if err != nil {
log.Printf("inflate error: %v", err)
continue
}
}

msg, err := decodeOBL50(data)
if err != nil {
log.Printf("decode error: %v", err)
continue
}

u := msg.U
pkgType := msg.PkgType // 0 snapshot, 1 delta
pxExp := msg.PriceExponent
szExp := msg.SizeExponent

// extract levels
var asks, bids [][2]float64
for _, a := range msg.Asks {
p := toReal(a.Price, pxExp)
sz := toReal(a.Size, szExp)
asks = append(asks, [2]float64{p, sz})
}
for _, b := range msg.Bids {
p := toReal(b.Price, pxExp)
sz := toReal(b.Size, szExp)
bids = append(bids, [2]float64{p, sz})
}

// continuity logic:
if u == 1 {
// service restart / precision change snapshot
book.Asks.SnapshotFrom(asks)
book.Bids.SnapshotFrom(bids)
book.LastU = 1
fmt.Printf("[RESET SNAPSHOT] u=%d seq=%d symbol=%s\n", u, msg.Seq, msg.Symbol)
continue
}

if book.LastU != 0 && u != book.LastU+1 {
log.Printf("[WARN] u jump: lastU=%d newU=%d – consider resync", book.LastU, u)
}

if pkgType == quote.PkgTypeEnum_SNAPSHOT {
book.Asks.SnapshotFrom(asks)
book.Bids.SnapshotFrom(bids)
} else {
for _, lv := range asks {
book.Asks.Apply(lv[0], lv[1])
}
for _, lv := range bids {
book.Bids.Apply(lv[0], lv[1])
}
}

book.LastU = u
bestBid := book.Bids.BestBid()
bestAsk := book.Asks.BestAsk()
fmt.Printf("u=%d pkgType=%d bestBid=%.5f bestAsk=%.5f\n", u, pkgType, bestBid, bestAsk)
}
}

// helpers (optional, depending on ws framing)
func isDeflatedFrame(data []byte) bool {
// placeholder: detect by protocol; many setups know from WS sub-protocol
return false
}

func inflate(data []byte) ([]byte, error) {
r := flate.NewReader(bytes.NewReader(data))
defer r.Close()

var out bytes.Buffer
if _, err := out.ReadFrom(r); err != nil {
return nil, err
}
return out.Bytes(), nil
}