indexing done by the profile and not table

This commit is contained in:
Priec
2026-04-29 01:08:59 +02:00
parent 1ceab57f3b
commit 036e12f345
2 changed files with 204 additions and 165 deletions

View File

@@ -5,9 +5,14 @@ use tantivy::schema::*;
use tantivy::tokenizer::*; use tantivy::tokenizer::*;
use tantivy::Index; use tantivy::Index;
/// Returns the on-disk path for a profile/table search index. /// Returns the on-disk path for a profile search index.
pub fn search_index_path(root: &Path, profile_name: &str, table_name: &str) -> PathBuf { pub fn search_index_path(root: &Path, profile_name: &str) -> PathBuf {
root.join(profile_name).join(table_name) root.join(profile_name)
}
/// Returns the unique index key for one table row inside a profile index.
pub fn search_row_key(table_name: &str, row_id: i64) -> String {
format!("{}:{}", table_name, row_id)
} }
/// Creates a hybrid Slovak search schema with optimized prefix fields. /// Creates a hybrid Slovak search schema with optimized prefix fields.
@@ -15,6 +20,8 @@ pub fn create_search_schema() -> Schema {
let mut schema_builder = Schema::builder(); let mut schema_builder = Schema::builder();
schema_builder.add_u64_field("pg_id", INDEXED | STORED); schema_builder.add_u64_field("pg_id", INDEXED | STORED);
schema_builder.add_text_field("table_name", STRING | STORED);
schema_builder.add_text_field("row_key", STRING | STORED);
// For prefixes (1-4 chars). // For prefixes (1-4 chars).
let short_prefix_indexing = TextFieldIndexing::default() let short_prefix_indexing = TextFieldIndexing::default()

View File

@@ -30,12 +30,18 @@ pub struct SearcherService {
pub pool: PgPool, pub pool: PgPool,
} }
struct SearchTarget { struct SearchScope {
table_name: String, profile_name: String,
qualified_table: String, requested_table: Option<String>,
index_path: PathBuf, index_path: PathBuf,
} }
struct SearchCandidate {
score: f32,
pg_id: i64,
table_name: String,
}
fn normalize_slovak_text(text: &str) -> String { fn normalize_slovak_text(text: &str) -> String {
text.chars() text.chars()
.map(|c| match c { .map(|c| match c {
@@ -96,7 +102,8 @@ fn qualify_profile_table(profile_name: &str, table_name: &str) -> String {
} }
async fn profile_exists(pool: &PgPool, profile_name: &str) -> Result<bool, Status> { async fn profile_exists(pool: &PgPool, profile_name: &str) -> Result<bool, Status> {
let exists = sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM schemas WHERE name = $1)") let exists =
sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM schemas WHERE name = $1)")
.bind(profile_name) .bind(profile_name)
.fetch_one(pool) .fetch_one(pool)
.await .await
@@ -105,11 +112,11 @@ async fn profile_exists(pool: &PgPool, profile_name: &str) -> Result<bool, Statu
} }
// Scope resolution // Scope resolution
async fn resolve_search_targets( async fn resolve_search_scope(
pool: &PgPool, pool: &PgPool,
profile_name: &str, profile_name: &str,
requested_table: Option<&str>, requested_table: Option<&str>,
) -> Result<Vec<SearchTarget>, Status> { ) -> Result<SearchScope, Status> {
validate_identifier(profile_name, "profile_name")?; validate_identifier(profile_name, "profile_name")?;
if !profile_exists(pool, profile_name).await? { if !profile_exists(pool, profile_name).await? {
@@ -119,7 +126,9 @@ async fn resolve_search_targets(
))); )));
} }
let tables = if let Some(table_name) = requested_table.filter(|value| !value.trim().is_empty()) { let requested_table = if let Some(table_name) =
requested_table.filter(|value| !value.trim().is_empty())
{
validate_identifier(table_name, "table_name")?; validate_identifier(table_name, "table_name")?;
let row = sqlx::query_scalar::<_, String>( let row = sqlx::query_scalar::<_, String>(
@@ -136,38 +145,21 @@ async fn resolve_search_targets(
.await .await
.map_err(|e| Status::internal(format!("Table lookup failed: {}", e)))?; .map_err(|e| Status::internal(format!("Table lookup failed: {}", e)))?;
let table_name = row.ok_or_else(|| { Some(row.ok_or_else(|| {
Status::not_found(format!( Status::not_found(format!(
"Table '{}' was not found in profile '{}'", "Table '{}' was not found in profile '{}'",
table_name, profile_name table_name, profile_name
)) ))
})?; })?)
vec![table_name]
} else { } else {
sqlx::query_scalar::<_, String>( None
r#"
SELECT td.table_name
FROM table_definitions td
JOIN schemas s ON td.schema_id = s.id
WHERE s.name = $1
ORDER BY td.table_name
"#,
)
.bind(profile_name)
.fetch_all(pool)
.await
.map_err(|e| Status::internal(format!("Profile table lookup failed: {}", e)))?
}; };
Ok(tables Ok(SearchScope {
.into_iter() profile_name: profile_name.to_string(),
.map(|table_name| SearchTarget { requested_table,
qualified_table: qualify_profile_table(profile_name, &table_name), index_path: search_index_path(Path::new(INDEX_ROOT), profile_name),
index_path: search_index_path(Path::new(INDEX_ROOT), profile_name, &table_name),
table_name,
}) })
.collect())
} }
// Query building // Query building
@@ -175,8 +167,12 @@ fn build_query(
index: &Index, index: &Index,
normalized_query: &str, normalized_query: &str,
mode: SearchMode, mode: SearchMode,
table_filter: Option<&str>,
) -> Result<Option<BooleanQuery>, Status> { ) -> Result<Option<BooleanQuery>, Status> {
let schema = index.schema(); let schema = index.schema();
let table_name_field = schema
.get_field("table_name")
.map_err(|_| Status::internal("Schema is missing the 'table_name' field."))?;
let prefix_edge_field = schema let prefix_edge_field = schema
.get_field("prefix_edge") .get_field("prefix_edge")
.map_err(|_| Status::internal("Schema is missing the 'prefix_edge' field."))?; .map_err(|_| Status::internal("Schema is missing the 'prefix_edge' field."))?;
@@ -192,7 +188,7 @@ fn build_query(
return Ok(None); return Ok(None);
} }
if matches!(mode, SearchMode::Exact) { let content_query: Box<dyn Query> = if matches!(mode, SearchMode::Exact) {
let exact_parser = QueryParser::for_index(index, vec![prefix_full_field]); let exact_parser = QueryParser::for_index(index, vec![prefix_full_field]);
let exact_query_str = if words.len() == 1 { let exact_query_str = if words.len() == 1 {
normalized_query.to_string() normalized_query.to_string()
@@ -203,13 +199,8 @@ fn build_query(
let exact_query = exact_parser let exact_query = exact_parser
.parse_query(&exact_query_str) .parse_query(&exact_query_str)
.map_err(|e| Status::internal(format!("Failed to build exact query: {}", e)))?; .map_err(|e| Status::internal(format!("Failed to build exact query: {}", e)))?;
Box::new(exact_query)
return Ok(Some(BooleanQuery::new(vec![( } else {
Occur::Must,
Box::new(exact_query),
)])));
}
let mut query_layers: Vec<(Occur, Box<dyn Query>)> = Vec::new(); let mut query_layers: Vec<(Occur, Box<dyn Query>)> = Vec::new();
// Layer 1: prefix // Layer 1: prefix
@@ -269,14 +260,31 @@ fn build_query(
} }
} }
Ok(Some(BooleanQuery::new(query_layers))) Box::new(BooleanQuery::new(query_layers))
};
let mut clauses: Vec<(Occur, Box<dyn Query>)> = vec![(Occur::Must, content_query)];
if let Some(table_name) = table_filter {
let table_term = Term::from_field_text(table_name_field, table_name);
clauses.push((
Occur::Must,
Box::new(TermQuery::new(table_term, IndexRecordOption::Basic)),
));
}
Ok(Some(BooleanQuery::new(clauses)))
} }
// Empty query // Empty query
async fn fetch_default_hits(pool: &PgPool, target: &SearchTarget) -> Result<Vec<Hit>, Status> { async fn fetch_default_hits(
pool: &PgPool,
profile_name: &str,
table_name: &str,
) -> Result<Vec<Hit>, Status> {
let sql = format!( let sql = format!(
"SELECT id, to_jsonb(t) AS data FROM {} t WHERE deleted = FALSE ORDER BY id DESC LIMIT {}", "SELECT id, to_jsonb(t) AS data FROM {} t WHERE deleted = FALSE ORDER BY id DESC LIMIT {}",
target.qualified_table, DEFAULT_RESULT_LIMIT qualify_profile_table(profile_name, table_name),
DEFAULT_RESULT_LIMIT
); );
let rows = sqlx::query(&sql) let rows = sqlx::query(&sql)
@@ -293,29 +301,34 @@ async fn fetch_default_hits(pool: &PgPool, target: &SearchTarget) -> Result<Vec<
id, id,
score: 0.0, score: 0.0,
content_json: json_data.to_string(), content_json: json_data.to_string(),
table_name: target.table_name.clone(), table_name: table_name.to_string(),
} }
}) })
.collect()) .collect())
} }
// Search + hydrate // Search + hydrate
async fn search_target( async fn search_profile(
pool: &PgPool, pool: &PgPool,
target: &SearchTarget, scope: &SearchScope,
query_str: &str, query_str: &str,
mode: SearchMode, mode: SearchMode,
) -> Result<Vec<Hit>, Status> { ) -> Result<Vec<Hit>, Status> {
if !target.index_path.exists() { if !scope.index_path.exists() {
return Ok(vec![]); return Ok(vec![]);
} }
let index = Index::open_in_dir(&target.index_path) let index = Index::open_in_dir(&scope.index_path)
.map_err(|e| Status::internal(format!("Failed to open index: {}", e)))?; .map_err(|e| Status::internal(format!("Failed to open index: {}", e)))?;
register_slovak_tokenizers(&index) register_slovak_tokenizers(&index)
.map_err(|e| Status::internal(format!("Failed to register Slovak tokenizers: {}", e)))?; .map_err(|e| Status::internal(format!("Failed to register Slovak tokenizers: {}", e)))?;
let Some(master_query) = build_query(&index, &normalize_slovak_text(query_str), mode)? else { let Some(master_query) = build_query(
&index,
&normalize_slovak_text(query_str),
mode,
scope.requested_table.as_deref(),
)? else {
return Ok(vec![]); return Ok(vec![]);
}; };
@@ -327,6 +340,9 @@ async fn search_target(
let pg_id_field = schema let pg_id_field = schema
.get_field("pg_id") .get_field("pg_id")
.map_err(|_| Status::internal("Schema is missing the 'pg_id' field."))?; .map_err(|_| Status::internal("Schema is missing the 'pg_id' field."))?;
let table_name_field = schema
.get_field("table_name")
.map_err(|_| Status::internal("Schema is missing the 'table_name' field."))?;
let top_docs = searcher let top_docs = searcher
.search(&master_query, &TopDocs::with_limit(SEARCH_RESULT_LIMIT)) .search(&master_query, &TopDocs::with_limit(SEARCH_RESULT_LIMIT))
@@ -336,26 +352,48 @@ async fn search_target(
return Ok(vec![]); return Ok(vec![]);
} }
let mut scored_ids: Vec<(f32, u64)> = Vec::new(); let mut candidates: Vec<SearchCandidate> = Vec::new();
for (score, doc_address) in top_docs { for (score, doc_address) in top_docs {
let doc: TantivyDocument = searcher let doc: TantivyDocument = searcher
.doc(doc_address) .doc(doc_address)
.map_err(|e| Status::internal(format!("Failed to retrieve document: {}", e)))?; .map_err(|e| Status::internal(format!("Failed to retrieve document: {}", e)))?;
if let Some(pg_id_value) = doc.get_first(pg_id_field) { let Some(pg_id_value) = doc.get_first(pg_id_field) else {
if let Some(pg_id) = pg_id_value.as_u64() { continue;
scored_ids.push((score, pg_id)); };
} let Some(table_name_value) = doc.get_first(table_name_field) else {
} continue;
};
let Some(pg_id) = pg_id_value.as_u64() else {
continue;
};
let Some(table_name) = table_name_value.as_str() else {
continue;
};
candidates.push(SearchCandidate {
score,
pg_id: pg_id as i64,
table_name: table_name.to_string(),
});
} }
if scored_ids.is_empty() { if candidates.is_empty() {
return Ok(vec![]); return Ok(vec![]);
} }
let pg_ids: Vec<i64> = scored_ids.iter().map(|(_, id)| *id as i64).collect(); let mut ids_by_table: HashMap<String, Vec<i64>> = HashMap::new();
for candidate in &candidates {
ids_by_table
.entry(candidate.table_name.clone())
.or_default()
.push(candidate.pg_id);
}
let mut content_map: HashMap<(String, i64), String> = HashMap::new();
for (table_name, pg_ids) in ids_by_table {
validate_identifier(&table_name, "table_name")?;
let sql = format!( let sql = format!(
"SELECT id, to_jsonb(t) AS data FROM {} t WHERE deleted = FALSE AND id = ANY($1)", "SELECT id, to_jsonb(t) AS data FROM {} t WHERE deleted = FALSE AND id = ANY($1)",
target.qualified_table qualify_profile_table(&scope.profile_name, &table_name)
); );
let rows = sqlx::query(&sql) let rows = sqlx::query(&sql)
@@ -364,21 +402,23 @@ async fn search_target(
.await .await
.map_err(|e| Status::internal(format!("Database query failed: {}", e)))?; .map_err(|e| Status::internal(format!("Database query failed: {}", e)))?;
let mut content_map: HashMap<i64, String> = HashMap::new();
for row in rows { for row in rows {
let id: i64 = row.try_get("id").unwrap_or_default(); let id: i64 = row.try_get("id").unwrap_or_default();
let json_data: serde_json::Value = row.try_get("data").unwrap_or_default(); let json_data: serde_json::Value = row.try_get("data").unwrap_or_default();
content_map.insert(id, json_data.to_string()); content_map.insert((table_name.clone(), id), json_data.to_string());
}
} }
Ok(scored_ids Ok(candidates
.into_iter() .into_iter()
.filter_map(|(score, pg_id)| { .filter_map(|candidate| {
content_map.get(&(pg_id as i64)).map(|content_json| Hit { content_map
id: pg_id as i64, .get(&(candidate.table_name.clone(), candidate.pg_id))
score, .map(|content_json| Hit {
id: candidate.pg_id,
score: candidate.score,
content_json: content_json.clone(), content_json: content_json.clone(),
table_name: target.table_name.clone(), table_name: candidate.table_name,
}) })
}) })
.collect()) .collect())
@@ -414,45 +454,37 @@ impl SearcherService {
} }
// Request scope // Request scope
let requested_table = req.table_name.as_deref().map(str::trim); let scope =
let targets = resolve_search_targets(&self.pool, profile_name, requested_table).await?; resolve_search_scope(&self.pool, profile_name, req.table_name.as_deref().map(str::trim))
.await?;
if targets.is_empty() {
return Ok(Response::new(SearchResponse { hits: vec![] }));
}
let query = req.query.trim(); let query = req.query.trim();
if query.is_empty() { if query.is_empty() {
// Empty query // Empty query
if targets.len() != 1 { let Some(table_name) = scope.requested_table.as_deref() else {
return Err(Status::invalid_argument( return Err(Status::invalid_argument(
"table_name is required when query is empty", "table_name is required when query is empty",
)); ));
} };
let hits = fetch_default_hits(&self.pool, &targets[0]).await?; let hits = fetch_default_hits(&self.pool, &scope.profile_name, table_name).await?;
info!( info!(
"Empty query for profile '{}' table '{}'. Returning {} default hits.", "Empty query for profile '{}' table '{}'. Returning {} default hits.",
profile_name, scope.profile_name,
targets[0].table_name, table_name,
hits.len() hits.len()
); );
return Ok(Response::new(SearchResponse { hits })); return Ok(Response::new(SearchResponse { hits }));
} }
if requested_table.is_some() && targets.len() == 1 && !targets[0].index_path.exists() { if scope.requested_table.is_some() && !scope.index_path.exists() {
return Err(Status::not_found(format!( return Err(Status::not_found(format!(
"No search index found for table '{}'", "No search index found for profile '{}'",
targets[0].table_name scope.profile_name
))); )));
} }
// Merge per-table hits let mut hits = search_profile(&self.pool, &scope, query, mode).await?;
let mut hits = Vec::new();
for target in &targets {
hits.extend(search_target(&self.pool, target, query, mode).await?);
}
hits.sort_by(|left, right| right.score.total_cmp(&left.score)); hits.sort_by(|left, right| right.score.total_cmp(&left.score));
if hits.len() > SEARCH_RESULT_LIMIT { if hits.len() > SEARCH_RESULT_LIMIT {
hits.truncate(SEARCH_RESULT_LIMIT); hits.truncate(SEARCH_RESULT_LIMIT);
@@ -464,8 +496,8 @@ impl SearcherService {
SearchMode::Fuzzy => "fuzzy", SearchMode::Fuzzy => "fuzzy",
SearchMode::Exact => "exact", SearchMode::Exact => "exact",
}, },
profile_name, scope.profile_name,
requested_table.unwrap_or("*"), scope.requested_table.as_deref().unwrap_or("*"),
hits.len() hits.len()
); );