こんにちは。ナナオです。
アービトラージに興味があり、開発してみたいな~と思いつつなかなか手が出せなかったのですが、この度重い腰をあげて開発してみました。
使用技術
監視にはPrometheus + Grafanaを使用しました。
実装はPythonを使い、HTTPリクエストにrequestsを使っています。
実装
早速実装です。
メインになる実装は以下の通りです。
ここの実装が大体100行くらいです。
import time
import logging
import threading
from itertools import combinations
# リクエストに使うAPI(自作)
from vc_bot import exchange_api
from prometheus_client import start_http_server, Gauge, Counter
# 必要に応じてログの設定(これは標準エラー出力に出す設定です)
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)
PROFIT_GAUGE = Gauge("profit", "利益", ["exchange", "symbol"])
PROFIT_RATE_GAUGE = Gauge("profit_rate", "利益率", ["exchange", "symbol"])
REQUEST_ERROR_GAUGE = Counter("request_error", "リクエストエラー", ["exchange"])
def worker():
# 対象の取引所
exchange_pair = {
"BTC_JPY": ["gmo", "coincheck", "binance", "bitflyer", "zaif", "bitbank", "okcoin"],
"ETH_JPY": ["gmo", "coincheck", "binance", "bitflyer", "zaif", "bitbank", "okcoin"],
"XRP_JPY": ['gmo', 'coincheck', 'binance', 'bitflyer', 'bitbank', 'okcoin'],
"MONA_JPY": ['coincheck', 'bitflyer', 'zaif', 'bitbank'],
}
while True:
for pair, exchanges in exchange_pair.items():
prices = {}
# リクエストが成功した取引所のみ格納
success_exchanges = []
for exchange in exchanges:
try:
api = getattr(exchange_api, exchange)()
prices.update({exchange: api.fetch_ticker(pair=pair)})
success_exchanges += [exchange]
except Exception:
logger.exception("リクエスト中にエラーが発生しました")
REQUEST_ERROR_GAUGE.labels(
exchange=exchange,
).inc()
continue
# 各取引所の比較結果を格納するGaugeオブジェクトを初期化
exchange_combination = list(combinations(success_exchanges, 2))
for exchange1, exchange2 in exchange_combination:
ex1 = prices[exchange1]
ex2 = prices[exchange2]
# パターンA: Ex1で買って(Ask)、Ex2で売る(Bid)
profit_a = ex2["bid"] - ex1["ask"]
profit_rate_a = (profit_a / ex1["ask"]) * 100
print(f"{pair} {exchange1}-{exchange2} profit : {profit_a}")
print(f"{pair} {exchange1}-{exchange2} profit rate: {profit_rate_a}")
PROFIT_GAUGE.labels(
exchange=f"{exchange1}_{exchange2}",
symbol=pair
).set(profit_a)
PROFIT_RATE_GAUGE.labels(
exchange=f"{exchange1}_{exchange2}",
symbol=pair
).set(profit_rate_a)
# パターンB: Ex2で買って(Ask)、Ex1で売る(Bid)
profit_b = ex1["bid"] - ex2["ask"]
profit_rate_b = (profit_a / ex2["ask"]) * 100
print(f"{pair} {exchange2}-{exchange1} profit : {profit_b}")
print(f"{pair} {exchange2}-{exchange1} profit rate: {profit_rate_b}")
PROFIT_GAUGE.labels(
exchange=f"{exchange2}_{exchange1}",
symbol=pair
).set(profit_b)
PROFIT_RATE_GAUGE.labels(
exchange=f"{exchange2}_{exchange1}",
symbol=pair
).set(profit_rate_b)
def main():
# Prometheus ExporterのHTTPサーバーをポート8000で起動
start_http_server(8000)
# メトリクス更新ワーカーを別スレッドで実行
worker_thread = threading.Thread(target=worker, daemon=True)
worker_thread.start()
print("Prometheus metrics server running on port 8000")
# メインスレッドを維持
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Exiting.")
if __name__ == "__main__":
main()
exchange_apiの実装は以下の通りです。
from abc import ABC, abstractmethod
from typing import Dict
import requests
class Exchange(ABC):
@abstractmethod
def fetch_ticker(self, pair: str) -> Dict[str, float]:
"""
取引所のTicker情報を取得し、BidとAskを返します。
Args:
pair (str): 通貨ペア
Returns:
Dict[str, float]: 売り価格と買い価格の辞書
例: {'bid': float, 'ask': float}
"""
class gmo(Exchange):
def __init__(self):
self._api_base = "https://api.coin.z.com/"
def fetch_ticker(self, pair: str = "btc_jpy") -> Dict[str, float]:
pair = pair.upper()
endpoint = f"{self._api_base}public/v1/ticker?symbol={pair}"
res = requests.get(endpoint)
res_json = res.json()
return {
"ask": float(res_json["data"][0]["ask"]),
"bid": float(res_json["data"][0]["bid"]),
}
class coincheck(Exchange):
def __init__(self):
self._api_base = "https://coincheck.com/"
def fetch_ticker(self, pair: str = "btc_jpy") -> Dict[str, float]:
pair = pair.lower()
endpoint = f"{self._api_base}api/ticker?pair={pair}"
res = requests.get(endpoint)
res_json = res.json()
return {
"ask": float(res_json["ask"]),
"bid": float(res_json["bid"]),
}
class binance(Exchange):
"""参考サイト: https://developers.binance.com/docs/binance-spot-api-docs/testnet/rest-api/market-data-endpoints
"""
def __init__(self):
self._api_base = "https://api.binance.com/"
def fetch_ticker(self, pair: str = "btc_jpy") -> Dict[str, float]:
pair = pair.replace("_", "").upper()
endpoint = f"{self._api_base}api/v3/ticker/bookTicker?symbol={pair}"
res = requests.get(endpoint)
res_json = res.json()
return {
"ask": float(res_json["askPrice"]),
"bid": float(res_json["bidPrice"]),
}
class bitflyer(Exchange):
def __init__(self):
self._api_base = "https://api.bitflyer.com/"
def fetch_ticker(self, pair: str = "btc_jpy") -> Dict[str, float]:
"""https://lightning.bitflyer.com/docs?lang=ja#ticker"""
endpoint = f"{self._api_base}v1/ticker?product_code={pair}"
res = requests.get(endpoint)
res_json = res.json()
return {
"ask": float(res_json["best_ask"]),
"bid": float(res_json["best_bid"]),
}
class zaif(Exchange):
def __init__(self):
self._api_base = "https://api.zaif.jp/"
def fetch_ticker(self, pair: str = "btc_jpy"):
pair = pair.lower()
endpoint = f"{self._api_base}api/1/ticker/{pair}"
res = requests.get(endpoint)
return {
"ask": float(res.json()["ask"]),
"bid": float(res.json()["bid"])
}
class bitbank(Exchange):
def __init__(self):
self._api_base = "https://public.bitbank.cc/"
def fetch_ticker(self, pair: str = "btc_jpy"):
"""https://github.com/bitbankinc/bitbank-api-docs/blob/master/public-api_JP.md#%E3%83%86%E3%82%A3%E3%83%83%E3%82%AB%E3%83%BC"""
pair = pair.lower()
endpoint = f"{self._api_base}{pair}/ticker"
res = requests.get(endpoint)
res_json = res.json()
return {
"ask": float(res_json["data"]["sell"]),
"bid": float(res_json["data"]["buy"]),
}
class okcoin(Exchange):
def __init__(self):
self._api_base = "https://www.okcoin.jp/"
def fetch_ticker(self, pair: str = "btc_jpy"):
endpoint = f"{self._api_base}api/spot/v3/instruments/{pair}/ticker"
res = requests.get(endpoint)
res_json = res.json()
return {
"ask": float(res_json["best_ask"]),
"bid": float(res_json["best_bid"]),
}
実装のポイント
いろいろ書いてますが、ポイントはメインファイルの以下の処理です。
for exchange1, exchange2 in exchange_combination:
ex1 = prices[exchange1]
ex2 = prices[exchange2]
# パターンA: Ex1で買って(Ask)、Ex2で売る(Bid)
profit_a = ex2["bid"] - ex1["ask"]
profit_rate_a = (profit_a / ex1["ask"]) * 100
print(f"{pair} {exchange1}-{exchange2} profit : {profit_a}")
print(f"{pair} {exchange1}-{exchange2} profit rate: {profit_rate_a}")
PROFIT_GAUGE.labels(
exchange=f"{exchange1}_{exchange2}",
symbol=pair
).set(profit_a)
PROFIT_RATE_GAUGE.labels(
exchange=f"{exchange1}_{exchange2}",
symbol=pair
).set(profit_rate_a)
# パターンB: Ex2で買って(Ask)、Ex1で売る(Bid)
profit_b = ex1["bid"] - ex2["ask"]
profit_rate_b = (profit_a / ex2["ask"]) * 100
print(f"{pair} {exchange2}-{exchange1} profit : {profit_b}")
print(f"{pair} {exchange2}-{exchange1} profit rate: {profit_rate_b}")
PROFIT_GAUGE.labels(
exchange=f"{exchange2}_{exchange1}",
symbol=pair
).set(profit_b)
PROFIT_RATE_GAUGE.labels(
exchange=f"{exchange2}_{exchange1}",
symbol=pair
).set(profit_rate_b)
bidとは板の中で最も安い値段のこと、askとは板の中で最も高い値段のことです。
暗号資産の価格はこれらの価格から決定されていますが、アービトラージでは現在の価格を見るのではなく対象の暗号資産の売り価格と買い価格を比較することで利益がどれだけ出るかを見ます。
また、PROFIT_GAUGEなどのオブジェクトで監視している暗号資産の価格をセットしています。
prometheusとgrafanaの起動はdocker-composeで行っています。
version: '3.7'
services:
# 1. メトリクスを公開するPythonアプリケーション (Exporter)
app:
build:
context: ..
dockerfile: ./docker/Dockerfile
image: python-prometheus-exporter:latest
container_name: python-exporter
# Prometheusがscrapeするポート
ports:
- "8000:8000"
restart: always
# 2. メトリクスを収集・保存する Prometheus
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ../prometheus.yml:/etc/prometheus/prometheus.yml # 設定ファイルをマウント
- ../prometheus_data:/prometheus # データ永続化
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
restart: always
# 3. グラフ表示用 Web UI (Grafana)
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
volumes:
- ../grafana-datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
restart: always
volumes:
prometheus_data:
prometheusの設定ファイルは以下の通りです。
global:
scrape_interval: 5s # 5秒ごとにメトリクスを取得
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090'] # Prometheus自身の監視
- job_name: 'python-app'
static_configs:
# appサービスのコンテナ名と公開ポートを指定
- targets: ['app:8000']
grafanaの設定ファイルは以下の通りです。
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090 # Prometheusのサービス名とポート
isDefault: true
version: 1
editable: true
pythonのDockerfileでは普通に実行しているだけです。
# ==========================================
# Stage 1: Builder
# ==========================================
FROM python:3.13.9-slim AS builder
WORKDIR /app
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/
# 【修正点】gcc ではなく build-essential をインストール
# これにより、リンカーエラー(crti.o not found等)が解消されます
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
COPY pyproject.toml uv.lock README.md ./
# 依存ライブラリのみインストール
RUN uv sync --frozen --no-dev --no-install-project --compile
COPY src src
# プロジェクト本体をインストール
RUN uv sync --frozen --no-dev --compile
# ==========================================
# Stage 2: Runner
# ==========================================
FROM python:3.13.9-slim
WORKDIR /app
# builderステージで作った仮想環境をコピー
COPY --from=builder /app/.venv /app/.venv
# ソースコード等をコピー
COPY src src
COPY pyproject.toml .
# パスを通す
ENV PATH="/app/.venv/bin:$PATH"
# pyproject.tomlで定義された simple_serve コマンドを実行
CMD ["simple_serve"]
pyproject.tomlでsimple_serveスクリプトを定義しています。
# ...中略...
[project.scripts]
simple_serve = "vc_bot:main"
起動するとExplorerからグラフ出力することが可能です。
感想
最初は現在価格を比較して利益を算出していたのですが、途中でそれだと実際に出せる利益がちゃんと計算できないことがわかり、修正したりしています。
これを実運用に回していきたいとは考えているのですが、利益を出せる最低ラインが0.5%くらいで、1か月ほど監視してみてもなかなかそれ以上になる機会がありません。。
うーん、どうするか。。