Rate Limiting Guide
Understanding and properly handling rate limits is crucial for building robust applications with the Vysion API. This guide covers rate limit policies, best practices, and implementation strategies.
Rate Limit Overview
Section titled “Rate Limit Overview”The Vysion API implements rate limiting to ensure fair usage and maintain service quality for all users. Rate limits are applied on a per-API-key basis and vary by endpoint and subscription tier.
Current Rate Limits
Section titled “Current Rate Limits”Endpoint Category | Requests per Minute | Burst Limit |
---|---|---|
Search APIs | 30 | 50 |
Document Retrieval | 60 | 100 |
Statistics & Histograms | 20 | 30 |
Instant Messaging | 30 | 50 |
Feeds | 10 | 15 |
Rate Limit Headers
Section titled “Rate Limit Headers”The API returns rate limit information in response headers:
HTTP/1.1 200 OKX-RateLimit-Limit: 30X-RateLimit-Remaining: 25X-RateLimit-Reset: 1640995200X-RateLimit-Window: 60
Header | Description |
---|---|
X-RateLimit-Limit | Maximum requests allowed in the current window |
X-RateLimit-Remaining | Requests remaining in the current window |
X-RateLimit-Reset | Unix timestamp when the rate limit resets |
X-RateLimit-Window | Rate limit window duration in seconds |
Error Response
Section titled “Error Response”When rate limits are exceeded, the API returns a 429 Too Many Requests
status:
{ "error": { "code": 429, "message": "Too Many Requests", "details": "Rate limit exceeded. Try again in 45 seconds." }}
Implementation Strategies
Section titled “Implementation Strategies”1. Basic Rate Limiting
Section titled “1. Basic Rate Limiting”import timeimport requestsfrom datetime import datetime, timedelta
class RateLimitedClient: def __init__(self, api_key, requests_per_minute=30): self.api_key = api_key self.requests_per_minute = requests_per_minute self.min_interval = 60.0 / requests_per_minute self.last_request_time = 0
def make_request(self, url, **kwargs): # Calculate time to wait now = time.time() time_since_last = now - self.last_request_time
if time_since_last < self.min_interval: sleep_time = self.min_interval - time_since_last time.sleep(sleep_time)
# Make the request headers = kwargs.get('headers', {}) headers['x-api-key'] = self.api_key kwargs['headers'] = headers
response = requests.get(url, **kwargs) self.last_request_time = time.time()
return response
# Usageclient = RateLimitedClient('your_api_key')response = client.make_request('https://api.vysion.ai/api/v2/documents/search')
class RateLimitedClient { constructor(apiKey, requestsPerMinute = 30) { this.apiKey = apiKey; this.requestsPerMinute = requestsPerMinute; this.minInterval = 60000 / requestsPerMinute; // milliseconds this.lastRequestTime = 0; }
async makeRequest(url, options = {}) { // Calculate time to wait const now = Date.now(); const timeSinceLast = now - this.lastRequestTime;
if (timeSinceLast < this.minInterval) { const sleepTime = this.minInterval - timeSinceLast; await new Promise(resolve => setTimeout(resolve, sleepTime)); }
// Make the request const headers = { 'x-api-key': this.apiKey, 'Accept': 'application/json', ...options.headers };
const response = await fetch(url, { ...options, headers });
this.lastRequestTime = Date.now(); return response; }}
// Usageconst client = new RateLimitedClient('your_api_key');const response = await client.makeRequest('https://api.vysion.ai/api/v2/documents/search');
2. Advanced Rate Limiting with Retry Logic
Section titled “2. Advanced Rate Limiting with Retry Logic”import timeimport randomfrom functools import wraps
def exponential_backoff_retry(max_retries=3, base_delay=1): """Decorator for exponential backoff retry logic.""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries + 1): try: response = func(*args, **kwargs)
# Check for rate limit if response.status_code == 429: if attempt == max_retries: raise Exception("Max retries exceeded")
# Extract retry delay from headers or use backoff retry_after = response.headers.get('Retry-After') if retry_after: delay = int(retry_after) else: delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {delay:.2f} seconds...") time.sleep(delay) continue
return response
except Exception as e: if attempt == max_retries: raise e
delay = base_delay * (2 ** attempt) print(f"Request failed. Retrying in {delay} seconds...") time.sleep(delay)
return wrapper return decorator
@exponential_backoff_retry(max_retries=3)def make_api_request(url, headers): return requests.get(url, headers=headers)
3. Rate Limit Monitoring
Section titled “3. Rate Limit Monitoring”class RateLimitMonitor: def __init__(self): self.rate_limit_info = {}
def update_from_headers(self, response_headers): """Update rate limit info from response headers.""" self.rate_limit_info = { 'limit': int(response_headers.get('X-RateLimit-Limit', 0)), 'remaining': int(response_headers.get('X-RateLimit-Remaining', 0)), 'reset': int(response_headers.get('X-RateLimit-Reset', 0)), 'window': int(response_headers.get('X-RateLimit-Window', 60)) }
def should_wait(self, buffer_requests=5): """Check if we should wait before making another request.""" if not self.rate_limit_info: return False
return self.rate_limit_info['remaining'] <= buffer_requests
def wait_time(self): """Calculate optimal wait time.""" if not self.rate_limit_info: return 0
now = time.time() reset_time = self.rate_limit_info['reset']
if now >= reset_time: return 0
return reset_time - now
def get_status(self): """Get current rate limit status.""" if not self.rate_limit_info: return "No rate limit data available"
return f"Rate Limit: {self.rate_limit_info['remaining']}/{self.rate_limit_info['limit']} remaining"
# Usagemonitor = RateLimitMonitor()
def monitored_request(url, headers): # Check if we should wait if monitor.should_wait(): wait_time = monitor.wait_time() print(f"Rate limit buffer reached. Waiting {wait_time:.2f} seconds...") time.sleep(wait_time)
response = requests.get(url, headers=headers) monitor.update_from_headers(response.headers)
print(monitor.get_status()) return response
Batch Processing Strategies
Section titled “Batch Processing Strategies”1. Chunked Processing
Section titled “1. Chunked Processing”def process_items_in_chunks(items, chunk_size=50, delay_between_chunks=60): """Process items in chunks with delays to respect rate limits."""
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
for i, chunk in enumerate(chunks): print(f"Processing chunk {i + 1}/{len(chunks)} ({len(chunk)} items)")
for item in chunk: # Process individual item result = process_single_item(item) time.sleep(2) # Small delay between requests
# Wait between chunks (except for the last one) if i < len(chunks) - 1: print(f"Waiting {delay_between_chunks} seconds before next chunk...") time.sleep(delay_between_chunks)
def process_single_item(item): """Process a single item with the API.""" try: response = make_api_request(f"https://api.vysion.ai/api/v2/documents/{item}") return response.json() except Exception as e: print(f"Error processing item {item}: {e}") return None
2. Priority-Based Processing
Section titled “2. Priority-Based Processing”from queue import PriorityQueueimport threading
class PriorityAPIProcessor: def __init__(self, requests_per_minute=30): self.queue = PriorityQueue() self.requests_per_minute = requests_per_minute self.min_interval = 60.0 / requests_per_minute self.running = False self.worker_thread = None
def add_request(self, priority, url, callback=None): """Add a request to the priority queue.""" self.queue.put((priority, url, callback))
def start_processing(self): """Start the background processing thread.""" self.running = True self.worker_thread = threading.Thread(target=self._process_queue) self.worker_thread.start()
def stop_processing(self): """Stop the background processing.""" self.running = False if self.worker_thread: self.worker_thread.join()
def _process_queue(self): """Process requests from the queue with rate limiting.""" while self.running: try: if not self.queue.empty(): priority, url, callback = self.queue.get(timeout=1)
# Make the API request response = make_api_request(url)
# Execute callback if provided if callback: callback(response)
# Rate limiting delay time.sleep(self.min_interval)
except Exception as e: print(f"Error processing queue: {e}") time.sleep(1)
# Usageprocessor = PriorityAPIProcessor(requests_per_minute=30)
# Add high-priority requests (lower number = higher priority)processor.add_request(1, "https://api.vysion.ai/api/v2/urgent-data")processor.add_request(5, "https://api.vysion.ai/api/v2/normal-data")processor.add_request(1, "https://api.vysion.ai/api/v2/another-urgent")
processor.start_processing()
Optimizing Request Patterns
Section titled “Optimizing Request Patterns”1. Efficient Pagination
Section titled “1. Efficient Pagination”def efficient_pagination(base_url, total_items=None, page_size=100): """Efficiently paginate through all results."""
page = 1 all_results = []
while True: url = f"{base_url}?page={page}&page_size={page_size}" response = make_rate_limited_request(url)
if response.status_code != 200: break
data = response.json() hits = data.get('data', {}).get('hits', [])
if not hits: break
all_results.extend(hits)
# Check if we've gotten all items total = data.get('data', {}).get('total', 0) if len(all_results) >= total: break
# Optimize page size based on remaining items remaining = total - len(all_results) if remaining < page_size: page_size = remaining
page += 1
# Show progress print(f"Retrieved {len(all_results)}/{total} items")
return all_results
2. Caching Strategy
Section titled “2. Caching Strategy”import hashlibimport jsonimport timefrom functools import wraps
def cached_api_call(cache_duration=3600): # 1 hour default """Decorator to cache API responses.""" cache = {}
def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Create cache key cache_key = hashlib.md5( json.dumps([args, kwargs], sort_keys=True).encode() ).hexdigest()
# Check cache if cache_key in cache: cached_result, timestamp = cache[cache_key] if time.time() - timestamp < cache_duration: print(f"Cache hit for {func.__name__}") return cached_result
# Make API call result = func(*args, **kwargs)
# Cache result cache[cache_key] = (result, time.time())
return result
return wrapper return decorator
@cached_api_call(cache_duration=1800) # 30 minutesdef get_ransomware_stats(countries): """Cached ransomware statistics call.""" url = f"https://api.vysion.ai/api/v2/stats/countries?countries={countries}" response = make_rate_limited_request(url) return response.json()
Monitoring and Alerting
Section titled “Monitoring and Alerting”1. Rate Limit Dashboard
Section titled “1. Rate Limit Dashboard”import matplotlib.pyplot as pltfrom collections import dequeimport threadingimport time
class RateLimitDashboard: def __init__(self, history_size=100): self.history_size = history_size self.rate_limit_history = deque(maxlen=history_size) self.request_times = deque(maxlen=history_size) self.monitoring = False
def record_request(self, rate_limit_info): """Record a request and its rate limit info.""" timestamp = time.time() self.request_times.append(timestamp) self.rate_limit_history.append(rate_limit_info)
def start_monitoring(self): """Start background monitoring.""" self.monitoring = True monitor_thread = threading.Thread(target=self._monitor_loop) monitor_thread.daemon = True monitor_thread.start()
def _monitor_loop(self): """Background monitoring loop.""" while self.monitoring: self.generate_dashboard() time.sleep(60) # Update every minute
def generate_dashboard(self): """Generate a simple dashboard.""" if not self.rate_limit_history: return
# Current status latest = self.rate_limit_history[-1] print(f"\n{'='*50}") print(f"Rate Limit Dashboard - {time.strftime('%Y-%m-%d %H:%M:%S')}") print(f"{'='*50}") print(f"Current: {latest['remaining']}/{latest['limit']} requests remaining") print(f"Reset in: {latest['reset'] - time.time():.0f} seconds")
# Recent activity recent_requests = sum(1 for t in self.request_times if time.time() - t < 60) print(f"Requests in last minute: {recent_requests}")
# Trend analysis if len(self.rate_limit_history) >= 2: trend = self.rate_limit_history[-1]['remaining'] - self.rate_limit_history[-2]['remaining'] if trend < 0: print(f"Trend: ↓ (consuming {abs(trend)} requests)") elif trend > 0: print(f"Trend: ↑ (replenished {trend} requests)") else: print("Trend: → (stable)")
def plot_usage(self): """Plot rate limit usage over time.""" if len(self.rate_limit_history) < 2: return
timestamps = list(self.request_times) remaining = [info['remaining'] for info in self.rate_limit_history] limits = [info['limit'] for info in self.rate_limit_history]
plt.figure(figsize=(12, 6)) plt.plot(timestamps, remaining, label='Remaining Requests', color='blue') plt.plot(timestamps, limits, label='Rate Limit', color='red', linestyle='--') plt.xlabel('Time') plt.ylabel('Requests') plt.title('API Rate Limit Usage') plt.legend() plt.xticks(rotation=45) plt.tight_layout() plt.show()
2. Alerting System
Section titled “2. Alerting System”import smtplibfrom email.mime.text import MIMEText
class RateLimitAlerting: def __init__(self, email_config=None, slack_webhook=None): self.email_config = email_config self.slack_webhook = slack_webhook self.alert_thresholds = { 'low_remaining': 5, # Alert when < 5 requests remaining 'high_usage': 0.9, # Alert when > 90% of rate limit used 'consecutive_limits': 3 # Alert after 3 consecutive rate limits } self.consecutive_limits = 0
def check_and_alert(self, rate_limit_info): """Check rate limit status and send alerts if needed.""" remaining = rate_limit_info['remaining'] limit = rate_limit_info['limit'] usage_ratio = (limit - remaining) / limit
alerts = []
# Check low remaining requests if remaining <= self.alert_thresholds['low_remaining']: alerts.append(f"LOW: Only {remaining} requests remaining")
# Check high usage if usage_ratio >= self.alert_thresholds['high_usage']: alerts.append(f"HIGH USAGE: {usage_ratio:.1%} of rate limit used")
# Check for rate limiting if remaining == 0: self.consecutive_limits += 1 if self.consecutive_limits >= self.alert_thresholds['consecutive_limits']: alerts.append(f"RATE LIMITED: {self.consecutive_limits} consecutive rate limits") else: self.consecutive_limits = 0
# Send alerts for alert in alerts: self.send_alert(alert, rate_limit_info)
def send_alert(self, message, rate_limit_info): """Send alert via configured channels.""" full_message = f"Vysion API Rate Limit Alert: {message}\n" full_message += f"Status: {rate_limit_info['remaining']}/{rate_limit_info['limit']}\n" full_message += f"Reset: {time.strftime('%H:%M:%S', time.localtime(rate_limit_info['reset']))}"
if self.email_config: self._send_email_alert(full_message)
if self.slack_webhook: self._send_slack_alert(full_message)
print(f"ALERT: {full_message}")
def _send_email_alert(self, message): """Send email alert.""" try: msg = MIMEText(message) msg['Subject'] = 'Vysion API Rate Limit Alert' msg['From'] = self.email_config['from'] msg['To'] = self.email_config['to']
server = smtplib.SMTP(self.email_config['smtp_server']) server.send_message(msg) server.quit() except Exception as e: print(f"Failed to send email alert: {e}")
def _send_slack_alert(self, message): """Send Slack alert.""" try: import requests payload = {"text": message} requests.post(self.slack_webhook, json=payload) except Exception as e: print(f"Failed to send Slack alert: {e}")
Best Practices Summary
Section titled “Best Practices Summary”1. Design Patterns
Section titled “1. Design Patterns”- Implement exponential backoff for retry logic
- Use queuing systems for high-volume processing
- Cache frequently accessed data to reduce API calls
- Monitor rate limit headers proactively
2. Request Optimization
Section titled “2. Request Optimization”- Batch operations when possible
- Use appropriate page sizes for pagination
- Prioritize critical requests in high-volume scenarios
- Implement circuit breakers for failed endpoints
3. Error Handling
Section titled “3. Error Handling”- Handle 429 responses gracefully with appropriate delays
- Implement fallback mechanisms for critical operations
- Log rate limit events for analysis and optimization
- Set up alerting for rate limit violations
4. Performance Monitoring
Section titled “4. Performance Monitoring”- Track request patterns and identify optimization opportunities
- Monitor API usage trends to predict capacity needs
- Set up dashboards for real-time rate limit visibility
- Analyze error rates and response times
Enterprise Solutions
Section titled “Enterprise Solutions”For high-volume applications requiring higher rate limits:
- Contact Vysion support for enterprise rate limit increases
- Implement dedicated API pools for different application components
- Use multiple API keys strategically (where permitted)
- Consider API gateway solutions for traffic management
Tools and Libraries
Section titled “Tools and Libraries”Recommended tools for rate limit management:
- Python:
ratelimit
,tenacity
,backoff
- JavaScript:
bottleneck
,p-limit
,retry
- Monitoring: Grafana, DataDog, New Relic
- Alerting: PagerDuty, Slack, email notifications
By following these practices and implementing proper rate limiting strategies, you can build robust applications that efficiently use the Vysion API while respecting service limits and ensuring optimal performance.