返回

Unix Domain Socket SetDeadline 不生效?原因及解决

Linux

Unix Domain Socket 中 SetDeadline 不生效?深究原因和解决办法

最近在测试 unixpacket 类型的 Unix Domain Socket (UDS) 的行为时,碰上了一个怪事:设置了读取的截止时间(SetDeadline),但是超时后读取操作仍然阻塞,并没有如预期般返回错误。 这到底咋回事?

问题复现

简单来说,客户端会发送一条消息,然后休眠 60 秒后关闭连接;服务端则尝试读取两次消息。 第一次读取成功,第二次读取按理说应该阻塞住。 我加了 SetDeadline,期望第二次读取在超时后会失败。 然而,测试结果是:第二次读取超时后仍然阻塞,并没有失败。

以下是复现问题的代码:

客户端 (client.go):

package main

import (
	"fmt"
	"net"
	"strings"
	"time"
)

func main() {
	addr := "/tmp/uds_socket"
	client, err := net.Dial("unixpacket", addr)
	if err != nil {
		fmt.Println("Dial error:", err)
		return
	}
	defer client.Close()

	message := strings.Repeat("A", 100)
	_, err = client.Write([]byte(message))
	if err != nil {
		fmt.Println("Write error:", err)
		return
	}

	time.Sleep(time.Second * 60)
}

服务端 (server.go):

package main

import (
	"fmt"
	"net"
	"os"
	"time"
)

func main() {
	addr := "/tmp/uds_socket"
	os.Remove(addr)

	server, err := net.Listen("unixpacket", addr)
	if err != nil {
		fmt.Println("Listen error:", err)
		return
	}
	defer server.Close()

	conn, err := server.Accept()
	if err != nil {
		fmt.Println("Accept error:", err)
		return
	}
	defer conn.Close()

	var buf []byte

	fmt.Println("now:", time.Now().String())
	deadline := time.Now().Add(time.Second * 2)
	err = conn.SetDeadline(deadline)
	if err != nil {
		fmt.Println("Error setting read deadline:", err)
		return
	}
	fmt.Println("setting read deadline:", deadline.String())

	temp := make([]byte, 1000)
	n, err := conn.Read(temp)
	if err != nil {
		fmt.Println("Read error:", err)
		return // 添加return,防止后续继续Read操作
	}
	fmt.Println("Read size:", n)
	buf = append(buf, temp[:n]...)

	fmt.Println("now:", time.Now().String())
	deadline = time.Now().Add(time.Second * 2)
	err = conn.SetDeadline(deadline)
	if err != nil {
		fmt.Println("Error setting read deadline:", err)
		return
	}
	fmt.Println("setting read deadline:", deadline.String())

	temp = make([]byte, 1000)
	n, err = conn.Read(temp)
	if err != nil {
		fmt.Println("Read error:", err)
		return // 添加return,防止继续执行
	}
	fmt.Println("Read size:", n)
	buf = append(buf, temp[:n]...)

	fmt.Println("Received message:", string(buf))
}

运行步骤:

  1. 先运行服务端 go run server.go
  2. 再运行客户端 go run client.go

你会发现,服务端的第二个 Read 操作在超过 2 秒后,仍然在阻塞,而不是返回一个超时的错误。

问题原因

问题的根源在于对 unixpacketSetDeadline 的理解有偏差。 SetDeadline 设置的是 I/O 操作的截止时间,但对于 unixpacket 类型的 UDS,它的行为有些特殊。

  • unixpacket 的特性: unixpacket 是一种面向消息的、可靠的协议。这意味着数据是以消息为单位进行传输的,并且保证消息的完整性和可靠性。 当客户端发送一条消息后,即使客户端不关闭连接,服务端也能完整地读取到这条消息。 如果服务端尝试再次读取,而此时没有新的消息到达,Read 操作就会阻塞。
  • SetDeadline 的作用: SetDeadline 设置的是 I/O 操作(如 ReadWrite)的截止时间。如果在这个时间内没有完成 I/O 操作,就会返回一个超时的错误 (net.Error 接口的 Timeout() 方法返回 true)。
  • 核心问题: 在使用unixpacket时,只要客户端发送了一条消息,服务端的第一次Read调用就一定会成功(假设缓冲区足够大)。 而设置SetDeadline主要影响 等待数据到达 这个过程。第一次Read, 数据已就绪。第二次Read时,因为对端根本没发送新消息,Read 操作就阻塞在 等待新消息到达 的阶段。 虽然设置了SetDeadline,但是unixpacket没有发生新的I/O事件 ,内核不会去检查Deadline, 因此并不会触发超时。

一句话概括:SetDeadline 对已存在的、未读取的 unixpacket 消息无效。它只对等待新 I/O 事件有效。

解决方案

针对这个问题,有几种不同的解决思路。

方案一:使用 SetReadDeadline

SetReadDeadline 只设置读取操作的截止时间,相比 SetDeadline 更精确。 多数情况下推荐直接使用SetReadDeadline.

修改 server.go 代码:

// ... 其他代码 ...

fmt.Println("now:", time.Now().String())
deadline := time.Now().Add(time.Second * 2)
err = conn.SetReadDeadline(deadline) // 只设置 Read 的 Deadline
if err != nil {
	fmt.Println("Error setting read deadline:", err)
	return
}
fmt.Println("setting read deadline:", deadline.String())

// ... 其他代码 ...

fmt.Println("now:", time.Now().String())
deadline = time.Now().Add(time.Second * 2)
err = conn.SetReadDeadline(deadline) // 只设置 Read 的 Deadline
if err != nil {
	fmt.Println("Error setting read deadline:", err)
	return
}
fmt.Println("setting read deadline:", deadline.String())
// ... 其他代码 ...

尽管做了以上更改,你会发现问题依然存在,这是因为核心问题依然没有解决,虽然变得更精准,但是依然是在没有I/O事件的时候触发的。

方案二:客户端发送 EOF 信号 (不推荐)

unixpacket 中,没有像 TCP 那样的半关闭状态。客户端可以通过 Shutdown 方法发送 EOF 信号,但仅对 SOCK_STREAM 有效, 对SOCK_SEQPACKET(即unixpacket)不起作用。因此这种方法不可靠。

方案三:自定义应用层协议

更可靠的方法是,在应用层定义一个简单的协议。 例如:

  1. 长度前缀: 客户端在发送消息前,先发送一个表示消息长度的固定字节数(如 4 字节的整数)。服务端先读取这个长度,再根据长度读取实际的消息内容。
  2. 特殊结束符: 客户端在发送完所有消息后,发送一个特殊的结束符(如一个空消息或一个特定标记)。服务端读取到这个结束符后,就知道没有更多数据了。
  3. 心跳包: 客户端定时发送心跳包。 服务端如果在一定时间内没有收到心跳包,就认为连接断开。

示例(长度前缀):

客户端 (client.go):

package main

import (
	"encoding/binary"
	"fmt"
	"net"
	"strings"
	"time"
)

func main() {
	addr := "/tmp/uds_socket"
	client, err := net.Dial("unixpacket", addr)
	if err != nil {
		fmt.Println("Dial error:", err)
		return
	}
	defer client.Close()

	message := strings.Repeat("A", 100)
	sendWithLength(client, []byte(message)) // 发送带长度的消息

    // 模拟发送第二条消息
    time.Sleep(2*time.Second) //此处睡眠时间短于服务端的SetReadDeadLine, 用来模拟网络延时到达的情况。
    sendWithLength(client, []byte("Hello"))

	time.Sleep(time.Second * 60)

}

func sendWithLength(conn net.Conn, data []byte) {
	length := len(data)
	lengthBytes := make([]byte, 4)
	binary.BigEndian.PutUint32(lengthBytes, uint32(length)) // 使用大端序

	_, err := conn.Write(lengthBytes) // 先发送长度
	if err != nil {
		fmt.Println("Write length error:", err)
		return
	}
	_, err = conn.Write(data) // 再发送数据
	if err != nil {
		fmt.Println("Write data error:", err)
		return
	}
}

服务端 (server.go):

package main

import (
	"encoding/binary"
	"fmt"
	"net"
	"os"
	"time"
)

func main() {
	addr := "/tmp/uds_socket"
	os.Remove(addr)

	server, err := net.Listen("unixpacket", addr)
	if err != nil {
		fmt.Println("Listen error:", err)
		return
	}
	defer server.Close()

	conn, err := server.Accept()
	if err != nil {
		fmt.Println("Accept error:", err)
		return
	}
	defer conn.Close()
  buf := make([]byte,0)
	for {
		fmt.Println("now:", time.Now().String())
		deadline := time.Now().Add(time.Second * 5) //稍微设置长一点,允许数据延时
		err = conn.SetReadDeadline(deadline) // 使用 SetReadDeadline
		if err != nil {
			fmt.Println("Error setting read deadline:", err)
			return
		}
		fmt.Println("setting read deadline:", deadline.String())

		data, err := readWithLength(conn)

		if err != nil {
             nErr,ok := err.(net.Error)
            if ok && nErr.Timeout()  {
                fmt.Println("Read Timeout:", err)
            }else{
                fmt.Println("Read error:", err)
            }

			return
		}
        buf = append(buf, data...)
		fmt.Println("Received message:", string(data))
	}
  fmt.Println("All Received message:", string(buf))

}

func readWithLength(conn net.Conn) ([]byte, error) {
	lengthBytes := make([]byte, 4)
	_, err := conn.Read(lengthBytes) // 先读取长度
	if err != nil {

		return nil, err
	}
	length := binary.BigEndian.Uint32(lengthBytes) // 使用大端序

	data := make([]byte, length)
	_, err = conn.Read(data) // 再读取数据
	if err != nil {

        return nil,err
	}
	return data, nil
}

改进后的代码分析:

  • 客户端 : sendWithLength 函数先发送消息长度(4 字节,大端序),然后发送消息内容。
  • 服务端 : readWithLength 函数先读取 4 字节的长度,然后根据长度读取消息内容。主循环中,设置了 SetReadDeadline, 一旦超时或者发生其他读取错误,循环退出。这样避免了永久阻塞。
  • 通过客户端模拟了第二条消息的延时发送, 来验证SetReadDeadLine的作用。

方案四:使用 selectpoll (进阶)

如果你对性能有更高的要求,或者需要同时处理多个连接,可以使用 selectpoll 系统调用。 它们可以监控多个文件符(包括 socket)的可读、可写状态。

Go 语言的 net 包没有直接暴露 selectpoll 的接口,但你可以通过 syscall 包来使用它们。 不过,这种方法比较底层,代码会更复杂,出错的概率更高。 通常不建议普通开发者操作,容易写出BUG

强烈建议不要随意使用,贴一个利用反射强行操作底层FD的方法。 (仅供展示)

package main

import (
	"fmt"
	"net"
	"os"
	"reflect"
	"syscall"
	"time"
	"unsafe"
)

func main() {
	addr := "/tmp/uds_socket"
	os.Remove(addr)

	server, err := net.Listen("unixpacket", addr)
	if err != nil {
		fmt.Println("Listen error:", err)
		return
	}
	defer server.Close()

	conn, err := server.Accept()
	if err != nil {
		fmt.Println("Accept error:", err)
		return
	}
	defer conn.Close()

	// 通过反射获取 fd
	connVal := reflect.ValueOf(conn)
	netConnVal := reflect.Indirect(connVal).FieldByName("conn")
	fdVal := reflect.Indirect(netConnVal).FieldByName("fd")
	sysfd := int(fdVal.UnsafeAddr())

    buf := make([]byte,0)

	for {

        fmt.Println("Now",time.Now())

		readSet := &syscall.FdSet{}
		FD_ZERO(readSet)
		FD_SET(readSet, sysfd)

		timeout := &syscall.Timeval{Sec: 2, Usec: 0} // 设置超时时间为2秒
        //核心: 通过select 监控底层fd 可读性, 2s 超时
		n, err := syscall.Select(sysfd+1, readSet, nil, nil, timeout)

		if err != nil {

			fmt.Println("Select error:", err)
            if err == syscall.EINTR { //系统中断,通常不用管,继续就好
               continue
            }
			return
		}

		if n == 0 {
			fmt.Println("Timeout!")
            return //如果超时,直接结束.
		}

        //走到这,代表fd有数据可读,再去尝试Read。

		if FD_ISSET(readSet, sysfd) {
			temp := make([]byte, 1000)
			n, err := conn.Read(temp)
			if err != nil {

				fmt.Println("Read error:", err)
				return
			}

                if n>0 {
                   buf = append(buf, temp[:n]...)
                   fmt.Println("Read Size",n)

				   fmt.Println("Received message:", string(temp[:n]))
                 }else {
                fmt.Println("Peer Closed")
                   return // 对端关闭连接。 此时n == 0
                 }


		}
	}
     fmt.Println("All Received message:", string(buf))
}

func FD_ZERO(set *syscall.FdSet) {
    set.Bits = [16]int64{}

}
func FD_SET(set *syscall.FdSet, fd int){
	set.Bits[fd/64] |= (1 << (uint(fd) % 64))
}

func FD_ISSET(set *syscall.FdSet, fd int) bool{

  return set.Bits[fd/64]&(1<<(uint(fd)%64)) != 0
}

代码解释

  • syscall.Select 的参数:
    • 第一个参数是最大的文件符值加 1。
    • 第二个参数是可读文件描述符集合。
    • 第三个参数是可写文件描述符集合。
    • 第四个参数是异常文件描述符集合。
    • 第五个参数是超时时间。
  • 通过反射拿到了net.Conn底层的fd
  • 因为使用了反射和非公开API, 程序的兼容性和可移植性大大降低了。

注意:强烈建议只用于学习和理解底层原理,切勿用于生产环境!

总结

unixpacket 类型的 UDS 的 SetDeadline 行为与 TCP socket 有所不同。 SetDeadline对已存在, 但未读取的消息是无效的,它只能作用于新的I/O事件。 要想实现超时控制,应该根据情况,优先使用SetReadDeadline , 如果要解决这类阻塞问题,通常需要在应用层自定义协议(如长度前缀、特殊结束符),或者在万不得已时使用底层的 select/poll (通常不推荐)。选择哪种方案,取决于你的具体需求和对代码复杂度的容忍度。