69 lines
2.5 KiB
Python
69 lines
2.5 KiB
Python
import asyncio
|
|
import json
|
|
import websockets
|
|
import requests
|
|
|
|
class BybitPerpStream:
|
|
def __init__(self, quote_filters, chunk_size=50):
|
|
self.url = "wss://stream.bybit.com/v5/public/linear"
|
|
self.quote_filters = quote_filters
|
|
self.chunk_size = chunk_size
|
|
self.symbol_chunks = self.get_symbol_chunks()
|
|
|
|
def get_symbol_chunks(self):
|
|
try:
|
|
url = "https://api.bybit.com/v5/market/instruments-info"
|
|
params = {"category": "linear"}
|
|
data = requests.get(url, params=params).json()
|
|
symbols = [
|
|
item["symbol"]
|
|
for item in data["result"]["list"]
|
|
if any(q in item["symbol"] for q in self.quote_filters)
|
|
]
|
|
print("[BYBIT] Starting stream with", len(symbols), "symbols")
|
|
return [symbols[i:i + self.chunk_size] for i in range(0, len(symbols), self.chunk_size)]
|
|
except Exception as e:
|
|
print("[BYBIT REST ERROR]", e)
|
|
return []
|
|
|
|
def normalize_symbol(self, symbol):
|
|
return symbol.replace("USDT", "/USDT")
|
|
|
|
def parse_tick(self, topic, data):
|
|
if data.get("bid1Price") is None or data.get("ask1Price") is None:
|
|
return None
|
|
symbol = topic.split(".")[-1]
|
|
return {
|
|
"exchange": "bybit",
|
|
"symbol": self.normalize_symbol(symbol),
|
|
"bid": float(data.get("bid1Price", 0)),
|
|
"ask": float(data.get("ask1Price", 0)),
|
|
"bid_size": float(data.get("bid1Size", 0)),
|
|
"ask_size": float(data.get("ask1Size", 0))
|
|
}
|
|
|
|
async def stream_chunk(self, symbols):
|
|
args = [f"tickers.{s}" for s in symbols]
|
|
async with websockets.connect(self.url) as ws:
|
|
await ws.send(json.dumps({"op": "subscribe", "args": args}))
|
|
async for msg in ws:
|
|
try:
|
|
data = json.loads(msg)
|
|
if "data" in data and "topic" in data:
|
|
tick = self.parse_tick(data["topic"], data["data"])
|
|
if tick:
|
|
yield tick
|
|
except Exception as e:
|
|
print("[BYBIT PARSE ERROR]", e)
|
|
|
|
async def stream(self):
|
|
queue = asyncio.Queue()
|
|
|
|
async def worker(symbols):
|
|
async for tick in self.stream_chunk(symbols):
|
|
await queue.put(tick)
|
|
|
|
tasks = [asyncio.create_task(worker(chunk)) for chunk in self.symbol_chunks]
|
|
|
|
while True:
|
|
tick = await queu |