Skip to content

Rate Limiting Guide

Understanding and properly handling rate limits is crucial for building robust applications with the Vysion API. This guide covers rate limit policies, best practices, and implementation strategies.

The Vysion API implements rate limiting to ensure fair usage and maintain service quality for all users. Rate limits are applied on a per-API-key basis and vary by endpoint and subscription tier.

Endpoint CategoryRequests per MinuteBurst Limit
Search APIs3050
Document Retrieval60100
Statistics & Histograms2030
Instant Messaging3050
Feeds1015

The API returns rate limit information in response headers:

HTTP/1.1 200 OK
X-RateLimit-Limit: 30
X-RateLimit-Remaining: 25
X-RateLimit-Reset: 1640995200
X-RateLimit-Window: 60
HeaderDescription
X-RateLimit-LimitMaximum requests allowed in the current window
X-RateLimit-RemainingRequests remaining in the current window
X-RateLimit-ResetUnix timestamp when the rate limit resets
X-RateLimit-WindowRate limit window duration in seconds

When rate limits are exceeded, the API returns a 429 Too Many Requests status:

{
"error": {
"code": 429,
"message": "Too Many Requests",
"details": "Rate limit exceeded. Try again in 45 seconds."
}
}
import time
import requests
from datetime import datetime, timedelta
class RateLimitedClient:
def __init__(self, api_key, requests_per_minute=30):
self.api_key = api_key
self.requests_per_minute = requests_per_minute
self.min_interval = 60.0 / requests_per_minute
self.last_request_time = 0
def make_request(self, url, **kwargs):
# Calculate time to wait
now = time.time()
time_since_last = now - self.last_request_time
if time_since_last < self.min_interval:
sleep_time = self.min_interval - time_since_last
time.sleep(sleep_time)
# Make the request
headers = kwargs.get('headers', {})
headers['x-api-key'] = self.api_key
kwargs['headers'] = headers
response = requests.get(url, **kwargs)
self.last_request_time = time.time()
return response
# Usage
client = RateLimitedClient('your_api_key')
response = client.make_request('https://api.vysion.ai/api/v2/documents/search')

2. Advanced Rate Limiting with Retry Logic

Section titled “2. Advanced Rate Limiting with Retry Logic”
import time
import random
from functools import wraps
def exponential_backoff_retry(max_retries=3, base_delay=1):
"""Decorator for exponential backoff retry logic."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
response = func(*args, **kwargs)
# Check for rate limit
if response.status_code == 429:
if attempt == max_retries:
raise Exception("Max retries exceeded")
# Extract retry delay from headers or use backoff
retry_after = response.headers.get('Retry-After')
if retry_after:
delay = int(retry_after)
else:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {delay:.2f} seconds...")
time.sleep(delay)
continue
return response
except Exception as e:
if attempt == max_retries:
raise e
delay = base_delay * (2 ** attempt)
print(f"Request failed. Retrying in {delay} seconds...")
time.sleep(delay)
return wrapper
return decorator
@exponential_backoff_retry(max_retries=3)
def make_api_request(url, headers):
return requests.get(url, headers=headers)
class RateLimitMonitor:
def __init__(self):
self.rate_limit_info = {}
def update_from_headers(self, response_headers):
"""Update rate limit info from response headers."""
self.rate_limit_info = {
'limit': int(response_headers.get('X-RateLimit-Limit', 0)),
'remaining': int(response_headers.get('X-RateLimit-Remaining', 0)),
'reset': int(response_headers.get('X-RateLimit-Reset', 0)),
'window': int(response_headers.get('X-RateLimit-Window', 60))
}
def should_wait(self, buffer_requests=5):
"""Check if we should wait before making another request."""
if not self.rate_limit_info:
return False
return self.rate_limit_info['remaining'] <= buffer_requests
def wait_time(self):
"""Calculate optimal wait time."""
if not self.rate_limit_info:
return 0
now = time.time()
reset_time = self.rate_limit_info['reset']
if now >= reset_time:
return 0
return reset_time - now
def get_status(self):
"""Get current rate limit status."""
if not self.rate_limit_info:
return "No rate limit data available"
return f"Rate Limit: {self.rate_limit_info['remaining']}/{self.rate_limit_info['limit']} remaining"
# Usage
monitor = RateLimitMonitor()
def monitored_request(url, headers):
# Check if we should wait
if monitor.should_wait():
wait_time = monitor.wait_time()
print(f"Rate limit buffer reached. Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
response = requests.get(url, headers=headers)
monitor.update_from_headers(response.headers)
print(monitor.get_status())
return response
def process_items_in_chunks(items, chunk_size=50, delay_between_chunks=60):
"""Process items in chunks with delays to respect rate limits."""
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
for i, chunk in enumerate(chunks):
print(f"Processing chunk {i + 1}/{len(chunks)} ({len(chunk)} items)")
for item in chunk:
# Process individual item
result = process_single_item(item)
time.sleep(2) # Small delay between requests
# Wait between chunks (except for the last one)
if i < len(chunks) - 1:
print(f"Waiting {delay_between_chunks} seconds before next chunk...")
time.sleep(delay_between_chunks)
def process_single_item(item):
"""Process a single item with the API."""
try:
response = make_api_request(f"https://api.vysion.ai/api/v2/documents/{item}")
return response.json()
except Exception as e:
print(f"Error processing item {item}: {e}")
return None
from queue import PriorityQueue
import threading
class PriorityAPIProcessor:
def __init__(self, requests_per_minute=30):
self.queue = PriorityQueue()
self.requests_per_minute = requests_per_minute
self.min_interval = 60.0 / requests_per_minute
self.running = False
self.worker_thread = None
def add_request(self, priority, url, callback=None):
"""Add a request to the priority queue."""
self.queue.put((priority, url, callback))
def start_processing(self):
"""Start the background processing thread."""
self.running = True
self.worker_thread = threading.Thread(target=self._process_queue)
self.worker_thread.start()
def stop_processing(self):
"""Stop the background processing."""
self.running = False
if self.worker_thread:
self.worker_thread.join()
def _process_queue(self):
"""Process requests from the queue with rate limiting."""
while self.running:
try:
if not self.queue.empty():
priority, url, callback = self.queue.get(timeout=1)
# Make the API request
response = make_api_request(url)
# Execute callback if provided
if callback:
callback(response)
# Rate limiting delay
time.sleep(self.min_interval)
except Exception as e:
print(f"Error processing queue: {e}")
time.sleep(1)
# Usage
processor = PriorityAPIProcessor(requests_per_minute=30)
# Add high-priority requests (lower number = higher priority)
processor.add_request(1, "https://api.vysion.ai/api/v2/urgent-data")
processor.add_request(5, "https://api.vysion.ai/api/v2/normal-data")
processor.add_request(1, "https://api.vysion.ai/api/v2/another-urgent")
processor.start_processing()
def efficient_pagination(base_url, total_items=None, page_size=100):
"""Efficiently paginate through all results."""
page = 1
all_results = []
while True:
url = f"{base_url}?page={page}&page_size={page_size}"
response = make_rate_limited_request(url)
if response.status_code != 200:
break
data = response.json()
hits = data.get('data', {}).get('hits', [])
if not hits:
break
all_results.extend(hits)
# Check if we've gotten all items
total = data.get('data', {}).get('total', 0)
if len(all_results) >= total:
break
# Optimize page size based on remaining items
remaining = total - len(all_results)
if remaining < page_size:
page_size = remaining
page += 1
# Show progress
print(f"Retrieved {len(all_results)}/{total} items")
return all_results
import hashlib
import json
import time
from functools import wraps
def cached_api_call(cache_duration=3600): # 1 hour default
"""Decorator to cache API responses."""
cache = {}
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
cache_key = hashlib.md5(
json.dumps([args, kwargs], sort_keys=True).encode()
).hexdigest()
# Check cache
if cache_key in cache:
cached_result, timestamp = cache[cache_key]
if time.time() - timestamp < cache_duration:
print(f"Cache hit for {func.__name__}")
return cached_result
# Make API call
result = func(*args, **kwargs)
# Cache result
cache[cache_key] = (result, time.time())
return result
return wrapper
return decorator
@cached_api_call(cache_duration=1800) # 30 minutes
def get_ransomware_stats(countries):
"""Cached ransomware statistics call."""
url = f"https://api.vysion.ai/api/v2/stats/countries?countries={countries}"
response = make_rate_limited_request(url)
return response.json()
import matplotlib.pyplot as plt
from collections import deque
import threading
import time
class RateLimitDashboard:
def __init__(self, history_size=100):
self.history_size = history_size
self.rate_limit_history = deque(maxlen=history_size)
self.request_times = deque(maxlen=history_size)
self.monitoring = False
def record_request(self, rate_limit_info):
"""Record a request and its rate limit info."""
timestamp = time.time()
self.request_times.append(timestamp)
self.rate_limit_history.append(rate_limit_info)
def start_monitoring(self):
"""Start background monitoring."""
self.monitoring = True
monitor_thread = threading.Thread(target=self._monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def _monitor_loop(self):
"""Background monitoring loop."""
while self.monitoring:
self.generate_dashboard()
time.sleep(60) # Update every minute
def generate_dashboard(self):
"""Generate a simple dashboard."""
if not self.rate_limit_history:
return
# Current status
latest = self.rate_limit_history[-1]
print(f"\n{'='*50}")
print(f"Rate Limit Dashboard - {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*50}")
print(f"Current: {latest['remaining']}/{latest['limit']} requests remaining")
print(f"Reset in: {latest['reset'] - time.time():.0f} seconds")
# Recent activity
recent_requests = sum(1 for t in self.request_times
if time.time() - t < 60)
print(f"Requests in last minute: {recent_requests}")
# Trend analysis
if len(self.rate_limit_history) >= 2:
trend = self.rate_limit_history[-1]['remaining'] - self.rate_limit_history[-2]['remaining']
if trend < 0:
print(f"Trend: ↓ (consuming {abs(trend)} requests)")
elif trend > 0:
print(f"Trend: ↑ (replenished {trend} requests)")
else:
print("Trend: → (stable)")
def plot_usage(self):
"""Plot rate limit usage over time."""
if len(self.rate_limit_history) < 2:
return
timestamps = list(self.request_times)
remaining = [info['remaining'] for info in self.rate_limit_history]
limits = [info['limit'] for info in self.rate_limit_history]
plt.figure(figsize=(12, 6))
plt.plot(timestamps, remaining, label='Remaining Requests', color='blue')
plt.plot(timestamps, limits, label='Rate Limit', color='red', linestyle='--')
plt.xlabel('Time')
plt.ylabel('Requests')
plt.title('API Rate Limit Usage')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
import smtplib
from email.mime.text import MIMEText
class RateLimitAlerting:
def __init__(self, email_config=None, slack_webhook=None):
self.email_config = email_config
self.slack_webhook = slack_webhook
self.alert_thresholds = {
'low_remaining': 5, # Alert when < 5 requests remaining
'high_usage': 0.9, # Alert when > 90% of rate limit used
'consecutive_limits': 3 # Alert after 3 consecutive rate limits
}
self.consecutive_limits = 0
def check_and_alert(self, rate_limit_info):
"""Check rate limit status and send alerts if needed."""
remaining = rate_limit_info['remaining']
limit = rate_limit_info['limit']
usage_ratio = (limit - remaining) / limit
alerts = []
# Check low remaining requests
if remaining <= self.alert_thresholds['low_remaining']:
alerts.append(f"LOW: Only {remaining} requests remaining")
# Check high usage
if usage_ratio >= self.alert_thresholds['high_usage']:
alerts.append(f"HIGH USAGE: {usage_ratio:.1%} of rate limit used")
# Check for rate limiting
if remaining == 0:
self.consecutive_limits += 1
if self.consecutive_limits >= self.alert_thresholds['consecutive_limits']:
alerts.append(f"RATE LIMITED: {self.consecutive_limits} consecutive rate limits")
else:
self.consecutive_limits = 0
# Send alerts
for alert in alerts:
self.send_alert(alert, rate_limit_info)
def send_alert(self, message, rate_limit_info):
"""Send alert via configured channels."""
full_message = f"Vysion API Rate Limit Alert: {message}\n"
full_message += f"Status: {rate_limit_info['remaining']}/{rate_limit_info['limit']}\n"
full_message += f"Reset: {time.strftime('%H:%M:%S', time.localtime(rate_limit_info['reset']))}"
if self.email_config:
self._send_email_alert(full_message)
if self.slack_webhook:
self._send_slack_alert(full_message)
print(f"ALERT: {full_message}")
def _send_email_alert(self, message):
"""Send email alert."""
try:
msg = MIMEText(message)
msg['Subject'] = 'Vysion API Rate Limit Alert'
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
server = smtplib.SMTP(self.email_config['smtp_server'])
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send email alert: {e}")
def _send_slack_alert(self, message):
"""Send Slack alert."""
try:
import requests
payload = {"text": message}
requests.post(self.slack_webhook, json=payload)
except Exception as e:
print(f"Failed to send Slack alert: {e}")
  • Implement exponential backoff for retry logic
  • Use queuing systems for high-volume processing
  • Cache frequently accessed data to reduce API calls
  • Monitor rate limit headers proactively
  • Batch operations when possible
  • Use appropriate page sizes for pagination
  • Prioritize critical requests in high-volume scenarios
  • Implement circuit breakers for failed endpoints
  • Handle 429 responses gracefully with appropriate delays
  • Implement fallback mechanisms for critical operations
  • Log rate limit events for analysis and optimization
  • Set up alerting for rate limit violations
  • Track request patterns and identify optimization opportunities
  • Monitor API usage trends to predict capacity needs
  • Set up dashboards for real-time rate limit visibility
  • Analyze error rates and response times

For high-volume applications requiring higher rate limits:

  • Contact Vysion support for enterprise rate limit increases
  • Implement dedicated API pools for different application components
  • Use multiple API keys strategically (where permitted)
  • Consider API gateway solutions for traffic management

Recommended tools for rate limit management:

  • Python: ratelimit, tenacity, backoff
  • JavaScript: bottleneck, p-limit, retry
  • Monitoring: Grafana, DataDog, New Relic
  • Alerting: PagerDuty, Slack, email notifications

By following these practices and implementing proper rate limiting strategies, you can build robust applications that efficiently use the Vysion API while respecting service limits and ensuring optimal performance.