import asyncio import json import websockets import requests class BybitPerpStream: def __init__(self, quote_filters, chunk_size=50): self.url = "wss://stream.bybit.com/v5/public/linear" self.quote_filters = quote_filters self.chunk_size = chunk_size self.symbol_chunks = self.get_symbol_chunks() def get_symbol_chunks(self): try: url = "https://api.bybit.com/v5/market/instruments-info" params = {"category": "linear"} data = requests.get(url, params=params).json() symbols = [ item["symbol"] for item in data["result"]["list"] if any(q in item["symbol"] for q in self.quote_filters) ] print("[BYBIT] Starting stream with", len(symbols), "symbols") return [symbols[i:i + self.chunk_size] for i in range(0, len(symbols), self.chunk_size)] except Exception as e: print("[BYBIT REST ERROR]", e) return [] def normalize_symbol(self, symbol): return symbol.replace("USDT", "/USDT") def parse_tick(self, topic, data): if data.get("bid1Price") is None or data.get("ask1Price") is None: return None symbol = topic.split(".")[-1] return { "exchange": "bybit", "symbol": self.normalize_symbol(symbol), "bid": float(data.get("bid1Price", 0)), "ask": float(data.get("ask1Price", 0)), "bid_size": float(data.get("bid1Size", 0)), "ask_size": float(data.get("ask1Size", 0)) } async def stream_chunk(self, symbols): args = [f"tickers.{s}" for s in symbols] async with websockets.connect(self.url) as ws: await ws.send(json.dumps({"op": "subscribe", "args": args})) async for msg in ws: try: data = json.loads(msg) if "data" in data and "topic" in data: tick = self.parse_tick(data["topic"], data["data"]) if tick: yield tick except Exception as e: print("[BYBIT PARSE ERROR]", e) async def stream(self): queue = asyncio.Queue() async def worker(symbols): async for tick in self.stream_chunk(symbols): await queue.put(tick) tasks = [asyncio.create_task(worker(chunk)) for chunk in self.symbol_chunks] while True: tick = await queu