// src/table_script/handlers/post_table_script.rs // TODO MAKE THE SCRIPTS PUSH ONLY TO THE EMPTY FILES use tonic::Status; use sqlx::{PgPool, Error as SqlxError}; use common::proto::multieko2::table_script::{PostTableScriptRequest, TableScriptResponse}; use serde_json::Value; use steel_decimal::SteelDecimal; use crate::table_script::handlers::dependency_analyzer::DependencyAnalyzer; const SYSTEM_COLUMNS: &[&str] = &["id", "deleted", "created_at"]; /// Validates the target column and ensures it is not a system column. /// Returns the column type if valid. fn validate_target_column( table_name: &str, target: &str, table_columns: &Value, ) -> Result { if SYSTEM_COLUMNS.contains(&target) { return Err(format!("Cannot override system column: {}", target)); } // Parse the columns JSON into a vector of strings let columns: Vec = serde_json::from_value(table_columns.clone()) .map_err(|e| format!("Invalid column data: {}", e))?; // Extract column names and types let column_info: Vec<(&str, &str)> = columns .iter() .filter_map(|c| { let mut parts = c.split_whitespace(); let name = parts.next()?.trim_matches('"'); let data_type = parts.next()?; Some((name, data_type)) }) .collect(); // Find the target column and return its type column_info .iter() .find(|(name, _)| *name == target) .map(|(_, dt)| dt.to_string()) .ok_or_else(|| format!("Target column '{}' not defined in table '{}'", target, table_name)) } /// Handles the creation of a new table script with dependency validation. pub async fn post_table_script( db_pool: &PgPool, request: PostTableScriptRequest, ) -> Result { // Start a transaction for ALL operations - critical for atomicity let mut tx = db_pool.begin().await .map_err(|e| Status::internal(format!("Failed to start transaction: {}", e)))?; // Fetch the table definition let table_def = sqlx::query!( r#"SELECT id, table_name, columns, schema_id FROM table_definitions WHERE id = $1"#, request.table_definition_id ) .fetch_optional(&mut *tx) .await .map_err(|e| Status::internal(format!("Failed to fetch table definition: {}", e)))? .ok_or_else(|| Status::not_found("Table definition not found"))?; // Validate the target column and get its type let column_type = validate_target_column( &table_def.table_name, &request.target_column, &table_def.columns, ) .map_err(|e| Status::invalid_argument(e))?; // Create dependency analyzer for this schema let analyzer = DependencyAnalyzer::new(table_def.schema_id, db_pool.clone()); // Analyze script dependencies let dependencies = analyzer .analyze_script_dependencies(&request.script) .map_err(|e| Status::from(e))?; // Check for circular dependencies BEFORE making any changes // Pass the transaction to ensure we see any existing dependencies analyzer .check_for_cycles(&mut tx, table_def.id, &dependencies) .await .map_err(|e| Status::from(e))?; // Transform the script using steel_decimal let steel_decimal = SteelDecimal::new(); let parsed_script = steel_decimal.transform(&request.script); // Insert or update the script let script_record = sqlx::query!( r#"INSERT INTO table_scripts (table_definitions_id, target_table, target_column, target_column_type, script, description, schema_id) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (table_definitions_id, target_column) DO UPDATE SET script = EXCLUDED.script, description = EXCLUDED.description, target_column_type = EXCLUDED.target_column_type RETURNING id"#, request.table_definition_id, table_def.table_name, request.target_column, column_type, parsed_script, request.description, table_def.schema_id ) .fetch_one(&mut *tx) .await .map_err(|e| { match e { SqlxError::Database(db_err) if db_err.constraint() == Some("table_scripts_table_definitions_id_target_column_key") => { Status::already_exists("Script already exists for this column") } _ => Status::internal(format!("Failed to insert script: {}", e)), } })?; // Save the dependencies within the same transaction analyzer .save_dependencies(&mut tx, script_record.id, table_def.id, &dependencies) .await .map_err(|e| Status::from(e))?; // Only now commit the entire transaction - script + dependencies together tx.commit().await .map_err(|e| Status::internal(format!("Failed to commit transaction: {}", e)))?; // Generate warnings for potential issues let warnings = generate_warnings(&dependencies, &table_def.table_name); Ok(TableScriptResponse { id: script_record.id, warnings, }) } /// Generate helpful warnings for script dependencies fn generate_warnings(dependencies: &[crate::table_script::handlers::dependency_analyzer::Dependency], table_name: &str) -> String { let mut warnings = Vec::new(); // Check for self-references if dependencies.iter().any(|d| d.target_table == table_name) { warnings.push("Warning: Script references its own table, which may cause issues during initial population.".to_string()); } // Check for complex SQL queries let sql_deps_count = dependencies.iter() .filter(|d| matches!(d.dependency_type, crate::table_script::handlers::dependency_analyzer::DependencyType::SqlQuery { .. })) .count(); if sql_deps_count > 0 { warnings.push(format!( "Warning: Script contains {} raw SQL quer{}, ensure they are read-only and reference valid tables.", sql_deps_count, if sql_deps_count == 1 { "y" } else { "ies" } )); } // Check for many dependencies if dependencies.len() > 5 { warnings.push(format!( "Warning: Script depends on {} tables, which may affect processing performance.", dependencies.len() )); } // Count structured access dependencies let structured_deps_count = dependencies.iter() .filter(|d| matches!( d.dependency_type, crate::table_script::handlers::dependency_analyzer::DependencyType::ColumnAccess { .. } | crate::table_script::handlers::dependency_analyzer::DependencyType::IndexedAccess { .. } )) .count(); if structured_deps_count > 0 { warnings.push(format!( "Info: Script uses {} linked table{} via steel_get_column functions.", structured_deps_count, if structured_deps_count == 1 { "" } else { "s" } )); } warnings.join(" ") }