// src/indexer.rs use std::path::Path; use sqlx::{PgPool, Row}; use tantivy::schema::{Schema, Term, TEXT, STORED, INDEXED}; use tantivy::{doc, Index, IndexWriter}; use tokio::sync::mpsc::Receiver; use tracing::{error, info, warn}; const INDEX_DIR: &str = "./tantivy_indexes"; /// Defines the commands that can be sent to the indexer task. #[derive(Debug)] pub enum IndexCommand { /// Add a new document or update an existing one. AddOrUpdate(IndexCommandData), /// Remove a document from the index. Delete(IndexCommandData), } #[derive(Debug)] pub struct IndexCommandData { pub table_name: String, pub row_id: i64, } /// The main loop for the background indexer task. /// It listens for commands on the receiver and updates the Tantivy index. pub async fn indexer_task(pool: PgPool, mut receiver: Receiver) { info!("Background indexer task started."); while let Some(command) = receiver.recv().await { info!("Indexer received command: {:?}", command); let result = match command { IndexCommand::AddOrUpdate(data) => { handle_add_or_update(&pool, data).await } IndexCommand::Delete(data) => handle_delete(&pool, data).await, }; if let Err(e) = result { error!("Failed to process index command: {}", e); } } warn!("Indexer channel closed. Task is shutting down."); } /// Handles adding or updating a document in a Tantivy index. async fn handle_add_or_update( pool: &PgPool, data: IndexCommandData, ) -> anyhow::Result<()> { // 1. Fetch the full row data from PostgreSQL let qualified_table = format!("gen.\"{}\"", data.table_name); let query_str = format!( "SELECT to_jsonb(t) AS data FROM {} t WHERE id = $1", qualified_table ); let row = sqlx::query(&query_str) .bind(data.row_id) .fetch_one(pool) .await?; let json_data: serde_json::Value = row.try_get("data")?; // 2. Prepare the Tantivy document let mut full_text = String::new(); if let Some(obj) = json_data.as_object() { for value in obj.values() { if let Some(s) = value.as_str() { full_text.push_str(s); full_text.push(' '); } } } // 3. Open the index and write the document let (mut writer, schema) = get_index_writer(&data.table_name)?; let pg_id_field = schema.get_field("pg_id").unwrap(); let all_text_field = schema.get_field("all_text").unwrap(); // First, delete any existing document with this ID to handle updates let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64); writer.delete_term(id_term); // Add the new document writer.add_document(doc!( pg_id_field => data.row_id as u64, all_text_field => full_text ))?; // 4. Commit changes writer.commit()?; info!( "Successfully indexed document id:{} for table:{}", data.row_id, data.table_name ); Ok(()) } /// Handles deleting a document from a Tantivy index. async fn handle_delete( _pool: &PgPool, data: IndexCommandData, ) -> anyhow::Result<()> { let (mut writer, schema) = get_index_writer(&data.table_name)?; let pg_id_field = schema.get_field("pg_id").unwrap(); let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64); writer.delete_term(id_term); writer.commit()?; info!( "Successfully deleted document id:{} from table:{}", data.row_id, data.table_name ); Ok(()) } /// Helper to get or create an index and return its writer and schema. fn get_index_writer( table_name: &str, ) -> anyhow::Result<(IndexWriter, Schema)> { let index_path = Path::new(INDEX_DIR).join(table_name); std::fs::create_dir_all(&index_path)?; let index = Index::open_in_dir(&index_path).or_else(|_| { // If it doesn't exist, create it with the standard schema let mut schema_builder = Schema::builder(); schema_builder.add_u64_field("pg_id", INDEXED | STORED); schema_builder.add_text_field("all_text", TEXT | STORED); let schema = schema_builder.build(); Index::create_in_dir(&index_path, schema) })?; let schema = index.schema(); let writer = index.writer(100_000_000)?; // 100MB heap Ok((writer, schema)) }