Asio扩展: 使用io_uring处理非标准异步系统调用
2025-01-05 02:36:35
Asio 与 io_uring 的功能扩展
Linux 内核的 io_uring
为异步 I/O 提供了高性能的解决方案,但并非所有系统调用都在 Asio 的默认异步操作范围内。本篇文章讨论了如何在 Asio 环境下使用 io_uring
处理这些非标准的异步系统调用,例如 mkdir
、rename
或 setxattr
等操作。
访问 Asio 的 io_uring 实例
首先,我们考虑是否可以访问 Asio 内部使用的 io_uring
实例。理论上,asio::stream_file::get_executor()
有可能提供关于 io_uring
实例的信息。 但经过深入研究,我们发现Asio 并没有直接暴露 io_uring
的底层接口。这样做主要考虑到接口的稳定性和抽象性。 直接操作底层的io_uring
API可能会打破 Asio 的异步执行模型,并使应用程序变得脆弱。
因此,直接访问或修改 Asio 的 io_uring
实例并不是一个可取的做法。
使用单独的 io_uring 实例进行集成
如果 Asio 的 io_uring
实例无法访问,一个合理的替代方案是创建并管理我们自己的 io_uring
实例。这种方法允许在 Asio 环境中实现对那些非标准系统调用的异步处理, 关键在于如何协调这个单独的 io_uring
实例与 Asio 的异步模型。
使用独立的io_uring
和 eventfd
并与 asio::posix::descriptor
配合是一种有效的集成方案。 流程如下:
- 初始化: 创建
io_uring
实例和用于唤醒 Asio 事件循环的eventfd
。 - 异步请求准备: 将
io_uring
的系统调用操作通过io_uring_prep_*
系列函数写入 Submission Queue Entry (SQE)。 - SQE 提交: 将 SQE 提交给
io_uring
,并让其异步执行。 - 关联上下文: 需要记录挂起的协程和
cqe->user_data
之间的关系,方便后续处理结果时可以恢复正确的协程。 - 事件监听: 通过
asio::posix::descriptor
监听eventfd
的可读事件。io_uring
执行完成后将通过eventfd
通知我们有新的 Completion Queue Entry (CQE) 产生。 - CQE处理: 当
eventfd
有可读事件时,处理相应的 CQE 并恢复对应的协程。
#include <asio/io_context.hpp>
#include <asio/awaitable.hpp>
#include <asio/co_spawn.hpp>
#include <asio/detached.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/signal_set.hpp>
#include <asio/use_awaitable.hpp>
#include <asio/posix/descriptor.hpp>
#include <asio/this_coro.hpp>
#include <asio/buffer.hpp>
#include <fcntl.h>
#include <liburing.h>
#include <sys/eventfd.h>
#include <stdexcept>
#include <map>
#include <format>
#include <iostream>
using tcp = asio::ip::tcp;
class io_uring_wrapper {
public:
io_uring_wrapper() {
int ret;
ring = new ::io_uring{};
if (::io_uring_queue_init(32, ring, 0) < 0){
throw std::runtime_error("Failed to initialize io_uring");
}
event_fd = eventfd(0, EFD_NONBLOCK);
if (event_fd < 0) {
::io_uring_queue_exit(ring);
delete ring;
throw std::runtime_error("Failed to create eventfd");
}
}
~io_uring_wrapper() {
if(ring != nullptr){
::io_uring_queue_exit(ring);
delete ring;
}
if(event_fd > 0){
::close(event_fd);
}
}
::io_uring *get_ring(){ return ring;}
int get_eventfd(){return event_fd;}
template <typename Awaitable>
void add_callback(uint64_t id, Awaitable awaitable) {
callbacks[id] = std::move(awaitable);
}
template<typename CompletionToken>
auto await_cqe(CompletionToken&& token){
struct awaiter {
using await_result = typename std::remove_cvref_t<CompletionToken>::template rebind<void>::other_result_type;
awaiter(io_uring_wrapper &ring, uint64_t user_data) :
ring_ref_(ring), id_(user_data) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> h) {
ring_ref_.add_callback(id_, h);
auto *ring = ring_ref_.get_ring();
::io_uring_cqe *cqe = nullptr;
int count = 0;
::io_uring_peek_cqe(ring, &cqe);
if (cqe != nullptr && (uint64_t)cqe->user_data == id_) {
if(count++> 1){
} else{
// already ready
h.resume();
}
} else{
return ;
}
}
await_result await_resume() {
auto *ring = ring_ref_.get_ring();
::io_uring_cqe *cqe;
::io_uring_wait_cqe(ring, &cqe);
int res = cqe->res;
::io_uring_cqe_seen(ring, cqe);
ring_ref_.remove_callback(id_);
return res;
}
io_uring_wrapper& ring_ref_;
uint64_t id_;
};
return awaiter{*this,std::rand()};
}
private:
void remove_callback(uint64_t id) {
callbacks.erase(id);
}
::io_uring *ring;
int event_fd;
std::map<uint64_t, std::coroutine_handle<>> callbacks;
};
asio::awaitable<void> handle_mkdir( io_uring_wrapper& io_uring_ring) {
int ret;
// get sqe and prepare the op.
auto *ring = io_uring_ring.get_ring();
::io_uring_sqe *sqe = ::io_uring_get_sqe(ring);
if (sqe == nullptr){
std::cout<<"No free SQE entries available";
return ;
}
::io_uring_prep_mkdirat(sqe, AT_FDCWD, "/tmp/mytestdir", 0755);
// set the user_data, a.k.a identifier for this operation
uint64_t operation_id = std::rand();
::io_uring_sqe_set_data(sqe, (void*)operation_id);
// submit to kernel to be processed asynchronously.
ret = ::io_uring_submit(ring);
if (ret < 0) {
throw std::runtime_error(std::format("failed to submit to ring {}, operation id ={}",ret ,operation_id ));
}
// block and process result
int result = co_await io_uring_ring.await_cqe(asio::use_awaitable);
std::cout<<"result for operation: "<<operation_id<< " result: " <<result<< "\n";
co_return;
}
asio::awaitable<void> handler(tcp::socket s, io_uring_wrapper & io_uring_ring)
{
auto exec = co_await asio::this_coro::executor;
try {
co_await handle_mkdir(io_uring_ring);
}catch(const std::exception &ex)
{
std::cerr<< "handler ex" << ex.what()<<std::endl;
}
auto msg = "task finished";
co_await asio::async_write(s, asio::buffer(msg), asio::use_awaitable);
s.close();
}
asio::awaitable<void> listener( io_uring_wrapper &io_uring_ring )
{
auto executor = co_await asio::this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v6(), 8080});
asio::posix::descriptor eventfd_descriptor{executor, io_uring_ring.get_eventfd()};
std::cout<<"io_uring listener setup\n";
for (;;)
{
// use select system call to avoid constant polling to event_fd (via level triggered EPOLL_CTL_ADD).
co_await eventfd_descriptor.async_wait(asio::posix::descriptor::wait_type::read,asio::use_awaitable);
uint64_t count;
auto r =::read( io_uring_ring.get_eventfd(), &count, sizeof(count)); // consume event count.
if( r < 0) {
} else{
auto socket = co_await acceptor.async_accept(asio::use_awaitable);
asio::co_spawn(executor, handler(std::move(socket), io_uring_ring), asio::detached);
}
}
}
int main()
{
io_uring_wrapper io_uring_ring{};
asio::io_context ctx(1);
asio::signal_set signals(ctx, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ctx.stop(); });
asio::co_spawn(ctx, listener(io_uring_ring), asio::detached);
ctx.run();
return 0;
}
这个代码示例演示了如何利用单独的 io_uring
实例,并将其与 Asio 的事件循环集成。
核心概念在于将 io_uring
的 Completion Event (CQE) 通过 eventfd
通知 Asio 事件循环。
总结
整合 io_uring
与 Asio 能够有效地提升非标准异步系统调用的性能。 选择直接操作底层 io_uring
或者维护单独实例的关键在于应用需求、团队经验和开发效率间的平衡。 上述代码示例清晰展示了如何创建和管理独立的 io_uring
实例, 进而与 Asio 环境有效集成。通过理解和实践本文的技巧,开发者可充分利用 io_uring
的优势,拓展 Asio 的异步 I/O 功能,满足更广泛的应用场景需求。