TCP Server in Zig - Part 7 - Kqueue
Oct 27, 2024
kqueue is a BSD/MacOS alternative over poll. In most ways, kqueue is similar to the Linux-specific epoll, which itself is important, but important, incremental upgrade to poll. Because kqueue has a single function it superficially looks like poll. But, as we'll soon see, that single function can behave in two different ways, making its API and the integration into our code very similar to epoll.
Because kqueue is rather similar to epoll, this part is shorter as it assumes that you're familiar with topics discussed in part 6, such as edge-triggering and @intToPtr.
Where epoll has one function to modify the epoll file descriptor (epoll_ctl) and one to wait for notifications (epoll_wait), kqueue uses a single function for both purposes: kevent. However, depending on the values passed to kevent, it can either modify the kqueue instance or wait for notifications or both. Thus, the single function can act like either of the epoll functions or combine both in a single call. The kevent function takes 4 parameters:
- The kqueue file descriptor which is the kqueue instance that we're modifying and/or waiting on. Created using
posix.kqueue.
- A list of
posix.Kevent that represents notifications we want to add/change/delete. Known as the changelist. Can be empty.
- A list of
posix.Kevent that indicate readiness. Known as the eventlist. Can be empty.
- A timeout as a
posix.timespec. Can be null.
The key to understanding this API is knowing that when the eventlist is empty, kevent immediately returns. Thus, callingkevent with an empty eventlist is like calling epoll_ctl. Therefore, like epoll and unlike poll, as long as we have the kqueue instance, we can easily add, remove and change monitors.
The kqueue API has one advantage over epoll: we can apply modification in bulk. Where epoll_ctl takes a single epoll_event, kevent takes an array of Kevent. In other words, with kqueue it should be possible to make fewer system calls.
This is a working example (on BSD / MacOS). To keep it simple and similar to our first epoll sample, we're not leveraging the bulk-modification capabilities of the API but rather add one event at a time (the final example does add them in bulk):
const std = @import("std");
const net = std.net;
const posix = std.posix;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const kfd = try posix.kqueue();
defer posix.close(kfd);
{
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = @intCast(listener),
}}, &.{}, null);
}
var ready_list: [128]posix.Kevent = undefined;
while (true) {
const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
for (ready_list[0..ready_count]) |ready| {
const ready_socket: i32 = @intCast(ready.udata);
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(client_socket),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(client_socket),
}}, &.{}, null);
} else {
var closed = false;
var buf: [4096]u8 = undefined;
const read = posix.read(ready_socket, &buf) catch 0;
if (read == 0) {
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
}
if (closed) {
posix.close(ready_socket);
}
}
}
}
}
Like with epoll, we can attach arbitrary information via the udata field. Above we're using the file descriptor, but in a more complete example we'd likely use @intFromPtr to get a usize representation of an application-specific "Client" struct. The Kevent struct has two additional fields: fflags and data. These hold flags and data to use for different filters. With sockets, where we're only interested in the READ and WRITE filters, these should be set to zero. In a future part, we'll see a brief example of a different filter which does leverage the fflags field.
With epoll the monitors we add, modify are identified by the 3rd parameter we pass to epoll_ctl. In all the code we've seen so far, that was either the listening socket or the client socket, but more generally, it's the file descriptor to monitor. With kqueue the identifier is the combination of the ident and filter fields. With epoll we toggled a client from read-mode to write-mode by modifying the existing notifier (identified by the socket) with the CTL_MOD operation. In kqueue we'd need to delete the read monitor and then add a write monitor. Or, and this is what we do in the full example given at the end, we add both a read and write monitor, but disable the write monitor. We can toggle the mode by disabling the active one and enabling the disabled one:
Read Mode:
key=(ident: socket1, filter: read), enabled=true
key=(ident: socket1, filter: write), enabled=false
Write Mode:
key=(ident: socket1, filter: read), enabled=false
key=(ident: socket1, filter: write), enabled=true
This also means that filter isn't a bitwise flag. To check if the socket is ready for reading, we just have to compare ready.filter == posix.system.EVFILT.READ.
In addition to using the EV.READ and EV.WRITE flags, we can also set EV.ONESHOT, EV.DISPATCH and EV.CLEAR.
EV.ONESHOT removes the notification after readiness has been reported, making it one-time-only. The notification has to be re-added using the EV.ADD flag.
EV.DISPATCH is similar but rather than removing the notification, it disables it (thus, EV.DISPATCH is like EPOLL.ONESHOT). To re-arm the notification, EV.ENABLE or EV.ADD have to be called ("adding" an already added entry, whether it's disabled or not, does not create a duplicate, and will re-enable it if disabled). The difference between removing (ONESHOT) and disabling (DISPATCH) is that disabling and re-enabling is faster but takes a bit more memory since the internal structure is kept. If you intend to frequently re-arm the notification, EV.DISPATCH might be a better choice.
EV.CLEAR is similar to EPOLL.ET, causing kevent to signal state change rather than readiness.
As before, we can take our above code, strip out the "read" handling to see the various behaviors. Only the end of the code was changed
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const kfd = try posix.kqueue();
{
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(listener),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(listener),
}}, &.{}, null);
}
var ready_list: [128]posix.Kevent = undefined;
while (true) {
const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
for (ready_list[0..ready_count]) |ready| {
const ready_socket: i32 = @intCast(ready.udata);
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(client_socket),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(client_socket),
}}, &.{}, null);
} else {
std.debug.print(".", .{});
}
}
}
}
If you connect to the above and send a message, your screen will get flooded with dots (.) as kevent will continuously notify about the sockets readiness (since we never read from it). Changing the filter for the added client socket from: posix.system.EV.ADD to one of:
posix.system.EV.ADD | posix.system.EV.ONESHOT,
posix.system.EV.ADD | posix.system.EV.DISPATCH, or
posix.system.EV.ADD | posix.system.EV.CLEAR.
will show how each behaves. For the first two, ONESHOT and DISPATCH no matter how much data we send, we'll only ever get a single notification. We'd need to re-add or re-enable (aka, re-arm) the notification. For CLEAR we'll get a single notification each time new data becomes ready.
Although kqueue and epoll are platform-specific, they're quite similar, allowing us to create a simple abstraction to target either platform - the topic of our next part. Furthermore, their similarity has the benefit of resulting in a rather short post!
A more complete example is included below, including our Server, Client and writes. Here you'll see the udata field used to aClient (via @intFromPtr and @ptrFromInt).
This code leverages the bulk-modification capabilities of kevent. When we add or modify a notification, we "stage" these in a local change_list. Only when the change_list is full or KQueue.wait is called do we apply the changes. In the latter case, applying the changes and waiting for readiness is done in a single system call. All of this is a simple but effective way to reduce the number of system calls we must make.
const std = @import("std");
const net = std.net;
const posix = std.posix;
const system = std.posix.system;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_demo);
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var server = try Server.init(allocator, 4096);
defer server.deinit();
const address = try std.net.Address.parseIp("0.0.0.0", 5882);
try server.run(address);
std.debug.print("STOPPED\n", .{});
}
const READ_TIMEOUT_MS = 60_000;
const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;
const Server = struct {
max: usize,
loop: KQueue,
allocator: Allocator,
connected: usize,
read_timeout_list: ClientList,
client_pool: std.heap.MemoryPool(Client),
client_node_pool: std.heap.MemoryPool(ClientList.Node),
fn init(allocator: Allocator, max: usize) !Server {
const loop = try KQueue.init();
errdefer loop.deinit();
const clients = try allocator.alloc(*Client, max);
errdefer allocator.free(clients);
return .{
.max = max,
.loop = loop,
.connected = 0,
.allocator = allocator,
.read_timeout_list = .{},
.client_pool = std.heap.MemoryPool(Client).init(allocator),
.client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
};
}
fn deinit(self: *Server) void {
self.loop.deinit();
self.client_pool.deinit();
self.client_node_pool.deinit();
}
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
var read_timeout_list = &self.read_timeout_list;
try self.loop.addListener(listener);
while (true) {
const next_timeout = self.enforceTimeout();
const ready_events = try self.loop.wait(next_timeout);
for (ready_events) |ready| {
switch (ready.udata) {
0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
else => |nptr| {
const filter = ready.filter;
const client: *Client = @ptrFromInt(nptr);
if (filter == system.EVFILT.READ) {
while (true) {
const msg = client.readMessage() catch {
self.closeClient(client);
break;
} orelse break;
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
read_timeout_list.remove(client.read_timeout_node);
read_timeout_list.append(client.read_timeout_node);
client.writeMessage(msg) catch {
self.closeClient(client);
break;
};
}
} else if (filter == system.EVFILT.WRITE) {
client.write() catch self.closeClient(client);
}
}
}
}
}
}
fn enforceTimeout(self: *Server) i32 {
const now = std.time.milliTimestamp();
var node = self.read_timeout_list.first;
while (node) |n| {
const client = n.data;
const diff = client.read_timeout - now;
if (diff > 0) {
return @intCast(diff);
}
posix.shutdown(client.socket, .recv) catch {};
node = n.next;
} else {
return -1;
}
}
fn accept(self: *Server, listener: posix.socket_t) !void {
const space = self.max - self.connected;
for (0..space) |_| {
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = try self.client_pool.create();
errdefer self.client_pool.destroy(client);
client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
errdefer client.deinit(self.allocator);
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
client.read_timeout_node = try self.client_node_pool.create();
errdefer self.client_node_pool.destroy(client.read_timeout_node);
client.read_timeout_node.* = .{
.next = null,
.prev = null,
.data = client,
};
self.read_timeout_list.append(client.read_timeout_node);
try self.loop.newClient(client);
self.connected += 1;
} else {
try self.loop.removeListener(listener);
}
}
fn closeClient(self: *Server, client: *Client) void {
self.read_timeout_list.remove(client.read_timeout_node);
posix.close(client.socket);
self.client_node_pool.destroy(client.read_timeout_node);
client.deinit(self.allocator);
self.client_pool.destroy(client);
}
};
const Client = struct {
loop: *KQueue,
socket: posix.socket_t,
address: std.net.Address,
reader: Reader,
to_write: []u8,
write_buf: []u8,
read_timeout: i64,
read_timeout_node: *ClientNode,
fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *KQueue) !Client {
const reader = try Reader.init(allocator, 4096);
errdefer reader.deinit(allocator);
const write_buf = try allocator.alloc(u8, 4096);
errdefer allocator.free(write_buf);
return .{
.loop = loop,
.reader = reader,
.socket = socket,
.address = address,
.to_write = &.{},
.write_buf = write_buf,
.read_timeout = 0,
.read_timeout_node = undefined,
};
}
fn deinit(self: *const Client, allocator: Allocator) void {
self.reader.deinit(allocator);
allocator.free(self.write_buf);
}
fn readMessage(self: *Client) !?[]const u8 {
return self.reader.readMessage(self.socket) catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
fn writeMessage(self: *Client, msg: []const u8) !void {
if (self.to_write.len > 0) {
return error.PendingMessage;
}
if (msg.len + 4 > self.write_buf.len) {
return error.MessageTooLarge;
}
std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
const end = msg.len + 4;
@memcpy(self.write_buf[4..end], msg);
self.to_write = self.write_buf[0..end];
return self.write();
}
fn write(self: *Client) !void {
var buf = self.to_write;
defer self.to_write = buf;
while (buf.len > 0) {
const n = posix.write(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => return self.loop.writeMode(self),
else => return err,
};
if (n == 0) {
return error.Closed;
}
buf = buf[n..];
} else {
return self.loop.readMode(self);
}
}
};
const Reader = struct {
buf: []u8,
pos: usize = 0,
start: usize = 0,
fn init(allocator: Allocator, size: usize) !Reader {
const buf = try allocator.alloc(u8, size);
return .{
.pos = 0,
.start = 0,
.buf = buf,
};
}
fn deinit(self: *const Reader, allocator: Allocator) void {
allocator.free(self.buf);
}
fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
var buf = self.buf;
while (true) {
if (try self.bufferedMessage()) |msg| {
return msg;
}
const pos = self.pos;
const n = try posix.read(socket, buf[pos..]);
if (n == 0) {
return error.Closed;
}
self.pos = pos + n;
}
}
fn bufferedMessage(self: *Reader) !?[]u8 {
const buf = self.buf;
const pos = self.pos;
const start = self.start;
std.debug.assert(pos >= start);
const unprocessed = buf[start..pos];
if (unprocessed.len < 4) {
self.ensureSpace(4 - unprocessed.len) catch unreachable;
return null;
}
const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);
const total_len = message_len + 4;
if (unprocessed.len < total_len) {
try self.ensureSpace(total_len);
return null;
}
self.start += total_len;
return unprocessed[4..total_len];
}
fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
const buf = self.buf;
if (buf.len < space) {
return error.BufferTooSmall;
}
const start = self.start;
const spare = buf.len - start;
if (spare >= space) {
return;
}
const unprocessed = buf[start..self.pos];
std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
self.start = 0;
self.pos = unprocessed.len;
}
};
const KQueue = struct {
kfd: posix.fd_t,
event_list: [128]system.Kevent = undefined,
change_list: [16]system.Kevent = undefined,
change_count: usize = 0,
fn init() !KQueue {
const kfd = try posix.kqueue();
return .{.kfd = kfd};
}
fn deinit(self: KQueue) void {
posix.close(self.kfd);
}
fn wait(self: *KQueue, timeout_ms: i32) ![]system.Kevent {
const timeout = posix.timespec{
.sec = @intCast(@divTrunc(timeout_ms, 1000)),
.nsec = @intCast(@mod(timeout_ms, 1000) * 1000000),
};
const count = try posix.kevent(self.kfd, self.change_list[0..self.change_count], &self.event_list, &timeout);
self.change_count = 0;
return self.event_list[0..count];
}
fn addListener(self: *KQueue, listener: posix.socket_t) !void {
try self.queueChange(.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = 0,
});
}
fn removeListener(self: *KQueue, listener: posix.socket_t) !void {
try self.queueChange(.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
}
fn newClient(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.WRITE,
.flags = posix.system.EV.ADD | posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn readMode(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.WRITE,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ENABLE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn writeMode(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.flags = posix.system.EV.ENABLE,
.filter = posix.system.EVFILT.WRITE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn queueChange(self: *KQueue, event: system.Kevent) !void {
var count = self.change_count;
if (count == self.change_list.len) {
_ = try posix.kevent(self.kfd, &self.change_list, &.{}, null);
count = 0;
}
self.change_list[count] = event;
self.change_count = count + 1;
}
};