返回

埋头苦干远胜于口头好汉:我的ES死锁问题排查之旅

后端







## 问题概述

### 业务流程

1. 从Kafka消费数据。
2. 业务解析,组装数据。
3. 将组装好的数据,使用bulkProcessor.add(indexRequest),异步批量提交数据到ES。

### 问题

一段时间以来,我们一直遇到一个奇怪的问题。我们的ES集群偶尔会死锁,导致整个系统瘫痪。经过一番调查,我们发现问题出在我们的数据摄取管道上。

我们使用Kafka来收集数据,然后使用一个Spring Boot应用程序将数据解析成ES可以理解的格式。最后,我们使用bulkProcessor.add(indexRequest)方法将数据批量提交到ES。

## 排查过程

### 初步调查

我们首先查看了ES的日志文件。我们发现,在死锁发生时,ES会抛出以下错误消息:

[2023-03-08 15:30:00,345] ERROR [bulk-thread-1] org.elasticsearch.transport.TransportService - [search-shard-0] failed to send bulk [42] to [search-node-1]
java.lang.IllegalStateException: Bulk shard request is in progress


这条错误消息表明,ES正在尝试发送一个批量请求到一个分片,但该分片目前正在处理另一个批量请求。这导致了死锁。

### 进一步调查

为了进一步调查问题,我们使用了jstack工具来生成ES进程的线程转储。我们发现,有一个线程被阻塞在以下方法上:

org.elasticsearch.transport.TransportService.sendRequest(TransportRequest, Node, TransportRequestOptions)


这个方法用于将请求发送到另一个节点。在我们的案例中,该线程正在尝试将批量请求发送到一个分片,但该分片目前正在处理另一个批量请求。

### 问题根源

经过一番调查,我们发现问题的根源在于我们的代码。我们在代码中使用了bulkProcessor.add(indexRequest)方法来将数据批量提交到ES。这个方法是异步的,这意味着它不会等待ES处理完请求。当ES处理请求时,它会将请求放入一个队列中。如果队列已满,ES就会抛出BulkShardRequestInProgressException异常。

## 解决措施

为了解决这个问题,我们在代码中添加了一个检查,以确保在将请求添加到bulkProcessor之前,队列中没有其他请求。我们还将bulkProcessor的队列大小增加了一倍。

if (bulkProcessor.queuedRequestCount() == 0) {
bulkProcessor.add(indexRequest);
}


bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// Do something before the bulk request is executed.
}

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        // Do something after the bulk request is executed.
    }

    @Override
    public void onFailure(long executionId, BulkRequest request, Throwable failure) {
        // Do something when a bulk request fails.
    }
}

).setBulkActions(5000)
.setBulkSize(25mb)
.setFlushInterval(100ms)
.build();


## 结论

通过对问题的仔细调查和分析,我们最终找到了问题的根源并采取了有效的解决措施。希望这篇文章能帮助其他人避免同样的问题。