返回

Asio扩展: 使用io_uring处理非标准异步系统调用

Linux

Asio 与 io_uring 的功能扩展

Linux 内核的 io_uring 为异步 I/O 提供了高性能的解决方案,但并非所有系统调用都在 Asio 的默认异步操作范围内。本篇文章讨论了如何在 Asio 环境下使用 io_uring 处理这些非标准的异步系统调用,例如 mkdirrenamesetxattr 等操作。

访问 Asio 的 io_uring 实例

首先,我们考虑是否可以访问 Asio 内部使用的 io_uring 实例。理论上,asio::stream_file::get_executor() 有可能提供关于 io_uring 实例的信息。 但经过深入研究,我们发现Asio 并没有直接暴露 io_uring 的底层接口。这样做主要考虑到接口的稳定性和抽象性。 直接操作底层的io_uring API可能会打破 Asio 的异步执行模型,并使应用程序变得脆弱。

因此,直接访问或修改 Asio 的 io_uring 实例并不是一个可取的做法。

使用单独的 io_uring 实例进行集成

如果 Asio 的 io_uring 实例无法访问,一个合理的替代方案是创建并管理我们自己的 io_uring 实例。这种方法允许在 Asio 环境中实现对那些非标准系统调用的异步处理, 关键在于如何协调这个单独的 io_uring 实例与 Asio 的异步模型。

使用独立的io_uringeventfd 并与 asio::posix::descriptor 配合是一种有效的集成方案。 流程如下:

  1. 初始化: 创建 io_uring 实例和用于唤醒 Asio 事件循环的 eventfd
  2. 异步请求准备:io_uring 的系统调用操作通过 io_uring_prep_* 系列函数写入 Submission Queue Entry (SQE)。
  3. SQE 提交: 将 SQE 提交给io_uring,并让其异步执行。
  4. 关联上下文: 需要记录挂起的协程和cqe->user_data之间的关系,方便后续处理结果时可以恢复正确的协程。
  5. 事件监听: 通过 asio::posix::descriptor 监听 eventfd 的可读事件。io_uring 执行完成后将通过 eventfd 通知我们有新的 Completion Queue Entry (CQE) 产生。
  6. CQE处理:eventfd 有可读事件时,处理相应的 CQE 并恢复对应的协程。
#include <asio/io_context.hpp>
#include <asio/awaitable.hpp>
#include <asio/co_spawn.hpp>
#include <asio/detached.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/signal_set.hpp>
#include <asio/use_awaitable.hpp>
#include <asio/posix/descriptor.hpp>
#include <asio/this_coro.hpp>
#include <asio/buffer.hpp>


#include <fcntl.h>
#include <liburing.h>
#include <sys/eventfd.h>
#include <stdexcept>
#include <map>

#include <format>
#include <iostream>

using tcp = asio::ip::tcp;


class io_uring_wrapper {
public:
    io_uring_wrapper() {
        int ret;

        ring = new ::io_uring{};
        if (::io_uring_queue_init(32, ring, 0) < 0){
            throw std::runtime_error("Failed to initialize io_uring");
        }
        
        event_fd = eventfd(0, EFD_NONBLOCK);
        if (event_fd < 0) {
          ::io_uring_queue_exit(ring);
          delete ring;
           throw std::runtime_error("Failed to create eventfd");
        }


    }
    ~io_uring_wrapper() {
          if(ring != nullptr){
           ::io_uring_queue_exit(ring);
            delete ring;
         }
          if(event_fd > 0){
              ::close(event_fd);
          }

    }

    ::io_uring *get_ring(){ return ring;}
    int get_eventfd(){return event_fd;}

    template <typename Awaitable>
    void add_callback(uint64_t id, Awaitable awaitable) {
       callbacks[id] = std::move(awaitable);
    }
    
     template<typename CompletionToken>
    auto await_cqe(CompletionToken&& token){

        struct awaiter {
            using await_result = typename std::remove_cvref_t<CompletionToken>::template rebind<void>::other_result_type;
            awaiter(io_uring_wrapper &ring, uint64_t user_data) :
                    ring_ref_(ring), id_(user_data)  {}
        
                bool await_ready() const { return false; }
           
               void await_suspend(std::coroutine_handle<> h) {
                   ring_ref_.add_callback(id_,  h);

                  auto *ring = ring_ref_.get_ring();
                  ::io_uring_cqe *cqe = nullptr;

                 
                    int count = 0;
                      ::io_uring_peek_cqe(ring, &cqe);
                       
                     if (cqe != nullptr && (uint64_t)cqe->user_data == id_) {
                         
                       
                           if(count++> 1){
                            
                           } else{
                              // already ready 
                                h.resume();
                            
                             }


                        } else{

                           
                          return ; 
                       }

                     
           }

          await_result await_resume()  {
             auto *ring = ring_ref_.get_ring();
               ::io_uring_cqe *cqe;
               
            ::io_uring_wait_cqe(ring, &cqe);


                 int res =  cqe->res;

                 ::io_uring_cqe_seen(ring, cqe);


                   ring_ref_.remove_callback(id_);

                   return  res;
        }


            io_uring_wrapper& ring_ref_;
           uint64_t  id_;


         };

          return awaiter{*this,std::rand()};
    }
private:
    
    void remove_callback(uint64_t id) {
         callbacks.erase(id);
     }

    ::io_uring *ring;
    int event_fd;
    std::map<uint64_t, std::coroutine_handle<>> callbacks;
};



asio::awaitable<void> handle_mkdir( io_uring_wrapper& io_uring_ring) {
        int ret;
    // get sqe and prepare the op.

        auto *ring = io_uring_ring.get_ring();

         ::io_uring_sqe *sqe =  ::io_uring_get_sqe(ring);


        if (sqe == nullptr){
          std::cout<<"No free SQE entries available";
           return ;
        }
      
         ::io_uring_prep_mkdirat(sqe, AT_FDCWD, "/tmp/mytestdir", 0755);


        // set the user_data, a.k.a identifier for this operation
           uint64_t operation_id = std::rand();
            ::io_uring_sqe_set_data(sqe, (void*)operation_id);


    //  submit to kernel to be processed asynchronously.
      ret =  ::io_uring_submit(ring);
     
        if (ret < 0) {

         throw std::runtime_error(std::format("failed to submit to ring {}, operation id ={}",ret ,operation_id ));
    }


    // block and process result
    int result = co_await io_uring_ring.await_cqe(asio::use_awaitable);

       std::cout<<"result for operation: "<<operation_id<< " result: " <<result<< "\n";
   
   
   co_return;


}



asio::awaitable<void> handler(tcp::socket s,  io_uring_wrapper & io_uring_ring)
{
    auto exec = co_await asio::this_coro::executor;
    try {
           co_await handle_mkdir(io_uring_ring);
      
    }catch(const std::exception &ex)
    {
         std::cerr<< "handler ex" << ex.what()<<std::endl;
    }


  auto msg = "task finished";
    co_await asio::async_write(s, asio::buffer(msg), asio::use_awaitable);
    s.close();


    
}

asio::awaitable<void> listener( io_uring_wrapper &io_uring_ring )
{

    auto executor = co_await asio::this_coro::executor;
    tcp::acceptor acceptor(executor, {tcp::v6(), 8080});

   
       asio::posix::descriptor eventfd_descriptor{executor, io_uring_ring.get_eventfd()};

        std::cout<<"io_uring listener setup\n";


    for (;;)
    {

       
       // use select system call to avoid constant polling to event_fd (via level triggered EPOLL_CTL_ADD).
           co_await  eventfd_descriptor.async_wait(asio::posix::descriptor::wait_type::read,asio::use_awaitable);

        uint64_t count;
        auto r =::read( io_uring_ring.get_eventfd(), &count, sizeof(count)); // consume event count.

          
        if( r < 0) {
           
        } else{


              auto socket = co_await acceptor.async_accept(asio::use_awaitable);
              asio::co_spawn(executor, handler(std::move(socket), io_uring_ring), asio::detached);
             
           }
      
    }
}

int main()
{
   
    io_uring_wrapper  io_uring_ring{};
    asio::io_context ctx(1);
    asio::signal_set signals(ctx, SIGINT, SIGTERM);
    signals.async_wait([&](auto, auto){ ctx.stop(); });


    asio::co_spawn(ctx, listener(io_uring_ring), asio::detached);

   
    ctx.run();
    
    return 0;
}

这个代码示例演示了如何利用单独的 io_uring 实例,并将其与 Asio 的事件循环集成。

核心概念在于将 io_uring 的 Completion Event (CQE) 通过 eventfd 通知 Asio 事件循环。

总结

整合 io_uring 与 Asio 能够有效地提升非标准异步系统调用的性能。 选择直接操作底层 io_uring 或者维护单独实例的关键在于应用需求、团队经验和开发效率间的平衡。 上述代码示例清晰展示了如何创建和管理独立的 io_uring 实例, 进而与 Asio 环境有效集成。通过理解和实践本文的技巧,开发者可充分利用 io_uring 的优势,拓展 Asio 的异步 I/O 功能,满足更广泛的应用场景需求。