diff --git a/README.md b/README.md index 93ab2ac..27be18d 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ main :: proc() { fjord.server_add_route(&server, .GET, {"users", ":id", "posts", ":post_id"}, user_post_handler) log.infof("Server listening on http://127.0.0.1:%d", endpoint.port) - fjord.listen_and_serve(&server) + fjord.server_listen_and_serve(&server) } ``` diff --git a/fjord_test.odin b/fjord_test.odin index f066c2f..f76e157 100644 --- a/fjord_test.odin +++ b/fjord_test.odin @@ -140,7 +140,7 @@ start_concurrent_server :: proc( server_thread_proc :: proc(t: ^thread.Thread) { d := (^ServerThreadData)(t.data) - listen_and_serve(d.server) + server_listen_and_serve(d.server) } d := new(ServerThreadData, context.temp_allocator) @@ -201,7 +201,6 @@ assert_endpoint_response :: proc(url: string, expected_response: string) { ) } - @(test) test_server_general_ok :: proc(t: ^testing.T) { context.logger = log.create_console_logger(.Info) @@ -232,18 +231,22 @@ test_server_general_ok :: proc(t: ^testing.T) { t := start_concurrent_server(&server) - assert_endpoint_response( - "http://127.0.0.1:8080/hello/world", - "
Hello firstworld
", - ) - assert_endpoint_response( - "http://127.0.0.1:8080/hello/lonely%20world/only", - "
Hello secondlonely%20world
", - ) - assert_endpoint_response( - "http://127.0.0.1:8080/hello/world/and/worlds%20friend", - "
Hello thirdworldandworlds%20friend
", - ) + // Try with some load + ITERATIONS :: 100 + for i in 0 ..< ITERATIONS { + assert_endpoint_response( + "http://127.0.0.1:8080/hello/world", + "
Hello firstworld
", + ) + assert_endpoint_response( + "http://127.0.0.1:8080/hello/lonely%20world/only", + "
Hello secondlonely%20world
", + ) + assert_endpoint_response( + "http://127.0.0.1:8080/hello/world/and/worlds%20friend", + "
Hello thirdworldandworlds%20friend
", + ) + } shutdown_concurrent_server(&server, t) diff --git a/http/marshaller.odin b/http/marshaller.odin index 973dfed..eb80d82 100644 --- a/http/marshaller.odin +++ b/http/marshaller.odin @@ -1,3 +1,4 @@ +#+private package package http import "core:bufio" diff --git a/server.odin b/server.odin index baa5b0e..4d3236e 100644 --- a/server.odin +++ b/server.odin @@ -1,5 +1,6 @@ package fjord +import "base:runtime" import "core:bufio" import "core:bytes" import "core:io" @@ -9,6 +10,7 @@ import "core:path/slashpath" import "core:strconv" import "core:strings" import "core:sync" +import "core:thread" import "core:unicode" import http "http" @@ -21,8 +23,11 @@ Server :: struct($Error_Type: typeid) { http.Response, Error_Type, ), - router: Router(Error_Type), shutdown: bool, + router: Router(Error_Type), + work_queue: WorkQueue, + worker_count: int, + worker_threads: [dynamic]^thread.Thread, } @(private) @@ -61,6 +66,9 @@ server_init :: proc( endpoint: net.Endpoint, allocator := context.allocator, ) { + WORKER_COUNT :: 8 + QUEUE_CAPACITY :: 252 + server^ = Server(Error_Type) { endpoint = endpoint, not_found_handler = proc( @@ -78,11 +86,20 @@ server_init :: proc( return http_response, nil }, shutdown = false, + worker_count = WORKER_COUNT, + worker_threads = make([dynamic]^thread.Thread, 0, WORKER_COUNT, allocator), } + + work_queue_init(&server.work_queue, QUEUE_CAPACITY, allocator) router_init(&server.router, allocator) } server_destroy :: proc(server: ^Server($Error_Type)) { + for t in server.worker_threads { + thread.destroy(t) + } + delete(server.worker_threads) + work_queue_destroy(&server.work_queue) router_destroy(&server.router) } @@ -122,76 +139,97 @@ server_shutdown :: proc(server: ^Server($Error_Type)) { sync.atomic_store(&server.shutdown, true) } -listen_and_serve :: proc(server: ^Server($Error_Type)) { +@(private) +serve :: proc(server: ^Server($Error_Type), client_socket: net.TCP_Socket) { + data, read_err := read_connection(client_socket, context.temp_allocator) + if read_err != nil { + log.warnf("Failed to read request, reason: %s", read_err) + return + } + + http_request, unmarshall_err := http.unmarshall_request( + data, + context.temp_allocator, + ) + if unmarshall_err != nil { + log.warnf("Failed to unmarshall request, reason: %s", unmarshall_err) + return + } + + handler: #type proc(request: ^http.Request) -> (http.Response, Error_Type) + ok: bool + + // todo: should sanitize better + path := strings.split(http_request.path, "/", context.temp_allocator)[1:] + + handler, http_request.path_variables, ok = router_lookup( + &server.router, + http_request.method, + path, + context.temp_allocator, + ) + if !ok do handler = server.not_found_handler + + http_response, handler_err := handler(&http_request) + + if handler_err != nil { + log.warnf("Handler failed with error: %s", handler_err) + return + } + + marshalled_response := http.marshall_response( + &http_response, + context.temp_allocator, + ) + net.send_tcp(client_socket, marshalled_response) +} + +@(private) +worker_thread :: proc(server: ^Server($Error_Type)) { + for { + socket, ok := work_queue_dequeue(&server.work_queue) + if !ok do break + + serve(server, socket) + net.close(socket) + free_all(context.temp_allocator) + } +} + +server_listen_and_serve :: proc(server: ^Server($Error_Type)) { server_socket, net_err := net.listen_tcp(server.endpoint) log.assert(net_err == nil, "Couldn't create TCP socket") defer net.close(server_socket) + for i in 0 ..< server.worker_count { + t := thread.create(proc(t: ^thread.Thread) { + server := cast(^Server(Error_Type))t.data + worker_thread(server) + }) + log.assertf(t != nil, "Thread creation failed") + + t.data = rawptr(server) + thread.start(t) + append(&server.worker_threads, t) + } + for !sync.atomic_load(&server.shutdown) { - client_socket, source, net_err := net.accept_tcp(server_socket) + client_socket, _, net_err := net.accept_tcp(server_socket) if net_err != nil { log.warnf("Failed to accept TCP connection, reason: %s", net_err) continue } - defer net.close(client_socket) - data, read_err := read_connection( - client_socket, - context.temp_allocator, - ) - if read_err != nil { - log.warnf("Failed to read request, reason: %s", read_err) - continue + ok := work_queue_enqueue(&server.work_queue, client_socket) + if !ok { + log.warn("Work queue full, rejecting connection") + net.close(client_socket) } + } - http_request, unmarshall_err := http.unmarshall_request( - data, - context.temp_allocator, - ) - if unmarshall_err != nil { - log.warnf( - "Failed to unmarshall request, reason: %s", - unmarshall_err, - ) - continue - } + work_queue_shutdown(&server.work_queue) - handler: #type proc( - request: ^http.Request, - ) -> ( - http.Response, - Error_Type, - ) - ok: bool - - // todo: should sanitize better - path := strings.split( - http_request.path, - "/", - context.temp_allocator, - )[1:] - - handler, http_request.path_variables, ok = router_lookup( - &server.router, - http_request.method, - path, - context.temp_allocator, - ) - if !ok do handler = server.not_found_handler - - http_response, handler_err := handler(&http_request) - - if handler_err != nil { - log.warnf("Handler failed with error: %s", handler_err) - continue - } - - marshalled_response := http.marshall_response( - &http_response, - context.temp_allocator, - ) - net.send_tcp(client_socket, marshalled_response) - - free_all(context.temp_allocator) + for t in server.worker_threads { + thread.join(t) } } diff --git a/work_queue.odin b/work_queue.odin new file mode 100644 index 0000000..293b4e8 --- /dev/null +++ b/work_queue.odin @@ -0,0 +1,77 @@ +#+private package +package fjord + +import "base:runtime" +import "core:net" +import "core:sync" + +WorkQueue :: struct { + items: [dynamic]net.TCP_Socket, + capacity: int, + mutex: sync.Mutex, + not_empty: sync.Cond, + shutdown: bool, + allocator: runtime.Allocator, +} + +work_queue_init :: proc( + work_queue: ^WorkQueue, + capacity: int, + allocator := context.allocator, +) { + work_queue^ = { + items = make([dynamic]net.TCP_Socket, 0, capacity, allocator), + capacity = capacity, + shutdown = false, + allocator = allocator, + } +} + +work_queue_destroy :: proc(work_queue: ^WorkQueue) { + sync.mutex_lock(&work_queue.mutex) + for socket in work_queue.items { + net.close(socket) + } + sync.mutex_unlock(&work_queue.mutex) + + delete(work_queue.items) +} + +work_queue_enqueue :: proc(work_queue: ^WorkQueue, socket: net.TCP_Socket) -> bool { + sync.mutex_lock(&work_queue.mutex) + defer sync.mutex_unlock(&work_queue.mutex) + + if work_queue.shutdown do return false + + if len(work_queue.items) >= work_queue.capacity { + return false + } + + append(&work_queue.items, socket) + sync.cond_signal(&work_queue.not_empty) + return true +} + +work_queue_dequeue :: proc(work_queue: ^WorkQueue) -> (socket: net.TCP_Socket, ok: bool) { + sync.mutex_lock(&work_queue.mutex) + defer sync.mutex_unlock(&work_queue.mutex) + + for len(work_queue.items) == 0 && !work_queue.shutdown { + sync.cond_wait(&work_queue.not_empty, &work_queue.mutex) + } + + if work_queue.shutdown && len(work_queue.items) == 0 { + return socket, false + } + + socket = pop_front(&work_queue.items) + return socket, true +} + +work_queue_shutdown :: proc(work_queue: ^WorkQueue) { + sync.mutex_lock(&work_queue.mutex) + defer sync.mutex_unlock(&work_queue.mutex) + + work_queue.shutdown = true + sync.cond_broadcast(&work_queue.not_empty) +}