Enviar arquivos para "exchanges"
commit
e7c36f5018
|
@ -0,0 +1,52 @@
|
|||
import asyncio
|
||||
import json
|
||||
import websockets
|
||||
import requests
|
||||
|
||||
class BinancePerpStream:
|
||||
def __init__(self, quote_filters, chunk_size=50):
|
||||
self.base_url = "wss://fstream.binance.com/stream"
|
||||
self.quote_filters = quote_filters
|
||||
self.chunk_size = chunk_size
|
||||
self.symbol_chunks = self.get_symbol_chunks()
|
||||
|
||||
def get_symbol_chunks(self):
|
||||
try:
|
||||
url = "https://fapi.binance.com/fapi/v1/exchangeInfo"
|
||||
data = requests.get(url).json()
|
||||
symbols = [s["symbol"] for s in data["symbols"] if s["quoteAsset"] in self.quote_filters]
|
||||
print("[BINANCE] Starting stream with", len(symbols), "symbols")
|
||||
return [symbols[i:i+self.chunk_size] for i in range(0, len(symbols), self.chunk_size)]
|
||||
except Exception as e:
|
||||
print("[BINANCE REST ERROR]", e)
|
||||
return []
|
||||
|
||||
def normalize_symbol(self, symbol):
|
||||
return symbol.replace("USDT", "/USDT")
|
||||
|
||||
def parse_tick(self, msg):
|
||||
data = msg.get("data")
|
||||
if not data:
|
||||
return None
|
||||
symbol = data["s"]
|
||||
return {
|
||||
"exchange": "binance",
|
||||
"symbol": self.normalize_symbol(symbol),
|
||||
"bid": float(data["b"]),
|
||||
"ask": float(data["a"]),
|
||||
"bid_size": float(data["B"]),
|
||||
"ask_size": float(data["A"])
|
||||
}
|
||||
|
||||
async def stream_chunk(self, symbols):
|
||||
streams = "/".join([f"{s.lower()}@bookTicker" for s in symbols])
|
||||
url = f"{self.base_url}?streams={streams}"
|
||||
async with websockets.connect(url) as ws:
|
||||
async for msg in ws:
|
||||
try:
|
||||
data = json.loads(msg)
|
||||
tick = self.parse_tick(data)
|
||||
if tick:
|
||||
yield tick
|
||||
except Exception as e:
|
||||
print("[BINANCE PAR
|
|
@ -0,0 +1,69 @@
|
|||
import asyncio
|
||||
import json
|
||||
import websockets
|
||||
import requests
|
||||
|
||||
class BybitPerpStream:
|
||||
def __init__(self, quote_filters, chunk_size=50):
|
||||
self.url = "wss://stream.bybit.com/v5/public/linear"
|
||||
self.quote_filters = quote_filters
|
||||
self.chunk_size = chunk_size
|
||||
self.symbol_chunks = self.get_symbol_chunks()
|
||||
|
||||
def get_symbol_chunks(self):
|
||||
try:
|
||||
url = "https://api.bybit.com/v5/market/instruments-info"
|
||||
params = {"category": "linear"}
|
||||
data = requests.get(url, params=params).json()
|
||||
symbols = [
|
||||
item["symbol"]
|
||||
for item in data["result"]["list"]
|
||||
if any(q in item["symbol"] for q in self.quote_filters)
|
||||
]
|
||||
print("[BYBIT] Starting stream with", len(symbols), "symbols")
|
||||
return [symbols[i:i + self.chunk_size] for i in range(0, len(symbols), self.chunk_size)]
|
||||
except Exception as e:
|
||||
print("[BYBIT REST ERROR]", e)
|
||||
return []
|
||||
|
||||
def normalize_symbol(self, symbol):
|
||||
return symbol.replace("USDT", "/USDT")
|
||||
|
||||
def parse_tick(self, topic, data):
|
||||
if data.get("bid1Price") is None or data.get("ask1Price") is None:
|
||||
return None
|
||||
symbol = topic.split(".")[-1]
|
||||
return {
|
||||
"exchange": "bybit",
|
||||
"symbol": self.normalize_symbol(symbol),
|
||||
"bid": float(data.get("bid1Price", 0)),
|
||||
"ask": float(data.get("ask1Price", 0)),
|
||||
"bid_size": float(data.get("bid1Size", 0)),
|
||||
"ask_size": float(data.get("ask1Size", 0))
|
||||
}
|
||||
|
||||
async def stream_chunk(self, symbols):
|
||||
args = [f"tickers.{s}" for s in symbols]
|
||||
async with websockets.connect(self.url) as ws:
|
||||
await ws.send(json.dumps({"op": "subscribe", "args": args}))
|
||||
async for msg in ws:
|
||||
try:
|
||||
data = json.loads(msg)
|
||||
if "data" in data and "topic" in data:
|
||||
tick = self.parse_tick(data["topic"], data["data"])
|
||||
if tick:
|
||||
yield tick
|
||||
except Exception as e:
|
||||
print("[BYBIT PARSE ERROR]", e)
|
||||
|
||||
async def stream(self):
|
||||
queue = asyncio.Queue()
|
||||
|
||||
async def worker(symbols):
|
||||
async for tick in self.stream_chunk(symbols):
|
||||
await queue.put(tick)
|
||||
|
||||
tasks = [asyncio.create_task(worker(chunk)) for chunk in self.symbol_chunks]
|
||||
|
||||
while True:
|
||||
tick = await queu
|
|
@ -0,0 +1,72 @@
|
|||
import asyncio
|
||||
import json
|
||||
import websockets
|
||||
import requests
|
||||
|
||||
class HyperliquidPerpStream:
|
||||
def __init__(self, quote_filters):
|
||||
self.url = "wss://api.hyperliquid.xyz/ws"
|
||||
self.quote_filters = quote_filters
|
||||
self.symbols = self.get_symbols()
|
||||
|
||||
def get_symbols(self):
|
||||
try:
|
||||
url = "https://api.hyperliquid.xyz/info"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
payload = {"type": "metaAndAssetCtxs"}
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
data = response.json()
|
||||
print("[HYPERLIQUID DEBUG] type:", type(data))
|
||||
assets = data[0].get("universe", [])
|
||||
print("[HYPERLIQUID DEBUG] universe len:", len(assets))
|
||||
symbols = [a["name"] for a in assets]
|
||||
print("[HYPERLIQUID SYMBOLS]", symbols[:10])
|
||||
return symbols
|
||||
except Exception as e:
|
||||
print("[HYPERLIQUID REST ERROR]", e)
|
||||
return []
|
||||
|
||||
def normalize_symbol(self, symbol):
|
||||
return symbol + "/USD"
|
||||
|
||||
async def stream(self):
|
||||
symbols = self.symbols
|
||||
print(f"[hyperliquid] Starting stream with {len(symbols)} symbols")
|
||||
|
||||
while True:
|
||||
try:
|
||||
async with websockets.connect(self.url) as ws:
|
||||
chunk_size = 1
|
||||
for i in range(0, len(symbols), chunk_size):
|
||||
chunk = symbols[i:i + chunk_size]
|
||||
await ws.send(json.dumps({
|
||||
"type": "subscribe",
|
||||
"channels": [{"name": "l2Book", "symbols": chunk}]
|
||||
}))
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async for msg in ws:
|
||||
try:
|
||||
print(msg)
|
||||
data = json.loads(msg)
|
||||
if data.get("type") == "l2Book" and "impactPxs" in data:
|
||||
symbol = data.get("symbol")
|
||||
impact = data.get("impactPxs", [])
|
||||
if len(impact) < 2:
|
||||
continue
|
||||
bid = float(impact[0])
|
||||
ask = float(impact[1])
|
||||
|
||||
yield {
|
||||
"exchange": "hyperliquid",
|
||||
"symbol": self.normalize_symbol(symbol),
|
||||
"bid": bid,
|
||||
"ask": ask,
|
||||
"bid_size": 0.0,
|
||||
"ask_size": 0.0
|
||||
}
|
||||
except Exception as e:
|
||||
print("[HYPERLIQUID PARSE ERROR]", e)
|
||||
except Exception as e:
|
||||
print("[HYPERLIQUID WS ERROR]", e)
|
||||
await asyncio.sleep(5)
|
|
@ -0,0 +1,54 @@
|
|||
import asyncio
|
||||
import json
|
||||
import websockets
|
||||
import requests
|
||||
|
||||
class OKXPerpStream:
|
||||
def __init__(self, quote_filters, chunk_size=50):
|
||||
self.url = "wss://ws.okx.com:8443/ws/v5/public"
|
||||
self.quote_filters = quote_filters
|
||||
self.chunk_size = chunk_size
|
||||
self.symbol_chunks = self.get_symbol_chunks()
|
||||
|
||||
def get_symbol_chunks(self):
|
||||
try:
|
||||
url = "https://www.okx.com/api/v5/public/instruments"
|
||||
params = {"instType": "SWAP"}
|
||||
data = requests.get(url, params=params).json()
|
||||
symbols = [item["instId"] for item in data["data"] if any(q in item["settleCcy"] for q in self.quote_filters)]
|
||||
print("[OKX] Starting stream with", len(symbols), "symbols")
|
||||
return [symbols[i:i+self.chunk_size] for i in range(0, len(symbols), self.chunk_size)]
|
||||
except Exception as e:
|
||||
print("[OKX REST ERROR]", e)
|
||||
return []
|
||||
|
||||
def normalize_symbol(self, symbol):
|
||||
return symbol.replace("-SWAP", "").replace("-", "/")
|
||||
|
||||
def parse_tick(self, data):
|
||||
symbol = data["instId"]
|
||||
return {
|
||||
"exchange": "okx",
|
||||
"symbol": self.normalize_symbol(symbol),
|
||||
"bid": float(data["bidPx"]),
|
||||
"ask": float(data["askPx"]),
|
||||
"bid_size": float(data["bidSz"]),
|
||||
"ask_size": float(data["askSz"])
|
||||
}
|
||||
|
||||
async def stream_chunk(self, symbols):
|
||||
args = [{"channel": "tickers", "instId": s} for s in symbols]
|
||||
async with websockets.connect(self.url) as ws:
|
||||
await ws.send(json.dumps({"op": "subscribe", "args": args}))
|
||||
async for msg in ws:
|
||||
try:
|
||||
data = json.loads(msg)
|
||||
if "data" in data:
|
||||
for item in data["data"]:
|
||||
tick = self.parse_tick(item)
|
||||
if tick:
|
||||
yield tick
|
||||
except Exception as e:
|
||||
print("[OKX PARSE ERROR]", e)
|
||||
|
||||
async def stream(se
|
Loading…
Reference in New Issue