Replace work queue with Odin core thread pool
This commit is contained in:
parent
a52c0cc2fd
commit
0bf25b991f
4 changed files with 44 additions and 123 deletions
|
|
@ -1,4 +1,3 @@
|
|||
#+private package
|
||||
package http
|
||||
|
||||
import "core:bufio"
|
||||
|
|
@ -56,7 +55,7 @@ status_text :: proc(status: Status) -> string {
|
|||
unmarshall_request_line :: proc(
|
||||
request: ^Request,
|
||||
reader: ^bufio.Reader,
|
||||
allocator := context.temp_allocator
|
||||
allocator := context.temp_allocator,
|
||||
) -> RequestError {
|
||||
request_line, io_err := bufio.reader_read_slice(reader, '\n')
|
||||
if io_err != nil do return .InvalidRequestLine
|
||||
|
|
|
|||
83
server.odin
83
server.odin
|
|
@ -25,9 +25,7 @@ Server :: struct($Error_Type: typeid) {
|
|||
),
|
||||
shutdown: bool,
|
||||
router: Router(Error_Type),
|
||||
work_queue: WorkQueue,
|
||||
worker_count: int,
|
||||
worker_threads: [dynamic]^thread.Thread,
|
||||
thread_pool: thread.Pool,
|
||||
}
|
||||
|
||||
@(private)
|
||||
|
|
@ -66,9 +64,6 @@ server_init :: proc(
|
|||
endpoint: net.Endpoint,
|
||||
allocator := context.allocator,
|
||||
) {
|
||||
WORKER_COUNT :: 8
|
||||
QUEUE_CAPACITY :: 252
|
||||
|
||||
server^ = Server(Error_Type) {
|
||||
endpoint = endpoint,
|
||||
not_found_handler = proc(
|
||||
|
|
@ -86,20 +81,20 @@ server_init :: proc(
|
|||
return http_response, nil
|
||||
},
|
||||
shutdown = false,
|
||||
worker_count = WORKER_COUNT,
|
||||
worker_threads = make([dynamic]^thread.Thread, 0, WORKER_COUNT, allocator),
|
||||
}
|
||||
|
||||
work_queue_init(&server.work_queue, QUEUE_CAPACITY, allocator)
|
||||
THREAD_COUNT :: 8
|
||||
thread.pool_init(&server.thread_pool, allocator, THREAD_COUNT)
|
||||
thread.pool_start(&server.thread_pool)
|
||||
router_init(&server.router, allocator)
|
||||
}
|
||||
|
||||
server_destroy :: proc(server: ^Server($Error_Type)) {
|
||||
for t in server.worker_threads {
|
||||
thread.destroy(t)
|
||||
if sync.atomic_load(&server.thread_pool.is_running) {
|
||||
thread.pool_join(&server.thread_pool)
|
||||
}
|
||||
delete(server.worker_threads)
|
||||
work_queue_destroy(&server.work_queue)
|
||||
|
||||
thread.pool_destroy(&server.thread_pool)
|
||||
router_destroy(&server.router)
|
||||
}
|
||||
|
||||
|
|
@ -184,16 +179,39 @@ serve :: proc(server: ^Server($Error_Type), client_socket: net.TCP_Socket) {
|
|||
net.send_tcp(client_socket, marshalled_response)
|
||||
}
|
||||
|
||||
@(private)
|
||||
worker_thread :: proc(server: ^Server($Error_Type)) {
|
||||
for {
|
||||
socket, ok := work_queue_dequeue(&server.work_queue)
|
||||
if !ok do break
|
||||
RequestThreadData :: struct($Error_Type: typeid) {
|
||||
server: ^Server(Error_Type),
|
||||
client_socket: net.TCP_Socket,
|
||||
}
|
||||
|
||||
serve(server, socket)
|
||||
net.close(socket)
|
||||
@(private)
|
||||
request_thread_task :: proc(t: thread.Task) {
|
||||
task_data := cast(^RequestThreadData(any))t.data
|
||||
defer free(task_data, t.allocator)
|
||||
|
||||
serve(task_data.server, task_data.client_socket)
|
||||
|
||||
net.close(task_data.client_socket)
|
||||
free_all(context.temp_allocator)
|
||||
}
|
||||
|
||||
@(private)
|
||||
spawn_request_thread :: proc(
|
||||
server: ^Server($Error_Type),
|
||||
client_socket: net.TCP_Socket,
|
||||
) {
|
||||
task_data := new(RequestThreadData(Error_Type))
|
||||
task_data^ = RequestThreadData(Error_Type) {
|
||||
server = server,
|
||||
client_socket = client_socket,
|
||||
}
|
||||
|
||||
thread.pool_add_task(
|
||||
&server.thread_pool,
|
||||
context.allocator,
|
||||
request_thread_task,
|
||||
rawptr(task_data),
|
||||
)
|
||||
}
|
||||
|
||||
server_listen_and_serve :: proc(server: ^Server($Error_Type)) {
|
||||
|
|
@ -201,18 +219,6 @@ server_listen_and_serve :: proc(server: ^Server($Error_Type)) {
|
|||
log.assert(net_err == nil, "Couldn't create TCP socket")
|
||||
defer net.close(server_socket)
|
||||
|
||||
for i in 0 ..< server.worker_count {
|
||||
t := thread.create(proc(t: ^thread.Thread) {
|
||||
server := cast(^Server(Error_Type))t.data
|
||||
worker_thread(server)
|
||||
})
|
||||
log.assertf(t != nil, "Thread creation failed")
|
||||
|
||||
t.data = rawptr(server)
|
||||
thread.start(t)
|
||||
append(&server.worker_threads, t)
|
||||
}
|
||||
|
||||
for !sync.atomic_load(&server.shutdown) {
|
||||
client_socket, _, net_err := net.accept_tcp(server_socket)
|
||||
if net_err != nil {
|
||||
|
|
@ -220,16 +226,9 @@ server_listen_and_serve :: proc(server: ^Server($Error_Type)) {
|
|||
continue
|
||||
}
|
||||
|
||||
ok := work_queue_enqueue(&server.work_queue, client_socket)
|
||||
if !ok {
|
||||
log.warn("Work queue full, rejecting connection")
|
||||
net.close(client_socket)
|
||||
}
|
||||
spawn_request_thread(server, client_socket)
|
||||
}
|
||||
|
||||
work_queue_shutdown(&server.work_queue)
|
||||
|
||||
for t in server.worker_threads {
|
||||
thread.join(t)
|
||||
}
|
||||
thread.pool_finish(&server.thread_pool)
|
||||
thread.pool_join(&server.thread_pool)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,77 +0,0 @@
|
|||
#+private package
|
||||
package fjord
|
||||
|
||||
import "base:runtime"
|
||||
import "core:net"
|
||||
import "core:sync"
|
||||
|
||||
WorkQueue :: struct {
|
||||
items: [dynamic]net.TCP_Socket,
|
||||
capacity: int,
|
||||
mutex: sync.Mutex,
|
||||
not_empty: sync.Cond,
|
||||
shutdown: bool,
|
||||
allocator: runtime.Allocator,
|
||||
}
|
||||
|
||||
work_queue_init :: proc(
|
||||
work_queue: ^WorkQueue,
|
||||
capacity: int,
|
||||
allocator := context.allocator,
|
||||
) {
|
||||
work_queue^ = {
|
||||
items = make([dynamic]net.TCP_Socket, 0, capacity, allocator),
|
||||
capacity = capacity,
|
||||
shutdown = false,
|
||||
allocator = allocator,
|
||||
}
|
||||
}
|
||||
|
||||
work_queue_destroy :: proc(work_queue: ^WorkQueue) {
|
||||
sync.mutex_lock(&work_queue.mutex)
|
||||
for socket in work_queue.items {
|
||||
net.close(socket)
|
||||
}
|
||||
sync.mutex_unlock(&work_queue.mutex)
|
||||
|
||||
delete(work_queue.items)
|
||||
}
|
||||
|
||||
work_queue_enqueue :: proc(work_queue: ^WorkQueue, socket: net.TCP_Socket) -> bool {
|
||||
sync.mutex_lock(&work_queue.mutex)
|
||||
defer sync.mutex_unlock(&work_queue.mutex)
|
||||
|
||||
if work_queue.shutdown do return false
|
||||
|
||||
if len(work_queue.items) >= work_queue.capacity {
|
||||
return false
|
||||
}
|
||||
|
||||
append(&work_queue.items, socket)
|
||||
sync.cond_signal(&work_queue.not_empty)
|
||||
return true
|
||||
}
|
||||
|
||||
work_queue_dequeue :: proc(work_queue: ^WorkQueue) -> (socket: net.TCP_Socket, ok: bool) {
|
||||
sync.mutex_lock(&work_queue.mutex)
|
||||
defer sync.mutex_unlock(&work_queue.mutex)
|
||||
|
||||
for len(work_queue.items) == 0 && !work_queue.shutdown {
|
||||
sync.cond_wait(&work_queue.not_empty, &work_queue.mutex)
|
||||
}
|
||||
|
||||
if work_queue.shutdown && len(work_queue.items) == 0 {
|
||||
return socket, false
|
||||
}
|
||||
|
||||
socket = pop_front(&work_queue.items)
|
||||
return socket, true
|
||||
}
|
||||
|
||||
work_queue_shutdown :: proc(work_queue: ^WorkQueue) {
|
||||
sync.mutex_lock(&work_queue.mutex)
|
||||
defer sync.mutex_unlock(&work_queue.mutex)
|
||||
|
||||
work_queue.shutdown = true
|
||||
sync.cond_broadcast(&work_queue.not_empty)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue