Add work queue for requests

This commit is contained in:
Hugo Mårdbrink 2025-11-10 22:11:26 +01:00
parent 83b39b15f6
commit a52c0cc2fd
5 changed files with 193 additions and 74 deletions

View file

@ -88,7 +88,7 @@ main :: proc() {
fjord.server_add_route(&server, .GET, {"users", ":id", "posts", ":post_id"}, user_post_handler) fjord.server_add_route(&server, .GET, {"users", ":id", "posts", ":post_id"}, user_post_handler)
log.infof("Server listening on http://127.0.0.1:%d", endpoint.port) log.infof("Server listening on http://127.0.0.1:%d", endpoint.port)
fjord.listen_and_serve(&server) fjord.server_listen_and_serve(&server)
} }
``` ```

View file

@ -140,7 +140,7 @@ start_concurrent_server :: proc(
server_thread_proc :: proc(t: ^thread.Thread) { server_thread_proc :: proc(t: ^thread.Thread) {
d := (^ServerThreadData)(t.data) d := (^ServerThreadData)(t.data)
listen_and_serve(d.server) server_listen_and_serve(d.server)
} }
d := new(ServerThreadData, context.temp_allocator) d := new(ServerThreadData, context.temp_allocator)
@ -201,7 +201,6 @@ assert_endpoint_response :: proc(url: string, expected_response: string) {
) )
} }
@(test) @(test)
test_server_general_ok :: proc(t: ^testing.T) { test_server_general_ok :: proc(t: ^testing.T) {
context.logger = log.create_console_logger(.Info) context.logger = log.create_console_logger(.Info)
@ -232,6 +231,9 @@ test_server_general_ok :: proc(t: ^testing.T) {
t := start_concurrent_server(&server) t := start_concurrent_server(&server)
// Try with some load
ITERATIONS :: 100
for i in 0 ..< ITERATIONS {
assert_endpoint_response( assert_endpoint_response(
"http://127.0.0.1:8080/hello/world", "http://127.0.0.1:8080/hello/world",
"<div>Hello firstworld</div>", "<div>Hello firstworld</div>",
@ -244,6 +246,7 @@ test_server_general_ok :: proc(t: ^testing.T) {
"http://127.0.0.1:8080/hello/world/and/worlds%20friend", "http://127.0.0.1:8080/hello/world/and/worlds%20friend",
"<div>Hello thirdworldandworlds%20friend</div>", "<div>Hello thirdworldandworlds%20friend</div>",
) )
}
shutdown_concurrent_server(&server, t) shutdown_concurrent_server(&server, t)

View file

@ -1,3 +1,4 @@
#+private package
package http package http
import "core:bufio" import "core:bufio"

View file

@ -1,5 +1,6 @@
package fjord package fjord
import "base:runtime"
import "core:bufio" import "core:bufio"
import "core:bytes" import "core:bytes"
import "core:io" import "core:io"
@ -9,6 +10,7 @@ import "core:path/slashpath"
import "core:strconv" import "core:strconv"
import "core:strings" import "core:strings"
import "core:sync" import "core:sync"
import "core:thread"
import "core:unicode" import "core:unicode"
import http "http" import http "http"
@ -21,8 +23,11 @@ Server :: struct($Error_Type: typeid) {
http.Response, http.Response,
Error_Type, Error_Type,
), ),
router: Router(Error_Type),
shutdown: bool, shutdown: bool,
router: Router(Error_Type),
work_queue: WorkQueue,
worker_count: int,
worker_threads: [dynamic]^thread.Thread,
} }
@(private) @(private)
@ -61,6 +66,9 @@ server_init :: proc(
endpoint: net.Endpoint, endpoint: net.Endpoint,
allocator := context.allocator, allocator := context.allocator,
) { ) {
WORKER_COUNT :: 8
QUEUE_CAPACITY :: 252
server^ = Server(Error_Type) { server^ = Server(Error_Type) {
endpoint = endpoint, endpoint = endpoint,
not_found_handler = proc( not_found_handler = proc(
@ -78,11 +86,20 @@ server_init :: proc(
return http_response, nil return http_response, nil
}, },
shutdown = false, shutdown = false,
worker_count = WORKER_COUNT,
worker_threads = make([dynamic]^thread.Thread, 0, WORKER_COUNT, allocator),
} }
work_queue_init(&server.work_queue, QUEUE_CAPACITY, allocator)
router_init(&server.router, allocator) router_init(&server.router, allocator)
} }
server_destroy :: proc(server: ^Server($Error_Type)) { server_destroy :: proc(server: ^Server($Error_Type)) {
for t in server.worker_threads {
thread.destroy(t)
}
delete(server.worker_threads)
work_queue_destroy(&server.work_queue)
router_destroy(&server.router) router_destroy(&server.router)
} }
@ -122,26 +139,12 @@ server_shutdown :: proc(server: ^Server($Error_Type)) {
sync.atomic_store(&server.shutdown, true) sync.atomic_store(&server.shutdown, true)
} }
listen_and_serve :: proc(server: ^Server($Error_Type)) { @(private)
server_socket, net_err := net.listen_tcp(server.endpoint) serve :: proc(server: ^Server($Error_Type), client_socket: net.TCP_Socket) {
log.assert(net_err == nil, "Couldn't create TCP socket") data, read_err := read_connection(client_socket, context.temp_allocator)
defer net.close(server_socket)
for !sync.atomic_load(&server.shutdown) {
client_socket, source, net_err := net.accept_tcp(server_socket)
if net_err != nil {
log.warnf("Failed to accept TCP connection, reason: %s", net_err)
continue
}
defer net.close(client_socket)
data, read_err := read_connection(
client_socket,
context.temp_allocator,
)
if read_err != nil { if read_err != nil {
log.warnf("Failed to read request, reason: %s", read_err) log.warnf("Failed to read request, reason: %s", read_err)
continue return
} }
http_request, unmarshall_err := http.unmarshall_request( http_request, unmarshall_err := http.unmarshall_request(
@ -149,27 +152,15 @@ listen_and_serve :: proc(server: ^Server($Error_Type)) {
context.temp_allocator, context.temp_allocator,
) )
if unmarshall_err != nil { if unmarshall_err != nil {
log.warnf( log.warnf("Failed to unmarshall request, reason: %s", unmarshall_err)
"Failed to unmarshall request, reason: %s", return
unmarshall_err,
)
continue
} }
handler: #type proc( handler: #type proc(request: ^http.Request) -> (http.Response, Error_Type)
request: ^http.Request,
) -> (
http.Response,
Error_Type,
)
ok: bool ok: bool
// todo: should sanitize better // todo: should sanitize better
path := strings.split( path := strings.split(http_request.path, "/", context.temp_allocator)[1:]
http_request.path,
"/",
context.temp_allocator,
)[1:]
handler, http_request.path_variables, ok = router_lookup( handler, http_request.path_variables, ok = router_lookup(
&server.router, &server.router,
@ -183,7 +174,7 @@ listen_and_serve :: proc(server: ^Server($Error_Type)) {
if handler_err != nil { if handler_err != nil {
log.warnf("Handler failed with error: %s", handler_err) log.warnf("Handler failed with error: %s", handler_err)
continue return
} }
marshalled_response := http.marshall_response( marshalled_response := http.marshall_response(
@ -191,7 +182,54 @@ listen_and_serve :: proc(server: ^Server($Error_Type)) {
context.temp_allocator, context.temp_allocator,
) )
net.send_tcp(client_socket, marshalled_response) net.send_tcp(client_socket, marshalled_response)
}
@(private)
worker_thread :: proc(server: ^Server($Error_Type)) {
for {
socket, ok := work_queue_dequeue(&server.work_queue)
if !ok do break
serve(server, socket)
net.close(socket)
free_all(context.temp_allocator) free_all(context.temp_allocator)
} }
} }
server_listen_and_serve :: proc(server: ^Server($Error_Type)) {
server_socket, net_err := net.listen_tcp(server.endpoint)
log.assert(net_err == nil, "Couldn't create TCP socket")
defer net.close(server_socket)
for i in 0 ..< server.worker_count {
t := thread.create(proc(t: ^thread.Thread) {
server := cast(^Server(Error_Type))t.data
worker_thread(server)
})
log.assertf(t != nil, "Thread creation failed")
t.data = rawptr(server)
thread.start(t)
append(&server.worker_threads, t)
}
for !sync.atomic_load(&server.shutdown) {
client_socket, _, net_err := net.accept_tcp(server_socket)
if net_err != nil {
log.warnf("Failed to accept TCP connection, reason: %s", net_err)
continue
}
ok := work_queue_enqueue(&server.work_queue, client_socket)
if !ok {
log.warn("Work queue full, rejecting connection")
net.close(client_socket)
}
}
work_queue_shutdown(&server.work_queue)
for t in server.worker_threads {
thread.join(t)
}
}

77
work_queue.odin Normal file
View file

@ -0,0 +1,77 @@
#+private package
package fjord
import "base:runtime"
import "core:net"
import "core:sync"
WorkQueue :: struct {
items: [dynamic]net.TCP_Socket,
capacity: int,
mutex: sync.Mutex,
not_empty: sync.Cond,
shutdown: bool,
allocator: runtime.Allocator,
}
work_queue_init :: proc(
work_queue: ^WorkQueue,
capacity: int,
allocator := context.allocator,
) {
work_queue^ = {
items = make([dynamic]net.TCP_Socket, 0, capacity, allocator),
capacity = capacity,
shutdown = false,
allocator = allocator,
}
}
work_queue_destroy :: proc(work_queue: ^WorkQueue) {
sync.mutex_lock(&work_queue.mutex)
for socket in work_queue.items {
net.close(socket)
}
sync.mutex_unlock(&work_queue.mutex)
delete(work_queue.items)
}
work_queue_enqueue :: proc(work_queue: ^WorkQueue, socket: net.TCP_Socket) -> bool {
sync.mutex_lock(&work_queue.mutex)
defer sync.mutex_unlock(&work_queue.mutex)
if work_queue.shutdown do return false
if len(work_queue.items) >= work_queue.capacity {
return false
}
append(&work_queue.items, socket)
sync.cond_signal(&work_queue.not_empty)
return true
}
work_queue_dequeue :: proc(work_queue: ^WorkQueue) -> (socket: net.TCP_Socket, ok: bool) {
sync.mutex_lock(&work_queue.mutex)
defer sync.mutex_unlock(&work_queue.mutex)
for len(work_queue.items) == 0 && !work_queue.shutdown {
sync.cond_wait(&work_queue.not_empty, &work_queue.mutex)
}
if work_queue.shutdown && len(work_queue.items) == 0 {
return socket, false
}
socket = pop_front(&work_queue.items)
return socket, true
}
work_queue_shutdown :: proc(work_queue: ^WorkQueue) {
sync.mutex_lock(&work_queue.mutex)
defer sync.mutex_unlock(&work_queue.mutex)
work_queue.shutdown = true
sync.cond_broadcast(&work_queue.not_empty)
}