Skip to main content

Fast Order Integration

Overview

The Fast Order SBE Channel provides ultra-low-latency order push updates for high-frequency trading (HFT) clients through Market Maker WebSocket (MMWS).

It delivers binary-encoded SBE (Simple Binary Encoding) messages directly from the matching engine for fast order submission, amendment, and cancellation acknowledgements.

This channel is designed for speed and efficiency — it focuses on the latest updates and does not push passive events such as fills or system-initiated cancellations.

Release windows

Futures (linear & inverse)

  • Testnet: 21st Nov 7am UTC+0
  • Mainnet: 2026 Q1

Connection

Testnet

wss://stream-testnet.bybits.org/v5/private

Mainnet

wss://<your-dedicated-MMWS-host>.bybit-aws.com/v5/private

⚠️ Use your dedicated MMWS host.
This channel is not accessible from the standard WebSocket endpoints.

Frame Type

  • SBE messages are sent as binary frames (opcode = 2).
  • Control frames (auth, ping/pong, subscribe/unsubscribe) follow the Bybit V5 API JSON format.

Authentication

Authentication is required immediately after establishing a connection.

Auth Request

{
"req_id": "10001",
"op": "auth",
"args": [
"api_key",
1662350400000,
"signature"
]
}
  • req_id — optional client identifier.
  • timestamp — must be greater than the current time.
  • signature — generated using the Bybit API signing algorithm.

Auth Success Response

{
"success": true,
"ret_msg": "",
"op": "auth",
"conn_id": "cejreaspqfh3sjdnldmg-p"
}

Heartbeat

Send Ping

{"req_id": "100001", "op": "ping"}

Receive Pong

{
"success": true,
"ret_msg": "pong",
"conn_id": "465772b1-7630-4fdc-a492-e003e6f0f260",
"req_id": "100001",
"op": "ping"
}

Subscription

Available Topics

  • order.sbe.resp.spot
  • order.sbe.resp.linear
  • order.sbe.resp.inverse
  • order.sbe.resp.option

Subscribe Example

{
"op": "subscribe",
"args": [
"order.sbe.resp.linear",
"order.sbe.resp.spot",
"order.sbe.resp.option"
]
}

Subscription Acknowledgment

{
"success": true,
"ret_msg": "",
"conn_id": "d30fdpbboasp1pjbe7r0",
"req_id": "abc123",
"op": "subscribe"
}

Push Logic (fast.resp.order)

fast.resp.order messages are actively pushed to clients when the order is initiated by the user (active trading actions).

Scenario Coverage

Scenario / EventPush via Fast Order ChannelNotes
Maker order new (accepted / ack)YesPushes orderStatus = New when maker order is accepted.
Maker order filled / partial filledNoFast Order channel does not push fill updates; only active place/amend/cancel.
Taker order (active side)YesAll active actions initiated by client (place / amend / cancel) as taker or active side.
COT (CloseOnTrigger) orderYes (triggered order)Triggered COT order acts like a new taker order; pushed when it becomes active.
RO / ReduceOnly orderYesNormal push; if rejected due to cost or position, rejection is returned.
Condition / TP-SL triggered orderYesOnce condition triggers and order becomes active, it is pushed.
DCP (Disconnect All Protection)YesPushes when DCP forcibly cancels orders on disconnect.
SMP cancel-taker / Cancel Both (Self Match Protection)YesOnly the cancel-taker variant pushes.
SMP cancel-makerNoMaker-side cancellation not pushed.
MMP (Market Maker Protection)NoMMP trigger cancels not pushed in fast order channel.
Delist / Contract expiry / Option deliveryNoSystem-initiated close; no fast order push.
Order reject (matching / validation reject)YesPushed immediately with rejectReason.
Amend success / rejectYesActive amend ack / reject are pushed.
Cancel success / rejectYesActive cancel ack / reject are pushed.

💡 Note:
Upon channel restart or re-subscription, pushes start from the latest matching event — the focus is speed, not backfill.


OrderLinkId Behavior by Version

Scenario2025 Testnet2025 MainnetNotes
Active new order (user-initiated)PresentPresentClient-initiated place includes user orderLinkId.
Amend / Cancel (user-initiated)PresentPresentClient-initiated amend/cancel includes orderLinkId.
Maker → Taker transition (e.g., price amend)PresentPresentTransition caused by client action includes linkId.
Active new conditional order (user-initiated)PresentPresentConditional orders initiated by user include linkId.
Position set trading stop orderEmptyEmptySystem-created; no orderLinkId.

Message Structure (SBE)

Fast Order pushes are encoded using a dedicated SBE schema.

FastOrderResp Field Reference

Message: FastOrderResp (template id = 21000)

Field IDNameTypeUnit / FormatDescription / Notes
1categoryuint8enum (1–4)Market category — 1=spot, 2=linear, 3=inverse, 4=option.
2sideuint8enum (1–2)Order side — 1=Buy, 2=Sell.
3orderStatusuint8enum (5-9)Order state (e.g., 5=Rejected, 6=New, 7=Cancelled, 8=PartiallyFilled, 9=Filled, 0=Others).
4priceExponentint8exponentDecimal places for price. displayPrice = mantissa / 10^priceExponent.
5sizeExponentint8exponentDecimal places for size (quantity). displaySize = mantissa / 10^sizeExponent.
6valueExponentint8exponentDecimal places for value (spot buy notional).
7rejectReasonuint16codeRejection code (0 if not applicable). See rejectReason mapping below.
8priceint64mantissaOrder price mantissa; apply priceExponent.
9qtyint64mantissaOriginal order quantity mantissa; apply sizeExponent.
10leavesQtyint64mantissaRemaining unfilled quantity mantissa; apply sizeExponent.
11valueint64mantissaSpot buy only; initial value mantissa; apply valueExponent, otherwise 0.
12leavesValueint64mantissaSpot buy only; remaining value mantissa; apply valueExponent, otherwise 0.
13creationTimeint64microsecondsOrder creation timestamp in Fast Order channel.
14updatedTimeint64microsecondsMatching engine timestamp when order was updated.
15seqint64sequential IDCross sequence ID.
100symbolNamevarString8UTF-8 string (1-byte length)Symbol name (e.g., BTCUSDT).
101orderIdvarString8UTF-8 stringOrder unique identifier (UUID).
102orderLinkIdvarString8UTF-8 stringOptional client-side identifier (present for user-initiated orders).

rejectReason mapping

  0 EC_NoError
1 EC_Others
2 EC_UnknownMessageType
3 EC_MissingClOrdID
4 EC_OrderNotExist
5 EC_MissingOrigClOrdID
6 EC_ClOrdIDOrigClOrdIDAreTheSame
7 EC_OrigClOrdIDDoesNotExist
8 EC_TooLateToCancel
9 EC_UnknownOrderType
10 EC_UnknownSide
11 EC_UnknownTimeInForce
12 EC_WronglyRouted
13 EC_MarketOrderPriceIsNotZero
14 EC_LimitOrderInvalidPrice
15 EC_NoEnoughQtyToFill
16 EC_NoImmediateQtyToFill
17 EC_QtyCannotBeZero
18 EC_PerCancelRequest
19 EC_MarketOrderCannotBePostOnly
20 EC_PostOnlyWillTakeLiquidity
21 EC_CancelReplaceOrder
22 EC_InvalidSymbolStatus
23 EC_MarketOrderNoSupportTIF
24 EC_ReachMaxTradeNum
25 EC_InvalidPriceScale
28 EC_BySelfMatch
29 EC_InvalidSmpType
30 EC_CancelByMMP
31 EC_InCallAuctionStatus
34 EC_InvalidUserType
35 EC_InvalidMirrorOid
36 EC_InvalidMirrorUid
100 EC_EcInvalidQty
101 EC_InvalidAmount
102 EC_LoadOrderCancel
103 EC_CancelForNoFullFill
104 EC_MarketQuoteNoSuppSell
105 EC_DisorderOrderID
106 EC_InvalidBaseValue
107 EC_LoadOrderCanMatch
108 EC_SecurityStatusFail
110 EC_ReachRiskPriceLimit
111 EC_CancelByOrderValueZero
112 EC_CancelByMatchValueZero
113 EC_CancelByMatchValueZero
200 EC_ReachMarketPriceLimit

SBE XML Template (Fast Order Response)

<?xml version="1.0" encoding="UTF-8"?>
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
xmlns:mbx="https://bybit-exchange.github.io/docs/v5/intro"
package="order.fast.sbe"
id="1"
version="0"
semanticVersion="1.0.0"
description="Bybit fast order response SBE schema"
byteOrder="littleEndian"
headerType="messageHeader">

<types>
<composite name="messageHeader" description="Template ID and length of message root">
<type name="blockLength" primitiveType="uint16"/>
<type name="templateId" primitiveType="uint16"/>
<type name="schemaId" primitiveType="uint16"/>
<type name="version" primitiveType="uint16"/>
</composite>

<composite name="varString8" description="Variable length UTF-8 string">
<type name="length" primitiveType="uint8"/>
<type name="varData"
length="0"
primitiveType="uint8"
semanticType="String"
characterEncoding="UTF-8"/>
</composite>
</types>

<!-- Fast order response: active place/cancel/amend acknowledgements -->
<sbe:message name="FastOrderResp" id="21000">

<!-- Routing / classification -->
<field id="1" name="category" type="uint8" description="1=spot, 2=linear, 3=inverse, 4=option"/>

<!-- Side / status / rejection -->
<field id="2" name="side" type="uint8" description="1=Buy, 2=Sell"/>
<field id="3" name="orderStatus" type="uint8" description="Order state enum"/>

<!-- Price / size (mantissas) with exponents -->
<field id="4" name="priceExponent" type="int8" description="Decimal places for price"/>
<field id="5" name="sizeExponent" type="int8" description="Decimal places for size"/>
<field id="6" name="valueExponent" type="int8" description="Decimal places for value"/>
<field id="7" name="rejectReason" type="uint16" description="0 if N/A"/>

<field id="8" name="price" type="int64" mbx:exponent="priceExponent" description="Price mantissa"/>
<field id="9" name="qty" type="int64" mbx:exponent="sizeExponent" description="Order quantity mantissa"/>
<field id="10" name="leavesQty" type="int64" mbx:exponent="sizeExponent" description="Remaining quantity mantissa"/>
<field id="11" name="value" type="int64" mbx:exponent="valueExponent" description="Spot market buy only; otherwise 0"/>
<field id="12" name="leavesValue" type="int64" mbx:exponent="valueExponent" description="Spot market buy only; otherwise 0"/>

<!-- Timing -->
<field id="13" name="creationTime" type="int64" description="Order creation timestamp in Fast order channel (microseconds)"/>
<field id="14" name="updatedTime" type="int64" description="Matching timestamp (microseconds)"/>
<field id="15" name="seq" type="int64" description="Cross sequence ID"/>

<!-- SymbolName -->
<data id="100" name="symbolName" type="varString8" description="Symbol name"/>

<!-- Order identifiers -->
<data id="101" name="orderId" type="varString8" description="Order ID"/>
<data id="102" name="orderLinkId" type="varString8" description="Optional; present for user-initiated orders"/>

</sbe:message>
</sbe:messageSchema>

Binary Sample (as received)

b"R\x00
N\x01\x00\x00\x00\xdb\x84\xd0k\x00\x00\x00\x00f\xb7\x003\x99\x01\x00\x00\x02\x0
6\xa1\xcb\xa1\x00\x00\x00\x00\x00\xe7\xda\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x
00\x00\x00\x00\x00\x04\xc8\xa1\x00\x00\x00\x00\x00
N\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x008\x01\x00\x00\x00\x00\
x00\x00v\xba\x003\x99\x01\x00\x00\x07BTCUSDT"

This shows: header (8 bytes), body (fixed-length fields), then varString8 for the symbol (length byte 0x07, followed by BTCUSDT).


SBE Binary Message

Each push on the Fast Order channel is an SBE-encoded binary frame (opcode = 2).

Example (pseudo-decoded):

{
"category": 2,
"side": 1,
"orderStatus": 1,
"priceExponent": 2,
"sizeExponent": 3,
"valueExponent": 4,
"rejectReason": 0,
"price": 301.23,
"qty": 100.0,
"leavesQty": 40.0,
"value": 30123.0,
"leavesValue": 12049.2,
"creationTime": 1710000000000000,
"updatedTime": 1710000000000500,
"seq": 123456789,
"symbolName": "BTCUSDT",
"orderId": "xxxx",
"orderLinkId": "xxxx"
}

Key Takeaways

AspectFast Order SBE ChannelStandard WS (JSON)
EncodingSBE (binary)JSON
LatencyUltra-low (microseconds)Milliseconds
Data ScopeActive user actions onlyFull lifecycle
Restart BehaviorStarts from latest stateSyncs with trading system
Use CaseHFT, Market MakersGeneral trading

Integration Script

Golang

package main

import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"log"
"math"
"os"
"os/signal"
"time"

"github.com/gorilla/websocket"
)

// ---------- Config ----------

const (
MMWSURLTestnetBybits = "wss://stream-testnet.bybits.org/v5/private"
)

// TODO: fill in your real keys
const (
APIKey = "xxx"
APISecret = "xxxxx"
)

var subTopics = []string{
"order.sbe.resp.linear",
}

// ---------- SBE helpers ----------

func readU8(buf []byte, off *int) (uint8, error) {
if *off+1 > len(buf) {
return 0, fmt.Errorf("readU8: out of range")
}
v := buf[*off]
*off++
return v, nil
}

func readI8(buf []byte, off *int) (int8, error) {
if *off+1 > len(buf) {
return 0, fmt.Errorf("readI8: out of range")
}
v := int8(buf[*off])
*off++
return v, nil
}

func readU16LE(buf []byte, off *int) (uint16, error) {
if *off+2 > len(buf) {
return 0, fmt.Errorf("readU16LE: out of range")
}
v := binary.LittleEndian.Uint16(buf[*off : *off+2])
*off += 2
return v, nil
}

func readI64LE(buf []byte, off *int) (int64, error) {
if *off+8 > len(buf) {
return 0, fmt.Errorf("readI64LE: out of range")
}
v := int64(binary.LittleEndian.Uint64(buf[*off : *off+8]))
*off += 8
return v, nil
}

func readVarString8(buf []byte, off *int) (string, error) {
if *off+1 > len(buf) {
return "", fmt.Errorf("readVarString8: no length byte")
}
ln := int(buf[*off])
*off++
if ln == 0 {
return "", nil
}
if *off+ln > len(buf) {
return "", fmt.Errorf("readVarString8: length out of range")
}
s := string(buf[*off : *off+ln])
*off += ln
return s, nil
}

// applyExp replicates the Python apply_exp(mantissa, exp)
func applyExp(mantissa int64, exp int8) float64 {
e := int(exp)
if e >= 0 {
return float64(mantissa) / math.Pow10(e)
}
return float64(mantissa) * math.Pow10(-e)
}

// ---------- Fast Order SBE decode ----------

type FastOrderSBEResp struct {
SBEHeader struct {
BlockLength uint16 `json:"blockLength"`
TemplateID uint16 `json:"templateId"`
SchemaID uint16 `json:"schemaId"`
Version uint16 `json:"version"`
} `json:"_sbe_header"`

Category uint8 `json:"category"`
Side uint8 `json:"side"`
OrderStatus uint8 `json:"orderStatus"`
PriceExponent int8 `json:"priceExponent"`
SizeExponent int8 `json:"sizeExponent"`
ValExponent int8 `json:"valueExponent"`
RejectReason uint16 `json:"rejectReason"`
PriceMantissa int64 `json:"priceMantissa"`
QtyMantissa int64 `json:"qtyMantissa"`
LeavesQtyMantissa int64 `json:"leavesQtyMantissa"`
ValueMantissa int64 `json:"valueMantissa"`
LeavesValueMantissa int64 `json:"leavesValueMantissa"`
CreationTime int64 `json:"creationTime"`
UpdatedTime int64 `json:"updatedTime"`
Seq int64 `json:"seq"`
SymbolName string `json:"symbolName"`
OrderID string `json:"orderId"`
OrderLinkID string `json:"orderLinkId"`

Price float64 `json:"price"`
Qty float64 `json:"qty"`
LeavesQty float64 `json:"leavesQty"`
Value float64 `json:"value"`
LeavesValue float64 `json:"leavesValue"`

RawOffsetEnd int `json:"_raw_offset_end"`
}

func decodeFastOrderResp(payload []byte, debug bool) (*FastOrderSBEResp, error) {
if len(payload) < 8 {
return nil, fmt.Errorf("payload too short for SBE header")
}

off := 0
blockLen := binary.LittleEndian.Uint16(payload[off : off+2])
templateID := binary.LittleEndian.Uint16(payload[off+2 : off+4])
schemaID := binary.LittleEndian.Uint16(payload[off+4 : off+6])
version := binary.LittleEndian.Uint16(payload[off+6 : off+8])
off += 8

if debug {
log.Printf("HEADER: block_len=%d, template_id=%d, schema_id=%d, version=%d",
blockLen, templateID, schemaID, version)
}

// Only handle template 21000 for now
if templateID != 21000 {
return nil, fmt.Errorf("unexpected templateId: %d", templateID)
}

var err error
resp := &FastOrderSBEResp{}
resp.SBEHeader.BlockLength = blockLen
resp.SBEHeader.TemplateID = templateID
resp.SBEHeader.SchemaID = schemaID
resp.SBEHeader.Version = version

// fixed fields in order
if resp.Category, err = readU8(payload, &off); err != nil {
return nil, err
}
if resp.Side, err = readU8(payload, &off); err != nil {
return nil, err
}
if resp.OrderStatus, err = readU8(payload, &off); err != nil {
return nil, err
}
if resp.PriceExponent, err = readI8(payload, &off); err != nil {
return nil, err
}
if resp.SizeExponent, err = readI8(payload, &off); err != nil {
return nil, err
}
if resp.ValExponent, err = readI8(payload, &off); err != nil {
return nil, err
}
if resp.RejectReason, err = readU16LE(payload, &off); err != nil {
return nil, err
}
if resp.PriceMantissa, err = readI64LE(payload, &off); err != nil {
return nil, err
}
if resp.QtyMantissa, err = readI64LE(payload, &off); err != nil {
return nil, err
}
if resp.LeavesQtyMantissa, err = readI64LE(payload, &off); err != nil {
return nil, err
}
if resp.ValueMantissa, err = readI64LE(payload, &off); err != nil {
return nil, err
}
if resp.LeavesValueMantissa, err = readI64LE(payload, &off); err != nil {
return nil, err
}
if resp.CreationTime, err = readI64LE(payload, &off); err != nil {
return nil, err
}
if resp.UpdatedTime, err = readI64LE(payload, &off); err != nil {
return nil, err
}
if resp.Seq, err = readI64LE(payload, &off); err != nil {
return nil, err
}

// strings
if resp.SymbolName, err = readVarString8(payload, &off); err != nil {
return nil, err
}
if resp.OrderID, err = readVarString8(payload, &off); err != nil {
return nil, err
}
if resp.OrderLinkID, err = readVarString8(payload, &off); err != nil {
return nil, err
}

// derived fields
resp.Price = applyExp(resp.PriceMantissa, resp.PriceExponent)
resp.Qty = applyExp(resp.QtyMantissa, resp.SizeExponent)
resp.LeavesQty = applyExp(resp.LeavesQtyMantissa, resp.SizeExponent)
resp.Value = applyExp(resp.ValueMantissa, resp.ValExponent)
resp.LeavesValue = applyExp(resp.LeavesValueMantissa, resp.ValExponent)
resp.RawOffsetEnd = off

return resp, nil
}

// ---------- WebSocket helpers ----------

func sendJSON(conn *websocket.Conn, v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
return conn.WriteMessage(websocket.TextMessage, data)
}

func signAuth(secret, value string) string {
h := hmac.New(sha256.New, []byte(secret))
h.Write([]byte(value))
return hex.EncodeToString(h.Sum(nil))
}

func heartbeat(ctx context.Context, conn *websocket.Conn) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
reqID := fmt.Sprintf("%d", time.Now().UnixMilli())
err := sendJSON(conn, map[string]interface{}{
"req_id": reqID,
"op": "ping",
})
if err != nil {
log.Printf("[heartbeat] error sending ping: %v", err)
return
}
}
}
}

// ---------- Main run ----------

func run(ctx context.Context, url string) error {
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
EnableCompression: false,
}

conn, _, err := dialer.Dial(url, nil)
if err != nil {
return fmt.Errorf("dial error: %w", err)
}
defer conn.Close()
log.Printf("Connected to %s", url)

// auth
expires := (time.Now().Unix() + 10000) * 1000
val := fmt.Sprintf("GET/realtime%d", expires)
sig := signAuth(APISecret, val)

authMsg := map[string]interface{}{
"req_id": "10001",
"op": "auth",
"args": []interface{}{APIKey, expires, sig},
}
if err := sendJSON(conn, authMsg); err != nil {
return fmt.Errorf("send auth error: %w", err)
}

// auth ack
if _, msg, err := conn.ReadMessage(); err != nil {
return fmt.Errorf("read auth ack error: %w", err)
} else {
log.Printf("auth-ack: %s", string(msg))
}

// subscribe
subMsg := map[string]interface{}{
"op": "subscribe",
"args": subTopics,
}
if err := sendJSON(conn, subMsg); err != nil {
return fmt.Errorf("send subscribe error: %w", err)
}

// heartbeat
hbCtx, hbCancel := context.WithCancel(ctx)
defer hbCancel()
go heartbeat(hbCtx, conn)

// read loop
for {
select {
case <-ctx.Done():
log.Printf("context canceled, exit read loop")
return nil
default:
}

mt, data, err := conn.ReadMessage()
if err != nil {
return fmt.Errorf("read message error: %w", err)
}

switch mt {
case websocket.BinaryMessage:
resp, err := decodeFastOrderResp(data, false)
if err != nil {
log.Printf("binary decode error: %v", err)
} else {
j, _ := json.Marshal(resp)
log.Printf("FAST_ORDER_SBE: %s", string(j))
}
case websocket.TextMessage:
var obj map[string]interface{}
if err := json.Unmarshal(data, &obj); err != nil {
log.Printf("text-nonjson: %s", string(data))
continue
}
if op, ok := obj["op"].(string); ok && op == "pong" {
// ignore pong
continue
}
j, _ := json.Marshal(obj)
log.Printf("control: %s", string(j))
default:
log.Printf("unknown message type %d", mt)
}
}
}

// ---------- Entry ----------

func main() {
url := flag.String("url", MMWSURLTestnetBybits, "WebSocket URL")
flag.Parse()

if APIKey == "YOUR_API_KEY" || APISecret == "YOUR_API_SECRET" {
log.Println("⚠️ Please set APIKey and APISecret in the source before running.")
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

if err := run(ctx, *url); err != nil {
log.Fatalf("run error: %v", err)
}
}

Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import asyncio
import json
import hmac
import time
import struct
from typing import Tuple, Dict, Any

import websockets
import logging

logger = logging.getLogger("fast_order")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s"))
logger.addHandler(handler)

MMWS_URL_TESTNET = "wss://stream-testnet.bybits.org/v5/private"
URL_TESTNET = "wss://stream-testnet.bybit.com/v5/private"
MMWS_URL_MAINNET = "wss://stream.bybit.com/v5/private"

API_KEY = "xxx"
API_SECRET = "xxx"

SUB_TOPICS = ["order.sbe.resp.linear"]


def read_u8(buf: memoryview, off: int) -> Tuple[int, int]:
return buf[off], off + 1


def read_i8(buf: memoryview, off: int) -> Tuple[int, int]:
b = struct.unpack_from("<b", buf, off)[0]
return b, off + 1


def read_u16_le(buf: memoryview, off: int) -> Tuple[int, int]:
v = struct.unpack_from("<H", buf, off)[0]
return v, off + 2


def read_i64_le(buf: memoryview, off: int) -> Tuple[int, int]:
v = struct.unpack_from("<q", buf, off)[0]
return v, off + 8


def read_varstring8(buf: memoryview, off: int) -> Tuple[str, int]:
ln = buf[off] # uint8 length
off += 1
if ln == 0:
return "", off
s = bytes(buf[off : off + ln]).decode("utf-8", "replace")
return s, off + ln


def apply_exp(mantissa: int, exp: int) -> float:
if exp >= 0:
return mantissa / (10 ** exp)
else:
return mantissa * (10 ** (-exp))


def decode_fast_order_resp(payload: bytes, debug: bool = False) -> Dict[str, Any]:
mv = memoryview(payload)
off = 0

# header (8 bytes)
if len(mv) < 8:
raise ValueError("payload too short for SBE header")

block_len, template_id, schema_id, version = struct.unpack_from("<HHHH", mv, off)
off += 8

if debug:
print(
f"HEADER: block_len={block_len}, template_id={template_id}, "
f"schema_id={schema_id}, version={version}"
)
print("payload hex:", payload.hex())

if template_id != 21000:
return {"_warn": f"unexpected_template_id:{template_id}", "_raw": payload.hex()}

# fixed fields in schema order
category, off = read_u8(mv, off) # 1
side, off = read_u8(mv, off) # 2
order_status, off = read_u8(mv, off) # 3
price_exp, off = read_i8(mv, off) # 4
size_exp, off = read_i8(mv, off) # 5
value_exp, off = read_i8(mv, off) # 6
reject_reason, off = read_u16_le(mv, off) # 7
price, off = read_i64_le(mv, off) # 8
qty, off = read_i64_le(mv, off) # 9
leaves_qty, off = read_i64_le(mv, off) # 10
value, off = read_i64_le(mv, off) # 11
leaves_value, off = read_i64_le(mv, off) # 12
creation_time_us, off = read_i64_le(mv, off) # 13
updated_time_us, off = read_i64_le(mv, off) # 14
seq, off = read_i64_le(mv, off) # 15

if debug:
print("after fixed fields offset:", off)

symbol_name, off = read_varstring8(mv, off)
order_id, off = read_varstring8(mv, off)
order_link_id, off = read_varstring8(mv, off)

out = {
"_sbe_header": {
"blockLength": block_len,
"templateId": template_id,
"schemaId": schema_id,
"version": version,
},
"category": category,
"side": side,
"orderStatus": order_status,
"priceExponent": price_exp,
"sizeExponent": size_exp,
"valueExponent": value_exp,
"rejectReason": reject_reason,
"priceMantissa": price,
"qtyMantissa": qty,
"leavesQtyMantissa": leaves_qty,
"leavesValueMantissa": leaves_value,
"price": apply_exp(price, price_exp),
"qty": apply_exp(qty, size_exp),
"leavesQty": apply_exp(leaves_qty, size_exp),
"value": apply_exp(value, value_exp),
"leavesValue": apply_exp(leaves_value, value_exp),
"creationTime": creation_time_us,
"updatedTime": updated_time_us,
"seq": seq,
"symbolName": symbol_name,
"orderId": order_id,
"orderLinkId": order_link_id,
"_raw_offset_end": off,
}
return out


async def send_json(ws, obj):
await ws.send(json.dumps(obj, separators=(",", ":")))


async def heartbeat(ws):
while True:
await asyncio.sleep(10)
try:
await send_json(
ws,
{"req_id": str(int(time.time() * 1000)), "op": "ping"},
)
except Exception:
return


async def run(url: str):
async with websockets.connect(url, max_size=None) as ws:
# auth
expires = int((time.time() + 10000) * 1000)
_val = f"GET/realtime{expires}"
signature = hmac.new(
API_SECRET.encode("utf-8"),
_val.encode("utf-8"),
digestmod="sha256",
).hexdigest()

await send_json(
ws,
{
"req_id": "10001",
"op": "auth",
"args": [API_KEY, expires, signature],
},
)

# auth ack
ack = await ws.recv()
logger.info(ack)

# subscribe topics
await send_json(ws, {"op": "subscribe", "args": SUB_TOPICS})

# heartbeat
asyncio.create_task(heartbeat(ws))

while True:
frame = await ws.recv()

if isinstance(frame, (bytes, bytearray)):
try:
decoded = decode_fast_order_resp(frame)
logger.info(json.dumps(decoded, ensure_ascii=False))
except Exception as e:
print("binary-decode-error:", e)
else:
# text JSON
try:
obj = json.loads(frame)
if "op" in obj and obj["op"] == "pong":
continue
logger.info(obj)
except Exception:
print("text-nonjson:", frame)


if __name__ == "__main__":
asyncio.run(run(MMWS_URL_TESTNET))