sql search2 added
This commit is contained in:
@@ -10,14 +10,16 @@ use crate::server::services::{
|
||||
TableDefinitionService,
|
||||
TablesDataService,
|
||||
TableScriptService,
|
||||
AuthServiceImpl
|
||||
AuthServiceImpl,
|
||||
Search2Service,
|
||||
};
|
||||
use common::proto::komp_ac::{
|
||||
table_structure::table_structure_service_server::TableStructureServiceServer,
|
||||
table_definition::table_definition_server::TableDefinitionServer,
|
||||
tables_data::tables_data_server::TablesDataServer,
|
||||
table_script::table_script_server::TableScriptServer,
|
||||
auth::auth_service_server::AuthServiceServer
|
||||
auth::auth_service_server::AuthServiceServer,
|
||||
search2::search2_server::Search2Server,
|
||||
};
|
||||
use search::{SearcherService, SearcherServer};
|
||||
|
||||
@@ -47,9 +49,8 @@ pub async fn run_server(db_pool: sqlx::PgPool) -> Result<(), Box<dyn std::error:
|
||||
};
|
||||
let table_script_service = TableScriptService { db_pool: db_pool.clone() };
|
||||
let auth_service = AuthServiceImpl { db_pool: db_pool.clone() };
|
||||
|
||||
// MODIFIED: Instantiate SearcherService with the database pool
|
||||
let search_service = SearcherService { pool: db_pool.clone() };
|
||||
let search2_service = Search2Service { db_pool: db_pool.clone() };
|
||||
|
||||
Server::builder()
|
||||
.add_service(TableStructureServiceServer::new(TableStructureHandler { db_pool: db_pool.clone() }))
|
||||
@@ -58,6 +59,7 @@ pub async fn run_server(db_pool: sqlx::PgPool) -> Result<(), Box<dyn std::error:
|
||||
.add_service(TableScriptServer::new(table_script_service))
|
||||
.add_service(AuthServiceServer::new(auth_service))
|
||||
.add_service(SearcherServer::new(search_service))
|
||||
.add_service(Search2Server::new(search2_service))
|
||||
.add_service(reflection_service)
|
||||
.serve(addr)
|
||||
.await?;
|
||||
|
||||
@@ -5,9 +5,11 @@ pub mod table_definition_service;
|
||||
pub mod tables_data_service;
|
||||
pub mod table_script_service;
|
||||
pub mod auth_service;
|
||||
pub mod search2_service;
|
||||
|
||||
pub use table_structure_service::TableStructureHandler;
|
||||
pub use table_definition_service::TableDefinitionService;
|
||||
pub use tables_data_service::TablesDataService;
|
||||
pub use table_script_service::TableScriptService;
|
||||
pub use auth_service::AuthServiceImpl;
|
||||
pub use search2_service::*;
|
||||
|
||||
202
server/src/server/services/search2_service.rs
Normal file
202
server/src/server/services/search2_service.rs
Normal file
@@ -0,0 +1,202 @@
|
||||
// src/server/services/search2_service.rs
|
||||
use tonic::{Request, Response, Status};
|
||||
use sqlx::PgPool;
|
||||
use sqlx::Row;
|
||||
use common::proto::komp_ac::search2::{
|
||||
search2_server::Search2,
|
||||
Search2Request, Search2Response, ColumnFilter, FilterType,
|
||||
search2_response::Hit,
|
||||
};
|
||||
use crate::shared::schema_qualifier::qualify_table_name_for_data;
|
||||
|
||||
pub struct Search2Service {
|
||||
pub db_pool: PgPool,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl Search2 for Search2Service {
|
||||
async fn search_table(
|
||||
&self,
|
||||
request: Request<Search2Request>,
|
||||
) -> Result<Response<Search2Response>, Status> {
|
||||
let req = request.into_inner();
|
||||
|
||||
// Build SQL query - NOW PASS DB_POOL AND AWAIT
|
||||
let (sql, bind_values, count_sql) = build_search_query(&self.db_pool, &req).await
|
||||
.map_err(|e| Status::invalid_argument(format!("Query build error: {}", e)))?;
|
||||
|
||||
// Execute main query
|
||||
let rows = execute_query(&self.db_pool, &sql, &bind_values).await
|
||||
.map_err(|e| Status::internal(format!("Database error: {}", e)))?;
|
||||
|
||||
// Execute count query for pagination
|
||||
let total_count = execute_count_query(&self.db_pool, &count_sql, &bind_values).await
|
||||
.map_err(|e| Status::internal(format!("Count query error: {}", e)))?;
|
||||
|
||||
// Convert to response format
|
||||
let hits = rows.into_iter().map(|row| {
|
||||
Hit {
|
||||
id: row.id,
|
||||
content_json: row.content_json,
|
||||
match_info: Some(format!("SQL search on table: {}", req.table_name)),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
Ok(Response::new(Search2Response {
|
||||
hits,
|
||||
total_count,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct QueryRow {
|
||||
id: i64,
|
||||
content_json: String,
|
||||
}
|
||||
|
||||
// MAKE THIS FUNCTION ASYNC AND ADD DB_POOL PARAMETER
|
||||
async fn build_search_query(
|
||||
db_pool: &PgPool,
|
||||
request: &Search2Request
|
||||
) -> Result<(String, Vec<String>, String), String> {
|
||||
|
||||
// Since Search2Request doesn't have profile_name, use "default"
|
||||
// You can add profile_name to the proto later if needed
|
||||
let profile_name = "default";
|
||||
|
||||
// NOW AWAIT THE ASYNC FUNCTION CALL
|
||||
let qualified_table = qualify_table_name_for_data(db_pool, profile_name, &request.table_name)
|
||||
.await
|
||||
.map_err(|e| format!("Invalid table name: {}", e))?;
|
||||
|
||||
let mut conditions = vec!["deleted = FALSE".to_string()];
|
||||
let mut bind_values = Vec::new();
|
||||
|
||||
// Build WHERE conditions from column filters
|
||||
for filter in &request.column_filters {
|
||||
let condition = build_filter_condition(filter, &mut bind_values)?;
|
||||
conditions.push(condition);
|
||||
}
|
||||
|
||||
// Add text search fallback if provided
|
||||
if let Some(text_query) = &request.text_query {
|
||||
if !text_query.trim().is_empty() {
|
||||
bind_values.push(format!("%{}%", text_query.to_lowercase()));
|
||||
conditions.push(format!(
|
||||
"EXISTS (SELECT 1 FROM jsonb_each_text(to_jsonb(t)) as kv(key, value) WHERE LOWER(value) LIKE ${})",
|
||||
bind_values.len()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let where_clause = conditions.join(" AND ");
|
||||
|
||||
// Build ORDER BY clause
|
||||
let order_clause = if let Some(order_by) = &request.order_by {
|
||||
let direction = if request.order_desc.unwrap_or(false) { "DESC" } else { "ASC" };
|
||||
format!("ORDER BY \"{}\" {}", order_by, direction)
|
||||
} else {
|
||||
"ORDER BY id DESC".to_string()
|
||||
};
|
||||
|
||||
let limit_clause = format!("LIMIT {}", request.limit.unwrap_or(100));
|
||||
|
||||
// Main query
|
||||
let sql = format!(
|
||||
"SELECT id, to_jsonb(t) AS data FROM {} t WHERE {} {} {}",
|
||||
qualified_table, where_clause, order_clause, limit_clause
|
||||
);
|
||||
|
||||
// Count query (for pagination)
|
||||
let count_sql = format!(
|
||||
"SELECT COUNT(*) FROM {} t WHERE {}",
|
||||
qualified_table, where_clause
|
||||
);
|
||||
|
||||
Ok((sql, bind_values, count_sql))
|
||||
}
|
||||
|
||||
fn build_filter_condition(filter: &ColumnFilter, bind_values: &mut Vec<String>) -> Result<String, String> {
|
||||
// FIX DEPRECATED WARNING - USE TryFrom INSTEAD
|
||||
let filter_type = FilterType::try_from(filter.filter_type)
|
||||
.map_err(|_| "Invalid filter type".to_string())?;
|
||||
|
||||
let param_idx = bind_values.len() + 1;
|
||||
|
||||
let condition = match filter_type {
|
||||
FilterType::Equals => {
|
||||
bind_values.push(filter.value.clone());
|
||||
format!("\"{}\" = ${}", filter.column_name, param_idx)
|
||||
},
|
||||
FilterType::Contains => {
|
||||
bind_values.push(format!("%{}%", filter.value));
|
||||
format!("\"{}\" ILIKE ${}", filter.column_name, param_idx)
|
||||
},
|
||||
FilterType::StartsWith => {
|
||||
bind_values.push(format!("{}%", filter.value));
|
||||
format!("\"{}\" ILIKE ${}", filter.column_name, param_idx)
|
||||
},
|
||||
FilterType::EndsWith => {
|
||||
bind_values.push(format!("%{}", filter.value));
|
||||
format!("\"{}\" ILIKE ${}", filter.column_name, param_idx)
|
||||
},
|
||||
FilterType::Range => {
|
||||
bind_values.push(filter.value.clone());
|
||||
bind_values.push(filter.value2.as_ref().unwrap_or(&"".to_string()).clone());
|
||||
format!("\"{}\" BETWEEN ${} AND ${}", filter.column_name, param_idx, param_idx + 1)
|
||||
},
|
||||
FilterType::GreaterThan => {
|
||||
bind_values.push(filter.value.clone());
|
||||
format!("\"{}\" > ${}", filter.column_name, param_idx)
|
||||
},
|
||||
FilterType::LessThan => {
|
||||
bind_values.push(filter.value.clone());
|
||||
format!("\"{}\" < ${}", filter.column_name, param_idx)
|
||||
},
|
||||
FilterType::IsNull => {
|
||||
format!("\"{}\" IS NULL", filter.column_name)
|
||||
},
|
||||
FilterType::IsNotNull => {
|
||||
format!("\"{}\" IS NOT NULL", filter.column_name)
|
||||
},
|
||||
};
|
||||
|
||||
Ok(condition)
|
||||
}
|
||||
|
||||
async fn execute_query(pool: &PgPool, sql: &str, bind_values: &[String]) -> Result<Vec<QueryRow>, sqlx::Error> {
|
||||
let mut query = sqlx::query(sql);
|
||||
|
||||
// Bind all parameters
|
||||
for value in bind_values {
|
||||
query = query.bind(value);
|
||||
}
|
||||
|
||||
let rows = query.fetch_all(pool).await?;
|
||||
|
||||
let results = rows.into_iter().map(|row| {
|
||||
QueryRow {
|
||||
id: row.try_get("id").unwrap_or(0),
|
||||
content_json: row.try_get::<serde_json::Value, _>("data")
|
||||
.unwrap_or(serde_json::Value::Null)
|
||||
.to_string(),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
async fn execute_count_query(pool: &PgPool, sql: &str, bind_values: &[String]) -> Result<i32, sqlx::Error> {
|
||||
let mut query = sqlx::query(sql);
|
||||
|
||||
// Bind all parameters
|
||||
for value in bind_values {
|
||||
query = query.bind(value);
|
||||
}
|
||||
|
||||
let row = query.fetch_one(pool).await?;
|
||||
let count: i64 = row.try_get(0).unwrap_or(0);
|
||||
|
||||
Ok(count as i32)
|
||||
}
|
||||
Reference in New Issue
Block a user