Compare commits

...

5 Commits

15 changed files with 388 additions and 194 deletions

32
Cargo.lock generated
View File

@@ -488,6 +488,7 @@ version = "0.3.13"
dependencies = [
"prost",
"serde",
"tantivy",
"tonic",
"tonic-build",
]
@@ -2739,6 +2740,7 @@ dependencies = [
"prost",
"serde",
"serde_json",
"sqlx",
"tantivy",
"tokio",
"tonic",
@@ -2843,6 +2845,7 @@ dependencies = [
"prost",
"regex",
"rstest",
"rust-stemmers",
"search",
"serde",
"serde_json",
@@ -3017,9 +3020,9 @@ dependencies = [
[[package]]
name = "sqlx"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3c3a85280daca669cfd3bcb68a337882a8bc57ec882f72c5d13a430613a738e"
checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc"
dependencies = [
"sqlx-core",
"sqlx-macros",
@@ -3030,9 +3033,9 @@ dependencies = [
[[package]]
name = "sqlx-core"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f743f2a3cea30a58cd479013f75550e879009e3a02f616f18ca699335aa248c3"
checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6"
dependencies = [
"base64",
"bytes",
@@ -3068,9 +3071,9 @@ dependencies = [
[[package]]
name = "sqlx-macros"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f4200e0fde19834956d4252347c12a083bdcb237d7a1a1446bffd8768417dce"
checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d"
dependencies = [
"proc-macro2",
"quote",
@@ -3081,9 +3084,9 @@ dependencies = [
[[package]]
name = "sqlx-macros-core"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "882ceaa29cade31beca7129b6beeb05737f44f82dbe2a9806ecea5a7093d00b7"
checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b"
dependencies = [
"dotenvy",
"either",
@@ -3100,16 +3103,15 @@ dependencies = [
"sqlx-postgres",
"sqlx-sqlite",
"syn 2.0.100",
"tempfile",
"tokio",
"url",
]
[[package]]
name = "sqlx-mysql"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0afdd3aa7a629683c2d750c2df343025545087081ab5942593a5288855b1b7a7"
checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526"
dependencies = [
"atoi",
"base64",
@@ -3152,9 +3154,9 @@ dependencies = [
[[package]]
name = "sqlx-postgres"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bedbe1bbb5e2615ef347a5e9d8cd7680fb63e77d9dafc0f29be15e53f1ebe6"
checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46"
dependencies = [
"atoi",
"base64",
@@ -3192,9 +3194,9 @@ dependencies = [
[[package]]
name = "sqlx-sqlite"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c26083e9a520e8eb87a06b12347679b142dc2ea29e6e409f805644a7a979a5bc"
checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
dependencies = [
"atoi",
"chrono",

View File

@@ -9,5 +9,8 @@ tonic = "0.13.0"
prost = "0.13.5"
serde = { version = "1.0.219", features = ["derive"] }
# Search
tantivy = { workspace = true }
[build-dependencies]
tonic-build = "0.13.0"

View File

@@ -10,11 +10,11 @@ message SearchRequest {
string table_name = 1;
string query = 2;
}
message SearchResponse {
message Hit {
int64 id = 1; // The PostgreSQL row ID
int64 id = 1; // PostgreSQL row ID
float score = 2;
string content_json = 3;
}
repeated Hit hits = 1;
}

View File

@@ -1,4 +1,7 @@
// common/src/lib.rs
pub mod search;
pub mod proto {
pub mod multieko2 {
pub mod adresar {

Binary file not shown.

View File

@@ -13,13 +13,15 @@ pub struct SearchResponse {
}
/// Nested message and enum types in `SearchResponse`.
pub mod search_response {
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Hit {
/// The PostgreSQL row ID
/// PostgreSQL row ID
#[prost(int64, tag = "1")]
pub id: i64,
#[prost(float, tag = "2")]
pub score: f32,
#[prost(string, tag = "3")]
pub content_json: ::prost::alloc::string::String,
}
}
/// Generated client implementations.

78
common/src/search.rs Normal file
View File

@@ -0,0 +1,78 @@
// common/src/search.rs
use tantivy::schema::*;
use tantivy::tokenizer::*;
use tantivy::Index;
/// Creates a hybrid Slovak search schema with optimized prefix fields.
pub fn create_search_schema() -> Schema {
let mut schema_builder = Schema::builder();
schema_builder.add_u64_field("pg_id", INDEXED | STORED);
// FIELD 1: For prefixes (1-4 chars).
let short_prefix_indexing = TextFieldIndexing::default()
.set_tokenizer("slovak_prefix_edge")
.set_index_option(IndexRecordOption::WithFreqsAndPositions);
let short_prefix_options = TextOptions::default()
.set_indexing_options(short_prefix_indexing)
.set_stored();
schema_builder.add_text_field("prefix_edge", short_prefix_options);
// FIELD 2: For the full word.
let full_word_indexing = TextFieldIndexing::default()
.set_tokenizer("slovak_prefix_full")
.set_index_option(IndexRecordOption::WithFreqsAndPositions);
let full_word_options = TextOptions::default()
.set_indexing_options(full_word_indexing)
.set_stored();
schema_builder.add_text_field("prefix_full", full_word_options);
// NGRAM FIELD: For substring matching.
let ngram_field_indexing = TextFieldIndexing::default()
.set_tokenizer("slovak_ngram")
.set_index_option(IndexRecordOption::WithFreqsAndPositions);
let ngram_options = TextOptions::default()
.set_indexing_options(ngram_field_indexing)
.set_stored();
schema_builder.add_text_field("text_ngram", ngram_options);
schema_builder.build()
}
/// Registers all necessary Slovak tokenizers with the index.
///
/// This must be called by ANY process that opens the index
/// to ensure the tokenizers are loaded into memory.
pub fn register_slovak_tokenizers(index: &Index) -> tantivy::Result<()> {
let tokenizer_manager = index.tokenizers();
// TOKENIZER for `prefix_edge`: Edge N-gram (1-4 chars)
let edge_tokenizer =
TextAnalyzer::builder(NgramTokenizer::new(1, 4, true)?)
.filter(RemoveLongFilter::limit(40))
.filter(LowerCaser)
.filter(AsciiFoldingFilter)
.build();
tokenizer_manager.register("slovak_prefix_edge", edge_tokenizer);
// TOKENIZER for `prefix_full`: Simple word tokenizer
let full_tokenizer =
TextAnalyzer::builder(SimpleTokenizer::default())
.filter(RemoveLongFilter::limit(40))
.filter(LowerCaser)
.filter(AsciiFoldingFilter)
.build();
tokenizer_manager.register("slovak_prefix_full", full_tokenizer);
// NGRAM TOKENIZER: For substring matching.
let ngram_tokenizer =
TextAnalyzer::builder(NgramTokenizer::new(3, 3, false)?)
.filter(RemoveLongFilter::limit(40))
.filter(LowerCaser)
.filter(AsciiFoldingFilter)
.build();
tokenizer_manager.register("slovak_ngram", ngram_tokenizer);
Ok(())
}

View File

@@ -16,3 +16,4 @@ tantivy = { workspace = true }
common = { path = "../common" }
tonic-reflection = "0.13.1"
sqlx = { version = "0.8.6", features = ["postgres"] }

View File

@@ -1,17 +1,66 @@
// src/lib.rs
use std::collections::HashMap;
use std::path::Path;
use tantivy::{collector::TopDocs, query::QueryParser, Index, TantivyDocument};
use tantivy::collector::TopDocs;
use tantivy::query::{
BooleanQuery, BoostQuery, FuzzyTermQuery, Occur, Query, QueryParser,
TermQuery,
};
use tantivy::schema::{IndexRecordOption, Value};
use tantivy::{Index, TantivyDocument, Term};
use tonic::{Request, Response, Status};
use common::proto::multieko2::search::{
search_response::Hit, SearchRequest, SearchResponse,
};
use common::proto::multieko2::search::searcher_server::Searcher;
pub use common::proto::multieko2::search::searcher_server::SearcherServer;
use tantivy::schema::Value;
use common::proto::multieko2::search::searcher_server::Searcher;
use common::search::register_slovak_tokenizers;
use sqlx::{PgPool, Row}; // <-- Import PgPool and Row
pub struct SearcherService;
// We need to hold the database pool in our service struct.
pub struct SearcherService {
pub pool: PgPool,
}
// Normalize diacritics in queries (no changes here)
fn normalize_slovak_text(text: &str) -> String {
// ... function content is unchanged ...
text.chars()
.map(|c| match c {
'á' | 'à' | 'â' | 'ä' | 'ă' | 'ā' => 'a',
'Á' | 'À' | 'Â' | 'Ä' | 'Ă' | 'Ā' => 'A',
'é' | 'è' | 'ê' | 'ë' | 'ě' | 'ē' => 'e',
'É' | 'È' | 'Ê' | 'Ë' | 'Ě' | 'Ē' => 'E',
'í' | 'ì' | 'î' | 'ï' | 'ī' => 'i',
'Í' | 'Ì' | 'Î' | 'Ï' | 'Ī' => 'I',
'ó' | 'ò' | 'ô' | 'ö' | 'ō' | 'ő' => 'o',
'Ó' | 'Ò' | 'Ô' | 'Ö' | 'Ō' | 'Ő' => 'O',
'ú' | 'ù' | 'û' | 'ü' | 'ū' | 'ű' => 'u',
'Ú' | 'Ù' | 'Û' | 'Ü' | 'Ū' | 'Ű' => 'U',
'ý' | 'ỳ' | 'ŷ' | 'ÿ' => 'y',
'Ý' | 'Ỳ' | 'Ŷ' | 'Ÿ' => 'Y',
'č' => 'c',
'Č' => 'C',
'ď' => 'd',
'Ď' => 'D',
'ľ' => 'l',
'Ľ' => 'L',
'ň' => 'n',
'Ň' => 'N',
'ř' => 'r',
'Ř' => 'R',
'š' => 's',
'Š' => 'S',
'ť' => 't',
'Ť' => 'T',
'ž' => 'z',
'Ž' => 'Z',
_ => c,
})
.collect()
}
#[tonic::async_trait]
impl Searcher for SearcherService {
@@ -27,9 +76,7 @@ impl Searcher for SearcherService {
return Err(Status::invalid_argument("Query cannot be empty"));
}
// Open the index for this table
let index_path = Path::new("./tantivy_indexes").join(&table_name);
if !index_path.exists() {
return Err(Status::not_found(format!(
"No search index found for table '{}'",
@@ -37,72 +84,185 @@ impl Searcher for SearcherService {
)));
}
// Open the index
let index = Index::open_in_dir(&index_path).map_err(|e| {
Status::internal(format!("Failed to open index: {}", e))
let index = Index::open_in_dir(&index_path)
.map_err(|e| Status::internal(format!("Failed to open index: {}", e)))?;
register_slovak_tokenizers(&index).map_err(|e| {
Status::internal(format!("Failed to register Slovak tokenizers: {}", e))
})?;
// Create reader and searcher
let reader = index.reader().map_err(|e| {
Status::internal(format!("Failed to create index reader: {}", e))
})?;
let searcher = reader.searcher();
let schema = index.schema();
// Get the fields we need
let all_text_field = match schema.get_field("all_text") {
Ok(field) => field,
Err(_) => {
return Err(Status::internal(
"Schema is missing the 'all_text' field.",
))
}
};
let pg_id_field = match schema.get_field("pg_id") {
Ok(field) => field,
Err(_) => {
return Err(Status::internal(
"Schema is missing the 'pg_id' field.",
))
}
};
// Parse the query
let query_parser =
QueryParser::for_index(&index, vec![all_text_field]);
let query = query_parser.parse_query(&query_str).map_err(|e| {
Status::invalid_argument(format!("Invalid query: {}", e))
let prefix_edge_field = schema.get_field("prefix_edge").map_err(|_| {
Status::internal("Schema is missing the 'prefix_edge' field.")
})?;
let prefix_full_field = schema.get_field("prefix_full").map_err(|_| {
Status::internal("Schema is missing the 'prefix_full' field.")
})?;
let text_ngram_field = schema.get_field("text_ngram").map_err(|_| {
Status::internal("Schema is missing the 'text_ngram' field.")
})?;
let pg_id_field = schema.get_field("pg_id").map_err(|_| {
Status::internal("Schema is missing the 'pg_id' field.")
})?;
// Perform the search
// --- Query Building Logic (no changes here) ---
let prefix_edge_field = schema.get_field("prefix_edge").unwrap();
let prefix_full_field = schema.get_field("prefix_full").unwrap();
let text_ngram_field = schema.get_field("text_ngram").unwrap();
let normalized_query = normalize_slovak_text(&query_str);
let words: Vec<&str> = normalized_query.split_whitespace().collect();
if words.is_empty() {
return Ok(Response::new(SearchResponse { hits: vec![] }));
}
let mut query_layers: Vec<(Occur, Box<dyn Query>)> = Vec::new();
// ... all your query building layers remain exactly the same ...
// ===============================
// LAYER 1: PREFIX MATCHING (HIGHEST PRIORITY, Boost: 4.0)
// ===============================
{
let mut must_clauses: Vec<(Occur, Box<dyn Query>)> = Vec::new();
for word in &words {
let edge_term =
Term::from_field_text(prefix_edge_field, word);
let full_term =
Term::from_field_text(prefix_full_field, word);
let per_word_query = BooleanQuery::new(vec![
(
Occur::Should,
Box::new(TermQuery::new(
edge_term,
IndexRecordOption::Basic,
)),
),
(
Occur::Should,
Box::new(TermQuery::new(
full_term,
IndexRecordOption::Basic,
)),
),
]);
must_clauses.push((Occur::Must, Box::new(per_word_query) as Box<dyn Query>));
}
if !must_clauses.is_empty() {
let prefix_query = BooleanQuery::new(must_clauses);
let boosted_query =
BoostQuery::new(Box::new(prefix_query), 4.0);
query_layers.push((Occur::Should, Box::new(boosted_query)));
}
}
// ===============================
// LAYER 2: FUZZY MATCHING (HIGH PRIORITY, Boost: 3.0)
// ===============================
{
let last_word = words.last().unwrap();
let fuzzy_term =
Term::from_field_text(prefix_full_field, last_word);
let fuzzy_query = FuzzyTermQuery::new(fuzzy_term, 2, true);
let boosted_query = BoostQuery::new(Box::new(fuzzy_query), 3.0);
query_layers.push((Occur::Should, Box::new(boosted_query)));
}
// ===============================
// LAYER 3: PHRASE MATCHING WITH SLOP (MEDIUM PRIORITY, Boost: 2.0)
// ===============================
if words.len() > 1 {
let slop_parser =
QueryParser::for_index(&index, vec![prefix_full_field]);
let slop_query_str = format!("\"{}\"~3", normalized_query);
if let Ok(slop_query) = slop_parser.parse_query(&slop_query_str) {
let boosted_query = BoostQuery::new(slop_query, 2.0);
query_layers.push((Occur::Should, Box::new(boosted_query)));
}
}
// ===============================
// LAYER 4: NGRAM SUBSTRING MATCHING (LOWEST PRIORITY, Boost: 1.0)
// ===============================
{
let ngram_parser =
QueryParser::for_index(&index, vec![text_ngram_field]);
if let Ok(ngram_query) =
ngram_parser.parse_query(&normalized_query)
{
let boosted_query = BoostQuery::new(ngram_query, 1.0);
query_layers.push((Occur::Should, Box::new(boosted_query)));
}
}
let master_query = BooleanQuery::new(query_layers);
// --- End of Query Building Logic ---
let top_docs = searcher
.search(&query, &TopDocs::with_limit(100))
.search(&master_query, &TopDocs::with_limit(100))
.map_err(|e| Status::internal(format!("Search failed: {}", e)))?;
// Convert results to our response format
let mut hits = Vec::new();
for (score, doc_address) in top_docs {
let doc: TantivyDocument = searcher.doc(doc_address).map_err(
|e| {
Status::internal(format!(
"Failed to retrieve document: {}",
e
))
},
)?;
if top_docs.is_empty() {
return Ok(Response::new(SearchResponse { hits: vec![] }));
}
// --- NEW LOGIC: Fetch from DB and combine results ---
// Step 1: Extract (score, pg_id) from Tantivy results.
let mut scored_ids: Vec<(f32, u64)> = Vec::new();
for (score, doc_address) in top_docs {
let doc: TantivyDocument = searcher.doc(doc_address).map_err(|e| {
Status::internal(format!("Failed to retrieve document: {}", e))
})?;
if let Some(pg_id_value) = doc.get_first(pg_id_field) {
if let Some(pg_id) = pg_id_value.as_u64() {
hits.push(Hit {
id: pg_id as i64,
score,
});
scored_ids.push((score, pg_id));
}
}
}
// Step 2: Fetch all corresponding rows from Postgres in a single query.
let pg_ids: Vec<i64> =
scored_ids.iter().map(|(_, id)| *id as i64).collect();
let qualified_table = format!("gen.\"{}\"", table_name);
let query_str = format!(
"SELECT id, to_jsonb(t) AS data FROM {} t WHERE id = ANY($1)",
qualified_table
);
let rows = sqlx::query(&query_str)
.bind(&pg_ids)
.fetch_all(&self.pool)
.await
.map_err(|e| {
Status::internal(format!("Database query failed: {}", e))
})?;
// Step 3: Map the database results by ID for quick lookup.
let mut content_map: HashMap<i64, String> = HashMap::new();
for row in rows {
let id: i64 = row.try_get("id").unwrap_or(0);
let json_data: serde_json::Value =
row.try_get("data").unwrap_or(serde_json::Value::Null);
content_map.insert(id, json_data.to_string());
}
// Step 4: Build the final response, combining Tantivy scores with PG content.
let hits: Vec<Hit> = scored_ids
.into_iter()
.filter_map(|(score, pg_id)| {
content_map
.get(&(pg_id as i64))
.map(|content_json| Hit {
id: pg_id as i64,
score,
content_json: content_json.clone(),
})
})
.collect();
let response = SearchResponse { hits };
Ok(Response::new(response))
}

View File

@@ -31,6 +31,7 @@ bcrypt = "0.17.0"
validator = { version = "0.20.0", features = ["derive"] }
uuid = { version = "1.16.0", features = ["serde", "v4"] }
jsonwebtoken = "9.3.1"
rust-stemmers = "1.2.0"
[lib]
name = "server"

View File

@@ -1,83 +0,0 @@
// In server/src/bin/manual_indexer.rs
use sqlx::{PgPool, Row};
use tantivy::schema::*;
use tantivy::{doc, Index};
use std::path::Path;
// --- CONFIGURATION ---
// IMPORTANT: Change this to a table name that actually exists and has data in your test DB.
// From your grpcurl output, "2025_test_post" is a good candidate.
const TABLE_TO_INDEX: &str = "2025_test_post2";
const INDEX_DIR: &str = "./tantivy_indexes";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// --- Database Connection ---
// This assumes you have a .env file with DATABASE_URL
dotenvy::dotenv().ok();
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set in your .env file");
let pool = PgPool::connect(&database_url).await?;
println!("Connected to database.");
// --- Tantivy Schema Definition ---
let mut schema_builder = Schema::builder();
// This field will store the original Postgres row ID. It's crucial.
schema_builder.add_u64_field("pg_id", INDEXED | STORED);
// This field will contain ALL text data from the row, concatenated.
schema_builder.add_text_field("all_text", TEXT | STORED);
let schema = schema_builder.build();
// --- Index Creation ---
let index_path = Path::new(INDEX_DIR).join(TABLE_TO_INDEX);
if index_path.exists() {
println!("Removing existing index at: {}", index_path.display());
std::fs::remove_dir_all(&index_path)?;
}
std::fs::create_dir_all(&index_path)?;
let index = Index::create_in_dir(&index_path, schema.clone())?;
let mut index_writer = index.writer(100_000_000)?; // 100MB heap
println!("Indexing table: {}", TABLE_TO_INDEX);
// --- Data Fetching and Indexing ---
let qualified_table = format!("gen.\"{}\"", TABLE_TO_INDEX);
let query_str = format!("SELECT id, to_jsonb(t) AS data FROM {} t", qualified_table);
let rows = sqlx::query(&query_str).fetch_all(&pool).await?;
if rows.is_empty() {
println!("Warning: No rows found in table '{}'. Index will be empty.", TABLE_TO_INDEX);
}
let pg_id_field = schema.get_field("pg_id").unwrap();
let all_text_field = schema.get_field("all_text").unwrap();
for row in &rows {
let id: i64 = row.try_get("id")?;
let data: serde_json::Value = row.try_get("data")?;
// Concatenate all text values from the JSON into one big string.
let mut full_text = String::new();
if let Some(obj) = data.as_object() {
for value in obj.values() {
if let Some(s) = value.as_str() {
full_text.push_str(s);
full_text.push(' ');
}
}
}
// Add the document to Tantivy
index_writer.add_document(doc!(
pg_id_field => id as u64,
all_text_field => full_text
))?;
}
// --- Finalize ---
index_writer.commit()?;
println!("Successfully indexed {} documents into '{}'", rows.len(), index_path.display());
Ok(())
}

View File

@@ -1,11 +1,12 @@
// src/indexer.rs
// server/src/indexer.rs
use std::path::Path;
use sqlx::{PgPool, Row};
use tantivy::schema::{Schema, Term, TEXT, STORED, INDEXED};
use tantivy::{doc, Index, IndexWriter};
use tantivy::schema::Term;
use tantivy::{doc, IndexWriter};
use tokio::sync::mpsc::Receiver;
use tracing::{error, info, warn};
use tantivy::schema::Schema;
use crate::search_schema;
const INDEX_DIR: &str = "./tantivy_indexes";
@@ -25,7 +26,6 @@ pub struct IndexCommandData {
}
/// The main loop for the background indexer task.
/// It listens for commands on the receiver and updates the Tantivy index.
pub async fn indexer_task(pool: PgPool, mut receiver: Receiver<IndexCommand>) {
info!("Background indexer task started.");
while let Some(command) = receiver.recv().await {
@@ -49,52 +49,39 @@ async fn handle_add_or_update(
pool: &PgPool,
data: IndexCommandData,
) -> anyhow::Result<()> {
// 1. Fetch the full row data from PostgreSQL
let qualified_table = format!("gen.\"{}\"", data.table_name);
let query_str = format!(
"SELECT to_jsonb(t) AS data FROM {} t WHERE id = $1",
qualified_table
);
let row = sqlx::query(&query_str)
.bind(data.row_id)
.fetch_one(pool)
.await?;
let json_data: serde_json::Value = row.try_get("data")?;
let slovak_text = extract_text_content(&json_data);
// 2. Prepare the Tantivy document
let mut full_text = String::new();
if let Some(obj) = json_data.as_object() {
for value in obj.values() {
if let Some(s) = value.as_str() {
full_text.push_str(s);
full_text.push(' ');
}
}
}
// 3. Open the index and write the document
let (mut writer, schema) = get_index_writer(&data.table_name)?;
let pg_id_field = schema.get_field("pg_id").unwrap();
let all_text_field = schema.get_field("all_text").unwrap();
let prefix_edge_field = schema.get_field("prefix_edge").unwrap();
let prefix_full_field = schema.get_field("prefix_full").unwrap();
let text_ngram_field = schema.get_field("text_ngram").unwrap();
// First, delete any existing document with this ID to handle updates
let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64);
writer.delete_term(id_term);
// Add the new document
writer.add_document(doc!(
pg_id_field => data.row_id as u64,
all_text_field => full_text
prefix_edge_field => slovak_text.clone(),
prefix_full_field => slovak_text.clone(),
text_ngram_field => slovak_text
))?;
// 4. Commit changes
writer.commit()?;
info!(
"Successfully indexed document id:{} for table:{}",
data.row_id, data.table_name
);
Ok(())
}
@@ -122,19 +109,31 @@ async fn handle_delete(
fn get_index_writer(
table_name: &str,
) -> anyhow::Result<(IndexWriter, Schema)> {
let index_path = Path::new(INDEX_DIR).join(table_name);
std::fs::create_dir_all(&index_path)?;
let index = Index::open_in_dir(&index_path).or_else(|_| {
// If it doesn't exist, create it with the standard schema
let mut schema_builder = Schema::builder();
schema_builder.add_u64_field("pg_id", INDEXED | STORED);
schema_builder.add_text_field("all_text", TEXT | STORED);
let schema = schema_builder.build();
Index::create_in_dir(&index_path, schema)
})?;
let index = search_schema::get_or_create_index(table_name)?;
let schema = index.schema();
let writer = index.writer(100_000_000)?; // 100MB heap
Ok((writer, schema))
}
/// Extract all text content from a JSON object for indexing
fn extract_text_content(json_data: &serde_json::Value) -> String {
let mut full_text = String::new();
if let Some(obj) = json_data.as_object() {
for value in obj.values() {
match value {
serde_json::Value::String(s) => {
full_text.push_str(s);
full_text.push(' ');
}
serde_json::Value::Number(n) => {
full_text.push_str(&n.to_string());
full_text.push(' ');
}
_ => {}
}
}
}
full_text.trim().to_string()
}

View File

@@ -2,6 +2,7 @@
pub mod db;
pub mod auth;
pub mod indexer;
pub mod search_schema;
pub mod server;
pub mod adresar;
pub mod uctovnictvo;

View File

@@ -0,0 +1,26 @@
// server/src/search_schema.rs
use std::path::Path;
use tantivy::Index;
// Re-export the functions from the common crate.
// This makes them available as `crate::search_schema::create_search_schema`, etc.
pub use common::search::{create_search_schema, register_slovak_tokenizers};
/// Gets an existing index or creates a new one.
/// This function now uses the shared logic from the `common` crate.
pub fn get_or_create_index(table_name: &str) -> tantivy::Result<Index> {
let index_path = Path::new("./tantivy_indexes").join(table_name);
std::fs::create_dir_all(&index_path)?;
let index = if index_path.join("meta.json").exists() {
Index::open_in_dir(&index_path)?
} else {
let schema = create_search_schema();
Index::create_in_dir(&index_path, schema)?
};
// This now calls the single, authoritative function from `common`.
register_slovak_tokenizers(&index)?;
Ok(index)
}

View File

@@ -1,4 +1,3 @@
// src/server/run.rs
use tonic::transport::Server;
use tonic_reflection::server::Builder as ReflectionBuilder;
@@ -52,7 +51,9 @@ pub async fn run_server(db_pool: sqlx::PgPool) -> Result<(), Box<dyn std::error:
};
let table_script_service = TableScriptService { db_pool: db_pool.clone() };
let auth_service = AuthServiceImpl { db_pool: db_pool.clone() };
let search_service = SearcherService;
// MODIFIED: Instantiate SearcherService with the database pool
let search_service = SearcherService { pool: db_pool.clone() };
Server::builder()
.add_service(AdresarServer::new(AdresarService { db_pool: db_pool.clone() }))
@@ -62,7 +63,7 @@ pub async fn run_server(db_pool: sqlx::PgPool) -> Result<(), Box<dyn std::error:
.add_service(TablesDataServer::new(tables_data_service))
.add_service(TableScriptServer::new(table_script_service))
.add_service(AuthServiceServer::new(auth_service))
.add_service(SearcherServer::new(search_service))
.add_service(SearcherServer::new(search_service)) // This now works correctly
.add_service(reflection_service)
.serve(addr)
.await?;