146 lines
4.3 KiB
Rust
146 lines
4.3 KiB
Rust
// src/indexer.rs
|
|
|
|
use std::path::Path;
|
|
use sqlx::{PgPool, Row};
|
|
use tantivy::schema::{Schema, Term};
|
|
use tantivy::{doc, Index, IndexWriter};
|
|
use tokio::sync::mpsc::Receiver;
|
|
use tracing::{error, info, warn};
|
|
use crate::search_schema;
|
|
|
|
const INDEX_DIR: &str = "./tantivy_indexes";
|
|
|
|
/// Defines the commands that can be sent to the indexer task.
|
|
#[derive(Debug)]
|
|
pub enum IndexCommand {
|
|
/// Add a new document or update an existing one.
|
|
AddOrUpdate(IndexCommandData),
|
|
/// Remove a document from the index.
|
|
Delete(IndexCommandData),
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct IndexCommandData {
|
|
pub table_name: String,
|
|
pub row_id: i64,
|
|
}
|
|
|
|
/// The main loop for the background indexer task.
|
|
pub async fn indexer_task(pool: PgPool, mut receiver: Receiver<IndexCommand>) {
|
|
info!("Background indexer task started.");
|
|
while let Some(command) = receiver.recv().await {
|
|
info!("Indexer received command: {:?}", command);
|
|
let result = match command {
|
|
IndexCommand::AddOrUpdate(data) => {
|
|
handle_add_or_update(&pool, data).await
|
|
}
|
|
IndexCommand::Delete(data) => handle_delete(&pool, data).await,
|
|
};
|
|
|
|
if let Err(e) = result {
|
|
error!("Failed to process index command: {}", e);
|
|
}
|
|
}
|
|
warn!("Indexer channel closed. Task is shutting down.");
|
|
}
|
|
|
|
/// Handles adding or updating a document in a Tantivy index.
|
|
async fn handle_add_or_update(
|
|
pool: &PgPool,
|
|
data: IndexCommandData,
|
|
) -> anyhow::Result<()> {
|
|
// 1. Fetch the full row data from PostgreSQL
|
|
let qualified_table = format!("gen.\"{}\"", data.table_name);
|
|
let query_str = format!(
|
|
"SELECT to_jsonb(t) AS data FROM {} t WHERE id = $1",
|
|
qualified_table
|
|
);
|
|
|
|
let row = sqlx::query(&query_str)
|
|
.bind(data.row_id)
|
|
.fetch_one(pool)
|
|
.await?;
|
|
let json_data: serde_json::Value = row.try_get("data")?;
|
|
|
|
// 2. Extract all text content for Slovak processing
|
|
let slovak_text = extract_text_content(&json_data);
|
|
|
|
// 3. Open the index and write the document
|
|
let (mut writer, schema) = get_index_writer(&data.table_name)?;
|
|
let pg_id_field = schema.get_field("pg_id").unwrap();
|
|
let text_sk_field = schema.get_field("text_sk").unwrap();
|
|
|
|
// First, delete any existing document with this ID to handle updates
|
|
let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64);
|
|
writer.delete_term(id_term);
|
|
|
|
// Add the new document
|
|
writer.add_document(doc!(
|
|
pg_id_field => data.row_id as u64,
|
|
text_sk_field => slovak_text
|
|
))?;
|
|
|
|
// 4. Commit changes
|
|
writer.commit()?;
|
|
info!(
|
|
"Successfully indexed Slovak document id:{} for table:{}",
|
|
data.row_id, data.table_name
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Handles deleting a document from a Tantivy index.
|
|
async fn handle_delete(
|
|
_pool: &PgPool,
|
|
data: IndexCommandData,
|
|
) -> anyhow::Result<()> {
|
|
let (mut writer, schema) = get_index_writer(&data.table_name)?;
|
|
let pg_id_field = schema.get_field("pg_id").unwrap();
|
|
|
|
let id_term = Term::from_field_u64(pg_id_field, data.row_id as u64);
|
|
writer.delete_term(id_term);
|
|
writer.commit()?;
|
|
|
|
info!(
|
|
"Successfully deleted document id:{} from table:{}",
|
|
data.row_id, data.table_name
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Helper to get or create an index and return its writer and schema.
|
|
fn get_index_writer(
|
|
table_name: &str,
|
|
) -> anyhow::Result<(IndexWriter, Schema)> {
|
|
let index = search_schema::get_or_create_index(table_name)?;
|
|
let schema = index.schema();
|
|
let writer = index.writer(100_000_000)?; // 100MB heap
|
|
Ok((writer, schema))
|
|
}
|
|
|
|
/// Extract all text content from a JSON object for indexing
|
|
fn extract_text_content(json_data: &serde_json::Value) -> String {
|
|
let mut full_text = String::new();
|
|
|
|
if let Some(obj) = json_data.as_object() {
|
|
for value in obj.values() {
|
|
match value {
|
|
serde_json::Value::String(s) => {
|
|
full_text.push_str(s);
|
|
full_text.push(' ');
|
|
}
|
|
serde_json::Value::Number(n) => {
|
|
full_text.push_str(&n.to_string());
|
|
full_text.push(' ');
|
|
}
|
|
// We could recursively handle nested objects if needed
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
full_text.trim().to_string()
|
|
}
|