diff --git a/server/src/indexer.rs b/server/src/indexer.rs new file mode 100644 index 0000000..87c40de --- /dev/null +++ b/server/src/indexer.rs @@ -0,0 +1,140 @@ +// src/indexer.rs + +use std::path::Path; +use sqlx::{PgPool, Row}; +use tantivy::schema::{Schema, Term, TEXT, STORED, INDEXED}; +use tantivy::{doc, Index, IndexWriter}; +use tokio::sync::mpsc::Receiver; +use tracing::{error, info, warn}; + +const INDEX_DIR: &str = "./tantivy_indexes"; + +/// Defines the commands that can be sent to the indexer task. +#[derive(Debug)] +pub enum IndexCommand { + /// Add a new document or update an existing one. + AddOrUpdate(IndexCommandData), + /// Remove a document from the index. + Delete(IndexCommandData), +} + +#[derive(Debug)] +pub struct IndexCommandData { + pub table_name: String, + pub row_id: i64, +} + +/// The main loop for the background indexer task. +/// It listens for commands on the receiver and updates the Tantivy index. +pub async fn indexer_task(pool: PgPool, mut receiver: Receiver) { + info!("Background indexer task started."); + while let Some(command) = receiver.recv().await { + info!("Indexer received command: {:?}", command); + let result = match command { + IndexCommand::AddOrUpdate(data) => { + handle_add_or_update(&pool, data).await + } + IndexCommand::Delete(data) => handle_delete(&pool, data).await, + }; + + if let Err(e) = result { + error!("Failed to process index command: {}", e); + } + } + warn!("Indexer channel closed. Task is shutting down."); +} + +/// Handles adding or updating a document in a Tantivy index. +async fn handle_add_or_update( + pool: &PgPool, + data: IndexCommandData, +) -> anyhow::Result<()> { + // 1. Fetch the full row data from PostgreSQL + let qualified_table = format!("gen.\"{}\"", data.table_name); + let query_str = format!( + "SELECT to_jsonb(t) AS data FROM {} t WHERE id = $1", + qualified_table + ); + + let row = sqlx::query(&query_str) + .bind(data.row_id) + .fetch_one(pool) + .await?; + let json_data: serde_json::Value = row.try_get("data")?; + + // 2. Prepare the Tantivy document + let mut full_text = String::new(); + if let Some(obj) = json_data.as_object() { + for value in obj.values() { + if let Some(s) = value.as_str() { + full_text.push_str(s); + full_text.push(' '); + } + } + } + + // 3. Open the index and write the document + let (mut writer, schema) = get_index_writer(&data.table_name)?; + let pg_id_field = schema.get_field("pg_id").unwrap(); + let all_text_field = schema.get_field("all_text").unwrap(); + + // First, delete any existing document with this ID to handle updates + let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64); + writer.delete_term(id_term); + + // Add the new document + writer.add_document(doc!( + pg_id_field => data.row_id as u64, + all_text_field => full_text + ))?; + + // 4. Commit changes + writer.commit()?; + info!( + "Successfully indexed document id:{} for table:{}", + data.row_id, data.table_name + ); + + Ok(()) +} + +/// Handles deleting a document from a Tantivy index. +async fn handle_delete( + _pool: &PgPool, + data: IndexCommandData, +) -> anyhow::Result<()> { + let (mut writer, schema) = get_index_writer(&data.table_name)?; + let pg_id_field = schema.get_field("pg_id").unwrap(); + + let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64); + writer.delete_term(id_term); + writer.commit()?; + + info!( + "Successfully deleted document id:{} from table:{}", + data.row_id, data.table_name + ); + + Ok(()) +} + +/// Helper to get or create an index and return its writer and schema. +fn get_index_writer( + table_name: &str, +) -> anyhow::Result<(IndexWriter, Schema)> { + let index_path = Path::new(INDEX_DIR).join(table_name); + std::fs::create_dir_all(&index_path)?; + + let index = Index::open_in_dir(&index_path).or_else(|_| { + // If it doesn't exist, create it with the standard schema + let mut schema_builder = Schema::builder(); + schema_builder.add_u64_field("pg_id", INDEXED | STORED); + schema_builder.add_text_field("all_text", TEXT | STORED); + let schema = schema_builder.build(); + Index::create_in_dir(&index_path, schema) + })?; + + let schema = index.schema(); + let writer = index.writer(100_000_000)?; // 100MB heap + Ok((writer, schema)) +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 42a848f..40ab7d8 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,6 +1,7 @@ // src/lib.rs pub mod db; pub mod auth; +pub mod indexer; pub mod server; pub mod adresar; pub mod uctovnictvo; diff --git a/server/src/server/run.rs b/server/src/server/run.rs index 40d6e11..22b1ef8 100644 --- a/server/src/server/run.rs +++ b/server/src/server/run.rs @@ -2,6 +2,9 @@ use tonic::transport::Server; use tonic_reflection::server::Builder as ReflectionBuilder; +use tokio::sync::mpsc; +use crate::indexer::{indexer_task, IndexCommand}; + use common::proto::multieko2::FILE_DESCRIPTOR_SET; use crate::server::services::{ AdresarService, @@ -30,13 +33,23 @@ pub async fn run_server(db_pool: sqlx::PgPool) -> Result<(), Box(100); // Buffer of 100 messages + + // 2. Spawn the background indexer task + let indexer_pool = db_pool.clone(); + tokio::spawn(indexer_task(indexer_pool, indexer_rx)); + let reflection_service = ReflectionBuilder::configure() .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) .build_v1()?; - // Initialize services + // Initialize services, passing the indexer sender to the relevant ones let table_definition_service = TableDefinitionService { db_pool: db_pool.clone() }; - let tables_data_service = TablesDataService { db_pool: db_pool.clone() }; + let tables_data_service = TablesDataService { + db_pool: db_pool.clone(), + indexer_tx: indexer_tx.clone(), // Pass the sender + }; let table_script_service = TableScriptService { db_pool: db_pool.clone() }; let auth_service = AuthServiceImpl { db_pool: db_pool.clone() }; let search_service = SearcherService; diff --git a/server/src/server/services/tables_data_service.rs b/server/src/server/services/tables_data_service.rs index 3b566da..8c9d71a 100644 --- a/server/src/server/services/tables_data_service.rs +++ b/server/src/server/services/tables_data_service.rs @@ -1,5 +1,10 @@ // src/server/services/tables_data_service.rs + use tonic::{Request, Response, Status}; +// Add these imports +use tokio::sync::mpsc; +use crate::indexer::IndexCommand; + use common::proto::multieko2::tables_data::tables_data_server::TablesData; use common::proto::multieko2::common::CountResponse; use common::proto::multieko2::tables_data::{ @@ -15,6 +20,8 @@ use sqlx::PgPool; #[derive(Debug)] pub struct TablesDataService { pub db_pool: PgPool, + // MODIFIED: Add the sender field + pub indexer_tx: mpsc::Sender, } #[tonic::async_trait] @@ -24,25 +31,34 @@ impl TablesData for TablesDataService { request: Request, ) -> Result, Status> { let request = request.into_inner(); - let response = post_table_data(&self.db_pool, request).await?; + // MODIFIED: Pass the indexer_tx to the handler + let response = post_table_data( + &self.db_pool, + request, + &self.indexer_tx, + ) + .await?; Ok(Response::new(response)) } - // Add the new method implementation + // You will later apply the same pattern to put_table_data... async fn put_table_data( &self, request: Request, ) -> Result, Status> { let request = request.into_inner(); + // TODO: Update put_table_data handler to accept and use indexer_tx let response = put_table_data(&self.db_pool, request).await?; Ok(Response::new(response)) } + // ...and delete_table_data async fn delete_table_data( &self, request: Request, ) -> Result, Status> { let request = request.into_inner(); + // TODO: Update delete_table_data handler to accept and use indexer_tx let response = delete_table_data(&self.db_pool, request).await?; Ok(Response::new(response)) } diff --git a/server/src/tables_data/handlers/post_table_data.rs b/server/src/tables_data/handlers/post_table_data.rs index 6559dbb..63614c2 100644 --- a/server/src/tables_data/handlers/post_table_data.rs +++ b/server/src/tables_data/handlers/post_table_data.rs @@ -1,4 +1,5 @@ // src/tables_data/handlers/post_table_data.rs + use tonic::Status; use sqlx::{PgPool, Arguments}; use sqlx::postgres::PgArguments; @@ -6,22 +7,26 @@ use chrono::{DateTime, Utc}; use common::proto::multieko2::tables_data::{PostTableDataRequest, PostTableDataResponse}; use std::collections::HashMap; use std::sync::Arc; -use crate::shared::schema_qualifier::qualify_table_name_for_data; // Import schema qualifier +use crate::shared::schema_qualifier::qualify_table_name_for_data; use crate::steel::server::execution::{self, Value}; use crate::steel::server::functions::SteelContext; +// Add these imports +use crate::indexer::{IndexCommand, IndexCommandData}; +use tokio::sync::mpsc; +use tracing::error; + +// MODIFIED: Function signature now accepts the indexer sender pub async fn post_table_data( db_pool: &PgPool, request: PostTableDataRequest, + indexer_tx: &mpsc::Sender, ) -> Result { let profile_name = request.profile_name; let table_name = request.table_name; let mut data = HashMap::new(); - // CORRECTED: Process and trim all incoming data values. - // We remove the hardcoded validation. We will let the database's - // NOT NULL constraints or Steel validation scripts handle required fields. for (key, value) in request.data { data.insert(key, value.trim().to_string()); } @@ -243,6 +248,21 @@ pub async fn post_table_data( } }; + // After a successful insert, send a command to the indexer. + let command = IndexCommand::AddOrUpdate(IndexCommandData { + table_name: table_name.clone(), + row_id: inserted_id, + }); + + if let Err(e) = indexer_tx.send(command).await { + // If sending fails, the DB is updated but the index will be stale. + // This is a critical situation to log and monitor. + error!( + "CRITICAL: DB insert for table '{}' (id: {}) succeeded but failed to queue for indexing: {}. Search index is now inconsistent.", + table_name, inserted_id, e + ); + } + Ok(PostTableDataResponse { success: true, message: "Data inserted successfully".into(),