diff --git a/common/proto/search.proto b/common/proto/search.proto index 86915ce..37aafd2 100644 --- a/common/proto/search.proto +++ b/common/proto/search.proto @@ -3,15 +3,27 @@ syntax = "proto3"; package komp_ac.search; service Searcher { - rpc SearchTable(SearchRequest) returns (SearchResponse); - rpc ExactSearchTable(SearchRequest) returns (SearchResponse); + rpc Search(SearchRequest) returns (SearchResponse); +} + +enum MatchMode { + MATCH_MODE_UNSPECIFIED = 0; + MATCH_MODE_FUZZY = 1; + MATCH_MODE_EXACT = 2; +} + +message ColumnConstraint { + string column = 1; + string query = 2; + MatchMode mode = 3; } message SearchRequest { - optional string table_name = 1; - string query = 2; - string profile_name = 3; - optional string column_name = 4; + string profile_name = 1; + optional string table_name = 2; + string free_query = 3; + repeated ColumnConstraint must = 4; + optional uint32 limit = 5; } message SearchResponse { message Hit { diff --git a/common/src/proto/descriptor.bin b/common/src/proto/descriptor.bin index b05201b..740da50 100644 Binary files a/common/src/proto/descriptor.bin and b/common/src/proto/descriptor.bin differ diff --git a/common/src/proto/komp_ac.search.rs b/common/src/proto/komp_ac.search.rs index 2aa6f69..f023d36 100644 --- a/common/src/proto/komp_ac.search.rs +++ b/common/src/proto/komp_ac.search.rs @@ -1,14 +1,25 @@ // This file is @generated by prost-build. #[derive(Clone, PartialEq, ::prost::Message)] -pub struct SearchRequest { - #[prost(string, optional, tag = "1")] - pub table_name: ::core::option::Option<::prost::alloc::string::String>, +pub struct ColumnConstraint { + #[prost(string, tag = "1")] + pub column: ::prost::alloc::string::String, #[prost(string, tag = "2")] pub query: ::prost::alloc::string::String, - #[prost(string, tag = "3")] + #[prost(enumeration = "MatchMode", tag = "3")] + pub mode: i32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SearchRequest { + #[prost(string, tag = "1")] pub profile_name: ::prost::alloc::string::String, - #[prost(string, optional, tag = "4")] - pub column_name: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "2")] + pub table_name: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, tag = "3")] + pub free_query: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "4")] + pub must: ::prost::alloc::vec::Vec, + #[prost(uint32, optional, tag = "5")] + pub limit: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SearchResponse { @@ -30,6 +41,35 @@ pub mod search_response { pub table_name: ::prost::alloc::string::String, } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum MatchMode { + Unspecified = 0, + Fuzzy = 1, + Exact = 2, +} +impl MatchMode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "MATCH_MODE_UNSPECIFIED", + Self::Fuzzy => "MATCH_MODE_FUZZY", + Self::Exact => "MATCH_MODE_EXACT", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "MATCH_MODE_UNSPECIFIED" => Some(Self::Unspecified), + "MATCH_MODE_FUZZY" => Some(Self::Fuzzy), + "MATCH_MODE_EXACT" => Some(Self::Exact), + _ => None, + } + } +} /// Generated client implementations. pub mod searcher_client { #![allow( @@ -121,7 +161,7 @@ pub mod searcher_client { self.inner = self.inner.max_encoding_message_size(limit); self } - pub async fn search_table( + pub async fn search( &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { @@ -135,32 +175,11 @@ pub mod searcher_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/komp_ac.search.Searcher/SearchTable", + "/komp_ac.search.Searcher/Search", ); let mut req = request.into_request(); req.extensions_mut() - .insert(GrpcMethod::new("komp_ac.search.Searcher", "SearchTable")); - self.inner.unary(req, path, codec).await - } - pub async fn exact_search_table( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/komp_ac.search.Searcher/ExactSearchTable", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("komp_ac.search.Searcher", "ExactSearchTable")); + .insert(GrpcMethod::new("komp_ac.search.Searcher", "Search")); self.inner.unary(req, path, codec).await } } @@ -178,11 +197,7 @@ pub mod searcher_server { /// Generated trait containing gRPC methods that should be implemented for use with SearcherServer. #[async_trait] pub trait Searcher: std::marker::Send + std::marker::Sync + 'static { - async fn search_table( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - async fn exact_search_table( + async fn search( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; @@ -263,11 +278,11 @@ pub mod searcher_server { } fn call(&mut self, req: http::Request) -> Self::Future { match req.uri().path() { - "/komp_ac.search.Searcher/SearchTable" => { + "/komp_ac.search.Searcher/Search" => { #[allow(non_camel_case_types)] - struct SearchTableSvc(pub Arc); + struct SearchSvc(pub Arc); impl tonic::server::UnaryService - for SearchTableSvc { + for SearchSvc { type Response = super::SearchResponse; type Future = BoxFuture< tonic::Response, @@ -279,7 +294,7 @@ pub mod searcher_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::search_table(&inner, request).await + ::search(&inner, request).await }; Box::pin(fut) } @@ -290,50 +305,7 @@ pub mod searcher_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = SearchTableSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/komp_ac.search.Searcher/ExactSearchTable" => { - #[allow(non_camel_case_types)] - struct ExactSearchTableSvc(pub Arc); - impl tonic::server::UnaryService - for ExactSearchTableSvc { - type Response = super::SearchResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::exact_search_table(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = ExactSearchTableSvc(inner); + let method = SearchSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( diff --git a/common/src/search.rs b/common/src/search.rs index 34cd127..b6f9123 100644 --- a/common/src/search.rs +++ b/common/src/search.rs @@ -1,10 +1,26 @@ -// common/src/search.rs - use std::path::{Path, PathBuf}; -use tantivy::schema::*; -use tantivy::tokenizer::*; + +use tantivy::schema::{ + Field, IndexRecordOption, JsonObjectOptions, Schema, TextFieldIndexing, Term, INDEXED, + STORED, STRING, +}; +use tantivy::tokenizer::{ + AsciiFoldingFilter, LowerCaser, NgramTokenizer, RawTokenizer, RemoveLongFilter, + SimpleTokenizer, TextAnalyzer, TokenStream, +}; use tantivy::Index; +pub const F_PG_ID: &str = "pg_id"; +pub const F_TABLE_NAME: &str = "table_name"; +pub const F_ROW_KEY: &str = "row_key"; +pub const F_DATA_WORD: &str = "data_word"; +pub const F_DATA_NGRAM: &str = "data_ngram"; +pub const F_DATA_EXACT: &str = "data_exact"; + +pub const TOK_WORD: &str = "kw_word"; +pub const TOK_NGRAM: &str = "kw_ngram"; +pub const TOK_EXACT: &str = "kw_exact"; + /// Returns the on-disk path for a profile search index. pub fn search_index_path(root: &Path, profile_name: &str) -> PathBuf { root.join(profile_name) @@ -15,117 +31,152 @@ pub fn search_row_key(table_name: &str, row_id: i64) -> String { format!("{}:{}", table_name, row_id) } -/// Normalizes user-entered search text while preserving letter case. -pub fn normalize_search_text(text: &str) -> String { - text.chars() - .map(|ch| match ch { - 'á' | 'à' | 'â' | 'ä' | 'ă' | 'ā' => 'a', - 'Á' | 'À' | 'Â' | 'Ä' | 'Ă' | 'Ā' => 'A', - 'é' | 'è' | 'ê' | 'ë' | 'ě' | 'ē' => 'e', - 'É' | 'È' | 'Ê' | 'Ë' | 'Ě' | 'Ē' => 'E', - 'í' | 'ì' | 'î' | 'ï' | 'ī' => 'i', - 'Í' | 'Ì' | 'Î' | 'Ï' | 'Ī' => 'I', - 'ó' | 'ò' | 'ô' | 'ö' | 'ō' | 'ő' => 'o', - 'Ó' | 'Ò' | 'Ô' | 'Ö' | 'Ō' | 'Ő' => 'O', - 'ú' | 'ù' | 'û' | 'ü' | 'ū' | 'ű' => 'u', - 'Ú' | 'Ù' | 'Û' | 'Ü' | 'Ū' | 'Ű' => 'U', - 'ý' | 'ỳ' | 'ŷ' | 'ÿ' => 'y', - 'Ý' | 'Ỳ' | 'Ŷ' | 'Ÿ' => 'Y', - 'č' => 'c', - 'Č' => 'C', - 'ď' => 'd', - 'Ď' => 'D', - 'ľ' => 'l', - 'Ľ' => 'L', - 'ň' => 'n', - 'Ň' => 'N', - 'ř' => 'r', - 'Ř' => 'R', - 'š' => 's', - 'Š' => 'S', - 'ť' => 't', - 'Ť' => 'T', - 'ž' => 'z', - 'Ž' => 'Z', - _ => ch, - }) - .collect() +/// Normalizes user-entered values for exact-mode terms. +pub fn normalize_exact(input: &str) -> String { + let trimmed = input.trim(); + if trimmed.is_empty() { + return String::new(); + } + + let mut analyzer = exact_analyzer(); + let mut stream = analyzer.token_stream(trimmed); + let mut out = String::with_capacity(trimmed.len()); + while let Some(token) = stream.next() { + out.push_str(&token.text); + } + out } -/// Normalizes an exact-match value so indexed data and user input use the same form. -pub fn normalize_exact_value(text: &str) -> String { - normalize_search_text(text).to_lowercase() +/// Normalizes a column name to the JSON-key form used at index time. +pub fn normalize_column_name(column: &str) -> String { + column.to_ascii_lowercase() } -/// Creates a hybrid Slovak search schema with optimized prefix fields. +/// Creates the column-aware search schema. pub fn create_search_schema() -> Schema { let mut schema_builder = Schema::builder(); - schema_builder.add_u64_field("pg_id", INDEXED | STORED); - schema_builder.add_text_field("table_name", STRING | STORED); - schema_builder.add_text_field("row_key", STRING | STORED); - schema_builder.add_text_field("column_exact", STRING); + schema_builder.add_u64_field(F_PG_ID, INDEXED | STORED); + schema_builder.add_text_field(F_TABLE_NAME, STRING | STORED); + schema_builder.add_text_field(F_ROW_KEY, STRING | STORED); - // For prefixes (1-4 chars). - let short_prefix_indexing = TextFieldIndexing::default() - .set_tokenizer("slovak_prefix_edge") - .set_index_option(IndexRecordOption::WithFreqsAndPositions); - let short_prefix_options = TextOptions::default() - .set_indexing_options(short_prefix_indexing) - .set_stored(); - schema_builder.add_text_field("prefix_edge", short_prefix_options); - - // For the full word. - let full_word_indexing = TextFieldIndexing::default() - .set_tokenizer("slovak_prefix_full") - .set_index_option(IndexRecordOption::WithFreqsAndPositions); - let full_word_options = TextOptions::default() - .set_indexing_options(full_word_indexing) - .set_stored(); - schema_builder.add_text_field("prefix_full", full_word_options); - - // NGRAM FIELD: For substring matching. - let ngram_field_indexing = TextFieldIndexing::default() - .set_tokenizer("slovak_ngram") - .set_index_option(IndexRecordOption::WithFreqsAndPositions); - let ngram_options = TextOptions::default() - .set_indexing_options(ngram_field_indexing) - .set_stored(); - schema_builder.add_text_field("text_ngram", ngram_options); + schema_builder.add_json_field(F_DATA_WORD, json_options(TOK_WORD, true, false)); + schema_builder.add_json_field(F_DATA_NGRAM, json_options(TOK_NGRAM, true, false)); + schema_builder.add_json_field(F_DATA_EXACT, json_options(TOK_EXACT, false, false)); schema_builder.build() } -/// Registers all necessary Slovak tokenizers with the index. -/// -/// This must be called by ANY process that opens the index -/// to ensure the tokenizers are loaded into memory. -pub fn register_slovak_tokenizers(index: &Index) -> tantivy::Result<()> { +fn json_options( + tokenizer_name: &str, + with_positions: bool, + stored: bool, +) -> JsonObjectOptions { + let index_option = if with_positions { + IndexRecordOption::WithFreqsAndPositions + } else { + IndexRecordOption::Basic + }; + + let indexing = TextFieldIndexing::default() + .set_tokenizer(tokenizer_name) + .set_index_option(index_option); + + let mut options = JsonObjectOptions::default().set_indexing_options(indexing); + if stored { + options = options.set_stored(); + } + options +} + +/// Registers all required tokenizers with the index. +pub fn register_tokenizers(index: &Index) -> tantivy::Result<()> { let tokenizer_manager = index.tokenizers(); - // TOKENIZER for `prefix_edge`: Edge N-gram (1-4 chars) - let edge_tokenizer = TextAnalyzer::builder(NgramTokenizer::new(1, 4, true)?) - .filter(RemoveLongFilter::limit(40)) - .filter(LowerCaser) - .filter(AsciiFoldingFilter) - .build(); - tokenizer_manager.register("slovak_prefix_edge", edge_tokenizer); - - // TOKENIZER for `prefix_full`: Simple word tokenizer - let full_tokenizer = TextAnalyzer::builder(SimpleTokenizer::default()) - .filter(RemoveLongFilter::limit(40)) - .filter(LowerCaser) - .filter(AsciiFoldingFilter) - .build(); - tokenizer_manager.register("slovak_prefix_full", full_tokenizer); - - // NGRAM TOKENIZER: For substring matching. - let ngram_tokenizer = TextAnalyzer::builder(NgramTokenizer::new(3, 3, false)?) - .filter(RemoveLongFilter::limit(40)) - .filter(LowerCaser) - .filter(AsciiFoldingFilter) - .build(); - tokenizer_manager.register("slovak_ngram", ngram_tokenizer); + tokenizer_manager.register(TOK_WORD, word_analyzer()); + tokenizer_manager.register(TOK_NGRAM, ngram_analyzer()?); + tokenizer_manager.register(TOK_EXACT, exact_analyzer()); Ok(()) } + +fn word_analyzer() -> TextAnalyzer { + TextAnalyzer::builder(SimpleTokenizer::default()) + .filter(RemoveLongFilter::limit(80)) + .filter(LowerCaser) + .filter(AsciiFoldingFilter) + .build() +} + +fn ngram_analyzer() -> tantivy::Result { + Ok(TextAnalyzer::builder(NgramTokenizer::new(3, 3, false)?) + .filter(RemoveLongFilter::limit(80)) + .filter(LowerCaser) + .filter(AsciiFoldingFilter) + .build()) +} + +fn exact_analyzer() -> TextAnalyzer { + TextAnalyzer::builder(RawTokenizer::default()) + .filter(LowerCaser) + .filter(AsciiFoldingFilter) + .build() +} + +/// Tokenizes text the same way `data_word` is indexed. +pub fn tokenize_word(text: &str) -> Vec { + tokenize_with(word_analyzer(), text) +} + +/// Tokenizes text the same way `data_ngram` is indexed. +pub fn tokenize_ngram(text: &str) -> Vec { + match ngram_analyzer() { + Ok(analyzer) => tokenize_with(analyzer, text), + Err(_) => Vec::new(), + } +} + +fn tokenize_with(mut analyzer: TextAnalyzer, text: &str) -> Vec { + let mut stream = analyzer.token_stream(text); + let mut out = Vec::new(); + while let Some(token) = stream.next() { + out.push(token.text.clone()); + } + out +} + +/// Builds a term scoped to a specific JSON path within a JSON field. +pub fn json_path_term(field: Field, column: &str, text: &str) -> Term { + let mut term = Term::from_field_json_path(field, column, false); + term.append_type_and_str(text); + term +} + +/// Returns all required schema fields or fails loudly on mismatch. +pub struct SchemaFields { + pub pg_id: Field, + pub table_name: Field, + pub row_key: Field, + pub data_word: Field, + pub data_ngram: Field, + pub data_exact: Field, +} + +impl SchemaFields { + pub fn from(schema: &Schema) -> tantivy::Result { + Ok(Self { + pg_id: get_field(schema, F_PG_ID)?, + table_name: get_field(schema, F_TABLE_NAME)?, + row_key: get_field(schema, F_ROW_KEY)?, + data_word: get_field(schema, F_DATA_WORD)?, + data_ngram: get_field(schema, F_DATA_NGRAM)?, + data_exact: get_field(schema, F_DATA_EXACT)?, + }) + } +} + +fn get_field(schema: &Schema, name: &str) -> tantivy::Result { + schema.get_field(name).map_err(|e| { + tantivy::TantivyError::SchemaError(format!("schema is missing field '{name}': {e}")) + }) +} diff --git a/search/src/lib.rs b/search/src/lib.rs index 83e8c57..be0654f 100644 --- a/search/src/lib.rs +++ b/search/src/lib.rs @@ -1,48 +1,186 @@ -use std::collections::HashMap; -use std::path::{Path, PathBuf}; +mod query_builder; -use tantivy::collector::TopDocs; -use tantivy::query::{ - BooleanQuery, BoostQuery, FuzzyTermQuery, Occur, Query, QueryParser, TermQuery, -}; -use tantivy::schema::{IndexRecordOption, Value}; -use tantivy::{Index, TantivyDocument, Term}; -use tonic::{Request, Response, Status}; +use std::collections::HashMap; +use std::path::Path; +use std::sync::{Arc, Mutex}; use common::proto::komp_ac::search::searcher_server::Searcher; use common::proto::komp_ac::search::{search_response::Hit, SearchRequest, SearchResponse}; pub use common::proto::komp_ac::search::searcher_server::SearcherServer; -use common::search::{ - normalize_exact_value, normalize_search_text, register_slovak_tokenizers, search_index_path, -}; +use common::search::{register_tokenizers, search_index_path, SchemaFields}; +use query_builder::{build_master_query, ConstraintMode, SearchConstraint}; use sqlx::{PgPool, Row}; +use tantivy::collector::TopDocs; +use tantivy::schema::Value; +use tantivy::{Index, IndexReader, ReloadPolicy, TantivyDocument}; +use tonic::{Request, Response, Status}; use tracing::info; const INDEX_ROOT: &str = "./tantivy_indexes"; -const DEFAULT_RESULT_LIMIT: usize = 5; -const SEARCH_RESULT_LIMIT: usize = 100; - -#[derive(Clone, Copy)] -enum SearchMode { - Fuzzy, - Exact, -} +const DEFAULT_RESULT_LIMIT: usize = 25; +const HARD_RESULT_LIMIT: usize = 200; +const DEFAULT_LIST_LIMIT: usize = 5; pub struct SearcherService { pub pool: PgPool, + profiles: Mutex>>, } -struct SearchScope { +impl SearcherService { + pub fn new(pool: PgPool) -> Self { + Self { + pool, + profiles: Mutex::new(HashMap::new()), + } + } + + async fn run_rpc(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + let normalized = normalize_request(req)?; + + if !profile_exists(&self.pool, &normalized.profile_name).await? { + return Err(Status::not_found(format!( + "Profile '{}' was not found", + normalized.profile_name + ))); + } + + if let Some(table_name) = normalized.table_name.as_deref() { + if !table_exists(&self.pool, &normalized.profile_name, table_name).await? { + return Err(Status::not_found(format!( + "Table '{}' was not found in profile '{}'", + table_name, normalized.profile_name + ))); + } + } + + if !normalized.has_input() { + let Some(table_name) = normalized.table_name.as_deref() else { + return Err(Status::invalid_argument( + "table_name is required when query is empty", + )); + }; + + let hits = fetch_latest_rows( + &self.pool, + &normalized.profile_name, + table_name, + normalized.limit.unwrap_or(DEFAULT_LIST_LIMIT), + ) + .await?; + return Ok(Response::new(SearchResponse { hits })); + } + + let index_path = search_index_path(Path::new(INDEX_ROOT), &normalized.profile_name); + if !index_path.exists() { + return Err(Status::not_found(format!( + "No search index found for profile '{}'", + normalized.profile_name + ))); + } + + let profile = profile_index(&self.profiles, &normalized.profile_name, &index_path)?; + let mut hits = run_search( + &self.pool, + &profile, + &normalized.profile_name, + normalized.table_name.as_deref(), + &normalized.free_query, + &normalized.must, + normalized.limit.unwrap_or(DEFAULT_RESULT_LIMIT), + ) + .await?; + + hits.sort_by(|left, right| right.score.total_cmp(&left.score)); + if let Some(limit) = normalized.limit { + if hits.len() > limit { + hits.truncate(limit); + } + } + + info!( + "search: profile={} table={:?} free='{}' constraints={} hits={}", + normalized.profile_name, + normalized.table_name, + normalized.free_query, + normalized.must.len(), + hits.len() + ); + + Ok(Response::new(SearchResponse { hits })) + } +} + +struct ProfileIndex { + index: Index, + reader: IndexReader, + fields: SchemaFields, +} + +impl ProfileIndex { + fn open(path: &Path) -> Result { + let index = Index::open_in_dir(path) + .map_err(|e| Status::internal(format!("Failed to open index: {}", e)))?; + register_tokenizers(&index) + .map_err(|e| Status::internal(format!("Failed to register tokenizers: {}", e)))?; + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommitWithDelay) + .try_into() + .map_err(|e| Status::internal(format!("Failed to build index reader: {}", e)))?; + let fields = SchemaFields::from(&index.schema()).map_err(|e| { + Status::internal(format!( + "Search index schema mismatch. Reindex required: {}", + e + )) + })?; + + Ok(Self { + index, + reader, + fields, + }) + } +} + +#[derive(Debug)] +struct NormalizedSearchRequest { profile_name: String, - requested_table: Option, - requested_column: Option, - index_path: PathBuf, + table_name: Option, + free_query: String, + must: Vec, + limit: Option, } -struct SearchCandidate { - score: f32, - pg_id: i64, - table_name: String, +impl NormalizedSearchRequest { + fn has_input(&self) -> bool { + !self.free_query.is_empty() || !self.must.is_empty() + } +} + +fn profile_index( + cache: &Mutex>>, + profile_name: &str, + path: &Path, +) -> Result, Status> { + { + let cache_guard = cache + .lock() + .map_err(|_| Status::internal("Profile index cache lock poisoned"))?; + if let Some(index) = cache_guard.get(profile_name) { + return Ok(index.clone()); + } + } + + let opened = Arc::new(ProfileIndex::open(path)?); + let mut cache_guard = cache + .lock() + .map_err(|_| Status::internal("Profile index cache lock poisoned"))?; + if let Some(index) = cache_guard.get(profile_name) { + return Ok(index.clone()); + } + cache_guard.insert(profile_name.to_string(), opened.clone()); + Ok(opened) } fn validate_identifier(value: &str, field_name: &str) -> Result<(), Status> { @@ -78,210 +216,97 @@ async fn profile_exists(pool: &PgPool, profile_name: &str) -> Result, - requested_column: Option<&str>, -) -> Result { - validate_identifier(profile_name, "profile_name")?; - - if !profile_exists(pool, profile_name).await? { - return Err(Status::not_found(format!( - "Profile '{}' was not found", - profile_name - ))); - } - - let requested_table = if let Some(table_name) = - requested_table.filter(|value| !value.trim().is_empty()) - { - validate_identifier(table_name, "table_name")?; - - let row = sqlx::query_scalar::<_, String>( - r#" - SELECT td.table_name +async fn table_exists(pool: &PgPool, profile_name: &str, table_name: &str) -> Result { + let exists = sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS( + SELECT 1 FROM table_definitions td JOIN schemas s ON td.schema_id = s.id WHERE s.name = $1 AND td.table_name = $2 - "#, ) - .bind(profile_name) - .bind(table_name) - .fetch_optional(pool) - .await - .map_err(|e| Status::internal(format!("Table lookup failed: {}", e)))?; + "#, + ) + .bind(profile_name) + .bind(table_name) + .fetch_one(pool) + .await + .map_err(|e| Status::internal(format!("Table lookup failed: {}", e)))?; + Ok(exists) +} - Some(row.ok_or_else(|| { - Status::not_found(format!( - "Table '{}' was not found in profile '{}'", - table_name, profile_name - )) - })?) - } else { - None +fn normalize_request(req: SearchRequest) -> Result { + let profile_name = req.profile_name.trim(); + if profile_name.is_empty() { + return Err(Status::invalid_argument("profile_name is required")); + } + validate_identifier(profile_name, "profile_name")?; + + let table_name = match req.table_name.as_deref().map(str::trim) { + Some(table_name) if !table_name.is_empty() => { + validate_identifier(table_name, "table_name")?; + Some(table_name.to_string()) + } + _ => None, }; - let requested_column = if let Some(column_name) = - requested_column.filter(|value| !value.trim().is_empty()) - { - validate_identifier(column_name, "column_name")?; - Some(column_name.to_string()) - } else { - None - }; + let free_query = req.free_query.trim().to_string(); + let mut must = Vec::new(); - Ok(SearchScope { + for constraint in req.must { + let column = constraint.column.trim(); + if column.is_empty() { + return Err(Status::invalid_argument( + "constraint.column must not be empty", + )); + } + validate_identifier(column, "constraint.column")?; + + let query = constraint.query.trim(); + if query.is_empty() { + return Err(Status::invalid_argument( + "constraint.query must not be empty", + )); + } + + must.push(SearchConstraint { + column: column.to_string(), + query: query.to_string(), + mode: constraint_mode_from_proto(constraint.mode), + }); + } + + let limit = req.limit.map(|value| (value as usize).min(HARD_RESULT_LIMIT)); + + Ok(NormalizedSearchRequest { profile_name: profile_name.to_string(), - requested_table, - requested_column, - index_path: search_index_path(Path::new(INDEX_ROOT), profile_name), + table_name, + free_query, + must, + limit, }) } -// Query building -fn build_query( - index: &Index, - normalized_query: &str, - mode: SearchMode, - table_filter: Option<&str>, - column_filter: Option<&str>, -) -> Result, Status> { - let schema = index.schema(); - let table_name_field = schema - .get_field("table_name") - .map_err(|_| Status::internal("Schema is missing the 'table_name' field."))?; - let column_exact_field = schema - .get_field("column_exact") - .map_err(|_| Status::internal("Schema is missing the 'column_exact' field."))?; - let prefix_edge_field = schema - .get_field("prefix_edge") - .map_err(|_| Status::internal("Schema is missing the 'prefix_edge' field."))?; - let prefix_full_field = schema - .get_field("prefix_full") - .map_err(|_| Status::internal("Schema is missing the 'prefix_full' field."))?; - let text_ngram_field = schema - .get_field("text_ngram") - .map_err(|_| Status::internal("Schema is missing the 'text_ngram' field."))?; - - let words: Vec<&str> = normalized_query.split_whitespace().collect(); - if words.is_empty() { - return Ok(None); +fn constraint_mode_from_proto(raw_mode: i32) -> ConstraintMode { + match raw_mode { + 2 => ConstraintMode::Exact, + _ => ConstraintMode::Fuzzy, } - - let content_query: Box = if matches!(mode, SearchMode::Exact) { - if let Some(column_name) = column_filter { - let exact_term = Term::from_field_text( - column_exact_field, - &format!( - "{}:{}", - column_name.to_ascii_lowercase(), - normalize_exact_value(normalized_query) - ), - ); - Box::new(TermQuery::new(exact_term, IndexRecordOption::Basic)) - } else { - let exact_parser = QueryParser::for_index(index, vec![prefix_full_field]); - let exact_query_str = if words.len() == 1 { - normalized_query.to_string() - } else { - format!("\"{}\"", normalized_query) - }; - - let exact_query = exact_parser - .parse_query(&exact_query_str) - .map_err(|e| Status::internal(format!("Failed to build exact query: {}", e)))?; - Box::new(exact_query) - } - } else { - let mut query_layers: Vec<(Occur, Box)> = Vec::new(); - - // Layer 1: prefix - { - let mut must_clauses: Vec<(Occur, Box)> = Vec::new(); - for word in &words { - let edge_term = Term::from_field_text(prefix_edge_field, word); - let full_term = Term::from_field_text(prefix_full_field, word); - - let per_word_query = BooleanQuery::new(vec![ - ( - Occur::Should, - Box::new(TermQuery::new(edge_term, IndexRecordOption::Basic)), - ), - ( - Occur::Should, - Box::new(TermQuery::new(full_term, IndexRecordOption::Basic)), - ), - ]); - must_clauses.push((Occur::Must, Box::new(per_word_query))); - } - - if !must_clauses.is_empty() { - let prefix_query = BooleanQuery::new(must_clauses); - let boosted_query = BoostQuery::new(Box::new(prefix_query), 4.0); - query_layers.push((Occur::Should, Box::new(boosted_query))); - } - } - - // Layer 2: fuzzy - { - let last_word = words - .last() - .ok_or_else(|| Status::internal("Query normalization lost all tokens"))?; - let fuzzy_term = Term::from_field_text(prefix_full_field, last_word); - let fuzzy_query = FuzzyTermQuery::new(fuzzy_term, 2, true); - let boosted_query = BoostQuery::new(Box::new(fuzzy_query), 3.0); - query_layers.push((Occur::Should, Box::new(boosted_query))); - } - - // Layer 3: phrase - if words.len() > 1 { - let slop_parser = QueryParser::for_index(index, vec![prefix_full_field]); - let slop_query_str = format!("\"{}\"~3", normalized_query); - if let Ok(slop_query) = slop_parser.parse_query(&slop_query_str) { - let boosted_query = BoostQuery::new(slop_query, 2.0); - query_layers.push((Occur::Should, Box::new(boosted_query))); - } - } - - // Layer 4: ngram - { - let ngram_parser = QueryParser::for_index(index, vec![text_ngram_field]); - if let Ok(ngram_query) = ngram_parser.parse_query(normalized_query) { - let boosted_query = BoostQuery::new(ngram_query, 1.0); - query_layers.push((Occur::Should, Box::new(boosted_query))); - } - } - - Box::new(BooleanQuery::new(query_layers)) - }; - - let mut clauses: Vec<(Occur, Box)> = vec![(Occur::Must, content_query)]; - if let Some(table_name) = table_filter { - let table_term = Term::from_field_text(table_name_field, table_name); - clauses.push(( - Occur::Must, - Box::new(TermQuery::new(table_term, IndexRecordOption::Basic)), - )); - } - - Ok(Some(BooleanQuery::new(clauses))) } -// Empty query -async fn fetch_default_hits( +async fn fetch_latest_rows( pool: &PgPool, profile_name: &str, table_name: &str, + limit: usize, ) -> Result, Status> { let sql = format!( - "SELECT id, to_jsonb(t) AS data FROM {} t WHERE deleted = FALSE ORDER BY id DESC LIMIT {}", - qualify_profile_table(profile_name, table_name), - DEFAULT_RESULT_LIMIT + "SELECT id, to_jsonb(t) AS data FROM {} t WHERE deleted = FALSE ORDER BY id DESC LIMIT $1", + qualify_profile_table(profile_name, table_name) ); let rows = sqlx::query(&sql) + .bind(limit as i64) .fetch_all(pool) .await .map_err(|e| Status::internal(format!("DB query for default results failed: {}", e)))?; @@ -301,74 +326,45 @@ async fn fetch_default_hits( .collect()) } -// Search + hydrate -async fn search_profile( +async fn run_search( pool: &PgPool, - scope: &SearchScope, - query_str: &str, - mode: SearchMode, + profile: &ProfileIndex, + profile_name: &str, + table_filter: Option<&str>, + free_query: &str, + must: &[SearchConstraint], + limit: usize, ) -> Result, Status> { - if !scope.index_path.exists() { - return Ok(vec![]); - } - - let index = Index::open_in_dir(&scope.index_path) - .map_err(|e| Status::internal(format!("Failed to open index: {}", e)))?; - register_slovak_tokenizers(&index) - .map_err(|e| Status::internal(format!("Failed to register Slovak tokenizers: {}", e)))?; - - let Some(master_query) = build_query( - &index, - &normalize_search_text(query_str), - mode, - scope.requested_table.as_deref(), - scope.requested_column.as_deref(), - )? else { - return Ok(vec![]); - }; - - let reader = index - .reader() - .map_err(|e| Status::internal(format!("Failed to create index reader: {}", e)))?; - let searcher = reader.searcher(); - let schema = index.schema(); - let pg_id_field = schema - .get_field("pg_id") - .map_err(|_| Status::internal("Schema is missing the 'pg_id' field."))?; - let table_name_field = schema - .get_field("table_name") - .map_err(|_| Status::internal("Schema is missing the 'table_name' field."))?; + let master_query = + build_master_query(&profile.index, &profile.fields, free_query, must, table_filter)?; + let searcher = profile.reader.searcher(); let top_docs = searcher - .search(&master_query, &TopDocs::with_limit(SEARCH_RESULT_LIMIT)) + .search(&*master_query, &TopDocs::with_limit(limit)) .map_err(|e| Status::internal(format!("Search failed: {}", e)))?; if top_docs.is_empty() { return Ok(vec![]); } - let mut candidates: Vec = Vec::new(); + let mut candidates: Vec<(f32, i64, String)> = Vec::with_capacity(top_docs.len()); for (score, doc_address) in top_docs { let doc: TantivyDocument = searcher .doc(doc_address) .map_err(|e| Status::internal(format!("Failed to retrieve document: {}", e)))?; - let Some(pg_id_value) = doc.get_first(pg_id_field) else { + let Some(pg_id) = doc + .get_first(profile.fields.pg_id) + .and_then(|value| value.as_u64()) + else { continue; }; - let Some(table_name_value) = doc.get_first(table_name_field) else { + let Some(table_name) = doc + .get_first(profile.fields.table_name) + .and_then(|value| value.as_str()) + else { continue; }; - let Some(pg_id) = pg_id_value.as_u64() else { - continue; - }; - let Some(table_name) = table_name_value.as_str() else { - continue; - }; - candidates.push(SearchCandidate { - score, - pg_id: pg_id as i64, - table_name: table_name.to_string(), - }); + candidates.push((score, pg_id as i64, table_name.to_string())); } if candidates.is_empty() { @@ -376,11 +372,11 @@ async fn search_profile( } let mut ids_by_table: HashMap> = HashMap::new(); - for candidate in &candidates { + for (_, pg_id, table_name) in &candidates { ids_by_table - .entry(candidate.table_name.clone()) + .entry(table_name.clone()) .or_default() - .push(candidate.pg_id); + .push(*pg_id); } let mut content_map: HashMap<(String, i64), String> = HashMap::new(); @@ -388,7 +384,7 @@ async fn search_profile( validate_identifier(&table_name, "table_name")?; let sql = format!( "SELECT id, to_jsonb(t) AS data FROM {} t WHERE deleted = FALSE AND id = ANY($1)", - qualify_profile_table(&scope.profile_name, &table_name) + qualify_profile_table(profile_name, &table_name) ); let rows = sqlx::query(&sql) @@ -406,14 +402,14 @@ async fn search_profile( Ok(candidates .into_iter() - .filter_map(|candidate| { + .filter_map(|(score, pg_id, table_name)| { content_map - .get(&(candidate.table_name.clone(), candidate.pg_id)) + .get(&(table_name.clone(), pg_id)) .map(|content_json| Hit { - id: candidate.pg_id, - score: candidate.score, + id: pg_id, + score, content_json: content_json.clone(), - table_name: candidate.table_name, + table_name, }) }) .collect()) @@ -421,86 +417,10 @@ async fn search_profile( #[tonic::async_trait] impl Searcher for SearcherService { - async fn search_table( + async fn search( &self, request: Request, ) -> Result, Status> { - self.run_search(request, SearchMode::Fuzzy).await - } - - async fn exact_search_table( - &self, - request: Request, - ) -> Result, Status> { - self.run_search(request, SearchMode::Exact).await - } -} - -impl SearcherService { - async fn run_search( - &self, - request: Request, - mode: SearchMode, - ) -> Result, Status> { - let req = request.into_inner(); - let profile_name = req.profile_name.trim(); - if profile_name.is_empty() { - return Err(Status::invalid_argument("profile_name is required")); - } - - // Request scope - let scope = - resolve_search_scope( - &self.pool, - profile_name, - req.table_name.as_deref().map(str::trim), - req.column_name.as_deref().map(str::trim), - ) - .await?; - - let query = req.query.trim(); - if query.is_empty() { - // Empty query - let Some(table_name) = scope.requested_table.as_deref() else { - return Err(Status::invalid_argument( - "table_name is required when query is empty", - )); - }; - - let hits = fetch_default_hits(&self.pool, &scope.profile_name, table_name).await?; - info!( - "Empty query for profile '{}' table '{}'. Returning {} default hits.", - scope.profile_name, - table_name, - hits.len() - ); - return Ok(Response::new(SearchResponse { hits })); - } - - if scope.requested_table.is_some() && !scope.index_path.exists() { - return Err(Status::not_found(format!( - "No search index found for profile '{}'", - scope.profile_name - ))); - } - - let mut hits = search_profile(&self.pool, &scope, query, mode).await?; - hits.sort_by(|left, right| right.score.total_cmp(&left.score)); - if hits.len() > SEARCH_RESULT_LIMIT { - hits.truncate(SEARCH_RESULT_LIMIT); - } - - info!( - "Processed {} search for profile '{}' (table scope: {}). Returning {} hits.", - match mode { - SearchMode::Fuzzy => "fuzzy", - SearchMode::Exact => "exact", - }, - scope.profile_name, - scope.requested_table.as_deref().unwrap_or("*"), - hits.len() - ); - - Ok(Response::new(SearchResponse { hits })) + self.run_rpc(request).await } } diff --git a/search/src/query_builder.rs b/search/src/query_builder.rs new file mode 100644 index 0000000..1f57bfe --- /dev/null +++ b/search/src/query_builder.rs @@ -0,0 +1,234 @@ +use common::search::{ + json_path_term, normalize_exact, tokenize_ngram, tokenize_word, SchemaFields, +}; +use tantivy::query::{ + BooleanQuery, BoostQuery, EmptyQuery, FuzzyTermQuery, Occur, PhraseQuery, Query, QueryParser, + TermQuery, +}; +use tantivy::schema::{IndexRecordOption, Term}; +use tantivy::Index; +use tonic::Status; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ConstraintMode { + Fuzzy, + Exact, +} + +#[derive(Clone, Debug)] +pub struct SearchConstraint { + pub column: String, + pub query: String, + pub mode: ConstraintMode, +} + +pub fn build_master_query( + index: &Index, + fields: &SchemaFields, + free_query: &str, + must: &[SearchConstraint], + table_filter: Option<&str>, +) -> Result, Status> { + let mut clauses: Vec<(Occur, Box)> = Vec::new(); + let mut has_search_clause = false; + + for constraint in must { + let predicate = match constraint.mode { + ConstraintMode::Exact => exact_predicate(fields, &constraint.column, &constraint.query)?, + ConstraintMode::Fuzzy => { + fuzzy_predicate_scoped(fields, &constraint.column, &constraint.query)? + } + }; + clauses.push((Occur::Must, predicate)); + has_search_clause = true; + } + + let free_words = tokenize_word(free_query); + if !free_words.is_empty() { + let predicate = fuzzy_predicate_unscoped(index, fields, &free_words)?; + clauses.push((Occur::Should, predicate)); + has_search_clause = true; + } + + if let Some(table_name) = table_filter { + let term = Term::from_field_text(fields.table_name, table_name); + clauses.push(( + Occur::Must, + Box::new(TermQuery::new(term, IndexRecordOption::Basic)), + )); + } + + if !has_search_clause { + return Ok(Box::new(EmptyQuery)); + } + + Ok(Box::new(BooleanQuery::new(clauses))) +} + +fn exact_predicate( + fields: &SchemaFields, + column: &str, + query: &str, +) -> Result, Status> { + let normalized_value = normalize_exact(query); + if normalized_value.is_empty() { + return Err(Status::invalid_argument( + "exact query is empty after normalization", + )); + } + + let term = json_path_term(fields.data_exact, column, &normalized_value); + Ok(Box::new(TermQuery::new(term, IndexRecordOption::Basic))) +} + +fn fuzzy_predicate_scoped( + fields: &SchemaFields, + column: &str, + query: &str, +) -> Result, Status> { + let words = tokenize_word(query); + if words.is_empty() { + return Err(Status::invalid_argument( + "fuzzy query has no searchable tokens", + )); + } + + let mut layers: Vec<(Occur, Box)> = Vec::new(); + + let mut per_word_clauses: Vec<(Occur, Box)> = Vec::new(); + for word in &words { + let term = json_path_term(fields.data_word, column, word); + let mut alternates: Vec<(Occur, Box)> = Vec::new(); + + alternates.push(( + Occur::Should, + Box::new(BoostQuery::new( + Box::new(TermQuery::new(term.clone(), IndexRecordOption::WithFreqs)), + 4.0, + )), + )); + + alternates.push(( + Occur::Should, + Box::new(BoostQuery::new( + Box::new(FuzzyTermQuery::new_prefix(term.clone(), 0, false)), + 3.0, + )), + )); + + if let Some(distance) = fuzzy_distance(word.chars().count()) { + alternates.push(( + Occur::Should, + Box::new(BoostQuery::new( + Box::new(FuzzyTermQuery::new(term.clone(), distance, true)), + 2.0, + )), + )); + } + + per_word_clauses.push((Occur::Must, Box::new(BooleanQuery::new(alternates)))); + } + layers.push((Occur::Should, Box::new(BooleanQuery::new(per_word_clauses)))); + + if words.len() > 1 { + let phrase_terms: Vec<(usize, Term)> = words + .iter() + .enumerate() + .map(|(offset, word)| (offset, json_path_term(fields.data_word, column, word))) + .collect(); + let phrase = PhraseQuery::new_with_offset_and_slop(phrase_terms, 3); + layers.push(( + Occur::Should, + Box::new(BoostQuery::new(Box::new(phrase), 2.0)), + )); + } + + let ngrams = tokenize_ngram(query); + if !ngrams.is_empty() { + let ngram_clauses: Vec<(Occur, Box)> = ngrams + .into_iter() + .map(|gram| { + let term = json_path_term(fields.data_ngram, column, &gram); + ( + Occur::Must, + Box::new(TermQuery::new(term, IndexRecordOption::Basic)) as Box, + ) + }) + .collect(); + layers.push(( + Occur::Should, + Box::new(BoostQuery::new(Box::new(BooleanQuery::new(ngram_clauses)), 1.0)), + )); + } + + Ok(Box::new(BooleanQuery::new(layers))) +} + +fn fuzzy_predicate_unscoped( + index: &Index, + fields: &SchemaFields, + words: &[String], +) -> Result, Status> { + let mut layers: Vec<(Occur, Box)> = Vec::new(); + + { + let parser = QueryParser::for_index(index, vec![fields.data_word]); + let query_string = words + .iter() + .map(|word| format!("+{}*", word)) + .collect::>() + .join(" "); + if let Ok(query) = parser.parse_query(&query_string) { + layers.push((Occur::Should, Box::new(BoostQuery::new(query, 4.0)))); + } + } + + { + let parser = QueryParser::for_index(index, vec![fields.data_word]); + let query_string = words + .iter() + .map(|word| match fuzzy_distance(word.chars().count()) { + Some(distance) => format!("+{}~{}", word, distance), + None => format!("+{}", word), + }) + .collect::>() + .join(" "); + if let Ok(query) = parser.parse_query(&query_string) { + layers.push((Occur::Should, Box::new(BoostQuery::new(query, 2.0)))); + } + } + + if words.len() > 1 { + let parser = QueryParser::for_index(index, vec![fields.data_word]); + let query_string = format!("\"{}\"~3", words.join(" ")); + if let Ok(query) = parser.parse_query(&query_string) { + layers.push((Occur::Should, Box::new(BoostQuery::new(query, 2.0)))); + } + } + + { + let parser = QueryParser::for_index(index, vec![fields.data_ngram]); + let query_string = words + .iter() + .map(|word| format!("+{}", word)) + .collect::>() + .join(" "); + if let Ok(query) = parser.parse_query(&query_string) { + layers.push((Occur::Should, Box::new(BoostQuery::new(query, 1.0)))); + } + } + + if layers.is_empty() { + return Ok(Box::new(EmptyQuery)); + } + + Ok(Box::new(BooleanQuery::new(layers))) +} + +fn fuzzy_distance(word_len: usize) -> Option { + match word_len { + 0..=3 => None, + 4..=6 => Some(1), + _ => Some(2), + } +} diff --git a/server b/server index 16ea6e1..82df1de 160000 --- a/server +++ b/server @@ -1 +1 @@ -Subproject commit 16ea6e14b5ccf3b98862ee35dfa04f8661b3e11f +Subproject commit 82df1dea822ddf18fd3724b2d1cc413fd7fd0bde