Python Library Guide
The Vysion Python library provides a convenient wrapper around the Vysion API, making it easy to integrate threat intelligence data into your Python applications. This guide covers installation, configuration, and usage patterns.
Installation
Section titled “Installation”Install the Vysion Python library using pip:
pip install vysion
For development or the latest features, you can install from the source repository:
pip install git+https://github.com/vysion-ai/vysion-python.git
Quick Start
Section titled “Quick Start”Basic Configuration
Section titled “Basic Configuration”import osfrom vysion import client
# Set your API key as an environment variableos.environ['VYSION_API_KEY'] = 'your_api_key_here'
# Initialize the clientc = client.Client()
from vysion import client
# Initialize the client with API keyc = client.Client(api_key='your_api_key_here')
from vysion import clientimport config # Your config module
# Initialize the client with configc = client.Client(api_key=config.API_KEY)
First API Call
Section titled “First API Call”from vysion import client
c = client.Client(api_key='your_api_key_here')
# Search for documents containing a specific termresult = c.search_documents(query='lockbit')print(f"Found {result['data']['total']} documents")
# Display first resultif result['data']['hits']: first_doc = result['data']['hits'][0] print(f"Title: {first_doc['page']['title']}") print(f"URL: {first_doc['page']['url']}")
Core Methods
Section titled “Core Methods”Document Search
Section titled “Document Search”The search_documents
method is the most versatile way to find documents:
# Simple text searchresult = c.search_documents(query='ransomware')
# Search with paginationresult = c.search_documents( query='alphv', page=1, page_size=50)
# Search with operators and filtersresult = c.search_documents( query='lockbit AND (leak OR victim)', gte='2024-01-01', lte='2024-12-31', tags='ransomware', countries='US,UK,DE')
# Search for documents containing emailsresult = c.search_documents( query='email:*@company.com')
# Search for documents with Bitcoin addressesresult = c.search_documents( query='bitcoin_address:bc1q*')
Document Retrieval
Section titled “Document Retrieval”Get specific documents by various identifiers:
# Get document by IDdoc = c.get_document_by_id('document_id_here')
# Get document by URLdoc = c.get_document_by_url('https://example.onion/page')
# Get document by tagdocs = c.get_documents_by_tag('ransomware')
# Get documents by emaildocs = c.get_documents_by_email('admin@company.com')
# Get documents by wallet addressdocs = c.get_documents_by_wallet('BTC', 'bc1q026rl6hjkdywnsrtva26mq2w0avs9k850ew2d6')
# Get documents by phone numberdocs = c.get_documents_by_phone('+1234567890')
HTML Content
Section titled “HTML Content”Retrieve the full HTML content of documents:
# Get HTML content by document IDhtml_content = c.get_document_html('document_id_here')
# Get HTML content by URLhtml_content = c.get_document_html_by_url('https://example.onion/page')
Ransomware Intelligence
Section titled “Ransomware Intelligence”Access ransomware-specific intelligence:
# Search ransomware victimsvictims = c.search_ransomware_victims( query='healthcare', countries='US', groups='lockbit,alphv')
# Get specific victim by IDvictim = c.get_ransomware_victim('victim_id_here')
# Get country statisticscountry_stats = c.ransomware_countries_stats('US,UK,DE')
# Get group statisticsgroup_stats = c.ransomware_groups_stats()
# Get sector statisticssector_stats = c.ransomware_groups_sector('62') # Healthcare NAICS
# Get attacks histogramattacks_hist = c.get_ransomware_attacks_histogram()
# Get groups histogramgroups_hist = c.get_ransomware_groups_histogram()
Instant Messaging
Section titled “Instant Messaging”Access Telegram and Discord data:
# Get chat messages from a channelchat = c.get_im_chat( platform='Telegram', channel_id='-1002018436281', gte='2024-05-16T10:57:58.632466', lte='2024-05-16T23:00:00')
# Get specific messagemessage = c.get_im_message('Telegram', '-1002018336281_85')
# Get user profileprofile = c.get_im_profile('Telegram', '1595116601')
# Get channel informationchannel = c.get_im_channel('Telegram', '-1012018336281')
# Get Discord server informationserver = c.get_im_server('discord', '1031841869195395175')
# Search by emailemail_results = c.im_find_email('help@coinbase.com')
# Search by walletwallet_results = c.im_find_wallet( 'BTC', 'bc1q026rl6hjkdywnsrtva26mq2w0avs9k850ew2d6')
Access daily threat intelligence feeds:
from datetime import datetime, timedelta
# Get yesterday's feedsyesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# Get ransomware feedransomware_feed = c.get_ransomware_feed(batch=yesterday)
# Get Telegram feedtelegram_feed = c.get_telegram_feed(batch=yesterday)
# Get wallets feedwallets_feed = c.get_wallets_feed(batch=yesterday)
Advanced Usage Patterns
Section titled “Advanced Usage Patterns”Bulk Data Processing
Section titled “Bulk Data Processing”from vysion import clientimport time
c = client.Client(api_key='your_api_key_here')
def process_all_victims(query, page_size=100): """Process all ransomware victims matching a query.""" page = 1 all_victims = []
while True: try: result = c.search_ransomware_victims( query=query, page=page, page_size=page_size )
hits = result['data']['hits'] all_victims.extend(hits)
print(f"Processed page {page}, got {len(hits)} victims")
# Check if we've reached the end if len(hits) < page_size: break
page += 1
# Rate limiting - be respectful to the API time.sleep(0.1)
except Exception as e: print(f"Error on page {page}: {e}") break
return all_victims
# Usagehealthcare_victims = process_all_victims('healthcare')print(f"Total healthcare victims found: {len(healthcare_victims)}")
Error Handling
Section titled “Error Handling”from vysion import clientfrom requests.exceptions import RequestException
c = client.Client(api_key='your_api_key_here')
def safe_api_call(func, *args, **kwargs): """Wrapper for safe API calls with error handling.""" try: result = func(*args, **kwargs)
# Check for API errors if result.get('error'): print(f"API Error: {result['error']['message']}") return None
return result
except RequestException as e: print(f"Network Error: {e}") return None except Exception as e: print(f"Unexpected Error: {e}") return None
# Usageresult = safe_api_call(c.search_documents, query='lockbit')if result: print(f"Found {result['data']['total']} documents")
Data Export
Section titled “Data Export”import jsonimport csvfrom vysion import client
c = client.Client(api_key='your_api_key_here')
def export_victims_to_csv(query, filename): """Export ransomware victims to CSV file.""" victims = c.search_ransomware_victims(query=query, page_size=1000)
with open(filename, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = ['companyName', 'country', 'industry', 'ransomwareGroup', 'detectionDate'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader() for victim in victims['data']['hits']: writer.writerow({ 'companyName': victim.get('companyName', ''), 'country': victim.get('country', ''), 'industry': victim.get('industry', ''), 'ransomwareGroup': victim.get('ransomwareGroup', ''), 'detectionDate': victim.get('detectionDate', '') })
def export_to_json(data, filename): """Export data to JSON file.""" with open(filename, 'w', encoding='utf-8') as jsonfile: json.dump(data, jsonfile, indent=2, ensure_ascii=False)
# Usageexport_victims_to_csv('healthcare', 'healthcare_victims.csv')
Monitoring and Alerting
Section titled “Monitoring and Alerting”import timefrom datetime import datetime, timedeltafrom vysion import client
c = client.Client(api_key='your_api_key_here')
def monitor_new_victims(keywords, check_interval=3600): """Monitor for new ransomware victims containing keywords.""" last_check = datetime.now() - timedelta(hours=24)
while True: try: # Check for new victims since last check result = c.search_ransomware_victims( query=' OR '.join(keywords), gte=last_check.isoformat() )
new_victims = result['data']['hits']
if new_victims: print(f"ALERT: {len(new_victims)} new victims found!") for victim in new_victims: print(f"- {victim['companyName']} ({victim['ransomwareGroup']})")
# Send alerts (implement your notification logic here) send_alert(new_victims)
last_check = datetime.now() time.sleep(check_interval)
except Exception as e: print(f"Monitoring error: {e}") time.sleep(60) # Wait before retrying
def send_alert(victims): """Implement your alerting logic here.""" # Email, Slack, webhook, etc. pass
# Usagekeywords = ['healthcare', 'hospital', 'medical', 'clinic']monitor_new_victims(keywords)
Configuration Options
Section titled “Configuration Options”Client Configuration
Section titled “Client Configuration”from vysion import client
# Full configurationc = client.Client( api_key='your_api_key_here', base_url='https://api.vysion.ai', # Custom base URL if needed timeout=30, # Request timeout in seconds retries=3, # Number of retry attempts backoff_factor=0.3 # Backoff factor for retries)
Environment Variables
Section titled “Environment Variables”The library supports these environment variables:
Variable | Description | Default |
---|---|---|
VYSION_API_KEY | Your API key | None |
VYSION_BASE_URL | API base URL | https://api.vysion.ai |
VYSION_TIMEOUT | Request timeout | 30 |
Performance Tips
Section titled “Performance Tips”1. Use Appropriate Page Sizes
Section titled “1. Use Appropriate Page Sizes”# For bulk processing, use larger page sizesresult = c.search_documents(query='ransomware', page_size=1000)
# For real-time applications, use smaller page sizesresult = c.search_documents(query='ransomware', page_size=10)
2. Implement Caching
Section titled “2. Implement Caching”import functoolsimport time
@functools.lru_cache(maxsize=128)def cached_country_stats(countries, timestamp): """Cache country stats for 1 hour.""" return c.ransomware_countries_stats(countries)
# Usage with hourly cachecurrent_hour = int(time.time() // 3600)stats = cached_country_stats('US,UK,DE', current_hour)
3. Use Specific Queries
Section titled “3. Use Specific Queries”# Less efficient - broad searchresult = c.search_documents(query='data')
# More efficient - specific searchresult = c.search_documents(query='healthcare AND lockbit')
Troubleshooting
Section titled “Troubleshooting”Common Issues
Section titled “Common Issues”-
Authentication Errors
# Check if your API key is validtry:result = c.search_documents(query='test', page_size=1)print("Authentication successful")except Exception as e:print(f"Authentication failed: {e}") -
Rate Limiting
import timedef rate_limited_call(func, *args, **kwargs):while True:try:return func(*args, **kwargs)except Exception as e:if 'rate limit' in str(e).lower():print("Rate limited, waiting...")time.sleep(60)else:raise e -
Network Issues
import requestsfrom requests.adapters import HTTPAdapterfrom urllib3.util.retry import Retry# Configure retry strategysession = requests.Session()retry_strategy = Retry(total=3,backoff_factor=1,status_forcelist=[429, 500, 502, 503, 504])adapter = HTTPAdapter(max_retries=retry_strategy)session.mount("http://", adapter)session.mount("https://", adapter)
Debug Mode
Section titled “Debug Mode”Enable debug mode to see detailed request/response information:
import loggingfrom vysion import client
# Enable debug logginglogging.basicConfig(level=logging.DEBUG)
c = client.Client(api_key='your_api_key_here')result = c.search_documents(query='test')
Next Steps
Section titled “Next Steps”- Explore the API Reference for detailed endpoint documentation
- Check out MISP Integration for threat intelligence platform integration
- Review Rate Limiting best practices
- Join our community forums for support and discussions