Files
komp_ac/server/src/indexer.rs

138 lines
4.2 KiB
Rust

// server/src/indexer.rs
use sqlx::{PgPool, Row};
use tantivy::schema::Term;
use tantivy::{doc, IndexWriter};
use tokio::sync::mpsc::Receiver;
use tracing::{error, info, warn};
use tantivy::schema::Schema;
use crate::search_schema;
/// Defines the commands that can be sent to the indexer task.
#[derive(Debug)]
pub enum IndexCommand {
/// Add a new document or update an existing one.
AddOrUpdate(IndexCommandData),
/// Remove a document from the index.
Delete(IndexCommandData),
}
#[derive(Debug)]
pub struct IndexCommandData {
pub table_name: String,
pub row_id: i64,
}
/// The main loop for the background indexer task.
pub async fn indexer_task(pool: PgPool, mut receiver: Receiver<IndexCommand>) {
info!("Background indexer task started.");
while let Some(command) = receiver.recv().await {
info!("Indexer received command: {:?}", command);
let result = match command {
IndexCommand::AddOrUpdate(data) => {
handle_add_or_update(&pool, data).await
}
IndexCommand::Delete(data) => handle_delete(&pool, data).await,
};
if let Err(e) = result {
error!("Failed to process index command: {}", e);
}
}
warn!("Indexer channel closed. Task is shutting down.");
}
/// Handles adding or updating a document in a Tantivy index.
async fn handle_add_or_update(
pool: &PgPool,
data: IndexCommandData,
) -> anyhow::Result<()> {
let qualified_table = format!("gen.\"{}\"", data.table_name);
let query_str = format!(
"SELECT to_jsonb(t) AS data FROM {} t WHERE id = $1",
qualified_table
);
let row = sqlx::query(&query_str)
.bind(data.row_id)
.fetch_one(pool)
.await?;
let json_data: serde_json::Value = row.try_get("data")?;
let slovak_text = extract_text_content(&json_data);
let (mut writer, schema) = get_index_writer(&data.table_name)?;
let pg_id_field = schema.get_field("pg_id").unwrap();
let prefix_edge_field = schema.get_field("prefix_edge").unwrap();
let prefix_full_field = schema.get_field("prefix_full").unwrap();
let text_ngram_field = schema.get_field("text_ngram").unwrap();
let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64);
writer.delete_term(id_term);
writer.add_document(doc!(
pg_id_field => data.row_id as u64,
prefix_edge_field => slovak_text.clone(),
prefix_full_field => slovak_text.clone(),
text_ngram_field => slovak_text
))?;
writer.commit()?;
info!(
"Successfully indexed document id:{} for table:{}",
data.row_id, data.table_name
);
Ok(())
}
/// Handles deleting a document from a Tantivy index.
async fn handle_delete(
_pool: &PgPool,
data: IndexCommandData,
) -> anyhow::Result<()> {
let (mut writer, schema) = get_index_writer(&data.table_name)?;
let pg_id_field = schema.get_field("pg_id").unwrap();
let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64);
writer.delete_term(id_term);
writer.commit()?;
info!(
"Successfully deleted document id:{} from table:{}",
data.row_id, data.table_name
);
Ok(())
}
/// Helper to get or create an index and return its writer and schema.
fn get_index_writer(
table_name: &str,
) -> anyhow::Result<(IndexWriter, Schema)> {
let index = search_schema::get_or_create_index(table_name)?;
let schema = index.schema();
let writer = index.writer(100_000_000)?; // 100MB heap
Ok((writer, schema))
}
/// Extract all text content from a JSON object for indexing
fn extract_text_content(json_data: &serde_json::Value) -> String {
let mut full_text = String::new();
if let Some(obj) = json_data.as_object() {
for value in obj.values() {
match value {
serde_json::Value::String(s) => {
full_text.push_str(s);
full_text.push(' ');
}
serde_json::Value::Number(n) => {
full_text.push_str(&n.to_string());
full_text.push(' ');
}
_ => {}
}
}
}
full_text.trim().to_string()
}