Make thread count adjustable

This commit is contained in:
Hugo Mårdbrink 2025-11-13 16:40:18 +01:00
parent 0bf25b991f
commit 595872f81c
3 changed files with 26 additions and 17 deletions

View file

@ -93,7 +93,7 @@ main :: proc() {
``` ```
## Dev ## Dev
Run the test suite to check functionality, curl is a required dependancy for the test suite to work. Run the test suite to check functionality, curl is a required dependency for the test suite to work.
```bash ```bash
odin test . odin test .
``` ```

View file

@ -20,7 +20,7 @@ first_handler :: proc(request: ^http.Request) -> (http.Response, Error) {
entity := request.path_variables["entity"] entity := request.path_variables["entity"]
body := strings.concatenate( body := strings.concatenate(
{"<div>Hello first", entity, "</div>"}, {"<div>Hello first ", entity, "</div>"},
context.temp_allocator, context.temp_allocator,
) )
response := http.make_response( response := http.make_response(
@ -37,7 +37,7 @@ second_handler :: proc(request: ^http.Request) -> (http.Response, Error) {
entity := request.path_variables["entity"] entity := request.path_variables["entity"]
body := strings.concatenate( body := strings.concatenate(
{"<div>Hello second", entity, "</div>"}, {"<div>Hello second ", entity, "</div>"},
context.temp_allocator, context.temp_allocator,
) )
response := http.make_response( response := http.make_response(
@ -55,7 +55,7 @@ third_handler :: proc(request: ^http.Request) -> (http.Response, Error) {
other_entity := request.path_variables["other_entity"] other_entity := request.path_variables["other_entity"]
body := strings.concatenate( body := strings.concatenate(
{"<div>Hello third", entity, "and", other_entity, "</div>"}, {"<div>Hello third ", entity, " and ", other_entity, "</div>"},
context.temp_allocator, context.temp_allocator,
) )
response := http.make_response( response := http.make_response(
@ -77,9 +77,10 @@ test_router_ok :: proc(t: ^testing.T) {
address = net.IP4_Address{127, 0, 0, 1}, address = net.IP4_Address{127, 0, 0, 1},
port = 8080, port = 8080,
} }
THREAD_COUNT :: 8
server: Server(Error) server: Server(Error)
server_init(&server, endpoint, context.allocator) server_init(&server, endpoint, THREAD_COUNT, context.allocator)
defer server_destroy(&server) defer server_destroy(&server)
server_add_route(&server, .GET, {"hello", ":entity"}, first_handler) server_add_route(&server, .GET, {"hello", ":entity"}, first_handler)
@ -153,7 +154,7 @@ start_concurrent_server :: proc(
} }
// Give server some time to start // Give server some time to start
time.sleep(100 * time.Millisecond) time.sleep(50 * time.Millisecond)
return d.thread return d.thread
} }
@ -210,9 +211,10 @@ test_server_general_ok :: proc(t: ^testing.T) {
address = net.IP4_Address{127, 0, 0, 1}, address = net.IP4_Address{127, 0, 0, 1},
port = 8080, port = 8080,
} }
THREAD_COUNT :: 8
server: Server(Error) server: Server(Error)
server_init(&server, endpoint, context.allocator) server_init(&server, endpoint, THREAD_COUNT, context.allocator)
defer server_destroy(&server) defer server_destroy(&server)
server_add_route(&server, .GET, {"hello", ":entity"}, first_handler) server_add_route(&server, .GET, {"hello", ":entity"}, first_handler)
@ -236,15 +238,15 @@ test_server_general_ok :: proc(t: ^testing.T) {
for i in 0 ..< ITERATIONS { for i in 0 ..< ITERATIONS {
assert_endpoint_response( assert_endpoint_response(
"http://127.0.0.1:8080/hello/world", "http://127.0.0.1:8080/hello/world",
"<div>Hello firstworld</div>", "<div>Hello first world</div>",
) )
assert_endpoint_response( assert_endpoint_response(
"http://127.0.0.1:8080/hello/lonely%20world/only", "http://127.0.0.1:8080/hello/lonely%20world/only",
"<div>Hello secondlonely%20world</div>", "<div>Hello second lonely%20world</div>",
) )
assert_endpoint_response( assert_endpoint_response(
"http://127.0.0.1:8080/hello/world/and/worlds%20friend", "http://127.0.0.1:8080/hello/world/and/worlds%20friend",
"<div>Hello thirdworldandworlds%20friend</div>", "<div>Hello third world and worlds%20friend</div>",
) )
} }

View file

@ -23,9 +23,11 @@ Server :: struct($Error_Type: typeid) {
http.Response, http.Response,
Error_Type, Error_Type,
), ),
shutdown: bool, running: bool,
router: Router(Error_Type), router: Router(Error_Type),
thread_pool: thread.Pool, thread_pool: thread.Pool,
thread_count: uint,
allocator: runtime.Allocator,
} }
@(private) @(private)
@ -62,6 +64,7 @@ read_connection :: proc(
server_init :: proc( server_init :: proc(
server: ^Server($Error_Type), server: ^Server($Error_Type),
endpoint: net.Endpoint, endpoint: net.Endpoint,
thread_count: uint,
allocator := context.allocator, allocator := context.allocator,
) { ) {
server^ = Server(Error_Type) { server^ = Server(Error_Type) {
@ -80,11 +83,12 @@ server_init :: proc(
) )
return http_response, nil return http_response, nil
}, },
shutdown = false, thread_count = thread_count,
running = false,
allocator = allocator,
} }
THREAD_COUNT :: 8 thread.pool_init(&server.thread_pool, allocator, int(server.thread_count))
thread.pool_init(&server.thread_pool, allocator, THREAD_COUNT)
thread.pool_start(&server.thread_pool) thread.pool_start(&server.thread_pool)
router_init(&server.router, allocator) router_init(&server.router, allocator)
} }
@ -131,7 +135,7 @@ server_remove_route :: proc(
} }
server_shutdown :: proc(server: ^Server($Error_Type)) { server_shutdown :: proc(server: ^Server($Error_Type)) {
sync.atomic_store(&server.shutdown, true) sync.atomic_store(&server.running, false)
} }
@(private) @(private)
@ -192,7 +196,7 @@ request_thread_task :: proc(t: thread.Task) {
serve(task_data.server, task_data.client_socket) serve(task_data.server, task_data.client_socket)
net.close(task_data.client_socket) net.close(task_data.client_socket)
free_all(context.temp_allocator) // context.temp_allocator is freed automatically on thread death.
} }
@(private) @(private)
@ -206,6 +210,7 @@ spawn_request_thread :: proc(
client_socket = client_socket, client_socket = client_socket,
} }
// Not sure about sharing the default non temp allocator, have to think about
thread.pool_add_task( thread.pool_add_task(
&server.thread_pool, &server.thread_pool,
context.allocator, context.allocator,
@ -219,7 +224,9 @@ server_listen_and_serve :: proc(server: ^Server($Error_Type)) {
log.assert(net_err == nil, "Couldn't create TCP socket") log.assert(net_err == nil, "Couldn't create TCP socket")
defer net.close(server_socket) defer net.close(server_socket)
for !sync.atomic_load(&server.shutdown) { sync.atomic_store(&server.running, true)
for sync.atomic_load(&server.running) {
client_socket, _, net_err := net.accept_tcp(server_socket) client_socket, _, net_err := net.accept_tcp(server_socket)
if net_err != nil { if net_err != nil {
log.warnf("Failed to accept TCP connection, reason: %s", net_err) log.warnf("Failed to accept TCP connection, reason: %s", net_err)