返回

解决 VirusTotal API 请求超时问题:Python 深度分析与解决方案

python

解决VirusTotal API请求超时问题

当使用 Python 的 requests 库访问 VirusTotal API 时,遇到 ReadTimeout 异常是一个常见问题。这个问题通常表现为程序在一段时间后抛出 requests.exceptions.ReadTimeout 错误,提示读取操作超时。本文将深入分析此问题的原因,并提供多种解决方案。

问题分析

ReadTimeout 异常通常表示客户端无法在预期时间内从服务器接收到数据。此问题可能由多种因素引起:

  • 网络连接不稳定: 客户端与 VirusTotal 服务器之间的网络连接可能存在延迟或中断。
  • VirusTotal 服务器负载过高: VirusTotal 服务器可能因为请求量过大而响应缓慢。
  • 请求超时设置不合理: requests 库的默认超时设置可能过短,无法满足 VirusTotal API 的响应时间需求。
  • 防火墙或代理配置问题: 防火墙或代理服务器可能会干扰客户端与 VirusTotal 服务器之间的通信。
  • DNS 解析问题: 域名解析失败或解析到错误的 IP 地址可能导致连接超时。
  • API调用频率过快: 超过VirusTotal API速率限制,导致服务器拒绝服务或响应缓慢。

解决方案

以下是解决 ReadTimeout 异常的几种常见方法:

1. 增大请求超时时间

requests 库允许你自定义请求超时时间。通过增大超时时间,可以给服务器更多时间来响应请求。

原理: 增加超时时间可以应对网络延迟或服务器响应缓慢的情况,但过长的超时时间也可能导致程序阻塞。

代码示例:

import requests

def get_ip_report_vt(ip):
    url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
    headers = {
        "x-apikey": "YOUR_API_KEY"
    }
    try:
        response = requests.get(url, headers=headers, timeout=30)  # 设置超时时间为30秒
        response.raise_for_status()  # 检查 HTTP 状态码
        return response.json()
    except requests.exceptions.ReadTimeout:
        print(f"VirusTotal API request timed out for IP: {ip}")
        return None
    except requests.exceptions.RequestException as e:
        print(f"An error occurred while accessing VirusTotal API for IP {ip}: {e}")
        return None

操作步骤:

  1. requests.get() 函数中添加 timeout 参数,并设置一个合理的超时时间值(例如,30秒)。
  2. 运行程序,观察是否仍然出现超时错误。

安全建议:

  • 避免设置过大的超时时间,以免程序长时间阻塞。建议根据实际情况和 VirusTotal API 的响应时间进行调整。
  • 添加错误处理机制,捕获 ReadTimeout 异常,并采取相应的处理措施(例如,重试请求、记录日志)。

2. 使用会话(Session)对象

requests 库的 Session 对象可以在多个请求之间保持持久连接。这可以减少建立连接的开销,提高请求效率。

原理: Session 对象复用 TCP 连接,减少了每次请求都需要重新建立连接的时间,从而降低超时风险。

代码示例:

import requests

def get_ip_report_vt(ip, session):
    url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
    try:
        response = session.get(url, timeout=30)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.ReadTimeout:
        print(f"VirusTotal API request timed out for IP: {ip}")
        return None
    except requests.exceptions.RequestException as e:
        print(f"An error occurred while accessing VirusTotal API for IP {ip}: {e}")
        return None

# 使用 Session 对象发送请求
session = requests.Session()
session.headers.update({"x-apikey": "YOUR_API_KEY"})

# ... 在 process_ip_list 函数中使用 get_ip_report_vt(ip, session) ...

操作步骤:

  1. 创建一个 requests.Session 对象。
  2. 将 API 密钥添加到 Session 对象的请求头中。
  3. 在发送请求时,使用 Session 对象的 get() 方法。

3. 优化API调用频率和请求批次

VirusTotal API 对调用频率有限制。为了避免触发速率限制并引发超时,建议控制请求频率并在请求之间添加延迟。另外,VirusTotal API 支持批量查询IP地址,建议尽量利用这个功能。

原理: 降低请求频率并进行批量查询可以显著降低服务器负载,同时加快数据获取效率。

代码示例:

import requests
import time
import csv

# ...其他代码不变...

def process_ip_list_batch(ip_list, chunk_size=25):  # 每25个IP进行一次批量查询
    results = {}
    for i in range(0, len(ip_list), chunk_size):
        chunk = ip_list[i:i + chunk_size]
        results.update(get_ips_report_vt(chunk))
        time.sleep(15)  # 每次批量查询后等待15秒
    return results

def get_ips_report_vt(ips,session):
    global vt_key_index
    api_key, vt_key_index = get_next_api_key(VIRUSTOTAL_API_KEYS, vt_key_index)
    url = "https://www.virustotal.com/api/v3/ip_addresses"
    headers = {"x-apikey": api_key, "Content-Type": "application/json"}  # 添加 Content-Type 请求头
    params = {"ip": ",".join(ips)}
    try:
        response = session.get(url, headers=headers, params=params, timeout=30)
        response.raise_for_status()  # Raises an error for HTTP codes 4xx/5xx
        data = response.json()
        result = {}
        for item in data['data']:
            result[item['id']] = (item['attributes']['reputation'],item['attributes']['country'])
        return result
    except requests.exceptions.RequestException as e:
        print(f"VirusTotal Error for IPs {ips}: {e}")
        return {}

def process_ip_list(input_csv, output_csv):
    with open(input_csv, 'r') as infile, open(output_csv, 'w', newline='') as outfile:
        reader = csv.reader(infile)
        writer = csv.writer(outfile)
        writer.writerow(['IP', 'VT Score', 'VT Country', 'AbuseIPDB Score', 'AbuseIPDB Country'])
        next(reader)  # Skip header
        ip_list = [row[0] for row in reader]
    # 使用 Session 对象发送请求
    session = requests.Session()
    results= process_ip_list_batch(ip_list) # 批量查询IP地址

    with open(output_csv, 'w', newline='') as outfile:
         writer = csv.writer(outfile)
         writer.writerow(['IP', 'VT Score', 'VT Country']) # 重新写入标题行和数据
         for ip ,(vt_score, vt_country) in results.items():
            abuse_score, abuse_country = get_ip_report_abuseipdb(ip)
            writer.writerow([ip, vt_score, vt_country, abuse_score, abuse_country])

# Example usage
process_ip_list('ip_list.csv', 'ip_report_list.csv')

操作步骤:

  1. 调整chunk_size参数,控制批量查询的大小。
  2. 添加适当的time.sleep()调用,控制请求频率。
  3. 调整为批量API。
  4. 运行代码,并监控运行状态,适当调整 chunk_sizetime.sleep() 的值。

安全建议:

  • 详细阅读VirusTotal API文档,了解其速率限制,确保代码合规。
  • 动态调整请求频率,根据API响应情况进行自适应限速。
  • 当触发速率限制时,增加重试之间的延迟时间,或使用指数退避算法。

4. 使用代理服务器

如果网络连接存在问题,可以尝试使用代理服务器来访问 VirusTotal API。

原理: 代理服务器可以转发客户端请求,绕过网络限制或提高网络速度。

代码示例:
(注意:本示例使用了环境变量 HTTP_PROXYHTTPS_PROXY 来设置代理服务器。 你也可以直接在代码中指定代理服务器地址。)

import requests
import os

def get_ip_report_vt(ip):