From 0bf25b991f3475b92374f1076c3117a0a39ad78d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20M=C3=A5rdbrink?= Date: Thu, 13 Nov 2025 11:18:13 +0100 Subject: [PATCH] Replace work queue with Odin core thread pool --- http/marshaller.odin | 3 +- router.odin | 2 +- server.odin | 85 ++++++++++++++++++++++---------------------- work_queue.odin | 77 --------------------------------------- 4 files changed, 44 insertions(+), 123 deletions(-) delete mode 100644 work_queue.odin diff --git a/http/marshaller.odin b/http/marshaller.odin index eb80d82..c3e48aa 100644 --- a/http/marshaller.odin +++ b/http/marshaller.odin @@ -1,4 +1,3 @@ -#+private package package http import "core:bufio" @@ -56,7 +55,7 @@ status_text :: proc(status: Status) -> string { unmarshall_request_line :: proc( request: ^Request, reader: ^bufio.Reader, - allocator := context.temp_allocator + allocator := context.temp_allocator, ) -> RequestError { request_line, io_err := bufio.reader_read_slice(reader, '\n') if io_err != nil do return .InvalidRequestLine diff --git a/router.odin b/router.odin index 045b2c5..ea90b0e 100644 --- a/router.odin +++ b/router.odin @@ -182,7 +182,7 @@ router_remove_route :: proc( if method not_in current_node.handlers { return false } - + delete_key(¤t_node.handlers, method) return true } diff --git a/server.odin b/server.odin index 4d3236e..b81d535 100644 --- a/server.odin +++ b/server.odin @@ -25,9 +25,7 @@ Server :: struct($Error_Type: typeid) { ), shutdown: bool, router: Router(Error_Type), - work_queue: WorkQueue, - worker_count: int, - worker_threads: [dynamic]^thread.Thread, + thread_pool: thread.Pool, } @(private) @@ -66,9 +64,6 @@ server_init :: proc( endpoint: net.Endpoint, allocator := context.allocator, ) { - WORKER_COUNT :: 8 - QUEUE_CAPACITY :: 252 - server^ = Server(Error_Type) { endpoint = endpoint, not_found_handler = proc( @@ -86,20 +81,20 @@ server_init :: proc( return http_response, nil }, shutdown = false, - worker_count = WORKER_COUNT, - worker_threads = make([dynamic]^thread.Thread, 0, WORKER_COUNT, allocator), } - work_queue_init(&server.work_queue, QUEUE_CAPACITY, allocator) + THREAD_COUNT :: 8 + thread.pool_init(&server.thread_pool, allocator, THREAD_COUNT) + thread.pool_start(&server.thread_pool) router_init(&server.router, allocator) } server_destroy :: proc(server: ^Server($Error_Type)) { - for t in server.worker_threads { - thread.destroy(t) + if sync.atomic_load(&server.thread_pool.is_running) { + thread.pool_join(&server.thread_pool) } - delete(server.worker_threads) - work_queue_destroy(&server.work_queue) + + thread.pool_destroy(&server.thread_pool) router_destroy(&server.router) } @@ -184,16 +179,39 @@ serve :: proc(server: ^Server($Error_Type), client_socket: net.TCP_Socket) { net.send_tcp(client_socket, marshalled_response) } -@(private) -worker_thread :: proc(server: ^Server($Error_Type)) { - for { - socket, ok := work_queue_dequeue(&server.work_queue) - if !ok do break +RequestThreadData :: struct($Error_Type: typeid) { + server: ^Server(Error_Type), + client_socket: net.TCP_Socket, +} - serve(server, socket) - net.close(socket) - free_all(context.temp_allocator) +@(private) +request_thread_task :: proc(t: thread.Task) { + task_data := cast(^RequestThreadData(any))t.data + defer free(task_data, t.allocator) + + serve(task_data.server, task_data.client_socket) + + net.close(task_data.client_socket) + free_all(context.temp_allocator) +} + +@(private) +spawn_request_thread :: proc( + server: ^Server($Error_Type), + client_socket: net.TCP_Socket, +) { + task_data := new(RequestThreadData(Error_Type)) + task_data^ = RequestThreadData(Error_Type) { + server = server, + client_socket = client_socket, } + + thread.pool_add_task( + &server.thread_pool, + context.allocator, + request_thread_task, + rawptr(task_data), + ) } server_listen_and_serve :: proc(server: ^Server($Error_Type)) { @@ -201,18 +219,6 @@ server_listen_and_serve :: proc(server: ^Server($Error_Type)) { log.assert(net_err == nil, "Couldn't create TCP socket") defer net.close(server_socket) - for i in 0 ..< server.worker_count { - t := thread.create(proc(t: ^thread.Thread) { - server := cast(^Server(Error_Type))t.data - worker_thread(server) - }) - log.assertf(t != nil, "Thread creation failed") - - t.data = rawptr(server) - thread.start(t) - append(&server.worker_threads, t) - } - for !sync.atomic_load(&server.shutdown) { client_socket, _, net_err := net.accept_tcp(server_socket) if net_err != nil { @@ -220,16 +226,9 @@ server_listen_and_serve :: proc(server: ^Server($Error_Type)) { continue } - ok := work_queue_enqueue(&server.work_queue, client_socket) - if !ok { - log.warn("Work queue full, rejecting connection") - net.close(client_socket) - } + spawn_request_thread(server, client_socket) } - work_queue_shutdown(&server.work_queue) - - for t in server.worker_threads { - thread.join(t) - } + thread.pool_finish(&server.thread_pool) + thread.pool_join(&server.thread_pool) } diff --git a/work_queue.odin b/work_queue.odin deleted file mode 100644 index 293b4e8..0000000 --- a/work_queue.odin +++ /dev/null @@ -1,77 +0,0 @@ -#+private package -package fjord - -import "base:runtime" -import "core:net" -import "core:sync" - -WorkQueue :: struct { - items: [dynamic]net.TCP_Socket, - capacity: int, - mutex: sync.Mutex, - not_empty: sync.Cond, - shutdown: bool, - allocator: runtime.Allocator, -} - -work_queue_init :: proc( - work_queue: ^WorkQueue, - capacity: int, - allocator := context.allocator, -) { - work_queue^ = { - items = make([dynamic]net.TCP_Socket, 0, capacity, allocator), - capacity = capacity, - shutdown = false, - allocator = allocator, - } -} - -work_queue_destroy :: proc(work_queue: ^WorkQueue) { - sync.mutex_lock(&work_queue.mutex) - for socket in work_queue.items { - net.close(socket) - } - sync.mutex_unlock(&work_queue.mutex) - - delete(work_queue.items) -} - -work_queue_enqueue :: proc(work_queue: ^WorkQueue, socket: net.TCP_Socket) -> bool { - sync.mutex_lock(&work_queue.mutex) - defer sync.mutex_unlock(&work_queue.mutex) - - if work_queue.shutdown do return false - - if len(work_queue.items) >= work_queue.capacity { - return false - } - - append(&work_queue.items, socket) - sync.cond_signal(&work_queue.not_empty) - return true -} - -work_queue_dequeue :: proc(work_queue: ^WorkQueue) -> (socket: net.TCP_Socket, ok: bool) { - sync.mutex_lock(&work_queue.mutex) - defer sync.mutex_unlock(&work_queue.mutex) - - for len(work_queue.items) == 0 && !work_queue.shutdown { - sync.cond_wait(&work_queue.not_empty, &work_queue.mutex) - } - - if work_queue.shutdown && len(work_queue.items) == 0 { - return socket, false - } - - socket = pop_front(&work_queue.items) - return socket, true -} - -work_queue_shutdown :: proc(work_queue: ^WorkQueue) { - sync.mutex_lock(&work_queue.mutex) - defer sync.mutex_unlock(&work_queue.mutex) - - work_queue.shutdown = true - sync.cond_broadcast(&work_queue.not_empty) -}