Unix Domain Socket SetDeadline 不生效?原因及解决
2025-03-13 07:47:57
Unix Domain Socket 中 SetDeadline 不生效?深究原因和解决办法
最近在测试 unixpacket
类型的 Unix Domain Socket (UDS) 的行为时,碰上了一个怪事:设置了读取的截止时间(SetDeadline
),但是超时后读取操作仍然阻塞,并没有如预期般返回错误。 这到底咋回事?
问题复现
简单来说,客户端会发送一条消息,然后休眠 60 秒后关闭连接;服务端则尝试读取两次消息。 第一次读取成功,第二次读取按理说应该阻塞住。 我加了 SetDeadline
,期望第二次读取在超时后会失败。 然而,测试结果是:第二次读取超时后仍然阻塞,并没有失败。
以下是复现问题的代码:
客户端 (client.go):
package main
import (
"fmt"
"net"
"strings"
"time"
)
func main() {
addr := "/tmp/uds_socket"
client, err := net.Dial("unixpacket", addr)
if err != nil {
fmt.Println("Dial error:", err)
return
}
defer client.Close()
message := strings.Repeat("A", 100)
_, err = client.Write([]byte(message))
if err != nil {
fmt.Println("Write error:", err)
return
}
time.Sleep(time.Second * 60)
}
服务端 (server.go):
package main
import (
"fmt"
"net"
"os"
"time"
)
func main() {
addr := "/tmp/uds_socket"
os.Remove(addr)
server, err := net.Listen("unixpacket", addr)
if err != nil {
fmt.Println("Listen error:", err)
return
}
defer server.Close()
conn, err := server.Accept()
if err != nil {
fmt.Println("Accept error:", err)
return
}
defer conn.Close()
var buf []byte
fmt.Println("now:", time.Now().String())
deadline := time.Now().Add(time.Second * 2)
err = conn.SetDeadline(deadline)
if err != nil {
fmt.Println("Error setting read deadline:", err)
return
}
fmt.Println("setting read deadline:", deadline.String())
temp := make([]byte, 1000)
n, err := conn.Read(temp)
if err != nil {
fmt.Println("Read error:", err)
return // 添加return,防止后续继续Read操作
}
fmt.Println("Read size:", n)
buf = append(buf, temp[:n]...)
fmt.Println("now:", time.Now().String())
deadline = time.Now().Add(time.Second * 2)
err = conn.SetDeadline(deadline)
if err != nil {
fmt.Println("Error setting read deadline:", err)
return
}
fmt.Println("setting read deadline:", deadline.String())
temp = make([]byte, 1000)
n, err = conn.Read(temp)
if err != nil {
fmt.Println("Read error:", err)
return // 添加return,防止继续执行
}
fmt.Println("Read size:", n)
buf = append(buf, temp[:n]...)
fmt.Println("Received message:", string(buf))
}
运行步骤:
- 先运行服务端
go run server.go
- 再运行客户端
go run client.go
你会发现,服务端的第二个 Read
操作在超过 2 秒后,仍然在阻塞,而不是返回一个超时的错误。
问题原因
问题的根源在于对 unixpacket
和 SetDeadline
的理解有偏差。 SetDeadline
设置的是 I/O 操作的截止时间,但对于 unixpacket
类型的 UDS,它的行为有些特殊。
unixpacket
的特性:unixpacket
是一种面向消息的、可靠的协议。这意味着数据是以消息为单位进行传输的,并且保证消息的完整性和可靠性。 当客户端发送一条消息后,即使客户端不关闭连接,服务端也能完整地读取到这条消息。 如果服务端尝试再次读取,而此时没有新的消息到达,Read
操作就会阻塞。SetDeadline
的作用:SetDeadline
设置的是 I/O 操作(如Read
、Write
)的截止时间。如果在这个时间内没有完成 I/O 操作,就会返回一个超时的错误 (net.Error
接口的Timeout()
方法返回true
)。- 核心问题: 在使用
unixpacket
时,只要客户端发送了一条消息,服务端的第一次Read
调用就一定会成功(假设缓冲区足够大)。 而设置SetDeadline
主要影响 等待数据到达 这个过程。第一次Read, 数据已就绪。第二次Read时,因为对端根本没发送新消息,Read
操作就阻塞在 等待新消息到达 的阶段。 虽然设置了SetDeadline
,但是unixpacket
没有发生新的I/O事件 ,内核不会去检查Deadline, 因此并不会触发超时。
一句话概括:SetDeadline
对已存在的、未读取的 unixpacket
消息无效。它只对等待新 I/O 事件有效。
解决方案
针对这个问题,有几种不同的解决思路。
方案一:使用 SetReadDeadline
SetReadDeadline
只设置读取操作的截止时间,相比 SetDeadline
更精确。 多数情况下推荐直接使用SetReadDeadline
.
修改 server.go 代码:
// ... 其他代码 ...
fmt.Println("now:", time.Now().String())
deadline := time.Now().Add(time.Second * 2)
err = conn.SetReadDeadline(deadline) // 只设置 Read 的 Deadline
if err != nil {
fmt.Println("Error setting read deadline:", err)
return
}
fmt.Println("setting read deadline:", deadline.String())
// ... 其他代码 ...
fmt.Println("now:", time.Now().String())
deadline = time.Now().Add(time.Second * 2)
err = conn.SetReadDeadline(deadline) // 只设置 Read 的 Deadline
if err != nil {
fmt.Println("Error setting read deadline:", err)
return
}
fmt.Println("setting read deadline:", deadline.String())
// ... 其他代码 ...
尽管做了以上更改,你会发现问题依然存在,这是因为核心问题依然没有解决,虽然变得更精准,但是依然是在没有I/O事件的时候触发的。
方案二:客户端发送 EOF 信号 (不推荐)
在 unixpacket
中,没有像 TCP 那样的半关闭状态。客户端可以通过 Shutdown
方法发送 EOF 信号,但仅对 SOCK_STREAM
有效, 对SOCK_SEQPACKET
(即unixpacket
)不起作用。因此这种方法不可靠。
方案三:自定义应用层协议
更可靠的方法是,在应用层定义一个简单的协议。 例如:
- 长度前缀: 客户端在发送消息前,先发送一个表示消息长度的固定字节数(如 4 字节的整数)。服务端先读取这个长度,再根据长度读取实际的消息内容。
- 特殊结束符: 客户端在发送完所有消息后,发送一个特殊的结束符(如一个空消息或一个特定标记)。服务端读取到这个结束符后,就知道没有更多数据了。
- 心跳包: 客户端定时发送心跳包。 服务端如果在一定时间内没有收到心跳包,就认为连接断开。
示例(长度前缀):
客户端 (client.go):
package main
import (
"encoding/binary"
"fmt"
"net"
"strings"
"time"
)
func main() {
addr := "/tmp/uds_socket"
client, err := net.Dial("unixpacket", addr)
if err != nil {
fmt.Println("Dial error:", err)
return
}
defer client.Close()
message := strings.Repeat("A", 100)
sendWithLength(client, []byte(message)) // 发送带长度的消息
// 模拟发送第二条消息
time.Sleep(2*time.Second) //此处睡眠时间短于服务端的SetReadDeadLine, 用来模拟网络延时到达的情况。
sendWithLength(client, []byte("Hello"))
time.Sleep(time.Second * 60)
}
func sendWithLength(conn net.Conn, data []byte) {
length := len(data)
lengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(lengthBytes, uint32(length)) // 使用大端序
_, err := conn.Write(lengthBytes) // 先发送长度
if err != nil {
fmt.Println("Write length error:", err)
return
}
_, err = conn.Write(data) // 再发送数据
if err != nil {
fmt.Println("Write data error:", err)
return
}
}
服务端 (server.go):
package main
import (
"encoding/binary"
"fmt"
"net"
"os"
"time"
)
func main() {
addr := "/tmp/uds_socket"
os.Remove(addr)
server, err := net.Listen("unixpacket", addr)
if err != nil {
fmt.Println("Listen error:", err)
return
}
defer server.Close()
conn, err := server.Accept()
if err != nil {
fmt.Println("Accept error:", err)
return
}
defer conn.Close()
buf := make([]byte,0)
for {
fmt.Println("now:", time.Now().String())
deadline := time.Now().Add(time.Second * 5) //稍微设置长一点,允许数据延时
err = conn.SetReadDeadline(deadline) // 使用 SetReadDeadline
if err != nil {
fmt.Println("Error setting read deadline:", err)
return
}
fmt.Println("setting read deadline:", deadline.String())
data, err := readWithLength(conn)
if err != nil {
nErr,ok := err.(net.Error)
if ok && nErr.Timeout() {
fmt.Println("Read Timeout:", err)
}else{
fmt.Println("Read error:", err)
}
return
}
buf = append(buf, data...)
fmt.Println("Received message:", string(data))
}
fmt.Println("All Received message:", string(buf))
}
func readWithLength(conn net.Conn) ([]byte, error) {
lengthBytes := make([]byte, 4)
_, err := conn.Read(lengthBytes) // 先读取长度
if err != nil {
return nil, err
}
length := binary.BigEndian.Uint32(lengthBytes) // 使用大端序
data := make([]byte, length)
_, err = conn.Read(data) // 再读取数据
if err != nil {
return nil,err
}
return data, nil
}
改进后的代码分析:
- 客户端 :
sendWithLength
函数先发送消息长度(4 字节,大端序),然后发送消息内容。 - 服务端 :
readWithLength
函数先读取 4 字节的长度,然后根据长度读取消息内容。主循环中,设置了SetReadDeadline
, 一旦超时或者发生其他读取错误,循环退出。这样避免了永久阻塞。 - 通过客户端模拟了第二条消息的延时发送, 来验证
SetReadDeadLine
的作用。
方案四:使用 select
或 poll
(进阶)
如果你对性能有更高的要求,或者需要同时处理多个连接,可以使用 select
或 poll
系统调用。 它们可以监控多个文件符(包括 socket)的可读、可写状态。
Go 语言的 net
包没有直接暴露 select
或 poll
的接口,但你可以通过 syscall
包来使用它们。 不过,这种方法比较底层,代码会更复杂,出错的概率更高。 通常不建议普通开发者操作,容易写出BUG。
强烈建议不要随意使用,贴一个利用反射强行操作底层FD的方法。 (仅供展示)
package main
import (
"fmt"
"net"
"os"
"reflect"
"syscall"
"time"
"unsafe"
)
func main() {
addr := "/tmp/uds_socket"
os.Remove(addr)
server, err := net.Listen("unixpacket", addr)
if err != nil {
fmt.Println("Listen error:", err)
return
}
defer server.Close()
conn, err := server.Accept()
if err != nil {
fmt.Println("Accept error:", err)
return
}
defer conn.Close()
// 通过反射获取 fd
connVal := reflect.ValueOf(conn)
netConnVal := reflect.Indirect(connVal).FieldByName("conn")
fdVal := reflect.Indirect(netConnVal).FieldByName("fd")
sysfd := int(fdVal.UnsafeAddr())
buf := make([]byte,0)
for {
fmt.Println("Now",time.Now())
readSet := &syscall.FdSet{}
FD_ZERO(readSet)
FD_SET(readSet, sysfd)
timeout := &syscall.Timeval{Sec: 2, Usec: 0} // 设置超时时间为2秒
//核心: 通过select 监控底层fd 可读性, 2s 超时
n, err := syscall.Select(sysfd+1, readSet, nil, nil, timeout)
if err != nil {
fmt.Println("Select error:", err)
if err == syscall.EINTR { //系统中断,通常不用管,继续就好
continue
}
return
}
if n == 0 {
fmt.Println("Timeout!")
return //如果超时,直接结束.
}
//走到这,代表fd有数据可读,再去尝试Read。
if FD_ISSET(readSet, sysfd) {
temp := make([]byte, 1000)
n, err := conn.Read(temp)
if err != nil {
fmt.Println("Read error:", err)
return
}
if n>0 {
buf = append(buf, temp[:n]...)
fmt.Println("Read Size",n)
fmt.Println("Received message:", string(temp[:n]))
}else {
fmt.Println("Peer Closed")
return // 对端关闭连接。 此时n == 0
}
}
}
fmt.Println("All Received message:", string(buf))
}
func FD_ZERO(set *syscall.FdSet) {
set.Bits = [16]int64{}
}
func FD_SET(set *syscall.FdSet, fd int){
set.Bits[fd/64] |= (1 << (uint(fd) % 64))
}
func FD_ISSET(set *syscall.FdSet, fd int) bool{
return set.Bits[fd/64]&(1<<(uint(fd)%64)) != 0
}
代码解释
syscall.Select
的参数:- 第一个参数是最大的文件符值加 1。
- 第二个参数是可读文件描述符集合。
- 第三个参数是可写文件描述符集合。
- 第四个参数是异常文件描述符集合。
- 第五个参数是超时时间。
- 通过反射拿到了
net.Conn
底层的fd
。 - 因为使用了反射和非公开API, 程序的兼容性和可移植性大大降低了。
注意:强烈建议只用于学习和理解底层原理,切勿用于生产环境!
总结
unixpacket
类型的 UDS 的 SetDeadline
行为与 TCP socket 有所不同。 SetDeadline
对已存在, 但未读取的消息是无效的,它只能作用于新的I/O事件。 要想实现超时控制,应该根据情况,优先使用SetReadDeadline
, 如果要解决这类阻塞问题,通常需要在应用层自定义协议(如长度前缀、特殊结束符),或者在万不得已时使用底层的 select
/poll
(通常不推荐)。选择哪种方案,取决于你的具体需求和对代码复杂度的容忍度。