diff --git a/Cargo.lock b/Cargo.lock index 8e39ec1..b2858e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -449,6 +449,7 @@ dependencies = [ "dotenvy", "lazy_static", "prost", + "prost-types", "ratatui", "serde", "serde_json", @@ -487,6 +488,7 @@ name = "common" version = "0.3.13" dependencies = [ "prost", + "prost-types", "serde", "tantivy", "tonic", @@ -2843,6 +2845,7 @@ dependencies = [ "jsonwebtoken", "lazy_static", "prost", + "prost-types", "regex", "rstest", "rust-stemmers", diff --git a/Cargo.toml b/Cargo.toml index 1a875ed..fcd15d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ tokio = { version = "1.44.2", features = ["full"] } tonic = "0.13.0" prost = "0.13.5" async-trait = "0.1.88" +prost-types = "0.13.0" # Data Handling & Serialization serde = { version = "1.0.219", features = ["derive"] } diff --git a/client/Cargo.toml b/client/Cargo.toml index 223f580..3fdf47e 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -9,6 +9,7 @@ anyhow = "1.0.98" async-trait = "0.1.88" common = { path = "../common" } +prost-types = { workspace = true } crossterm = "0.28.1" dirs = "6.0.0" dotenvy = "0.15.7" diff --git a/client/src/services/grpc_client.rs b/client/src/services/grpc_client.rs index f2e2eea..1f793d0 100644 --- a/client/src/services/grpc_client.rs +++ b/client/src/services/grpc_client.rs @@ -23,8 +23,9 @@ use common::proto::multieko2::tables_data::{ use common::proto::multieko2::search::{ searcher_client::SearcherClient, SearchRequest, SearchResponse, }; -use anyhow::{Context, Result}; // Added Context -use std::collections::HashMap; // NEW +use anyhow::{Context, Result}; +use std::collections::HashMap; +use prost_types::Value; #[derive(Clone)] pub struct GrpcClient { @@ -48,7 +49,6 @@ impl GrpcClient { TableDefinitionClient::new(channel.clone()); let table_script_client = TableScriptClient::new(channel.clone()); let tables_data_client = TablesDataClient::new(channel.clone()); - // NEW: Instantiate the search client let search_client = SearcherClient::new(channel.clone()); Ok(Self { @@ -56,7 +56,7 @@ impl GrpcClient { table_definition_client, table_script_client, tables_data_client, - search_client, // NEW + search_client, }) } @@ -135,7 +135,7 @@ impl GrpcClient { Ok(response.into_inner().count as u64) } - pub async fn get_table_data_by_position( +pub async fn get_table_data_by_position( &mut self, profile_name: String, table_name: String, @@ -155,16 +155,22 @@ impl GrpcClient { Ok(response.into_inner()) } - pub async fn post_table_data( +pub async fn post_table_data( &mut self, profile_name: String, table_name: String, data: HashMap, ) -> Result { + // 2. CONVERT THE HASHMAP + let data: HashMap = data + .into_iter() + .map(|(k, v)| (k, Value::from(v))) + .collect(); + let grpc_request = PostTableDataRequest { profile_name, table_name, - data, + data, // This is now the correct type }; let request = tonic::Request::new(grpc_request); let response = self @@ -182,11 +188,17 @@ impl GrpcClient { id: i64, data: HashMap, ) -> Result { + // 2. CONVERT THE HASHMAP + let data: HashMap = data + .into_iter() + .map(|(k, v)| (k, Value::from(v))) + .collect(); + let grpc_request = PutTableDataRequest { profile_name, table_name, id, - data, + data, // This is now the correct type }; let request = tonic::Request::new(grpc_request); let response = self diff --git a/common/Cargo.toml b/common/Cargo.toml index e5f8b7b..5d19597 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true license.workspace = true [dependencies] +prost-types = { workspace = true } + tonic = "0.13.0" prost = "0.13.5" serde = { version = "1.0.219", features = ["derive"] } diff --git a/common/proto/tables_data.proto b/common/proto/tables_data.proto index c0e613c..ccef666 100644 --- a/common/proto/tables_data.proto +++ b/common/proto/tables_data.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package multieko2.tables_data; import "common.proto"; +import "google/protobuf/struct.proto"; service TablesData { rpc PostTableData (PostTableDataRequest) returns (PostTableDataResponse); @@ -16,7 +17,7 @@ service TablesData { message PostTableDataRequest { string profile_name = 1; string table_name = 2; - map data = 3; + map data = 3; } message PostTableDataResponse { @@ -29,7 +30,7 @@ message PutTableDataRequest { string profile_name = 1; string table_name = 2; int64 id = 3; - map data = 4; + map data = 4; } message PutTableDataResponse { diff --git a/common/src/proto/descriptor.bin b/common/src/proto/descriptor.bin index 642c133..30d062c 100644 Binary files a/common/src/proto/descriptor.bin and b/common/src/proto/descriptor.bin differ diff --git a/common/src/proto/multieko2.tables_data.rs b/common/src/proto/multieko2.tables_data.rs index 1ae4d33..3aecea0 100644 --- a/common/src/proto/multieko2.tables_data.rs +++ b/common/src/proto/multieko2.tables_data.rs @@ -5,10 +5,10 @@ pub struct PostTableDataRequest { pub profile_name: ::prost::alloc::string::String, #[prost(string, tag = "2")] pub table_name: ::prost::alloc::string::String, - #[prost(map = "string, string", tag = "3")] + #[prost(map = "string, message", tag = "3")] pub data: ::std::collections::HashMap< ::prost::alloc::string::String, - ::prost::alloc::string::String, + ::prost_types::Value, >, } #[derive(Clone, PartialEq, ::prost::Message)] @@ -28,10 +28,10 @@ pub struct PutTableDataRequest { pub table_name: ::prost::alloc::string::String, #[prost(int64, tag = "3")] pub id: i64, - #[prost(map = "string, string", tag = "4")] + #[prost(map = "string, message", tag = "4")] pub data: ::std::collections::HashMap< ::prost::alloc::string::String, - ::prost::alloc::string::String, + ::prost_types::Value, >, } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/server/Cargo.toml b/server/Cargo.toml index 56c8f03..235806f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,6 +10,7 @@ search = { path = "../search" } anyhow = { workspace = true } tantivy = { workspace = true } +prost-types = { workspace = true } chrono = { version = "0.4.40", features = ["serde"] } dotenvy = "0.15.7" prost = "0.13.5" diff --git a/server/src/tables_data/handlers/post_table_data.rs b/server/src/tables_data/handlers/post_table_data.rs index cb310d1..af761b2 100644 --- a/server/src/tables_data/handlers/post_table_data.rs +++ b/server/src/tables_data/handlers/post_table_data.rs @@ -8,16 +8,15 @@ use common::proto::multieko2::tables_data::{PostTableDataRequest, PostTableDataR use std::collections::HashMap; use std::sync::Arc; use crate::shared::schema_qualifier::qualify_table_name_for_data; +use prost_types::value::Kind; // NEW: Import the Kind enum use crate::steel::server::execution::{self, Value}; use crate::steel::server::functions::SteelContext; -// Add these imports use crate::indexer::{IndexCommand, IndexCommandData}; use tokio::sync::mpsc; use tracing::error; -// MODIFIED: Function signature now accepts the indexer sender pub async fn post_table_data( db_pool: &PgPool, request: PostTableDataRequest, @@ -25,11 +24,12 @@ pub async fn post_table_data( ) -> Result { let profile_name = request.profile_name; let table_name = request.table_name; - let mut data = HashMap::new(); - for (key, value) in request.data { - data.insert(key, value.trim().to_string()); - } + // REMOVED: The old data conversion loop. We will process request.data directly. + // let mut data = HashMap::new(); + // for (key, value) in request.data { + // data.insert(key, value.trim().to_string()); + // } // Lookup profile let profile = sqlx::query!( @@ -94,13 +94,28 @@ pub async fn post_table_data( // Validate all data columns let user_columns: Vec<&String> = columns.iter().map(|(name, _)| name).collect(); - for key in data.keys() { + for key in request.data.keys() { // CHANGED: Use request.data if !system_columns_set.contains(key.as_str()) && !user_columns.contains(&&key.to_string()) { return Err(Status::invalid_argument(format!("Invalid column: {}", key))); } } + // NEW: Create a string-based map for backwards compatibility with Steel scripts. + let mut string_data_for_scripts = HashMap::new(); + for (key, proto_value) in &request.data { + let str_val = match &proto_value.kind { + Some(Kind::StringValue(s)) => s.clone(), + Some(Kind::NumberValue(n)) => n.to_string(), + Some(Kind::BoolValue(b)) => b.to_string(), + Some(Kind::NullValue(_)) => String::new(), + Some(Kind::StructValue(_)) | Some(Kind::ListValue(_)) | None => { + return Err(Status::invalid_argument(format!("Unsupported type for script validation in column '{}'", key))); + } + }; + string_data_for_scripts.insert(key.clone(), str_val); + } + // Validate Steel scripts let scripts = sqlx::query!( "SELECT target_column, script FROM table_scripts WHERE table_definitions_id = $1", @@ -114,20 +129,20 @@ pub async fn post_table_data( let target_column = script_record.target_column; // Ensure target column exists in submitted data - let user_value = data.get(&target_column) + let user_value = string_data_for_scripts.get(&target_column) // CHANGED: Use the new string map .ok_or_else(|| Status::invalid_argument( format!("Script target column '{}' is required", target_column) ))?; // Create execution context let context = SteelContext { - current_table: table_name.clone(), // Keep base name for scripts + current_table: table_name.clone(), profile_id, - row_data: data.clone(), + row_data: string_data_for_scripts.clone(), // CHANGED: Use the new string map db_pool: Arc::new(db_pool.clone()), }; - // Execute validation script + // ... (rest of script execution is the same) let script_result = execution::execute_script( script_record.script, "STRINGS", @@ -138,7 +153,6 @@ pub async fn post_table_data( format!("Script execution failed for '{}': {}", target_column, e) ))?; - // Validate script output let Value::Strings(mut script_output) = script_result else { return Err(Status::internal("Script must return string values")); }; @@ -160,11 +174,12 @@ pub async fn post_table_data( let mut placeholders = Vec::new(); let mut param_idx = 1; - for (col, value) in data { + // CHANGED: This entire loop is rewritten to handle prost_types::Value + for (col, proto_value) in request.data { let sql_type = if system_columns_set.contains(col.as_str()) { match col.as_str() { "deleted" => "BOOLEAN", - _ if col.ends_with("_id") => "BIGINT", // Handle foreign keys + _ if col.ends_with("_id") => "BIGINT", _ => return Err(Status::invalid_argument("Invalid system column")), } } else { @@ -174,36 +189,55 @@ pub async fn post_table_data( .ok_or_else(|| Status::invalid_argument(format!("Column not found: {}", col)))? }; + let kind = proto_value.kind.ok_or_else(|| { + Status::invalid_argument(format!("Value for column '{}' cannot be null", col)) + })?; + match sql_type { "TEXT" | "VARCHAR(15)" | "VARCHAR(255)" => { - if let Some(max_len) = sql_type.strip_prefix("VARCHAR(") - .and_then(|s| s.strip_suffix(')')) - .and_then(|s| s.parse::().ok()) - { - if value.len() > max_len { - return Err(Status::internal(format!("Value too long for {}", col))); + if let Kind::StringValue(value) = kind { + if let Some(max_len) = sql_type.strip_prefix("VARCHAR(") + .and_then(|s| s.strip_suffix(')')) + .and_then(|s| s.parse::().ok()) + { + if value.len() > max_len { + return Err(Status::internal(format!("Value too long for {}", col))); + } } + params.add(value) + .map_err(|e| Status::invalid_argument(format!("Failed to add text parameter for {}: {}", col, e)))?; + } else { + return Err(Status::invalid_argument(format!("Expected string for column '{}'", col))); } - params.add(value) - .map_err(|e| Status::invalid_argument(format!("Failed to add text parameter for {}: {}", col, e)))?; }, "BOOLEAN" => { - let val = value.parse::() - .map_err(|_| Status::invalid_argument(format!("Invalid boolean for {}", col)))?; - params.add(val) - .map_err(|e| Status::invalid_argument(format!("Failed to add boolean parameter for {}: {}", col, e)))?; + if let Kind::BoolValue(val) = kind { + params.add(val) + .map_err(|e| Status::invalid_argument(format!("Failed to add boolean parameter for {}: {}", col, e)))?; + } else { + return Err(Status::invalid_argument(format!("Expected boolean for column '{}'", col))); + } }, "TIMESTAMPTZ" => { - let dt = DateTime::parse_from_rfc3339(&value) - .map_err(|_| Status::invalid_argument(format!("Invalid timestamp for {}", col)))?; - params.add(dt.with_timezone(&Utc)) - .map_err(|e| Status::invalid_argument(format!("Failed to add timestamp parameter for {}: {}", col, e)))?; + if let Kind::StringValue(value) = kind { + let dt = DateTime::parse_from_rfc3339(&value) + .map_err(|_| Status::invalid_argument(format!("Invalid timestamp for {}", col)))?; + params.add(dt.with_timezone(&Utc)) + .map_err(|e| Status::invalid_argument(format!("Failed to add timestamp parameter for {}: {}", col, e)))?; + } else { + return Err(Status::invalid_argument(format!("Expected ISO 8601 string for column '{}'", col))); + } }, "BIGINT" => { - let val = value.parse::() - .map_err(|_| Status::invalid_argument(format!("Invalid integer for {}", col)))?; - params.add(val) - .map_err(|e| Status::invalid_argument(format!("Failed to add integer parameter for {}: {}", col, e)))?; + if let Kind::NumberValue(val) = kind { + if val.fract() != 0.0 { + return Err(Status::invalid_argument(format!("Expected integer for column '{}', but got a float", col))); + } + params.add(val as i64) + .map_err(|e| Status::invalid_argument(format!("Failed to add integer parameter for {}: {}", col, e)))?; + } else { + return Err(Status::invalid_argument(format!("Expected number for column '{}'", col))); + } }, _ => return Err(Status::invalid_argument(format!("Unsupported type {}", sql_type))), } @@ -227,7 +261,7 @@ pub async fn post_table_data( placeholders.join(", ") ); - // Execute query with enhanced error handling + // ... (rest of the function is unchanged) let result = sqlx::query_scalar_with::<_, i64, _>(&sql, params) .fetch_one(db_pool) .await; @@ -235,7 +269,6 @@ pub async fn post_table_data( let inserted_id = match result { Ok(id) => id, Err(e) => { - // Handle "relation does not exist" error specifically if let Some(db_err) = e.as_database_error() { if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) { return Err(Status::internal(format!( @@ -248,15 +281,12 @@ pub async fn post_table_data( } }; - // After a successful insert, send a command to the indexer. let command = IndexCommand::AddOrUpdate(IndexCommandData { table_name: table_name.clone(), row_id: inserted_id, }); if let Err(e) = indexer_tx.send(command).await { - // If sending fails, the DB is updated but the index will be stale. - // This is a critical situation to log and monitor. error!( "CRITICAL: DB insert for table '{}' (id: {}) succeeded but failed to queue for indexing: {}. Search index is now inconsistent.", table_name, inserted_id, e diff --git a/server/src/tables_data/handlers/put_table_data.rs b/server/src/tables_data/handlers/put_table_data.rs index 6eef2d1..8f61338 100644 --- a/server/src/tables_data/handlers/put_table_data.rs +++ b/server/src/tables_data/handlers/put_table_data.rs @@ -4,8 +4,8 @@ use sqlx::{PgPool, Arguments, Postgres}; use sqlx::postgres::PgArguments; use chrono::{DateTime, Utc}; use common::proto::multieko2::tables_data::{PutTableDataRequest, PutTableDataResponse}; -use std::collections::HashMap; -use crate::shared::schema_qualifier::qualify_table_name_for_data; // Import schema qualifier +use crate::shared::schema_qualifier::qualify_table_name_for_data; +use prost_types::value::Kind; pub async fn put_table_data( db_pool: &PgPool, @@ -15,20 +15,9 @@ pub async fn put_table_data( let table_name = request.table_name; let record_id = request.id; - // Preprocess and validate data - let mut processed_data = HashMap::new(); - let mut null_fields = Vec::new(); - - // CORRECTED: Generic handling for all fields. - // Any field with an empty string will be added to the null_fields list. - // The special, hardcoded logic for "firma" has been removed. - for (key, value) in request.data { - let trimmed = value.trim().to_string(); - if trimmed.is_empty() { - null_fields.push(key); - } else { - processed_data.insert(key, trimmed); - } + // If no data is provided to update, it's an invalid request. + if request.data.is_empty() { + return Err(Status::invalid_argument("No fields provided to update.")); } // Lookup profile @@ -70,14 +59,29 @@ pub async fn put_table_data( columns.push((name, sql_type)); } - // CORRECTED: "firma" is not a system column. - // It should be treated as a user-defined column. - let system_columns = ["deleted"]; + // Get all foreign key columns for this table (needed for validation) + let fk_columns = sqlx::query!( + r#"SELECT ltd.table_name + FROM table_definition_links tdl + JOIN table_definitions ltd ON tdl.linked_table_id = ltd.id + WHERE tdl.source_table_id = $1"#, + table_def.id + ) + .fetch_all(db_pool) + .await + .map_err(|e| Status::internal(format!("Foreign key lookup error: {}", e)))?; + + let mut system_columns = vec!["deleted".to_string()]; + for fk in fk_columns { + let base_name = fk.table_name.split_once('_').map_or(fk.table_name.as_str(), |(_, rest)| rest); + system_columns.push(format!("{}_id", base_name)); + } + let system_columns_set: std::collections::HashSet<_> = system_columns.iter().map(|s| s.as_str()).collect(); let user_columns: Vec<&String> = columns.iter().map(|(name, _)| name).collect(); // Validate input columns - for key in processed_data.keys() { - if !system_columns.contains(&key.as_str()) && !user_columns.contains(&key) { + for key in request.data.keys() { + if !system_columns_set.contains(key.as_str()) && !user_columns.contains(&key) { return Err(Status::invalid_argument(format!("Invalid column: {}", key))); } } @@ -87,54 +91,65 @@ pub async fn put_table_data( let mut set_clauses = Vec::new(); let mut param_idx = 1; - // Add data parameters for non-empty fields - for (col, value) in &processed_data { - // CORRECTED: The logic for "firma" is removed from this match. - // It will now fall through to the `else` block and have its type - // correctly looked up from the `columns` vector. - let sql_type = if system_columns.contains(&col.as_str()) { + for (col, proto_value) in request.data { + let sql_type = if system_columns_set.contains(col.as_str()) { match col.as_str() { "deleted" => "BOOLEAN", + _ if col.ends_with("_id") => "BIGINT", _ => return Err(Status::invalid_argument("Invalid system column")), } } else { columns.iter() - .find(|(name, _)| name == col) + .find(|(name, _)| name == &col) .map(|(_, sql_type)| sql_type.as_str()) .ok_or_else(|| Status::invalid_argument(format!("Column not found: {}", col)))? }; + // A provided value cannot be null or empty in a PUT request. + // To clear a field, it should be set to an empty string "" for text, + // or a specific value for other types if needed (though typically not done). + // For now, we reject nulls. + let kind = proto_value.kind.ok_or_else(|| { + Status::invalid_argument(format!("Value for column '{}' cannot be empty in a PUT request. To clear a text field, send an empty string.", col)) + })?; + match sql_type { "TEXT" | "VARCHAR(15)" | "VARCHAR(255)" => { - if let Some(max_len) = sql_type.strip_prefix("VARCHAR(") - .and_then(|s| s.strip_suffix(')')) - .and_then(|s| s.parse::().ok()) - { - if value.len() > max_len { - return Err(Status::internal(format!("Value too long for {}", col))); - } + if let Kind::StringValue(value) = kind { + params.add(value) + .map_err(|e| Status::internal(format!("Failed to add text parameter for {}: {}", col, e)))?; + } else { + return Err(Status::invalid_argument(format!("Expected string for column '{}'", col))); } - params.add(value) - .map_err(|e| Status::internal(format!("Failed to add text parameter for {}: {}", col, e)))?; }, "BOOLEAN" => { - let val = value.parse::() - .map_err(|_| Status::invalid_argument(format!("Invalid boolean for {}", col)))?; - params.add(val) - .map_err(|e| Status::internal(format!("Failed to add boolean parameter for {}: {}", col, e)))?; + if let Kind::BoolValue(val) = kind { + params.add(val) + .map_err(|e| Status::internal(format!("Failed to add boolean parameter for {}: {}", col, e)))?; + } else { + return Err(Status::invalid_argument(format!("Expected boolean for column '{}'", col))); + } }, "TIMESTAMPTZ" => { - let dt = DateTime::parse_from_rfc3339(value) - .map_err(|_| Status::invalid_argument(format!("Invalid timestamp for {}", col)))?; - params.add(dt.with_timezone(&Utc)) - .map_err(|e| Status::internal(format!("Failed to add timestamp parameter for {}: {}", col, e)))?; + if let Kind::StringValue(value) = kind { + let dt = DateTime::parse_from_rfc3339(&value) + .map_err(|_| Status::invalid_argument(format!("Invalid timestamp for {}", col)))?; + params.add(dt.with_timezone(&Utc)) + .map_err(|e| Status::internal(format!("Failed to add timestamp parameter for {}: {}", col, e)))?; + } else { + return Err(Status::invalid_argument(format!("Expected ISO 8601 string for column '{}'", col))); + } }, - // ADDED: BIGINT handling for completeness, if needed for other columns. "BIGINT" => { - let val = value.parse::() - .map_err(|_| Status::invalid_argument(format!("Invalid integer for {}", col)))?; - params.add(val) - .map_err(|e| Status::internal(format!("Failed to add integer parameter for {}: {}", col, e)))?; + if let Kind::NumberValue(val) = kind { + if val.fract() != 0.0 { + return Err(Status::invalid_argument(format!("Expected integer for column '{}', but got a float", col))); + } + params.add(val as i64) + .map_err(|e| Status::internal(format!("Failed to add integer parameter for {}: {}", col, e)))?; + } else { + return Err(Status::invalid_argument(format!("Expected number for column '{}'", col))); + } }, _ => return Err(Status::invalid_argument(format!("Unsupported type {}", sql_type))), } @@ -143,27 +158,10 @@ pub async fn put_table_data( param_idx += 1; } - // Add NULL clauses for empty fields - for field in null_fields { - // Make sure the field is valid - if !system_columns.contains(&field.as_str()) && !user_columns.contains(&&field) { - return Err(Status::invalid_argument(format!("Invalid column to set NULL: {}", field))); - } - set_clauses.push(format!("\"{}\" = NULL", field)); - } - - // Ensure we have at least one field to update - if set_clauses.is_empty() { - return Err(Status::invalid_argument("No valid fields to update")); - } - - // Add ID parameter at the end params.add(record_id) .map_err(|e| Status::internal(format!("Failed to add record_id parameter: {}", e)))?; - // Qualify table name with schema let qualified_table = qualify_table_name_for_data(&table_name)?; - let set_clause = set_clauses.join(", "); let sql = format!( "UPDATE {} SET {} WHERE id = ${} AND deleted = FALSE RETURNING id", @@ -184,7 +182,6 @@ pub async fn put_table_data( }), Ok(None) => Err(Status::not_found("Record not found or already deleted")), Err(e) => { - // Handle "relation does not exist" error specifically if let Some(db_err) = e.as_database_error() { if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) { return Err(Status::internal(format!(