返回
埋头苦干远胜于口头好汉:我的ES死锁问题排查之旅
后端
2023-09-11 02:03:13
## 问题概述
### 业务流程
1. 从Kafka消费数据。
2. 业务解析,组装数据。
3. 将组装好的数据,使用bulkProcessor.add(indexRequest),异步批量提交数据到ES。
### 问题
一段时间以来,我们一直遇到一个奇怪的问题。我们的ES集群偶尔会死锁,导致整个系统瘫痪。经过一番调查,我们发现问题出在我们的数据摄取管道上。
我们使用Kafka来收集数据,然后使用一个Spring Boot应用程序将数据解析成ES可以理解的格式。最后,我们使用bulkProcessor.add(indexRequest)方法将数据批量提交到ES。
## 排查过程
### 初步调查
我们首先查看了ES的日志文件。我们发现,在死锁发生时,ES会抛出以下错误消息:
[2023-03-08 15:30:00,345] ERROR [bulk-thread-1] org.elasticsearch.transport.TransportService - [search-shard-0] failed to send bulk [42] to [search-node-1]
java.lang.IllegalStateException: Bulk shard request is in progress
这条错误消息表明,ES正在尝试发送一个批量请求到一个分片,但该分片目前正在处理另一个批量请求。这导致了死锁。
### 进一步调查
为了进一步调查问题,我们使用了jstack工具来生成ES进程的线程转储。我们发现,有一个线程被阻塞在以下方法上:
org.elasticsearch.transport.TransportService.sendRequest(TransportRequest, Node, TransportRequestOptions)
这个方法用于将请求发送到另一个节点。在我们的案例中,该线程正在尝试将批量请求发送到一个分片,但该分片目前正在处理另一个批量请求。
### 问题根源
经过一番调查,我们发现问题的根源在于我们的代码。我们在代码中使用了bulkProcessor.add(indexRequest)方法来将数据批量提交到ES。这个方法是异步的,这意味着它不会等待ES处理完请求。当ES处理请求时,它会将请求放入一个队列中。如果队列已满,ES就会抛出BulkShardRequestInProgressException异常。
## 解决措施
为了解决这个问题,我们在代码中添加了一个检查,以确保在将请求添加到bulkProcessor之前,队列中没有其他请求。我们还将bulkProcessor的队列大小增加了一倍。
if (bulkProcessor.queuedRequestCount() == 0) {
bulkProcessor.add(indexRequest);
}
bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// Do something before the bulk request is executed.
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// Do something after the bulk request is executed.
}
@Override
public void onFailure(long executionId, BulkRequest request, Throwable failure) {
// Do something when a bulk request fails.
}
}
).setBulkActions(5000)
.setBulkSize(25mb)
.setFlushInterval(100ms)
.build();
## 结论
通过对问题的仔细调查和分析,我们最终找到了问题的根源并采取了有效的解决措施。希望这篇文章能帮助其他人避免同样的问题。