跳至主要内容

SBE Level 50 接入指南

總覽

  • Channel: 僅支援MMWS域名
  • Topic: ob.50.sbe.{symbol} (每 20 ms 推送 snapshot 或 delta).
  • Format: SBE 二進制 frame (opcode = 2), little-endian.
  • Depth: 每一側 50 檔深度, 此頻道內不包含 RPI.
  • Units: 時間戳為 microseconds (µs), price/size 為 mantissa 搭配 exponent.

SBE 連接數限制

  • 現貨: 每個專屬 MMWS host 限制 1500 條連接.
  • 合約 (linear + inverse): 每個專屬 MMWS host 限制 3000 條連接.
  • 一旦超過連接上限, 新連接會返回 HTTP 429

流程

Ping / Pong (JSON 控制 frame)

Send Ping

{"req_id": "100001", "op": "ping"}

Receive Pong

{"success": true,"ret_msg": "pong","conn_id": "xxxxx-xx","req_id": "","op": "ping"}

訂閱

  • Topic 格式: ob.50.sbe.<symbol>

訂閱示例

{"op": "subscribe", "args": ["ob.50.sbe.BTCUSDT"]}

訂閱回報

{"success": true,"ret_msg": "","conn_id": "d30fdpbboasp1pjbe7r0","req_id": "xxx","op": "subscribe"}

SBE XML 模板 (L50 OB)

<?xml version="1.0" encoding="UTF-8"?>
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
xmlns:mbx="https://bybit-exchange.github.io/docs/v5/intro"
package="quote.sbe"
id="1"
version="0"
semanticVersion="1.0.0"
description="Bybit market data streams SBE message schema"
byteOrder="littleEndian"
headerType="messageHeader">
<types>
<composite name="messageHeader" description="Template ID and length of message root">
<type name="blockLength" primitiveType="uint16"/>
<type name="templateId" primitiveType="uint16"/>
<type name="schemaId" primitiveType="uint16"/>
<type name="version" primitiveType="uint16"/>
</composite>

<composite name="varString8" description="Variable length UTF-8 string.">
<type name="length" primitiveType="uint8"/>
<type name="varData" length="0" primitiveType="uint8" semanticType="String" characterEncoding="UTF-8"/>
</composite>

<composite name="groupSize16Encoding" description="Repeating group dimensions.">
<type name="blockLength" primitiveType="uint16"/>
<type name="numInGroup" primitiveType="uint16"/>
</composite>

<!-- NEW: package type enum -->
<enum name="pkgTypeEnum" encodingType="uint8">
<validValue name="SNAPSHOT">0</validValue>
<validValue name="DELTA">1</validValue>
</enum>
</types>

<!-- Stream event for "ob.rpi.1.sbe.<symbol>" channel -->
<sbe:message name="BestOBRpiEvent" id="20000">
<field id="1" name="ts" type="int64" description="The timestamp in microseconds that the system generates the data"/>
<field id="2" name="seq" type="int64" description="Cross sequence ID"/>
<field id="3" name="cts" type="int64" description="The timestamp in microseconds from the matching engine when this orderbook data is produced."/>
<field id="4" name="u" type="int64" description="Update Id"/>
<field id="5" name="askNormalPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best ask normal price"/>
<field id="6" name="askNormalSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best ask normal size"/>
<field id="7" name="askRpiPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best ask rpi price"/>
<field id="8" name="askRpiSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best ask rpi size"/>
<field id="9" name="bidNormalPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best bid normal price"/>
<field id="10" name="bidNormalSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best bid normal size"/>
<field id="11" name="bidRpiPrice" type="int64" mbx:exponent="priceExponent" description="Mantissa for the best bid rpi price"/>
<field id="12" name="bidRpiSize" type="int64" mbx:exponent="sizeExponent" description="Mantissa for the best bid rpi size"/>
<field id="13" name="priceExponent" type="int8" description="Price exponent for decimal point positioning"/>
<field id="14" name="sizeExponent" type="int8" description="Size exponent for decimal point positioning"/>
<data id="55" name="symbol" type="varString8"/>
</sbe:message>

<!-- Stream event for "ob.50.sbe.<symbol>" channel -->
<sbe:message name="OBL50Event" id="20001">
<field id="1" name="ts" type="int64" description="The timestamp in microseconds that the system generates the data"/>
<field id="2" name="seq" type="int64" description="Cross sequence ID"/>
<field id="3" name="cts" type="int64" description="The timestamp in microseconds from the matching engine when this orderbook data is produced."/>
<field id="4" name="u" type="int64" description="Update Id"/>
<field id="5" name="priceExponent" type="int8" description="Price exponent for decimal point positioning"/>
<field id="6" name="sizeExponent" type="int8" description="Size exponent for decimal point positioning"/>
<!-- NEW: package type -->
<field id="7" name="pkgType" type="pkgTypeEnum" description="Package type: 0 = SNAPSHOT (full book), 1 = DELTA (partial update)"/>

<group id="40" name="asks" dimensionType="groupSize16Encoding" description="Sell side order book updates">
<field id="1" name="price" type="int64" description="Price mantissa"/>
<field id="2" name="size" type="int64" description="Size mantissa"/>
</group>

<group id="41" name="bids" dimensionType="groupSize16Encoding" description="Buy side order book updates">
<field id="1" name="price" type="int64" description="Price mantissa"/>
<field id="2" name="size" type="int64" description="Size mantissa"/>
</group>

<data id="55" name="symbol" type="varString8"/>
</sbe:message>
</sbe:messageSchema>

SBE Level 50 欄位參考

Message: OBL50Event (id = 20001)

Field NameIDSBE TypeUnit / FormatNotes
ts1int64µs系統在推送側生成資料的時間戳 (dispatcher).
seq2int64integerCross-sequence id, 每個 feed 單調遞增, 但不保證連續.
cts3int64µs撮合引擎生成此 OB snapshot 或 delta 的時間戳, 可用於延遲測量.
u4int64integerUpdate id, 每個 symbol 單調遞增, 用於檢查連續性.
priceExponent5int8exponent價格小數位數, 顯示價格 = mantissa × 10^priceExponent.
sizeExponent6int8exponent數量小數位數, 顯示數量 = mantissa × 10^sizeExponent.
pkgType7uint8 (pkgTypeEnum)integer封包型別 (0 = snapshot, 1 = delta).
asks40group(groupSize16Encoding)賣方 order book 更新, 最多 50 檔.
bids41group(groupSize16Encoding)買方 order book 更新, 最多 50 檔.
symbol55varString8UTF-81 位長度 byte + 實際資料, 例如 0x07 "BTCUSDT".

Asks Group

Field (id)TypeDescription
price (1)int64賣價 mantissa, 顯示價格 = price × 10^priceExponent.
size (2)int64賣量 mantissa, 顯示數量 = size × 10^sizeExponent.

Bids Group

Field (id)TypeDescription
price (1)int64買價 mantissa, 顯示價格 = price × 10^priceExponent.
size (2)int64買量 mantissa, 顯示數量 = size × 10^sizeExponent.

OrderBook 更新邏輯

u (Update ID) 欄位規則

u 的行為

  • 欄位 u 總是單調遞增.
  • 欄位 u 通常不會重置, 除非系統重啟或精度 (precision) 改變.
  • u = 1 時永遠代表一個 snapshot

連續性檢查

只有在 u != 1 時才需要進行連續性檢查.

ConditionAction
u != 1驗證連續性: 下一個 u 應該等於前一個 u + 1.
u == 1特殊 snapshot (服務重啟或精度變更), 不應 執行連續性檢查.

OrderBook 維護規則

首次連結與重連

訂閱完成後, 第一條信息一定是 snapshot, 客戶端使用該 snapshot 初始化本地 orderbook.

快照處理

收到Snapshot 一定要 完全取代 本地 orderbook

Snapshot 可能出現於:

  • 初始化訂閱之後
  • 當單次變更的價位數超過 100 檔 (極端行情自動回退機制)
  • 內部服務重啟之後
  • exponent / precision 變更之後

增量處理

Delta 以 增量方式 套用:

  • size > 0 時插入或更新對應價位,當 size == 0 時刪除對應價位,持續使用欄位 u 進行連續性檢查.

極端行情處理

  • 當某一筆 delta 包含 超過 100 檔 bid+ask 更新 (買賣加總), 系統會自動改為推送 完整 snapshot, 不再推送該筆 delta.
  • 確保客戶端 orderbook 能夠乾淨地重新同步.
  • 避免在高頻變動時產生大量 delta 封包.
  • Snapshot 大小固定, 解碼行為更可預期.

推送更新示例

以下是真實情境, 連接穩定且信息按順序到達:

uTypeNotes
10000snapshot訂閱後收到的第一則信息.
10001delta增量更新, 必須 將變更套用 在現有本地 orderbook 上.
10002delta正常的增量更新.
10003snapshot市場大幅波動, 變更價位數 > 100, 系統改以 snapshot 推送並 重建 本地 orderbook.
10004delta從最新 snapshot 繼續推送 delta.
1snapshot服務重啟或精度變更, 導致 u 重設為 1.
2delta新一輪連續序列的開始.
3delta
4delta

接入示例

Python

import json
import logging
import struct
import threading
import time
from datetime import datetime
from typing import Dict, Any, List, Tuple

import websocket

logging.basicConfig(
filename='logfile_ob50.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)

# -------------------------------------------------------------------
# Config
# -------------------------------------------------------------------

# L50 SBE order book topic
TOPIC = "ob.50.sbe.BTCUSDT"

# Adjust URL for spot / contract environment as needed:
WS_URL = "wss://stream-testnet.bybits.org/v5/public-sbe/spot"

# -------------------------------------------------------------------
# SBE Parser for OBL50Event (template_id = 20001)
#
# XML schema:
# ts(int64), seq(int64), cts(int64), u(int64),
# priceExponent(int8), sizeExponent(int8),
# pkgType(uint8) # 0 = SNAPSHOT, 1 = DELTA
# group asks: blockLen(uint16), numInGroup(uint16),
# then numInGroup * [ price(int64), size(int64) ]
# group bids: same as asks
# symbol(varString8)
# -------------------------------------------------------------------

class SBEOBL50Parser:
def __init__(self):
# message header: blockLength, templateId, schemaId, version
self.header_fmt = "<HHHH"
self.header_sz = struct.calcsize(self.header_fmt)

# fixed body fields:
# ts, seq, cts, u -> 4 x int64
# priceExponent, sizeExponent -> 2 x int8
# pkgType -> uint8
self.body_fmt = "<qqqqbbB" # 4*q + 2*b + B
self.body_sz = struct.calcsize(self.body_fmt)

# group header for repeating groups: blockLength(uint16), numInGroup(uint16)
self.group_hdr_fmt = "<HH"
self.group_hdr_sz = struct.calcsize(self.group_hdr_fmt)

# each group entry: price(int64), size(int64)
self.level_fmt = "<qq"
self.level_sz = struct.calcsize(self.level_fmt)

self.target_template_id = 20001

# ---------------- core small helpers ----------------

def _parse_header(self, data: bytes) -> Dict[str, Any]:
if len(data) < self.header_sz:
raise ValueError("insufficient data for SBE header")
block_length, template_id, schema_id, version = struct.unpack_from(
self.header_fmt, data, 0
)
return {
"block_length": block_length,
"template_id": template_id,
"schema_id": schema_id,
"version": version,
}

@staticmethod
def _parse_varstring8(data: bytes, offset: int) -> Tuple[str, int]:
if offset + 1 > len(data):
raise ValueError("insufficient data for varString8 length")
(length,) = struct.unpack_from("<B", data, offset)
offset += 1
if length == 0:
return "", offset
if offset + length > len(data):
raise ValueError("insufficient data for varString8 bytes")
s = data[offset: offset + length].decode("utf-8")
offset += length
return s, offset

@staticmethod
def _apply_exponent(value: int, exponent: int) -> float:
return value / (10 ** exponent) if exponent >= 0 else value * (10 ** (-exponent))

def _parse_levels(self, data: bytes, offset: int) -> Tuple[List[Dict[str, float]], int]:
"""
Parse one repeating group (asks or bids).
Layout:
uint16 blockLength
uint16 numInGroup
numInGroup * [ price(int64), size(int64) ] (within blockLength)
"""
if offset + self.group_hdr_sz > len(data):
raise ValueError("insufficient data for group header")
block_len, num_in_group = struct.unpack_from(self.group_hdr_fmt, data, offset)
offset += self.group_hdr_sz

if block_len < self.level_sz:
raise ValueError(f"blockLength({block_len}) < level_sz({self.level_sz})")

levels = []
for _ in range(num_in_group):
if offset + block_len > len(data):
raise ValueError("insufficient data for group entry")
# we only care about first 16 bytes (price, size)
price_m, size_m = struct.unpack_from(self.level_fmt, data, offset)
offset += block_len # skip the whole block (safe if future adds extra fields)

levels.append({
"price_m": price_m,
"size_m": size_m,
})
return levels, offset

# ---------------- public parse ----------------

def parse(self, data: bytes) -> Dict[str, Any]:
hdr = self._parse_header(data)
if hdr["template_id"] != self.target_template_id:
raise NotImplementedError(f"unsupported template_id={hdr['template_id']}")

if len(data) < self.header_sz + self.body_sz:
raise ValueError("insufficient data for OBL50Event body")

# parse fixed body
(ts, seq, cts, u,
price_exp, size_exp, pkg_type) = struct.unpack_from(
self.body_fmt, data, self.header_sz
)

offset = self.header_sz + self.body_sz

# asks group
asks_raw, offset = self._parse_levels(data, offset)
# bids group
bids_raw, offset = self._parse_levels(data, offset)
# symbol
symbol, offset = self._parse_varstring8(data, offset)

# apply exponents
asks = [
{
"price": self._apply_exponent(l["price_m"], price_exp),
"size": self._apply_exponent(l["size_m"], size_exp),
}
for l in asks_raw
]
bids = [
{
"price": self._apply_exponent(l["price_m"], price_exp),
"size": self._apply_exponent(l["size_m"], size_exp),
}
for l in bids_raw
]

return {
"header": hdr,
"ts": ts,
"seq": seq,
"cts": cts,
"u": u,
"price_exponent": price_exp,
"size_exponent": size_exp,
"pkg_type": pkg_type, # 0 = SNAPSHOT, 1 = DELTA
"symbol": symbol,
"asks": asks,
"bids": bids,
"parsed_length": offset,
}


parser = SBEOBL50Parser()

# -------------------------------------------------------------------
# WebSocket handlers
# -------------------------------------------------------------------

def on_message(ws, message):
try:
if isinstance(message, (bytes, bytearray)):
decoded = parser.parse(message)

pkg_type = decoded["pkg_type"]
pkg_str = "SNAPSHOT" if pkg_type == 0 else "DELTA" if pkg_type == 1 else f"UNKNOWN({pkg_type})"

asks = decoded["asks"]
bids = decoded["bids"]

best_ask = asks[0] if asks else {"price": 0.0, "size": 0.0}
best_bid = bids[0] if bids else {"price": 0.0, "size": 0.0}

logging.info(
"SBE %s u=%s seq=%s type=%s asks=%d bids=%d "
"BEST bid=%.8f@%.8f ask=%.8f@%.8f ts=%s",
decoded["symbol"], decoded["u"], decoded["seq"], pkg_str,
len(asks), len(bids),
best_bid["price"], best_bid["size"],
best_ask["price"], best_ask["size"],
decoded["ts"],
)

print(
f"{decoded['symbol']} u={decoded['u']} seq={decoded['seq']} {pkg_str} "
f"levels: asks={len(asks)} bids={len(bids)} "
f"BEST: bid {best_bid['price']:.8f} x {best_bid['size']:.8f} | "
f"ask {best_ask['price']:.8f} x {best_ask['size']:.8f}"
)

else:
# text frame: control / errors / ping-pong
try:
obj = json.loads(message)
logging.info("TEXT %s", obj)
print("TEXT:", obj)
except json.JSONDecodeError:
logging.warning("non-JSON text frame: %r", message)
print("TEXT(non-json):", message)
except Exception as e:
logging.exception("decode error: %s", e)
print("decode error:", e)


def on_error(ws, error):
print("WS error:", error)
logging.error("WS error: %s", error)


def on_close(ws, *_):
print("### connection closed ###")
logging.info("connection closed")


def ping_per(ws):
while True:
try:
ws.send(json.dumps({"op": "ping"}))
except Exception:
return
time.sleep(10)


def on_open(ws):
print("opened")
sub = {"op": "subscribe", "args": [TOPIC]}
ws.send(json.dumps(sub))
print("subscribed:", TOPIC)

# background ping thread
threading.Thread(target=ping_per, args=(ws,), daemon=True).start()


def on_pong(ws, *_):
print("pong received")


def on_ping(ws, *_):
print("ping received @", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


def connWS():
ws = websocket.WebSocketApp(
WS_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_ping=on_ping,
on_pong=on_pong,
)
ws.run_forever(ping_interval=20, ping_timeout=10)


if __name__ == "__main__":
websocket.enableTrace(False)
connWS()

Golang

// sbe_ob50_client.go
package main

import (
"bytes"
"compress/flate"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"math"
"time"

"github.com/gorilla/websocket"
"yourmodule/quote" // generated SBE package
)

const (
WSURL = "wss://stream.bybit.com/v5/market/sbe"
CHANNEL = "ob.50.sbe.BTCUSDT"
)

func toReal(mantissa int64, exponent int8) float64 {
return float64(mantissa) * math.Pow10(int(exponent))
}

func decodeOBL50(buf []byte) (*quote.OBL50Event, error) {
var hdr quote.MessageHeader
reader := bytes.NewReader(buf)

// decode messageHeader (little endian)
if err := binary.Read(reader, binary.LittleEndian, &hdr); err != nil {
return nil, fmt.Errorf("read header: %w", err)
}

if hdr.TemplateId != 20001 {
return nil, fmt.Errorf("unexpected templateId: %d", hdr.TemplateId)
}

var msg quote.OBL50Event
// many generators provide WrapForDecode; here assume we can read the fixed block then groups
if err := msg.Decode(reader, int(hdr.BlockLength), int(hdr.Version)); err != nil {
return nil, fmt.Errorf("decode OBL50: %w", err)
}

return &msg, nil
}

func main() {
book := NewOrderBook()

dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
EnableCompression: false,
}

conn, _, err := dialer.Dial(WSURL, nil)
if err != nil {
log.Fatalf("dial: %v", err)
}
defer conn.Close()

// subscribe
sub := map[string]interface{}{
"op": "subscribe",
"args": []string{CHANNEL},
}
if err := conn.WriteJSON(sub); err != nil {
log.Fatalf("subscribe: %v", err)
}

for {
mt, data, err := conn.ReadMessage()
if err != nil {
log.Fatalf("read: %v", err)
}

if mt == websocket.TextMessage {
// control JSON or pong etc
var m map[string]interface{}
_ = json.Unmarshal(data, &m)
continue
}

// if server wraps SBE in per-message deflate, you may need to decompress:
if isDeflatedFrame(data) {
data, err = inflate(data)
if err != nil {
log.Printf("inflate error: %v", err)
continue
}
}

msg, err := decodeOBL50(data)
if err != nil {
log.Printf("decode error: %v", err)
continue
}

u := msg.U
pkgType := msg.PkgType // 0 snapshot, 1 delta
pxExp := msg.PriceExponent
szExp := msg.SizeExponent

// extract levels
var asks, bids [][2]float64
for _, a := range msg.Asks {
p := toReal(a.Price, pxExp)
sz := toReal(a.Size, szExp)
asks = append(asks, [2]float64{p, sz})
}
for _, b := range msg.Bids {
p := toReal(b.Price, pxExp)
sz := toReal(b.Size, szExp)
bids = append(bids, [2]float64{p, sz})
}

// continuity logic:
if u == 1 {
// service restart / precision change snapshot
book.Asks.SnapshotFrom(asks)
book.Bids.SnapshotFrom(bids)
book.LastU = 1
fmt.Printf("[RESET SNAPSHOT] u=%d seq=%d symbol=%s\n", u, msg.Seq, msg.Symbol)
continue
}

if book.LastU != 0 && u != book.LastU+1 {
log.Printf("[WARN] u jump: lastU=%d newU=%d – consider resync", book.LastU, u)
}

if pkgType == quote.PkgTypeEnum_SNAPSHOT {
book.Asks.SnapshotFrom(asks)
book.Bids.SnapshotFrom(bids)
} else {
for _, lv := range asks {
book.Asks.Apply(lv[0], lv[1])
}
for _, lv := range bids {
book.Bids.Apply(lv[0], lv[1])
}
}

book.LastU = u
bestBid := book.Bids.BestBid()
bestAsk := book.Asks.BestAsk()
fmt.Printf("u=%d pkgType=%d bestBid=%.5f bestAsk=%.5f\n", u, pkgType, bestBid, bestAsk)
}
}

// helpers (optional, depending on ws framing)
func isDeflatedFrame(data []byte) bool {
// placeholder: detect by protocol; many setups know from WS sub-protocol
return false
}

func inflate(data []byte) ([]byte, error) {
r := flate.NewReader(bytes.NewReader(data))
defer r.Close()

var out bytes.Buffer
if _, err := out.ReadFrom(r); err != nil {
return nil, err
}
return out.Bytes(), nil
}