100行でアービトラージ監視Botを作った
こんにちは。ナナオです。 アービトラージに興味があり、開発してみたいな~と思いつつなかなか手が出せなかったのですが、この度重い腰をあげて開発してみました。 使用技術 監視にはPrometheus + Grafanaを使用しました。 実装はPythonを使い、HTTPリクエストにrequestsを使っています。 実装 早速実装です。 メインになる実装は以下の通りです。 ここの実装が大体100行くらいです。 import time import logging import threading from itertools import combinations # リクエストに使うAPI(自作) from vc_bot import exchange_api from prometheus_client import start_http_server, Gauge, Counter # 必要に応じてログの設定(これは標準エラー出力に出す設定です) logging.basicConfig(level=logging.ERROR) logger = logging.getLogger(__name__) PROFIT_GAUGE = Gauge("profit", "利益", ["exchange", "symbol"]) PROFIT_RATE_GAUGE = Gauge("profit_rate", "利益率", ["exchange", "symbol"]) REQUEST_ERROR_GAUGE = Counter("request_error", "リクエストエラー", ["exchange"]) def worker(): # 対象の取引所 exchange_pair = { "BTC_JPY": ["gmo", "coincheck", "binance", "bitflyer", "zaif", "bitbank", "okcoin"], "ETH_JPY": ["gmo", "coincheck", "binance", "bitflyer", "zaif", "bitbank", "okcoin"], "XRP_JPY": ['gmo', 'coincheck', 'binance', 'bitflyer', 'bitbank', 'okcoin'], "MONA_JPY": ['coincheck', 'bitflyer', 'zaif', 'bitbank'], } while True: for pair, exchanges in exchange_pair.items(): prices = {} # リクエストが成功した取引所のみ格納 success_exchanges = [] for exchange in exchanges: try: api = getattr(exchange_api, exchange)() prices.update({exchange: api.fetch_ticker(pair=pair)}) success_exchanges += [exchange] except Exception: logger.exception("リクエスト中にエラーが発生しました") REQUEST_ERROR_GAUGE.labels( exchange=exchange, ).inc() continue # 各取引所の比較結果を格納するGaugeオブジェクトを初期化 exchange_combination = list(combinations(success_exchanges, 2)) for exchange1, exchange2 in exchange_combination: ex1 = prices[exchange1] ex2 = prices[exchange2] # パターンA: Ex1で買って(Ask)、Ex2で売る(Bid) profit_a = ex2["bid"] - ex1["ask"] profit_rate_a = (profit_a / ex1["ask"]) * 100 print(f"{pair} {exchange1}-{exchange2} profit : {profit_a}") print(f"{pair} {exchange1}-{exchange2} profit rate: {profit_rate_a}") PROFIT_GAUGE.labels( exchange=f"{exchange1}_{exchange2}", symbol=pair ).set(profit_a) PROFIT_RATE_GAUGE.labels( exchange=f"{exchange1}_{exchange2}", symbol=pair ).set(profit_rate_a) # パターンB: Ex2で買って(Ask)、Ex1で売る(Bid) profit_b = ex1["bid"] - ex2["ask"] profit_rate_b = (profit_a / ex2["ask"]) * 100 print(f"{pair} {exchange2}-{exchange1} profit : {profit_b}") print(f"{pair} {exchange2}-{exchange1} profit rate: {profit_rate_b}") PROFIT_GAUGE.labels( exchange=f"{exchange2}_{exchange1}", symbol=pair ).set(profit_b) PROFIT_RATE_GAUGE.labels( exchange=f"{exchange2}_{exchange1}", symbol=pair ).set(profit_rate_b) def main(): # Prometheus ExporterのHTTPサーバーをポート8000で起動 start_http_server(8000) # メトリクス更新ワーカーを別スレッドで実行 worker_thread = threading.Thread(target=worker, daemon=True) worker_thread.start() print("Prometheus metrics server running on port 8000") # メインスレッドを維持 try: while True: time.sleep(1) except KeyboardInterrupt: print("Exiting.") if __name__ == "__main__": main() exchange_apiの実装は以下の通りです。 ...