perp-arbitrage/exchanges/bybit.py

69 lines
2.5 KiB
Python

import asyncio
import json
import websockets
import requests
class BybitPerpStream:
def __init__(self, quote_filters, chunk_size=50):
self.url = "wss://stream.bybit.com/v5/public/linear"
self.quote_filters = quote_filters
self.chunk_size = chunk_size
self.symbol_chunks = self.get_symbol_chunks()
def get_symbol_chunks(self):
try:
url = "https://api.bybit.com/v5/market/instruments-info"
params = {"category": "linear"}
data = requests.get(url, params=params).json()
symbols = [
item["symbol"]
for item in data["result"]["list"]
if any(q in item["symbol"] for q in self.quote_filters)
]
print("[BYBIT] Starting stream with", len(symbols), "symbols")
return [symbols[i:i + self.chunk_size] for i in range(0, len(symbols), self.chunk_size)]
except Exception as e:
print("[BYBIT REST ERROR]", e)
return []
def normalize_symbol(self, symbol):
return symbol.replace("USDT", "/USDT")
def parse_tick(self, topic, data):
if data.get("bid1Price") is None or data.get("ask1Price") is None:
return None
symbol = topic.split(".")[-1]
return {
"exchange": "bybit",
"symbol": self.normalize_symbol(symbol),
"bid": float(data.get("bid1Price", 0)),
"ask": float(data.get("ask1Price", 0)),
"bid_size": float(data.get("bid1Size", 0)),
"ask_size": float(data.get("ask1Size", 0))
}
async def stream_chunk(self, symbols):
args = [f"tickers.{s}" for s in symbols]
async with websockets.connect(self.url) as ws:
await ws.send(json.dumps({"op": "subscribe", "args": args}))
async for msg in ws:
try:
data = json.loads(msg)
if "data" in data and "topic" in data:
tick = self.parse_tick(data["topic"], data["data"])
if tick:
yield tick
except Exception as e:
print("[BYBIT PARSE ERROR]", e)
async def stream(self):
queue = asyncio.Queue()
async def worker(symbols):
async for tick in self.stream_chunk(symbols):
await queue.put(tick)
tasks = [asyncio.create_task(worker(chunk)) for chunk in self.symbol_chunks]
while True:
tick = await queu