返回

Kubernetes 日志流中断后如何持续读取?

python

从 Kubernetes 日志流的中断处恢复读取

问题:

在 Kubernetes 中使用日志流 URI 访问 Pod 日志时,如何从流中断处重新请求,直到 Pod 结束?

解决方案:

使用 Range

通过在请求头中传递 headers['Range'] = f'bytes={last_byte + 1}-',可以从上次读取的字节位置开始恢复请求。其中,last_byte 是上次读取的最后一个字节的序号。

使用 Requests 库的 iter_lines() 方法

Requests 库中,iter_lines() 方法可用于迭代流式响应中的行。通过持续调用此方法,可以直到流中断,然后使用 try...except 块捕获异常并重新建立连接。

代码示例:

import requests
import time

def stream_logs(log_streaming_endpoint, headers):
    while True:
        try:
            response = requests.get(log_streaming_endpoint, headers=headers, stream=True)
            response_log_streaming_lines = response.iter_lines(chunk_size=1024)
            for line in response_log_streaming_lines:
                log_line = line.decode("utf-8")
                print(log_line)
        except requests.exceptions.RequestException as e:
            print("Error fetching logs:", e)

        # 检查流是否中断
        if not response.ok:
            break
        else:
            # 如果响应成功,清除 'Range' 头
            content_range = response.headers.get('Content-Range')
            if content_range:
                last_byte = int(content_range.split('/')[-1])
                # 设置 'Range' 头以从接收的最后一个字节恢复流
                headers['Range'] = f'bytes={last_byte + 1}-'
            else:
                # 如果 'Content-Range' 头不存在,则从头开始重试
                try:
                    del headers['Range']
                except KeyError:
                    pass  # 'Range' 头不存在,无需删除

            print("Content-Range 头未找到。从头开始重试。")

        time.sleep(1)

# 使用示例
log_streaming_endpoint = "https://example.com/api/v1/logs/pod/my-pod"
headers = {'Authorization': 'Bearer 12345'}

stream_logs(log_streaming_endpoint, headers)

注意:

  • 在使用 Range 头时,服务器需要支持 HTTP Range 请求。
  • 如果 Content-Range 头不可用,可以从头开始重新请求日志流。

结论:

通过使用 Range 头或 Requests 库的 iter_lines() 方法,我们可以从 Kubernetes 日志流中断处恢复读取,直到 Pod 结束。这有助于保持日志流的持续性,并防止丢失重要的信息。

常见问题解答:

1. 什么是 Range 头?

Range 头是一个 HTTP 请求头,用于请求服务器响应消息体的指定字节范围。

2. iter_lines() 方法如何工作?

iter_lines() 方法将流式响应解析为文本行,并返回一个迭代器对象。该迭代器在流式响应中生成行,直到流中断或达到文件末尾。

3. 如何判断日志流是否中断?

requests.get() 请求引发 requests.exceptions.RequestException 异常时,表示日志流已中断。

4. 为什么在 Content-Range 头不存在时需要从头开始重试?

如果 Content-Range 头不存在,则服务器无法确定响应的字节范围。因此,需要从头开始重新请求日志流以获取完整的日志。

5. 如何确保日志流的持续性?

通过定期重新建立日志流连接,并从上次读取的字节处恢复请求,可以确保日志流的持续性。