commit e7c36f5018c60a6c4f2e53236f0b663e9e1010ce Author: lucas Date: Wed Mar 26 12:24:14 2025 +0000 Enviar arquivos para "exchanges" diff --git a/exchanges/binance.py b/exchanges/binance.py new file mode 100644 index 0000000..46621d0 --- /dev/null +++ b/exchanges/binance.py @@ -0,0 +1,52 @@ +import asyncio +import json +import websockets +import requests + +class BinancePerpStream: + def __init__(self, quote_filters, chunk_size=50): + self.base_url = "wss://fstream.binance.com/stream" + self.quote_filters = quote_filters + self.chunk_size = chunk_size + self.symbol_chunks = self.get_symbol_chunks() + + def get_symbol_chunks(self): + try: + url = "https://fapi.binance.com/fapi/v1/exchangeInfo" + data = requests.get(url).json() + symbols = [s["symbol"] for s in data["symbols"] if s["quoteAsset"] in self.quote_filters] + print("[BINANCE] Starting stream with", len(symbols), "symbols") + return [symbols[i:i+self.chunk_size] for i in range(0, len(symbols), self.chunk_size)] + except Exception as e: + print("[BINANCE REST ERROR]", e) + return [] + + def normalize_symbol(self, symbol): + return symbol.replace("USDT", "/USDT") + + def parse_tick(self, msg): + data = msg.get("data") + if not data: + return None + symbol = data["s"] + return { + "exchange": "binance", + "symbol": self.normalize_symbol(symbol), + "bid": float(data["b"]), + "ask": float(data["a"]), + "bid_size": float(data["B"]), + "ask_size": float(data["A"]) + } + + async def stream_chunk(self, symbols): + streams = "/".join([f"{s.lower()}@bookTicker" for s in symbols]) + url = f"{self.base_url}?streams={streams}" + async with websockets.connect(url) as ws: + async for msg in ws: + try: + data = json.loads(msg) + tick = self.parse_tick(data) + if tick: + yield tick + except Exception as e: + print("[BINANCE PAR \ No newline at end of file diff --git a/exchanges/bybit.py b/exchanges/bybit.py new file mode 100644 index 0000000..96ff4cd --- /dev/null +++ b/exchanges/bybit.py @@ -0,0 +1,69 @@ +import asyncio +import json +import websockets +import requests + +class BybitPerpStream: + def __init__(self, quote_filters, chunk_size=50): + self.url = "wss://stream.bybit.com/v5/public/linear" + self.quote_filters = quote_filters + self.chunk_size = chunk_size + self.symbol_chunks = self.get_symbol_chunks() + + def get_symbol_chunks(self): + try: + url = "https://api.bybit.com/v5/market/instruments-info" + params = {"category": "linear"} + data = requests.get(url, params=params).json() + symbols = [ + item["symbol"] + for item in data["result"]["list"] + if any(q in item["symbol"] for q in self.quote_filters) + ] + print("[BYBIT] Starting stream with", len(symbols), "symbols") + return [symbols[i:i + self.chunk_size] for i in range(0, len(symbols), self.chunk_size)] + except Exception as e: + print("[BYBIT REST ERROR]", e) + return [] + + def normalize_symbol(self, symbol): + return symbol.replace("USDT", "/USDT") + + def parse_tick(self, topic, data): + if data.get("bid1Price") is None or data.get("ask1Price") is None: + return None + symbol = topic.split(".")[-1] + return { + "exchange": "bybit", + "symbol": self.normalize_symbol(symbol), + "bid": float(data.get("bid1Price", 0)), + "ask": float(data.get("ask1Price", 0)), + "bid_size": float(data.get("bid1Size", 0)), + "ask_size": float(data.get("ask1Size", 0)) + } + + async def stream_chunk(self, symbols): + args = [f"tickers.{s}" for s in symbols] + async with websockets.connect(self.url) as ws: + await ws.send(json.dumps({"op": "subscribe", "args": args})) + async for msg in ws: + try: + data = json.loads(msg) + if "data" in data and "topic" in data: + tick = self.parse_tick(data["topic"], data["data"]) + if tick: + yield tick + except Exception as e: + print("[BYBIT PARSE ERROR]", e) + + async def stream(self): + queue = asyncio.Queue() + + async def worker(symbols): + async for tick in self.stream_chunk(symbols): + await queue.put(tick) + + tasks = [asyncio.create_task(worker(chunk)) for chunk in self.symbol_chunks] + + while True: + tick = await queu \ No newline at end of file diff --git a/exchanges/hyperliquid.py b/exchanges/hyperliquid.py new file mode 100644 index 0000000..3eab89a --- /dev/null +++ b/exchanges/hyperliquid.py @@ -0,0 +1,72 @@ +import asyncio +import json +import websockets +import requests + +class HyperliquidPerpStream: + def __init__(self, quote_filters): + self.url = "wss://api.hyperliquid.xyz/ws" + self.quote_filters = quote_filters + self.symbols = self.get_symbols() + + def get_symbols(self): + try: + url = "https://api.hyperliquid.xyz/info" + headers = {"Content-Type": "application/json"} + payload = {"type": "metaAndAssetCtxs"} + response = requests.post(url, headers=headers, json=payload) + data = response.json() + print("[HYPERLIQUID DEBUG] type:", type(data)) + assets = data[0].get("universe", []) + print("[HYPERLIQUID DEBUG] universe len:", len(assets)) + symbols = [a["name"] for a in assets] + print("[HYPERLIQUID SYMBOLS]", symbols[:10]) + return symbols + except Exception as e: + print("[HYPERLIQUID REST ERROR]", e) + return [] + + def normalize_symbol(self, symbol): + return symbol + "/USD" + + async def stream(self): + symbols = self.symbols + print(f"[hyperliquid] Starting stream with {len(symbols)} symbols") + + while True: + try: + async with websockets.connect(self.url) as ws: + chunk_size = 1 + for i in range(0, len(symbols), chunk_size): + chunk = symbols[i:i + chunk_size] + await ws.send(json.dumps({ + "type": "subscribe", + "channels": [{"name": "l2Book", "symbols": chunk}] + })) + await asyncio.sleep(0.1) + + async for msg in ws: + try: + print(msg) + data = json.loads(msg) + if data.get("type") == "l2Book" and "impactPxs" in data: + symbol = data.get("symbol") + impact = data.get("impactPxs", []) + if len(impact) < 2: + continue + bid = float(impact[0]) + ask = float(impact[1]) + + yield { + "exchange": "hyperliquid", + "symbol": self.normalize_symbol(symbol), + "bid": bid, + "ask": ask, + "bid_size": 0.0, + "ask_size": 0.0 + } + except Exception as e: + print("[HYPERLIQUID PARSE ERROR]", e) + except Exception as e: + print("[HYPERLIQUID WS ERROR]", e) + await asyncio.sleep(5) \ No newline at end of file diff --git a/exchanges/okx.py b/exchanges/okx.py new file mode 100644 index 0000000..68ec670 --- /dev/null +++ b/exchanges/okx.py @@ -0,0 +1,54 @@ +import asyncio +import json +import websockets +import requests + +class OKXPerpStream: + def __init__(self, quote_filters, chunk_size=50): + self.url = "wss://ws.okx.com:8443/ws/v5/public" + self.quote_filters = quote_filters + self.chunk_size = chunk_size + self.symbol_chunks = self.get_symbol_chunks() + + def get_symbol_chunks(self): + try: + url = "https://www.okx.com/api/v5/public/instruments" + params = {"instType": "SWAP"} + data = requests.get(url, params=params).json() + symbols = [item["instId"] for item in data["data"] if any(q in item["settleCcy"] for q in self.quote_filters)] + print("[OKX] Starting stream with", len(symbols), "symbols") + return [symbols[i:i+self.chunk_size] for i in range(0, len(symbols), self.chunk_size)] + except Exception as e: + print("[OKX REST ERROR]", e) + return [] + + def normalize_symbol(self, symbol): + return symbol.replace("-SWAP", "").replace("-", "/") + + def parse_tick(self, data): + symbol = data["instId"] + return { + "exchange": "okx", + "symbol": self.normalize_symbol(symbol), + "bid": float(data["bidPx"]), + "ask": float(data["askPx"]), + "bid_size": float(data["bidSz"]), + "ask_size": float(data["askSz"]) + } + + async def stream_chunk(self, symbols): + args = [{"channel": "tickers", "instId": s} for s in symbols] + async with websockets.connect(self.url) as ws: + await ws.send(json.dumps({"op": "subscribe", "args": args})) + async for msg in ws: + try: + data = json.loads(msg) + if "data" in data: + for item in data["data"]: + tick = self.parse_tick(item) + if tick: + yield tick + except Exception as e: + print("[OKX PARSE ERROR]", e) + + async def stream(se \ No newline at end of file