From 79e4c393e9b23574aa1d59ce2ccafbc60643a49a Mon Sep 17 00:00:00 2001 From: DanielGavin Date: Sat, 5 Dec 2020 21:00:49 +0100 Subject: requests are now made into tasks --- src/common/config.odin | 1 - src/common/pool.odin | 162 ++++++++++++++++++++++++++++++++++++++ src/index/background.odin | 7 ++ src/main.odin | 16 ++-- src/server/analysis.odin | 1 - src/server/documents.odin | 23 +++++- src/server/log.odin | 2 + src/server/requests.odin | 196 +++++++++++++++++++++++++++++++--------------- src/server/types.odin | 1 + src/server/writer.odin | 11 ++- 10 files changed, 346 insertions(+), 74 deletions(-) create mode 100644 src/common/pool.odin create mode 100644 src/index/background.odin (limited to 'src') diff --git a/src/common/config.odin b/src/common/config.odin index 281bec9..d9131d6 100644 --- a/src/common/config.odin +++ b/src/common/config.odin @@ -6,7 +6,6 @@ Config :: struct { hover_support_md: bool, signature_offset_support: bool, collections: map [string] string, - thread_pool_count: int, running: bool, }; diff --git a/src/common/pool.odin b/src/common/pool.odin new file mode 100644 index 0000000..0bccb2d --- /dev/null +++ b/src/common/pool.odin @@ -0,0 +1,162 @@ +package common + +import "intrinsics" +import "core:sync" +import "core:mem" +import "core:thread" + +Task_Status :: enum i32 { + Ready, + Busy, + Waiting, + Term, +} + +Task_Proc :: #type proc(task: ^Task); + +Task :: struct { + procedure: Task_Proc, + data: rawptr, + user_index: int, +} + +Task_Id :: distinct i32; +INVALID_TASK_ID :: Task_Id(-1); + + +Pool :: struct { + allocator: mem.Allocator, + mutex: sync.Mutex, + sem_available: sync.Semaphore, + processing_task_count: int, // atomic + is_running: bool, + + threads: []^thread.Thread, + + tasks: [dynamic]Task, +} + +pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) { + worker_thread_internal :: proc(t: ^thread.Thread) { + pool := (^Pool)(t.data); + + temp_allocator: Scratch_Allocator; + + scratch_allocator_init(&temp_allocator, mem.megabytes(1)); + + context.temp_allocator = scratch_allocator(&temp_allocator); + + for pool.is_running { + sync.semaphore_wait_for(&pool.sem_available); + + if task, ok := pool_try_and_pop_task(pool); ok { + pool_do_work(pool, &task); + } + + free_all(context.temp_allocator); + } + + scratch_allocator_destroy(&temp_allocator); + + sync.semaphore_post(&pool.sem_available, 1); + } + + + context.allocator = allocator; + pool.allocator = allocator; + pool.tasks = make([dynamic]Task); + pool.threads = make([]^thread.Thread, thread_count); + + sync.mutex_init(&pool.mutex); + sync.semaphore_init(&pool.sem_available); + pool.is_running = true; + + for _, i in pool.threads { + t := thread.create(worker_thread_internal); + t.user_index = i; + t.data = pool; + pool.threads[i] = t; + } +} + +pool_destroy :: proc(pool: ^Pool) { + delete(pool.tasks); + + for t in &pool.threads { + thread.destroy(t); + } + + delete(pool.threads, pool.allocator); + + sync.mutex_destroy(&pool.mutex); + sync.semaphore_destroy(&pool.sem_available); +} + +pool_start :: proc(pool: ^Pool) { + for t in pool.threads { + thread.start(t); + } +} + +pool_join :: proc(pool: ^Pool) { + pool.is_running = false; + + sync.semaphore_post(&pool.sem_available, len(pool.threads)); + + thread.yield(); + + for t in pool.threads { + thread.join(t); + } +} + +pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) { + sync.mutex_lock(&pool.mutex); + defer sync.mutex_unlock(&pool.mutex); + + task: Task; + task.procedure = procedure; + task.data = data; + task.user_index = user_index; + + append(&pool.tasks, task); + sync.semaphore_post(&pool.sem_available, 1); +} + +pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) { + if sync.mutex_try_lock(&pool.mutex) { + if len(pool.tasks) != 0 { + intrinsics.atomic_add(&pool.processing_task_count, 1); + task = pop_front(&pool.tasks); + got_task = true; + } + sync.mutex_unlock(&pool.mutex); + } + return; +} + + +pool_do_work :: proc(pool: ^Pool, task: ^Task) { + task.procedure(task); + intrinsics.atomic_sub(&pool.processing_task_count, 1); +} + + +pool_wait_and_process :: proc(pool: ^Pool) { + for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 { + if task, ok := pool_try_and_pop_task(pool); ok { + pool_do_work(pool, &task); + } + + // Safety kick + if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 { + sync.mutex_lock(&pool.mutex); + sync.semaphore_post(&pool.sem_available, len(pool.tasks)); + sync.mutex_unlock(&pool.mutex); + } + + thread.yield(); + } + + pool_join(pool); +} diff --git a/src/index/background.odin b/src/index/background.odin new file mode 100644 index 0000000..eb9a3dd --- /dev/null +++ b/src/index/background.odin @@ -0,0 +1,7 @@ +package index + +/* + Background thread that runs and ensures that the dynamic indexer is not stale +*/ + + diff --git a/src/main.odin b/src/main.odin index 038a99f..b326859 100644 --- a/src/main.odin +++ b/src/main.odin @@ -18,12 +18,14 @@ import "shared:common" os_read :: proc(handle: rawptr, data: [] byte) -> (int, int) { - return os.read(cast(os.Handle)handle, data); + a, b := os.read(cast(os.Handle)handle, data); + return a, cast(int)b; } os_write :: proc(handle: rawptr, data: [] byte) -> (int, int) { - return os.write(cast(os.Handle)handle, data); + a, b := os.write(cast(os.Handle)handle, data); + return a, cast(int)b; } //Note(Daniel, Should look into handling errors without crashing from parsing) @@ -40,7 +42,6 @@ run :: proc(reader: ^server.Reader, writer: ^server.Writer) { log.info("Starting Odin Language Server"); - thread.pool_init(&server.pool, config.thread_pool_count); config.running = true; @@ -83,7 +84,8 @@ run :: proc(reader: ^server.Reader, writer: ^server.Writer) { index.free_static_index(); - thread.pool_destroy(&server.pool); + common.pool_wait_and_process(&server.pool); + common.pool_destroy(&server.pool); //common.memleak_dump(tracking_allocator, common.log_dump, nil); @@ -102,10 +104,14 @@ main :: proc() { init_global_temporary_allocator(mem.megabytes(200)); + context.logger = log.Logger{nil, nil, log.Level.Debug, nil}; //have to set the procedure to nil to avoid calling tprintf... + //fd, err := os.open("C:/Users/danie/OneDrive/Desktop/Computer_Science/ols/log.txt", os.O_RDWR|os.O_CREATE|os.O_TRUNC ); //context.logger = log.create_file_logger(fd); - context.logger = server.create_lsp_logger(&writer); + //useless now with multithreading because the tprintf is called before passing the string to proc..................... + //probably make my own logging system. + //context.logger = server.create_lsp_logger(&writer); run(&reader, &writer); } diff --git a/src/server/analysis.odin b/src/server/analysis.odin index b84379f..82f415b 100644 --- a/src/server/analysis.odin +++ b/src/server/analysis.odin @@ -663,7 +663,6 @@ resolve_type_identifier :: proc(ast_context: ^AstContext, node: ast.Ident) -> (i } } - //note(Daniel, if global and local ends up being 100% same just make a function that takes the map) if local, ok := ast_context.locals[node.name]; ast_context.use_locals && ok { diff --git a/src/server/documents.odin b/src/server/documents.odin index 5a0261c..dedc74a 100644 --- a/src/server/documents.odin +++ b/src/server/documents.odin @@ -10,6 +10,8 @@ import "core:odin/tokenizer" import "core:path" import "core:mem" +import "intrinsics" + import "shared:common" ParserError :: struct { @@ -36,6 +38,7 @@ Document :: struct { imports: [] Package, package_name: string, allocator: ^common.Scratch_Allocator, //because does not support freeing I use arena allocators for each document + operating_on: int, //atomic }; DocumentStorage :: struct { @@ -75,7 +78,23 @@ document_get :: proc(uri_string: string) -> ^Document { return nil; } - return &document_storage.documents[uri.path]; + document := &document_storage.documents[uri.path]; + + if document == nil { + return nil; + } + + intrinsics.atomic_add(&document.operating_on, 1); + + return document; +} + +document_release :: proc(document: ^Document) { + + if document != nil { + intrinsics.atomic_sub(&document.operating_on, 1); + } + } /* @@ -126,7 +145,7 @@ document_open :: proc(uri_string: string, text: string, config: ^common.Config, return err; } - document_storage.documents[uri.path] = document; + document_storage.documents[strings.clone(uri.path)] = document; } diff --git a/src/server/log.odin b/src/server/log.odin index 59a98c7..b7b5466 100644 --- a/src/server/log.odin +++ b/src/server/log.odin @@ -5,6 +5,7 @@ import "core:strings"; import "core:os"; import "core:time"; import "core:log"; +import "core:sync" Default_Console_Logger_Opts :: log.Options{ @@ -31,6 +32,7 @@ destroy_lsp_logger :: proc(log: ^log.Logger) { } lsp_logger_proc :: proc(logger_data: rawptr, level: log.Level, text: string, options: log.Options, location := #caller_location) { + data := cast(^Lsp_Logger_Data)logger_data; backing: [1024]byte; //NOTE(Hoej): 1024 might be too much for a header backing, unless somebody has really long paths. diff --git a/src/server/requests.odin b/src/server/requests.odin index 96146a1..0a05bfc 100644 --- a/src/server/requests.odin +++ b/src/server/requests.odin @@ -11,6 +11,8 @@ import "core:encoding/json" import "core:path" import "core:runtime" import "core:thread" +import "core:sync" +import "intrinsics" import "shared:common" import "shared:index" @@ -39,6 +41,7 @@ RequestType :: enum { RequestInfo :: struct { params: json.Value, + document: ^Document, id: RequestId, config: ^common.Config, writer: ^Writer, @@ -46,10 +49,10 @@ RequestInfo :: struct { }; -pool: thread.Pool; +pool: common.Pool; -get_request_info :: proc(task: ^thread.Task) -> ^RequestInfo { +get_request_info :: proc(task: ^common.Task) -> ^RequestInfo { return cast(^RequestInfo)task.data; } @@ -156,7 +159,7 @@ read_and_parse_body :: proc(reader: ^Reader, header: Header) -> (json.Value, boo err: json.Error; - value, err = json.parse(data = data, allocator = context.temp_allocator, parse_integers = true); + value, err = json.parse(data = data, allocator = context.allocator, parse_integers = true); if(err != json.Error.None) { log.error("Failed to parse body"); @@ -245,7 +248,7 @@ handle_request :: proc(request: json.Value, config: ^common.Config, writer: ^Wri info.config = config; info.writer = writer; - task_proc: thread.Task_Proc; + task_proc: common.Task_Proc; switch request_type { case .Initialize: @@ -278,12 +281,67 @@ handle_request :: proc(request: json.Value, config: ^common.Config, writer: ^Wri task_proc = request_semantic_token_range; } - task := thread.Task { + task := common.Task { data = info, procedure = task_proc, }; - task_proc(&task); + #partial switch request_type { + case .Initialize, .Initialized: + task_proc(&task); + case .Completion, .Definition: + + uri := root["params"].value.(json.Object)["textDocument"].value.(json.Object)["uri"].value.(json.String); + + document := document_get(uri); + + if document == nil { + handle_error(.InternalError, id, writer); + return false; + } + + info.document = document; + + task_proc(&task); + + case .DidClose, .DidChange, .DidOpen: + + uri := root["params"].value.(json.Object)["textDocument"].value.(json.Object)["uri"].value.(json.String); + + document := document_get(uri); + + if document != nil { + + for intrinsics.atomic_load(&document.operating_on) > 1 { + if task, ok := common.pool_try_and_pop_task(&pool); ok { + common.pool_do_work(&pool, &task); + } + } + + } + + task_proc(&task); + + document_release(document); + case .Shutdown, .Exit: + task_proc(&task); + case .SignatureHelp, .SemanticTokensFull, .SemanticTokensRange, .DocumentSymbol: + + uri := root["params"].value.(json.Object)["textDocument"].value.(json.Object)["uri"].value.(json.String); + + document := document_get(uri); + + if document == nil { + handle_error(.InternalError, id, writer); + return false; + } + + info.document = document; + + common.pool_add_task(&pool, task_proc, info); + case: + common.pool_add_task(&pool, task_proc, info); + } } @@ -291,12 +349,14 @@ handle_request :: proc(request: json.Value, config: ^common.Config, writer: ^Wri return true; } -request_initialize :: proc(task: ^thread.Task) { - +request_initialize :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer json.destroy_value(params); + defer free(info); + params_object, ok := params.value.(json.Object); if !ok { @@ -317,6 +377,8 @@ request_initialize :: proc(task: ^thread.Task) { append(&config.workspace_folders, s); } + thread_count := 2; + //right now just look at the first workspace - TODO(daniel, add multiple workspace support) if uri, ok := common.parse_uri(config.workspace_folders[0].uri, context.temp_allocator); ok { @@ -330,6 +392,8 @@ request_initialize :: proc(task: ^thread.Task) { if unmarshal(value, ols_config, context.temp_allocator) == .None { + thread_count = ols_config.thread_pool_count; + for p in ols_config.collections { config.collections[strings.clone(p.name)] = strings.clone(strings.to_lower(p.path, context.temp_allocator)); } @@ -342,6 +406,9 @@ request_initialize :: proc(task: ^thread.Task) { } + common.pool_init(&pool, thread_count); + common.pool_start(&pool); + for format in initialize_params.capabilities.textDocument.hover.contentFormat { if format == .Markdown { config.hover_support_md = true; @@ -407,30 +474,40 @@ request_initialize :: proc(task: ^thread.Task) { log.info("Finished indexing"); } -request_initialized :: proc(task: ^thread.Task) { - +request_initialized :: proc(task: ^common.Task) { + info := get_request_info(task); + free(info); } -request_shutdown :: proc(task: ^thread.Task) { +request_shutdown :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer json.destroy_value(params); + defer free(info); + response := make_response_message( params = nil, id = id, ); send_response(response, writer); + + } -request_definition :: proc(task: ^thread.Task) { +request_definition :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer document_release(document); + defer free(info); + defer json.destroy_value(params); + params_object, ok := params.value.(json.Object); if !ok { @@ -445,13 +522,6 @@ request_definition :: proc(task: ^thread.Task) { return; } - document := document_get(definition_params.textDocument.uri); - - if document == nil { - handle_error(.InternalError, id, writer); - return; - } - location, ok2 := get_definition_location(document, definition_params.position); if !ok2 { @@ -467,12 +537,16 @@ request_definition :: proc(task: ^thread.Task) { } -request_completion :: proc(task: ^thread.Task) { +request_completion :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer document_release(document); + defer json.destroy_value(params); + defer free(info); + params_object, ok := params.value.(json.Object); if !ok { @@ -489,13 +563,6 @@ request_completion :: proc(task: ^thread.Task) { return; } - document := document_get(completition_params.textDocument.uri); - - if document == nil { - handle_error(.InternalError, id, writer); - return; - } - list: CompletionList; list, ok = get_completion_list(document, completition_params.position); @@ -512,12 +579,16 @@ request_completion :: proc(task: ^thread.Task) { send_response(response, writer); } -request_signature_help :: proc(task: ^thread.Task) { +request_signature_help :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer document_release(document); + defer json.destroy_value(params); + defer free(info); + params_object, ok := params.value.(json.Object); if !ok { @@ -532,13 +603,6 @@ request_signature_help :: proc(task: ^thread.Task) { return; } - document := document_get(signature_params.textDocument.uri); - - if document == nil { - handle_error(.InternalError, id, writer); - return; - } - help: SignatureHelp; help, ok = get_signature_information(document, signature_params.position); @@ -550,18 +614,21 @@ request_signature_help :: proc(task: ^thread.Task) { send_response(response, writer); } -notification_exit :: proc(task: ^thread.Task) { +notification_exit :: proc(task: ^common.Task) { info := get_request_info(task); using info; config.running = false; } -notification_did_open :: proc(task: ^thread.Task) { +notification_did_open :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer json.destroy_value(params); + defer free(info); + params_object, ok := params.value.(json.Object); if !ok { @@ -583,12 +650,15 @@ notification_did_open :: proc(task: ^thread.Task) { } } -notification_did_change :: proc(task: ^thread.Task) { +notification_did_change :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer json.destroy_value(params); + defer free(info); + params_object, ok := params.value.(json.Object); if !ok { @@ -607,12 +677,15 @@ notification_did_change :: proc(task: ^thread.Task) { document_apply_changes(change_params.textDocument.uri, change_params.contentChanges, config, writer); } -notification_did_close :: proc(task: ^thread.Task) { +notification_did_close :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer json.destroy_value(params); + defer free(info); + params_object, ok := params.value.(json.Object); if !ok { @@ -633,17 +706,25 @@ notification_did_close :: proc(task: ^thread.Task) { } } -notification_did_save :: proc(task: ^thread.Task) { +notification_did_save :: proc(task: ^common.Task) { + info := get_request_info(task); + using info; + json.destroy_value(params); + free(info); } -request_semantic_token_full :: proc(task: ^thread.Task) { +request_semantic_token_full :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer document_release(document); + defer json.destroy_value(params); + defer free(info); + params_object, ok := params.value.(json.Object); if !ok { @@ -658,13 +739,6 @@ request_semantic_token_full :: proc(task: ^thread.Task) { return; } - document := document_get(semantic_params.textDocument.uri); - - if document == nil { - handle_error(.InternalError, id, writer); - return; - } - range := common.Range { start = common.Position { line = 0, @@ -686,7 +760,7 @@ request_semantic_token_full :: proc(task: ^thread.Task) { send_response(response, writer); } -request_semantic_token_range :: proc(task: ^thread.Task) { +request_semantic_token_range :: proc(task: ^common.Task) { info := get_request_info(task); @@ -694,6 +768,10 @@ request_semantic_token_range :: proc(task: ^thread.Task) { params_object, ok := params.value.(json.Object); + defer document_release(document); + defer json.destroy_value(params); + defer free(info); + if !ok { handle_error(.ParseError, id, writer); return; @@ -706,13 +784,6 @@ request_semantic_token_range :: proc(task: ^thread.Task) { return; } - document := document_get(semantic_params.textDocument.uri); - - if document == nil { - handle_error(.InternalError, id, writer); - return; - } - //symbols: SemanticTokens; symbols := get_semantic_tokens(document, semantic_params.range); @@ -724,12 +795,16 @@ request_semantic_token_range :: proc(task: ^thread.Task) { send_response(response, writer); } -request_document_symbols :: proc(task: ^thread.Task) { +request_document_symbols :: proc(task: ^common.Task) { info := get_request_info(task); using info; + defer document_release(document); + defer json.destroy_value(params); + defer free(info); + params_object, ok := params.value.(json.Object); if !ok { @@ -744,13 +819,6 @@ request_document_symbols :: proc(task: ^thread.Task) { return; } - document := document_get(symbol_params.textDocument.uri); - - if document == nil { - handle_error(.InternalError, id, writer); - return; - } - symbols := get_document_symbols(document); response := make_response_message( diff --git a/src/server/types.odin b/src/server/types.odin index 0443cc4..fbd0341 100644 --- a/src/server/types.odin +++ b/src/server/types.odin @@ -274,6 +274,7 @@ ParameterInformation :: struct { OlsConfig :: struct { collections: [dynamic] OlsConfigCollection, + thread_pool_count: int, }; OlsConfigCollection :: struct { diff --git a/src/server/writer.odin b/src/server/writer.odin index 3c82451..a03313a 100644 --- a/src/server/writer.odin +++ b/src/server/writer.odin @@ -4,19 +4,28 @@ import "core:os" import "core:mem" import "core:fmt" import "core:strings" +import "core:sync" WriterFn :: proc(rawptr, [] byte) -> (int, int); Writer :: struct { writer_fn: WriterFn, writer_context: rawptr, + writer_mutex: sync.Mutex, }; make_writer :: proc(writer_fn: WriterFn, writer_context: rawptr) -> Writer { - return Writer { writer_context = writer_context, writer_fn = writer_fn }; + writer := Writer { writer_context = writer_context, writer_fn = writer_fn }; + sync.mutex_init(&writer.writer_mutex); + return writer; } write_sized :: proc(writer: ^Writer, data: []byte) -> bool { + + sync.mutex_lock(&writer.writer_mutex); + defer sync.mutex_unlock(&writer.writer_mutex); + + written, err := writer.writer_fn(writer.writer_context, data); if(err != 0 || written != len(data)) { -- cgit v1.2.3