TCP Server in Zig - Part 5a - Poll
One of the reasons we introduced multithreading was to get around that fact that our read
and, to a lesser extent, accept
and write
, block. In our initial single-threaded implementation, rather than pushing our server to its limits, we spent a lot of time idle, waiting for data to come in. Multithreading helped to unblock the main thread so that new connections could be accepted - as long as we had enough workers to handle them - while processing existing connections. But threads are relatively heavyweight constructs and it isn't particularly efficient to spawn them and then have them blocked waiting for data.
There are two complimentary parts to improving our design: non-blocking I/O and event-notification.
Non-Blocking I/O
It's possible to put a socket in non-blocking mode. When enabled, functions which normally block, such as accept
, read
or write
, will return error.WouldBlock
(or EAGAIN
in C) rather than blocking. As we're about to see, it's hard to take advantage of this on its own, but we're looking at it first, to get a feel for it. Consider what happens if we go to one of our earlier single-threaded implementations and enable non-blocking sockets (two lines have been changed, both commented):
const std = @import("std");
const net = std.net;
const posix = std.posix;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
// ADDED: ` | posix.SOCK.NONBLOCK`
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
var buf: [128]u8 = undefined;
while (true) {
// Replaced: `0` with `posix.SOCK.NONBLOCK`
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
std.debug.print("error accept: {}\n", .{err});
continue;
};
defer posix.close(socket);
const stream = std.net.Stream{.handle = socket};
const read = try stream.read(&buf);
if (read == 0) {
continue;
}
try stream.writeAll(buf[0..read]);
}
}
When we create our listening socket, using posix.socket
, we now set the SOCK.NONBLOCK
flag. Similarly, when we accept
we now pass that same flag, SOCK.NONBLOCK
, as our fourth parameter. The first usage puts our listening socket in non-blocking mode (we'll see what that accomplishes shortly). The second usage puts a newly connected socket in non-blocking mode. This second usage is a special-case. There are actually two accept
system calls: accept
and accept4
. The first one, accept
only takes 3 parameters whereas accept4
takes a 4th parameter. That 4th parameter is for any flags, like SOCK.NONBLOCK
, we want to apply to the connected socket. If Zig's posix.accept
detects that accept4
is available, it uses it, otherwise it calls accept
and then calls fcntl
to set the appropriate flags. accept4
does what Part 3 talked about (minimizing system calls) by combining accept
with fcntl
.
If you try to run the above code, you should get a rapid and endless streams of:
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
We said that SOCK.NONBLOCK
made it so functions like accept
return an error instead of blocking, given that our accept
looks like:
while (true) {
const socket = posix.accept(listener, &client_address.any, &client_address_len, posix.SOCK.NONBLOCK) catch |err| {
std.debug.print("error accept: {}\n", .{err});
continue;
};
// ...
}
The output makes sense. There's no obvious solution either. We could sleep whenever accept
returns error.WouldBlock
, but that would be worse than leaving the socket in blocking mode - at least in blocking mode we're woken up as soon as their an connection waiting to be accepted.
If we keep our listening socket in blocking mode, but put connected sockets in non-blocking mode, we'll face the same problems. When our read
returns error.WouldBlock
what can we realistically do? All of our implementations so far depend on one thread processing one connection. We need to fundamentally rethink our approach.
Polling
We need to break out of the one-thread-per-connection pattern, which is something we can only do now that we've discovered non-blocking I/O. We probably need to start organizing our code a little better, but you can start thinking about having an array of sockets that we can loop through and try to read from. As an incomplete prototype, something like:
for (sockets) |s| {
const n = posix.read(s, &buf) catch |err| {
switch (err) {
error.WouldBlock => {},
else => {
posix.close(s);
// TODO: remove the socket from our array
},
}
// go to the next socket, this one isn't ready yet
continue;
}
if (n == 0) {
posix.close(s);
// TODO: remove the socket from our array
continue;
}
process(buf([0..n]))
}
This is possible because posix.read
won't block when no data is available. If error.WouldBlock
is returned, we can skip to the next socket in our list. There are at least two major problems with this prototype. The first is that this will result in a tight loop when no socket is ready. We previously talked about the importance of minimizing system calls, but the above code would result in a massive number of calls to read
as we constantly poll each socket, hoping that one has data.
A larger issue is the need to associate state with each connection. This was trivial in our thread-per-connection model, but gets more complicated now that we need to track multiple connections. For example, if we consider that a read
is likely to return less than or more than a single "message" (see Part 2), it seems unlikely that we can share a buffer for all sockets. This is something that we started solving when we introduced a Client
. Our above prototype can be improved if we think clients
rather than sockets
:
for (clients) |client| {
while (true) {
const message = client.readMessage() catch |err| {
client.close();
// TODO: remove client from our clients array;
} orelse break; // no message (either not enough bytes, or WouldBlock)
client.process(message) catch {
client.close();
// TODO: remove client from our clients array;
};
}
}
We're getting further and further away from working code, but the key change here is that a Client
encapsulates the state necessary for reading and [eventually] writing messages. Our fictional readMessage
could fail, but it could also return null to indicate that there isn't enough data to form a complete message yet - maybe because we don't have all the bytes, or maybe because posix.read
returned error.WouldBlock
. If you're wondering why we have an inner-loop, recall that we're not just concerned about reading less than a whole message, but also about reading more than a single message. It's possible that readMessage
, which would eventually call posix.read
, fills our buffer with multiple messages. We need to process them all.
So far, this is admittedly very abstract, but remember in Part 3 where we create a stateful message-aware Reader
. Our client.readMessage
can be a wrapper around reader.readMessage
which translates error.WouldBlock
into null
:
fn readMessage(self: *Client) !?[]byte {
return self.reader.readMessage() catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
While this seems promising, constantly looping through our clients, hoping that one or more has data, isn't going to be efficient.
poll
The poll
system call lets us register file descriptors (like sockets) with the operating system and be notified when certain events, like reading or writing, can be done without blocking. For example, we can give poll
an array of sockets and it'll block until one is ready to be read. It's a simple API, a single function, but it's a change in how we think about serving clients. We're going to start by looking at a basic implementation which is unconcerned with things like message boundaries. Our goal, for now, is to get familiar with poll
's API and this new way of working with sockets.
const std = @import("std");
const net = std.net;
const posix = std.posix;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
// Our server can support 4095 clients. Wait, shouldn't that be 4096? No
// One of the polling slots (the first one) is reserved for our listening
// socket.
var polls: [4096]posix.pollfd = undefined;
polls[0] = .{
.fd = listener,
.events = posix.POLL.IN,
.revents = 0,
};
var poll_count: usize = 1;
while (true) {
// polls is the total number of connections we can monitor, but
// polls[0..poll_count + 1] is the actual number of clients + the listening
// socket that are currently connected (remember, the upper bound is exclusive)
var active = polls[0..poll_count + 1];
// 2nd argument is the timeout, -1 is infinity
_ = try posix.poll(active, -1);
// Active[0] is _always_ the listening socket. When this socket is ready
// we can accept. Putting it outside the following while loop means that
// we don't have to check if if this is the listening socket on each
// iteration
if (active[0].revents != 0) {
// The listening socket is ready, accept!
// Notice that we pass SOCK.NONBLOCK to accept, placing the new client
// socket in non-blocking mode. Also, for now, for simplicity,
// we're not capturing the client address (the two null arguments).
const socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
// Add this new client socket to our polls array for monitoring
polls[poll_count] = .{
.fd = socket,
// This will be SET by posix.poll to tell us what event is ready
// (or it will stay 0 if this socket isn't ready)
.revents = 0,
// We want to be notified about the POLL.IN event
// (i.e. can read without blocking)
.events = posix.POLL.IN,
};
// increment the number of active connections we're monitoring
// this can overflow our 4096 polls array. TODO: fix that!
poll_count += 1;
}
var i: usize = 1;
while (i < active.len) {
const polled = active[i];
const revents = polled.revents;
if (revents == 0) {
// This socket isn't ready, go to the next one
i += 1;
continue;
}
var closed = false;
// the socket is ready to be read
if (revents & posix.POLL.IN == posix.POLL.IN) {
var buf: [4096]u8 = undefined;
const read = posix.read(polled.fd, &buf) catch 0;
if (read == 0) {
// probably closed on the other side
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{polled.fd, buf[0..read]});
}
}
// either the read failed, or we're being notified through poll
// that the socket is closed
if (closed or (revents & posix.POLL.HUP == posix.POLL.HUP)) {
posix.close(polled.fd);
// We use a simple trick to remove it: we swap it with the last
// item in our array, then "shrink" our array by 1
const last_index = active.len - 1;
active[i] = active[last_index];
active = active[0..last_index];
poll_count -= 1;
// don't increment `i` because we swapped out the removed item
// and shrank the array
} else {
// not closed, go to the next socket
i += 1;
}
}
}
}
The above is a working example. You can run it and connect up to 4095 clients - any more and it'll crash, something we can and will eventually fix. The above code is documented, but there are a number of things to go over. The posix.pollfd
structure has three fields:
fd
- The file descriptor that we're polling,events
- A bitwise list of events we care about, for now that's onlyPOLL.IN
but later, when we look at writing, it'll bePOLL.IN | POLL.OUT
,revents
- The ready events set bypoll
system call. This tells us which, if any, events are ready.
The first thing we do is setup a pollfd
for our listening socket, registering our interest in POLL.IN
. For a listening socket POLL.IN
indicates that we can accept
without blocking. We set and keep this at index zero throughout the lifetime of our program. When posix.poll
returns, it means that at least 1 of the monitored file descriptor is ready. We iterate through them all, looking for any where revents != 0
. We special-case our listening socket, always at active[0]
- we need to call accept
and process the new connection. As an optimization, we've moved that our of our loop. This means we don't have to check if (i == 0)
for each iteration of our loop.
We could have used an std.ArrayList(posix.pollfd)
to make our life of adding and removing entries a little easier, but, at least for the purpose of learning, I prefer the explicit code to handle new connections and disconnects.
You might be wondering why we check if revents
has the POLL.HUP
flag, even though we didn't register our interest in POLL.HUP
. POLL.HUP
is always monitored even if we don't explicitly ask for it. We check POLL.IN
first, but we could check POLL.HUP
first instead. For example, an HTTP server might prefer checking POLL.HUP
and remove disconnected clients, ignoring any pending data they've sent. When POLL.IN
is set, you might be wondering if read
can fail and/or return zero bytes. The simple answer is: I'm not sure. I would certainly be possible for read
to return error.WouldBlock
if another thread called read
on the same socket, draining it. And, we could get zero bytes if the supplied buffer passed into read
was zero-length. In this single-threaded example, with a fixed-length buffer, neither of those cases is possible. But I think it's better to be safe than sorry, and we should always assume read
can return zero bytes or an error.
O(N)
The poll
call unblocks when at least one of the monitored file descriptor is ready. But the only way to know which are ready is to iterate through all of them, checking if revents != 0
. Even with a modest upper limit of 4K clients, this isn't ideal. There isn't much we can do about. However, poll
does return the number of entries that have a non-zero revents
. So while our code will still be O(N), we can at least stop iterating once we've processed the number of ready sockets
while (true) {
var active = polls[0..poll_count + 1];
// we no longer ignore the return value
var pending = try posix.poll(active, -1);
if (active[0].revents != 0) {
// previous code to accept the socket and add it to polls is unchanged
// (but I removed it to keep this snippet small)
// add this:
pending -= 1;
}
// Next two lines are changed
var i: usize = 1;
while (pending > 0) {
const polled = active[i];
const revent = polled.revents;
if (revent == 0) {
i += 1;
continue;
}
// add this
// we've processed one of the pending notifications
pending -= 1;
// the rest is the same
// ....
}
}
Both epoll and kqueue, platform-specific alternatives to poll, elegantly solve this problem. We'll cover both those APIs in following parts.
Level Triggered
When the listening socket is ready to accept a new connection, that is when polls[0].revents != 0
, we call accept
once. But because our socket is in non-blocking mode, we could call it until we receive an error.WouldBlock
error:
if (active[0].revents != 0) {
while (true) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => break,
else => return err,
};
polls[poll_count] = .{
.fd = socket,
.revents = 0,
.events = posix.POLL.IN,
};
poll_count += 1;
}
}
poll
tells us the listening socket can accept without blocking (i.e. that it's ready). But it doesn't tell us how many pending connections there are waiting to be accepted. Both approaches, looping and not looping, work because poll is always level-triggered. This means that poll
continues to notify us so long as the socket is ready. This is different than edge-triggered which only notifies us when the state changes (we'll learn more about this in future parts).
Put differently, if there are four connections waiting to be accepted when poll
returns, but we only accept one, the next call to poll
will re-notify us that our listening socket is ready because of the still-pending three connections.
Stateful Reads
In the above code, when a socket is ready, we simply print whatever bytes we were able to read. But as we've discussed a few times, TCP deals with streams of bytes and isn't "message"-aware. We need to add more state so that we can handle a read
returning only part of a message or more than a single message. We already saw a pseudo-implementation of this above, when we looked at non-blocking I/O. To use that same approach, we have a minor problem: poll
doesn't allow us to associate arbitrary data with the pollfd
, so we need to create and manage this association ourselves.
I think it's time that we introduced a Server
struct to help us keep our code tidy. First its fields, init
and deinit
functions. We'll see a full Client
implementation shortly, but for now, it's just a thin wrapper around Reader
from previous parts.
const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;
// We're going to start logging errors to a scope logger
const log = std.log.scoped(.tcp_demo);
const Server = struct {
// Our Client need an allocator to create their read buffer
allocator: Allocator,
// The number of clients we currently have connected
connected: usize,
// polls[0] is always our listening socket
polls: []posix.pollfd,
// list of clients, only client[0..connected] are valid
clients: []Client,
// This is always polls[1..] and it's used to so that we can manipulate
// clients and client_polls together. Necessary because polls[0] is the
// listening socket, and we don't ever touch that.
client_polls: []posix.pollfd,
fn init(allocator: Allocator, max: usize) !Server {
// + 1 for the listening socket
const polls = try allocator.alloc(posix.pollfd, max + 1);
errdefer allocator.free(polls);
const clients = try allocator.alloc(Client, max);
errdefer allocator.free(clients);
return .{
.polls = polls,
.clients = clients,
.client_polls = polls[1..],
.connected = 0,
.allocator = allocator,
};
}
fn deinit(self: *Server) void {
// TODO: Close connected sockets? We'll talk about shutdowns in
// a future part.
self.allocator.free(self.polls);
self.allocator.free(self.clients);
}
};
The first client that connects will obviously be at clients[0]
, but it'll be at polls[1]
because our listening socket always at polls[0]
. This isn't a requirement, it's merely our own convention. But given that we want to poll both our listening socket and connected sockets, it's an efficient way to do things. However, it means that client[N]
corresponds the polls[N+1]
. This +1 offset is error prone and annoying, so we also create a client_polls
which will be set to polls[1..]
. Now when clients connected and disconnect, we just touch clients
and client_polls
which both share the same offset.
Next we can add a run
function. This is almost the same code as our first example with poll
, but I've extracted some of the functionality into their own function (which we'll see next):
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
// Oops, still have a +1 here, since we want to poll all our connected
// clients, plus our listening socket
self.polls[0] = .{
.fd = listener,
.revents = 0,
.events = posix.POLL.IN,
};
while (true) {
// Oops, still have a +1 here, since we want to poll all our connected
// clients, plus our listening socket
_ = try posix.poll(self.polls[0..self.connected + 1], -1);
if (self.polls[0].revents != 0) {
// listening socket is ready
self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
}
var i: usize = 0;
while (i < self.connected) {
const revents = self.client_polls[i].revents;
if (revents == 0) {
// this socket isn't ready, move on to the next one
i += 1;
continue;
}
var client = &self.clients[i];
if (revents & posix.POLL.IN == posix.POLL.IN) {
// this socket is ready to be read
while (true) {
const msg = client.readMessage() catch {
// we don't increment `i` when we remove the client
// because removeClient does a swap and puts the last
// client at position i
self.removeClient(i);
break;
} orelse {
// no more messages, but this client is still connected
i += 1;
break;
};
std.debug.print("got: {s}\n", .{msg});
}
}
}
}
}
The goal here is that the client at client[N]
is being monitored by polls[N+1]
. But, to avoid that nasty +1, we use client_polls
which is slice of polls
: polls[1..]
. This means that when client_polls[N]
is ready, we can access the corresponding client at clients[N]
.
I'm not sure I love the idea of extracting accept
and removeClient
into their own function. After all, they're only called from a single place and I like being able to read code without having to chase after it. But, ask me another day and I'll give you another answer:
fn accept(self: *Server, listener: posix.socket_t) !void {
while (true) {
// we'll continue to accept until we get error.WouldBlock
// or until our program crashes because we overflow self.clients and self.polls
// (we really should fix that!)
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = Client.init(self.allocator, socket, address) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
const connected = self.connected;
self.clients[connected] = client;
self.client_polls[connected] = .{
.fd = socket,
.revents = 0,
.events = posix.POLL.IN,
};
self.connected = connected + 1;
}
}
fn removeClient(self: *Server, at: usize) void {
posix.close(client.socket);
var client = self.clients[at];
client.deinit(self.allocator);
// Swap the client we're removing with the last one
// So that when we set connected -= 1, it'll effectively "remove"
// the client from our slices.
const last_index = self.connected - 1;
self.clients[at] = self.clients[last_index];
self.client_polls[at] = self.client_polls[last_index];
self.connected = last_index;
}
It seems like a lot more code, but the approach is the same as our initial example that used poll
. Rather than having two slices, clients
and client_polls
which are linked by index, we could have use a std.AutoHashMap(posix.socket_t, Client)
. Then code would have looked something like:
for (self.polls[1..]) |p| {
const revents = p.revents;
if (revents == 0) {
// same as before
continue;
}
const client = self.clients.getPtr(p.fd) orelse unreachable;
// Like before, we now have a *Client for the socket which is ready
}
Other parts of the code would also have been simplified - hashmaps have a way of doing that. But it isn't how I'd do it - because it would probably be less efficient - and this series isn't about taking the easy path.
Conclusion
Although the poll
system call is simple we had to make substantial changes to our code, and our mindset, to accommodate this different way of interacting with sockets. State becomes both more critical and challenging to maintain compared to previous approaches where we had a thread-per-connection.
The next part will continue to expand on the above, adding writes, timeouts, connection limits and looking at how we can combine what we've learnt here with our previous experience with multi-threading. Future parts will then look at platform-specific alternatives to poll
: epoll
for Linux and kqueue
for BSD/MacOS. These are not only faster and more scalable than poll
but also offer additional features. However, fundamentally, most of what we're learning here will translate directly to those APIs. Although it's relatively simple to build a system that supports both platform-specific APIs, poll
has the significant benefit of being cross-platform. If you don't need the extra performance/scale or features of the platform-specific APIs, I suggest you stick with poll
.
As a quick aside, there's also the select
system call which is older and more limited than poll
. Unless you're targeting a very old platform, you should always use poll
instead of select
. But you will see "select" mentioned/referenced now and again, so it's good to at least be aware of it.
Appendix A - Code
const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_demo);
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var server = try Server.init(allocator, 4096);
defer server.deinit();
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
try server.run(address);
}
const Server = struct {
// creates our polls and clients slices and is passed to Client.init
// for it to create our read buffer.
allocator: Allocator,
// The number of clients we currently have connected
connected: usize,
// polls[0] is always our listening socket
polls: []posix.pollfd,
// list of clients, only client[0..connected] are valid
clients: []Client,
// This is always polls[1..] and it's used to so that we can manipulate
// clients and client_polls together. Necessary because polls[0] is the
// listening socket, and we don't ever touch that.
client_polls: []posix.pollfd,
fn init(allocator: Allocator, max: usize) !Server {
// + 1 for the listening socket
const polls = try allocator.alloc(posix.pollfd, max + 1);
errdefer allocator.free(polls);
const clients = try allocator.alloc(Client, max);
errdefer allocator.free(clients);
return .{
.polls = polls,
.clients = clients,
.client_polls = polls[1..],
.connected = 0,
.allocator = allocator,
};
}
fn deinit(self: *Server) void {
// TODO: Close connected sockets? We'll talk about shutdowns in
// a future part.
self.allocator.free(self.polls);
self.allocator.free(self.clients);
}
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
// Oops, still have a +1 here, since we want to poll all our connected
// clients, plus our listening socket
self.polls[0] = .{
.fd = listener,
.revents = 0,
.events = posix.POLL.IN,
};
while (true) {
// Oops, still have a +1 here, since we want to poll all our connected
// clients, plus our listening socket
_ = try posix.poll(self.polls[0..self.connected + 1], -1);
if (self.polls[0].revents != 0) {
// listening socket is ready
self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
}
var i: usize = 0;
while (i < self.connected) {
const revents = self.client_polls[i].revents;
if (revents == 0) {
// this socket isn't ready, move on to the next one
i += 1;
continue;
}
var client = &self.clients[i];
if (revents & posix.POLL.IN == posix.POLL.IN) {
// this socket is ready to be read
while (true) {
const msg = client.readMessage() catch {
// we don't increment `i` when we remove the client
// because removeClient does a swap and puts the last
// client at position i
self.removeClient(i);
break;
} orelse {
// no more messages, but this client is still connected
i += 1;
break;
};
std.debug.print("got: {s}\n", .{msg});
}
}
}
}
}
fn accept(self: *Server, listener: posix.socket_t) !void {
while (true) {
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = Client.init(self.allocator, socket, address) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
const connected = self.connected;
self.clients[connected] = client;
self.client_polls[connected] = .{
.fd = socket,
.revents = 0,
.events = posix.POLL.IN,
};
self.connected = connected + 1;
}
}
fn removeClient(self: *Server, at: usize) void {
var client = self.clients[at];
posix.close(client.socket);
client.deinit(self.allocator);
// Swap the client we're removing with the last one
// So that when we set connected -= 1, it'll effectively "remove"
// the client from our slices.
const last_index = self.connected - 1;
self.clients[at] = self.clients[last_index];
self.client_polls[at] = self.client_polls[last_index];
self.connected = last_index;
}
};
const Client = struct {
reader: Reader,
socket: posix.socket_t,
address: std.net.Address,
fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address) !Client {
const reader = try Reader.init(allocator, 4096);
errdefer reader.deinit(allocator);
return .{
.reader = reader,
.socket = socket,
.address = address,
};
}
fn deinit(self: *const Client, allocator: Allocator) void {
self.reader.deinit(allocator);
}
fn readMessage(self: *Client) !?[]const u8 {
return self.reader.readMessage(self.socket) catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
};
const Reader = struct {
buf: []u8,
pos: usize = 0,
start: usize = 0,
fn init(allocator: Allocator, size: usize) !Reader {
const buf = try allocator.alloc(u8, size);
return .{
.pos = 0,
.start = 0,
.buf = buf,
};
}
fn deinit(self: *const Reader, allocator: Allocator) void {
allocator.free(self.buf);
}
fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
var buf = self.buf;
while (true) {
if (try self.bufferedMessage()) |msg| {
return msg;
}
const pos = self.pos;
const n = try posix.read(socket, buf[pos..]);
if (n == 0) {
return error.Closed;
}
self.pos = pos + n;
}
}
fn bufferedMessage(self: *Reader) !?[]u8 {
const buf = self.buf;
const pos = self.pos;
const start = self.start;
std.debug.assert(pos >= start);
const unprocessed = buf[start..pos];
if (unprocessed.len < 4) {
self.ensureSpace(4 - unprocessed.len) catch unreachable;
return null;
}
const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);
// the length of our message + the length of our prefix
const total_len = message_len + 4;
if (unprocessed.len < total_len) {
try self.ensureSpace(total_len);
return null;
}
self.start += total_len;
return unprocessed[4..total_len];
}
fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
const buf = self.buf;
if (buf.len < space) {
return error.BufferTooSmall;
}
const start = self.start;
const spare = buf.len - start;
if (spare >= space) {
return;
}
const unprocessed = buf[start..self.pos];
std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
self.start = 0;
self.pos = unprocessed.len;
}
};