Master integration of MEV strategies with external APIs, data providers, and infrastructure systems for production deployment
By the end of this course, you will be able to:
import asyncio
import aiohttp
import json
import logging
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, field
from enum import Enum
import time
import hashlib
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
import websockets
import kafka
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
class APIStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
DOWN = "down"
MAINTENANCE = "maintenance"
class RateLimitStrategy(Enum):
FIXED_WINDOW = "fixed_window"
SLIDING_WINDOW = "sliding_window"
TOKEN_BUCKET = "token_bucket"
@dataclass
class APIConfig:
"""Configuration for API client"""
base_url: str
api_key: str = ""
secret_key: str = ""
rate_limit: int = 100 # requests per minute
rate_limit_strategy: RateLimitStrategy = RateLimitStrategy.SLIDING_WINDOW
timeout: int = 30 # seconds
max_retries: int = 3
retry_delay: float = 1.0
circuit_breaker_threshold: int = 5
circuit_breaker_timeout: int = 60
health_check_interval: int = 30
api_version: str = "v1"
@dataclass
class APIResponse:
"""Standardized API response"""
status_code: int
data: Dict
headers: Dict
timestamp: datetime
duration_ms: float
rate_limit_remaining: Optional[int] = None
rate_limit_reset: Optional[int] = None
@property
def success(self) -> bool:
return 200 <= self.status_code < 300
@property
def rate_limited(self) -> bool:
return self.status_code == 429
@property
def retry_after(self) -> Optional[int]:
return self.headers.get('Retry-After')
class CircuitBreaker:
"""Circuit breaker implementation for API calls"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
if self.state == "OPEN":
if time.time() - self.last_failure_time < self.timeout:
raise Exception("Circuit breaker is OPEN")
else:
self.state = "HALF_OPEN"
try:
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise e
class RateLimiter:
"""Advanced rate limiter with multiple strategies"""
def __init__(self, limit: int, strategy: RateLimitStrategy):
self.limit = limit
self.strategy = strategy
self.requests = []
self.tokens = limit
self.last_refill = time.time()
async def acquire(self) -> bool:
"""Acquire permission to make request"""
now = time.time()
if self.strategy == RateLimitStrategy.SLIDING_WINDOW:
# Remove requests older than 1 minute
self.requests = [req_time for req_time in self.requests if now - req_time < 60]
if len(self.requests) >= self.limit:
return False
self.requests.append(now)
return True
elif self.strategy == RateLimitStrategy.TOKEN_BUCKET:
# Refill tokens
tokens_to_add = (now - self.last_refill) * (self.limit / 60.0)
self.tokens = min(self.limit, self.tokens + tokens_to_add)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
return True # Fixed window or unknown strategy
class MEVAPIClient:
"""Comprehensive MEV API client with production features"""
def __init__(self, config: APIConfig):
self.config = config
self.logger = logging.getLogger(__name__)
self.session = None
self.rate_limiter = RateLimiter(config.rate_limit, config.rate_limit_strategy)
self.circuit_breaker = CircuitBreaker(
config.circuit_breaker_threshold,
config.circuit_breaker_timeout
)
self.health_status = APIStatus.HEALTHY
self.last_health_check = time.time()
# Metrics tracking
self.request_count = 0
self.error_count = 0
self.success_count = 0
self.latency_samples = []
# Encryption for sensitive data
self.cipher = Fernet(config.secret_key.encode() if config.secret_key else Fernet.generate_key())
# WebSocket connections
self.websocket_connections = {}
# Event handlers
self.event_handlers = {
'price_update': [],
'new_block': [],
'mev_opportunity': [],
'trade_executed': []
}
async def __aenter__(self):
"""Async context manager entry"""
await self.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.cleanup()
async def initialize(self):
"""Initialize API client"""
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=30,
enable_cleanup_closed=True
)
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={
'User-Agent': 'MEVAPIClient/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
)
# Start health check
asyncio.create_task(self._health_check_loop())
self.logger.info(f"API client initialized for {self.config.base_url}")
async def cleanup(self):
"""Cleanup resources"""
if self.session:
await self.session.close()
# Close all WebSocket connections
for ws in self.websocket_connections.values():
await ws.close()
self.logger.info("API client cleaned up")
async def make_request(self,
method: str,
endpoint: str,
params: Dict = None,
data: Dict = None,
headers: Dict = None,
auth_required: bool = True) -> APIResponse:
"""Make HTTP request with full error handling"""
# Rate limiting check
if not await self.rate_limiter.acquire():
retry_after = 60 # Default retry after 1 minute
raise Exception(f"Rate limit exceeded. Retry after {retry_after} seconds")
# Circuit breaker check
start_time = time.time()
try:
response = await self.circuit_breaker.call(
self._execute_request, method, endpoint, params, data, headers, auth_required
)
# Update metrics
duration = (time.time() - start_time) * 1000
self._update_metrics(response, duration)
return response
except Exception as e:
self.error_count += 1
self.logger.error(f"Request failed: {e}")
raise
async def _execute_request(self, method: str, endpoint: str, params: Dict,
data: Dict, headers: Dict, auth_required: bool) -> APIResponse:
"""Execute the actual HTTP request"""
url = f"{self.config.base_url}/{self.config.api_version}/{endpoint}"
# Prepare headers
request_headers = headers or {}
if auth_required and self.config.api_key:
request_headers['Authorization'] = f"Bearer {self.config.api_key}"
# Add request ID for tracking
request_id = self._generate_request_id()
request_headers['X-Request-ID'] = request_id
# Log request
self.logger.debug(f"Making {method} request to {url}")
start_time = time.time()
try:
async with self.session.request(
method=method,
url=url,
params=params,
json=data if method.upper() != 'GET' else None,
data=data if method.upper() == 'GET' else None,
headers=request_headers
) as response:
# Parse response
try:
response_data = await response.json()
except:
response_data = {"raw_response": await response.text()}
duration = (time.time() - start_time) * 1000
# Rate limit headers
rate_limit_remaining = response.headers.get('X-RateLimit-Remaining')
rate_limit_reset = response.headers.get('X-RateLimit-Reset')
api_response = APIResponse(
status_code=response.status,
data=response_data,
headers=dict(response.headers),
timestamp=datetime.now(),
duration_ms=duration,
rate_limit_remaining=int(rate_limit_remaining) if rate_limit_remaining else None,
rate_limit_reset=int(rate_limit_reset) if rate_limit_reset else None
)
# Handle rate limiting
if api_response.rate_limited:
retry_after = api_response.retry_after or 60
await asyncio.sleep(int(retry_after))
return api_response
except aiohttp.ClientError as e:
self.logger.error(f"HTTP client error: {e}")
raise
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
raise
async def get_eth_prices(self) -> Dict[str, float]:
"""Get current ETH prices from multiple sources"""
endpoints = {
'coingecko': 'simple/price',
'binance': 'ticker/price',
'kraken': 'public/Ticker',
'coinbase': 'products/ETH-USD/ticker'
}
prices = {}
for source, endpoint in endpoints.items():
try:
params = {
'ids': 'ethereum',
'vs_currencies': 'usd,btc,eur'
} if source == 'coingecko' else None
response = await self.make_request('GET', f'{source}/{endpoint}', params=params)
if response.success:
data = response.data
if source == 'coingecko':
eth_data = data.get('ethereum', {})
prices['ETH_USD'] = eth_data.get('usd')
prices['ETH_BTC'] = eth_data.get('btc')
prices['ETH_EUR'] = eth_data.get('eur')
elif source == 'binance':
prices['ETH_USD'] = float(data.get('price', 0))
elif source == 'kraken':
eth_data = data.get('result', {}).get('XETHZUSD', {})
prices['ETH_USD'] = float(eth_data.get('c', [0])[0])
elif source == 'coinbase':
prices['ETH_USD'] = float(data.get('price', 0))
except Exception as e:
self.logger.warning(f"Failed to get price from {source}: {e}")
return prices
async def get_dex_prices(self, tokens: List[str]) -> Dict[str, Dict[str, float]]:
"""Get token prices from major DEXs"""
dex_prices = {}
# Uniswap V2 API
try:
for token in tokens:
response = await self.make_request('GET', f'uniswap/v2/{token}/price')
if response.success:
dex_prices[token] = dex_prices.get(token, {})
dex_prices[token]['uniswap'] = float(response.data.get('price', 0))
except Exception as e:
self.logger.warning(f"Failed to get Uniswap prices: {e}")
# Sushiswap API
try:
for token in tokens:
response = await self.make_request('GET', f'sushiswap/v1/{token}/price')
if response.success:
dex_prices[token] = dex_prices.get(token, {})
dex_prices[token]['sushiswap'] = float(response.data.get('price', 0))
except Exception as e:
self.logger.warning(f"Failed to get SushiSwap prices: {e}")
# 1inch API
try:
for token in tokens:
response = await self.make_request('GET', f'1inch/v4/{token}/price')
if response.success:
dex_prices[token] = dex_prices.get(token, {})
dex_prices[token]['1inch'] = float(response.data.get('price', 0))
except Exception as e:
self.logger.warning(f"Failed to get 1inch prices: {e}")
return dex_prices
async def get_lending_positions(self, protocol: str = 'aave') -> List[Dict]:
"""Get lending protocol positions"""
try:
response = await self.make_request('GET', f'lending/{protocol}/positions')
if response.success:
return response.data.get('positions', [])
return []
except Exception as e:
self.logger.error(f"Failed to get {protocol} positions: {e}")
return []
async def get_gas_prices(self) -> Dict[str, int]:
"""Get current gas prices"""
try:
# Ethereum gas prices
response = await self.make_request('GET', 'gas/prices')
if response.success:
return response.data
return {'standard': 20_000_000_000} # Fallback: 20 gwei
except Exception as e:
self.logger.error(f"Failed to get gas prices: {e}")
return {'standard': 20_000_000_000}
async def subscribe_to_price_updates(self, tokens: List[str], callback: Callable):
"""Subscribe to real-time price updates via WebSocket"""
@self.event_handler('price_update')
def price_update_handler(data):
callback(data)
# Establish WebSocket connection
ws_url = self.config.base_url.replace('http', 'ws') + '/ws/prices'
try:
async with websockets.connect(ws_url) as websocket:
self.websocket_connections['prices'] = websocket
# Subscribe to tokens
subscribe_msg = {
'action': 'subscribe',
'tokens': tokens
}
await websocket.send(json.dumps(subscribe_msg))
# Listen for updates
async for message in websocket:
try:
data = json.loads(message)
await self._handle_websocket_message(data)
except json.JSONDecodeError:
self.logger.warning(f"Invalid JSON message: {message}")
except Exception as e:
self.logger.error(f"WebSocket connection failed: {e}")
async def submit_flashbots_bundle(self, bundle: Dict) -> Dict:
"""Submit bundle to Flashbots"""
try:
response = await self.make_request('POST', 'flashbots/bundle', data=bundle)
if response.success:
return response.data
raise Exception(f"Bundle submission failed: {response.status_code}")
except Exception as e:
self.logger.error(f"Failed to submit bundle: {e}")
raise
async def get_mempool_transactions(self) -> List[Dict]:
"""Get transactions from mempool"""
try:
response = await self.make_request('GET', 'mempool/transactions')
if response.success:
return response.data.get('transactions', [])
return []
except Exception as e:
self.logger.error(f"Failed to get mempool transactions: {e}")
return []
def _generate_request_id(self) -> str:
"""Generate unique request ID"""
return hashlib.md5(f"{time.time()}{self.request_count}".encode()).hexdigest()
def _update_metrics(self, response: APIResponse, duration_ms: float):
"""Update performance metrics"""
self.request_count += 1
self.latency_samples.append(duration_ms)
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
if response.success:
self.success_count += 1
else:
self.error_count += 1
# Update success rate
self.success_rate = self.success_count / self.request_count if self.request_count > 0 else 0
# Update average latency
self.avg_latency = sum(self.latency_samples) / len(self.latency_samples)
async def _health_check_loop(self):
"""Continuous health check loop"""
while True:
try:
await asyncio.sleep(self.config.health_check_interval)
# Perform health check
start_time = time.time()
response = await self.make_request('GET', 'health', auth_required=False)
health_time = (time.time() - start_time) * 1000
if response.success and health_time < 5000: # 5 second threshold
self.health_status = APIStatus.HEALTHY
else:
self.health_status = APIStatus.DEGRADED
except Exception as e:
self.logger.warning(f"Health check failed: {e}")
self.health_status = APIStatus.DOWN
async def _handle_websocket_message(self, message: Dict):
"""Handle incoming WebSocket messages"""
message_type = message.get('type')
if message_type == 'price_update':
for handler in self.event_handlers['price_update']:
await handler(message['data'])
elif message_type == 'new_block':
for handler in self.event_handlers['new_block']:
await handler(message['data'])
elif message_type == 'mev_opportunity':
for handler in self.event_handlers['mev_opportunity']:
await handler(message['data'])
def event_handler(self, event_type: str) -> Callable:
"""Decorator for event handlers"""
def decorator(func: Callable):
if event_type in self.event_handlers:
self.event_handlers[event_type].append(func)
return func
return decorator
def get_metrics(self) -> Dict:
"""Get client performance metrics"""
return {
'total_requests': self.request_count,
'successful_requests': self.success_count,
'error_count': self.error_count,
'success_rate': self.success_rate,
'average_latency_ms': self.avg_latency,
'health_status': self.health_status.value,
'rate_limiter_state': self.rate_limiter.__dict__,
'circuit_breaker_state': self.circuit_breaker.state
}
# Production Integration Example
class MEVTradingSystem:
"""Complete MEV trading system with API integration"""
def __init__(self, config: Dict):
self.config = config
self.logger = logging.getLogger(__name__)
# Initialize API clients
self.eth_client = None
self.dex_clients = {}
self.lending_clients = {}
# Trading components
self.strategies = []
self.portfolio = PortfolioManager()
self.risk_manager = RiskManager()
# Data streaming
self.kafka_producer = None
self.kafka_consumer = None
# Monitoring
self.prometheus_client = None
self.grafana_client = None
async def initialize(self):
"""Initialize the complete trading system"""
# Initialize API clients
self.eth_client = MEVAPIClient(APIConfig(
base_url=self.config['ethereum']['api_url'],
api_key=self.config['ethereum']['api_key'],
rate_limit=1000
))
await self.eth_client.initialize()
# Initialize Kafka for data streaming
self.kafka_producer = KafkaProducer(
bootstrap_servers=self.config['kafka']['bootstrap_servers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.kafka_consumer = KafkaConsumer(
'mev_opportunities',
bootstrap_servers=self.config['kafka']['bootstrap_servers'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Start data processing
asyncio.create_task(self._data_processing_loop())
asyncio.create_task(self._opportunity_detection_loop())
asyncio.create_task(self._portfolio_monitoring_loop())
self.logger.info("MEV Trading System initialized")
async def _data_processing_loop(self):
"""Process incoming market data"""
async for message in self.kafka_consumer:
try:
data = message.value
opportunity_type = data.get('type')
# Process different types of opportunities
if opportunity_type == 'arbitrage':
await self._process_arbitrage_opportunity(data)
elif opportunity_type == 'liquidation':
await self._process_liquidation_opportunity(data)
elif opportunity_type == 'sandwich':
await self._process_sandwich_opportunity(data)
except Exception as e:
self.logger.error(f"Data processing error: {e}")
async def _opportunity_detection_loop(self):
"""Detect and evaluate MEV opportunities"""
while True:
try:
# Get current market data
prices = await self.eth_client.get_eth_prices()
dex_prices = await self.eth_client.get_dex_prices(['ETH', 'BTC', 'UNI'])
gas_prices = await self.eth_client.get_gas_prices()
# Detect arbitrage opportunities
opportunities = self._detect_arbitrage_opportunities(dex_prices)
# Evaluate opportunities
for opportunity in opportunities:
if await self._evaluate_opportunity(opportunity):
await self._send_opportunity_to_strategies(opportunity)
# Sleep before next iteration
await asyncio.sleep(1)
except Exception as e:
self.logger.error(f"Opportunity detection error: {e}")
await asyncio.sleep(5)
def _detect_arbitrage_opportunities(self, dex_prices: Dict) -> List[Dict]:
"""Detect arbitrage opportunities from price data"""
opportunities = []
for token, prices in dex_prices.items():
if len(prices) < 2:
continue
min_price = min(prices.values())
max_price = max(prices.values())
if max_price - min_price > 10: # $10 minimum difference
buy_exchange = min(prices, key=prices.get)
sell_exchange = max(prices, key=prices.get)
opportunity = {
'type': 'arbitrage',
'token': token,
'buy_exchange': buy_exchange,
'sell_exchange': sell_exchange,
'buy_price': min_price,
'sell_price': max_price,
'price_difference': max_price - min_price,
'potential_profit': max_price - min_price,
'timestamp': datetime.now().isoformat()
}
opportunities.append(opportunity)
return opportunities
async def _evaluate_opportunity(self, opportunity: Dict) -> bool:
"""Evaluate if opportunity should be traded"""
# Check minimum profit threshold
if opportunity['potential_profit'] < self.config['min_profit_threshold']:
return False
# Get gas estimates
gas_prices = await self.eth_client.get_gas_prices()
estimated_gas_cost = gas_prices['standard'] * 200000 / 1e18 * 2000 # Rough estimate
# Check if profit after gas is still positive
net_profit = opportunity['potential_profit'] - estimated_gas_cost
if net_profit <= 0:
return False
# Risk checks
if not await self.risk_manager.validate_opportunity(opportunity):
return False
return True
async def _send_opportunity_to_strategies(self, opportunity: Dict):
"""Send opportunity to registered strategies"""
for strategy in self.strategies:
try:
await strategy.handle_opportunity(opportunity)
except Exception as e:
self.logger.error(f"Strategy error: {e}")
async def _process_arbitrage_opportunity(self, data: Dict):
"""Process arbitrage opportunity data"""
self.logger.info(f"Processing arbitrage: {data}")
# Implement arbitrage execution logic
pass
async def cleanup(self):
"""Cleanup system resources"""
if self.eth_client:
await self.eth_client.cleanup()
if self.kafka_producer:
self.kafka_producer.close()
self.logger.info("MEV Trading System cleaned up")
# Configuration example
config = {
'ethereum': {
'api_url': 'https://api.etherscan.io',
'api_key': 'YOUR_ETHERSCAN_API_KEY'
},
'dex': {
'uniswap': {'api_key': 'YOUR_UNISWAP_API_KEY'},
'sushiswap': {'api_key': 'YOUR_SUSHISWAP_API_KEY'}
},
'kafka': {
'bootstrap_servers': ['localhost:9092']
},
'min_profit_threshold': 5.0,
'risk_limits': {
'max_position_size': 50000,
'max_daily_loss': 5000
}
}
# Usage example
async def main():
# Initialize trading system
system = MEVTradingSystem(config)
await system.initialize()
try:
# Keep running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await system.cleanup()
if __name__ == "__main__":
asyncio.run(main())