diff --git a/server/migrations/20250710201933_create_script_dependencies.sql b/server/migrations/20250710201933_create_script_dependencies.sql new file mode 100644 index 0000000..62c3575 --- /dev/null +++ b/server/migrations/20250710201933_create_script_dependencies.sql @@ -0,0 +1,47 @@ +-- migrations/20250710201933_create_script_dependencies.sql + +-- This table stores the dependency graph for table scripts +-- More efficient design with better indexing +CREATE TABLE script_dependencies ( + id BIGSERIAL PRIMARY KEY, + + -- The script that creates this dependency + script_id BIGINT NOT NULL REFERENCES table_scripts(id) ON DELETE CASCADE, + + -- The table that depends on another (source of dependency) + source_table_id BIGINT NOT NULL REFERENCES table_definitions(id) ON DELETE CASCADE, + + -- The table being depended upon (target of dependency) + target_table_id BIGINT NOT NULL REFERENCES table_definitions(id) ON DELETE CASCADE, + + -- What type of dependency (for better debugging) + dependency_type TEXT NOT NULL CHECK (dependency_type IN ('column_access', 'sql_query', 'indexed_access')), + + -- Additional context (column name, query snippet, etc.) + context_info JSONB, + + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + + -- Prevent duplicate dependencies for the same script + UNIQUE (script_id, source_table_id, target_table_id, dependency_type) +); + +-- Optimized indexes for fast cycle detection +CREATE INDEX idx_script_deps_source_target ON script_dependencies (source_table_id, target_table_id); +CREATE INDEX idx_script_deps_by_script ON script_dependencies (script_id); +CREATE INDEX idx_script_deps_by_target ON script_dependencies (target_table_id); + +-- View for easy dependency analysis +CREATE VIEW dependency_graph AS +SELECT + sd.source_table_id, + sd.target_table_id, + st.table_name AS source_table, + tt.table_name AS target_table, + sd.dependency_type, + sd.context_info, + s.name AS schema_name +FROM script_dependencies sd +JOIN table_definitions st ON sd.source_table_id = st.id +JOIN table_definitions tt ON sd.target_table_id = tt.id +JOIN schemas s ON st.schema_id = s.id; diff --git a/server/src/table_script/handlers.rs b/server/src/table_script/handlers.rs index da0a988..dd95ada 100644 --- a/server/src/table_script/handlers.rs +++ b/server/src/table_script/handlers.rs @@ -1,4 +1,8 @@ // src/table_script/handlers.rs +pub mod dependency_analyzer; +pub mod dependency_utils; pub mod post_table_script; -pub use post_table_script::post_table_script; +pub use dependency_analyzer::{DependencyAnalyzer, DependencyError, Dependency, DependencyType}; +pub use dependency_utils::*; +pub use post_table_script::*; diff --git a/server/src/table_script/handlers/dependency_analyzer.rs b/server/src/table_script/handlers/dependency_analyzer.rs new file mode 100644 index 0000000..084c875 --- /dev/null +++ b/server/src/table_script/handlers/dependency_analyzer.rs @@ -0,0 +1,404 @@ +use std::collections::HashMap; +use tonic::Status; +use sqlx::PgPool; +use serde_json::{json, Value}; + +#[derive(Clone, Copy, PartialEq)] +enum NodeState { + Unvisited, + Visiting, // Currently in recursion stack + Visited, // Completely processed +} + +#[derive(Debug, Clone)] +pub struct Dependency { + pub target_table: String, + pub dependency_type: DependencyType, + pub context: Option, +} + +#[derive(Debug, Clone)] +pub enum DependencyType { + ColumnAccess { column: String }, + IndexedAccess { column: String, index: i64 }, + SqlQuery { query_fragment: String }, +} + +impl DependencyType { + pub fn as_str(&self) -> &'static str { + match self { + DependencyType::ColumnAccess { .. } => "column_access", + DependencyType::IndexedAccess { .. } => "indexed_access", + DependencyType::SqlQuery { .. } => "sql_query", + } + } + + pub fn context_json(&self) -> Value { + match self { + DependencyType::ColumnAccess { column } => { + json!({ "column": column }) + } + DependencyType::IndexedAccess { column, index } => { + json!({ "column": column, "index": index }) + } + DependencyType::SqlQuery { query_fragment } => { + json!({ "query_fragment": query_fragment }) + } + } + } +} + +#[derive(Debug)] +pub enum DependencyError { + CircularDependency { + cycle_path: Vec, + involving_script: String + }, + InvalidTableReference { + table_name: String, + script_context: String + }, + ScriptParseError { + error: String + }, + DatabaseError { + error: String + }, +} + +impl From for Status { + fn from(error: DependencyError) -> Self { + match error { + DependencyError::CircularDependency { cycle_path, involving_script } => { + Status::failed_precondition(format!( + "Circular dependency detected in script for '{}': {}", + involving_script, + cycle_path.join(" -> ") + )) + } + DependencyError::InvalidTableReference { table_name, script_context } => { + Status::not_found(format!( + "Table '{}' referenced in script '{}' does not exist", + table_name, script_context + )) + } + DependencyError::ScriptParseError { error } => { + Status::invalid_argument(format!("Script parsing failed: {}", error)) + } + DependencyError::DatabaseError { error } => { + Status::internal(format!("Database error: {}", error)) + } + } + } +} + +pub struct DependencyAnalyzer { + schema_id: i64, + pool: PgPool, +} + +impl DependencyAnalyzer { + pub fn new(schema_id: i64, pool: PgPool) -> Self { + Self { schema_id, pool } + } + + /// Analyzes a Steel script to extract all table dependencies + /// Uses regex patterns to find function calls that create dependencies + pub fn analyze_script_dependencies(&self, script: &str) -> Result, DependencyError> { + let mut dependencies = Vec::new(); + + // Extract function calls and SQL dependencies using regex + dependencies.extend(self.extract_function_calls(script)?); + dependencies.extend(self.extract_sql_dependencies(script)?); + + Ok(dependencies) + } + + /// Extract function calls using regex patterns + fn extract_function_calls(&self, script: &str) -> Result, DependencyError> { + let mut dependencies = Vec::new(); + + // Look for steel_get_column patterns + let column_pattern = regex::Regex::new(r#"\(\s*steel_get_column\s+"([^"]+)"\s+"([^"]+)""#) + .map_err(|e: regex::Error| DependencyError::ScriptParseError { error: e.to_string() })?; + + for caps in column_pattern.captures_iter(script) { + let table = caps[1].to_string(); + let column = caps[2].to_string(); + dependencies.push(Dependency { + target_table: table, + dependency_type: DependencyType::ColumnAccess { column }, + context: None, + }); + } + + // Look for steel_get_column_with_index patterns + let indexed_pattern = regex::Regex::new(r#"\(\s*steel_get_column_with_index\s+"([^"]+)"\s+(\d+)\s+"([^"]+)""#) + .map_err(|e: regex::Error| DependencyError::ScriptParseError { error: e.to_string() })?; + + for caps in indexed_pattern.captures_iter(script) { + let table = caps[1].to_string(); + let index: i64 = caps[2].parse() + .map_err(|e: std::num::ParseIntError| DependencyError::ScriptParseError { error: e.to_string() })?; + let column = caps[3].to_string(); + dependencies.push(Dependency { + target_table: table, + dependency_type: DependencyType::IndexedAccess { column, index }, + context: None, + }); + } + + Ok(dependencies) + } + + /// Extract table references from SQL queries in steel_query_sql calls + fn extract_sql_dependencies(&self, script: &str) -> Result, DependencyError> { + let mut dependencies = Vec::new(); + + // Look for steel_query_sql calls and extract table names from the SQL + let sql_pattern = regex::Regex::new(r#"\(\s*steel_query_sql\s+"([^"]+)""#) + .map_err(|e: regex::Error| DependencyError::ScriptParseError { error: e.to_string() })?; + + for caps in sql_pattern.captures_iter(script) { + let query = caps[1].to_string(); + let table_refs = self.extract_table_references_from_sql(&query)?; + + for table in table_refs { + dependencies.push(Dependency { + target_table: table.clone(), + dependency_type: DependencyType::SqlQuery { + query_fragment: query.clone() + }, + context: None, + }); + } + } + + Ok(dependencies) + } + + /// Extract table names from SQL query text + fn extract_table_references_from_sql(&self, sql: &str) -> Result, DependencyError> { + let mut tables = Vec::new(); + + // Simple extraction - look for FROM and JOIN clauses + // This could be made more sophisticated with a proper SQL parser + let table_pattern = regex::Regex::new(r#"(?i)\b(?:FROM|JOIN)\s+(?:"([^"]+)"|(\w+))"#) + .map_err(|e: regex::Error| DependencyError::ScriptParseError { error: e.to_string() })?; + + for caps in table_pattern.captures_iter(sql) { + let table = caps.get(1) + .or_else(|| caps.get(2)) + .map(|m| m.as_str().to_string()); + + if let Some(table_name) = table { + tables.push(table_name); + } + } + + Ok(tables) + } + + /// Check for cycles in the dependency graph using proper DFS + pub async fn check_for_cycles( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + table_id: i64, + new_dependencies: &[Dependency], + ) -> Result<(), DependencyError> { + // Get current dependency graph for this schema + let current_deps = sqlx::query!( + r#"SELECT sd.source_table_id, sd.target_table_id, st.table_name as source_name, tt.table_name as target_name + FROM script_dependencies sd + JOIN table_definitions st ON sd.source_table_id = st.id + JOIN table_definitions tt ON sd.target_table_id = tt.id + WHERE st.schema_id = $1"#, + self.schema_id + ) + .fetch_all(&mut **tx) + .await + .map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?; + + // Build adjacency list + let mut graph: HashMap> = HashMap::new(); + let mut table_names: HashMap = HashMap::new(); + + for dep in current_deps { + graph.entry(dep.source_table_id).or_default().push(dep.target_table_id); + table_names.insert(dep.source_table_id, dep.source_name); + table_names.insert(dep.target_table_id, dep.target_name); + } + + // Add new dependencies to test + for dep in new_dependencies { + // Look up target table ID + let target_id = sqlx::query_scalar!( + "SELECT id FROM table_definitions WHERE schema_id = $1 AND table_name = $2", + self.schema_id, + dep.target_table + ) + .fetch_optional(&mut **tx) + .await + .map_err(|e| DependencyError::DatabaseError { error: e.to_string() })? + .ok_or_else(|| DependencyError::InvalidTableReference { + table_name: dep.target_table.clone(), + script_context: format!("table_id_{}", table_id), + })?; + + graph.entry(table_id).or_default().push(target_id); + + // Get table name for error reporting + if !table_names.contains_key(&table_id) { + let source_name = sqlx::query_scalar!( + "SELECT table_name FROM table_definitions WHERE id = $1", + table_id + ) + .fetch_one(&mut **tx) + .await + .map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?; + + table_names.insert(table_id, source_name); + } + } + + // Detect cycles using proper DFS algorithm + self.detect_cycles_dfs(&graph, &table_names, table_id)?; + + Ok(()) + } + + /// Proper DFS-based cycle detection with state tracking + fn detect_cycles_dfs( + &self, + graph: &HashMap>, + table_names: &HashMap, + starting_table: i64, + ) -> Result<(), DependencyError> { + let mut states: HashMap = HashMap::new(); + + // Initialize all nodes as unvisited + for &node in graph.keys() { + states.insert(node, NodeState::Unvisited); + } + + // Run DFS from each unvisited node + for &node in graph.keys() { + if states.get(&node) == Some(&NodeState::Unvisited) { + let mut path = Vec::new(); + if let Err(cycle_error) = self.dfs_visit( + node, + &mut states, + graph, + &mut path, + table_names, + starting_table + ) { + return Err(cycle_error); + } + } + } + + Ok(()) + } + + fn dfs_visit( + &self, + node: i64, + states: &mut HashMap, + graph: &HashMap>, + path: &mut Vec, + table_names: &HashMap, + starting_table: i64, + ) -> Result<(), DependencyError> { + states.insert(node, NodeState::Visiting); + path.push(node); + + if let Some(neighbors) = graph.get(&node) { + for &neighbor in neighbors { + // Ensure neighbor is in states map + if !states.contains_key(&neighbor) { + states.insert(neighbor, NodeState::Unvisited); + } + + match states.get(&neighbor).copied().unwrap_or(NodeState::Unvisited) { + NodeState::Visiting => { + // Found a cycle! Build the cycle path + let cycle_start_idx = path.iter().position(|&x| x == neighbor).unwrap_or(0); + let cycle_path: Vec = path[cycle_start_idx..] + .iter() + .chain(std::iter::once(&neighbor)) + .map(|&id| table_names.get(&id).cloned().unwrap_or_else(|| id.to_string())) + .collect(); + + let involving_script = table_names.get(&starting_table) + .cloned() + .unwrap_or_else(|| starting_table.to_string()); + + return Err(DependencyError::CircularDependency { + cycle_path, + involving_script, + }); + } + NodeState::Unvisited => { + // Recursively visit unvisited neighbor + self.dfs_visit(neighbor, states, graph, path, table_names, starting_table)?; + } + NodeState::Visited => { + // Already processed, no cycle through this path + } + } + } + } + + path.pop(); + states.insert(node, NodeState::Visited); + Ok(()) + } + + /// Save dependencies to database within an existing transaction + pub async fn save_dependencies( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + script_id: i64, + table_id: i64, + dependencies: &[Dependency], + ) -> Result<(), DependencyError> { + // Clear existing dependencies for this script + sqlx::query!("DELETE FROM script_dependencies WHERE script_id = $1", script_id) + .execute(&mut **tx) + .await + .map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?; + + // Insert new dependencies + for dep in dependencies { + let target_id = sqlx::query_scalar!( + "SELECT id FROM table_definitions WHERE schema_id = $1 AND table_name = $2", + self.schema_id, + dep.target_table + ) + .fetch_optional(&mut **tx) + .await + .map_err(|e| DependencyError::DatabaseError { error: e.to_string() })? + .ok_or_else(|| DependencyError::InvalidTableReference { + table_name: dep.target_table.clone(), + script_context: format!("script_id_{}", script_id), + })?; + + sqlx::query!( + r#"INSERT INTO script_dependencies + (script_id, source_table_id, target_table_id, dependency_type, context_info) + VALUES ($1, $2, $3, $4, $5)"#, + script_id, + table_id, + target_id, + dep.dependency_type.as_str(), + dep.dependency_type.context_json() + ) + .execute(&mut **tx) + .await + .map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?; + } + + Ok(()) + } +} diff --git a/server/src/table_script/handlers/dependency_utils.rs b/server/src/table_script/handlers/dependency_utils.rs new file mode 100644 index 0000000..d63ef9b --- /dev/null +++ b/server/src/table_script/handlers/dependency_utils.rs @@ -0,0 +1,228 @@ +// src/table_script/handlers/dependency_utils.rs +// Utility functions for dependency analysis and debugging + +use sqlx::PgPool; +use serde_json::{json, Value}; +use std::collections::HashMap; + +/// Get a visual representation of the dependency graph for a schema +pub async fn get_dependency_graph_visualization( + db_pool: &PgPool, + schema_id: i64, +) -> Result { + let dependencies = sqlx::query!( + r#"SELECT + st.table_name as source_table, + tt.table_name as target_table, + sd.dependency_type, + sd.context_info, + COUNT(*) as dependency_count + FROM script_dependencies sd + JOIN table_definitions st ON sd.source_table_id = st.id + JOIN table_definitions tt ON sd.target_table_id = tt.id + WHERE st.schema_id = $1 + GROUP BY st.table_name, tt.table_name, sd.dependency_type, sd.context_info + ORDER BY st.table_name, tt.table_name"#, + schema_id + ) + .fetch_all(db_pool) + .await?; + + let mut graph = HashMap::new(); + for dep in dependencies { + let source_entry = graph.entry(dep.source_table.clone()).or_insert_with(|| json!({ + "table": dep.source_table, + "dependencies": [] + })); + + if let Some(deps_array) = source_entry.get_mut("dependencies") { + if let Some(deps) = deps_array.as_array_mut() { + deps.push(json!({ + "target": dep.target_table, + "type": dep.dependency_type, + "context": dep.context_info, + "count": dep.dependency_count.unwrap_or(0) + })); + } + } + } + + Ok(json!({ + "schema_id": schema_id, + "dependency_graph": graph.into_values().collect::>() + })) +} + +/// Check if a schema has any circular dependencies +pub async fn check_schema_for_cycles( + db_pool: &PgPool, + schema_id: i64, +) -> Result, sqlx::Error> { + // This is a simplified cycle detection for monitoring purposes + let dependencies = sqlx::query!( + r#"SELECT sd.source_table_id, sd.target_table_id, st.table_name as source_table, tt.table_name as target_table + FROM script_dependencies sd + JOIN table_definitions st ON sd.source_table_id = st.id + JOIN table_definitions tt ON sd.target_table_id = tt.id + WHERE st.schema_id = $1"#, + schema_id + ) + .fetch_all(db_pool) + .await?; + + let mut graph: HashMap> = HashMap::new(); + let mut table_names: HashMap = HashMap::new(); + + for dep in dependencies { + graph.entry(dep.source_table_id).or_default().push(dep.target_table_id); + table_names.insert(dep.source_table_id, dep.source_table); + table_names.insert(dep.target_table_id, dep.target_table); + } + + // Simple cycle detection using DFS + let mut visited = std::collections::HashSet::new(); + let mut rec_stack = std::collections::HashSet::new(); + let mut cycles = Vec::new(); + + for &node in graph.keys() { + if !visited.contains(&node) { + if let Some(cycle) = detect_cycle_dfs( + node, + &graph, + &mut visited, + &mut rec_stack, + &table_names + ) { + cycles.push(cycle); + } + } + } + + Ok(cycles) +} + +fn detect_cycle_dfs( + node: i64, + graph: &HashMap>, + visited: &mut std::collections::HashSet, + rec_stack: &mut std::collections::HashSet, + table_names: &HashMap, +) -> Option { + visited.insert(node); + rec_stack.insert(node); + + if let Some(neighbors) = graph.get(&node) { + for &neighbor in neighbors { + if !visited.contains(&neighbor) { + if let Some(cycle) = detect_cycle_dfs(neighbor, graph, visited, rec_stack, table_names) { + return Some(cycle); + } + } else if rec_stack.contains(&neighbor) { + // Found a cycle + let node_name = table_names.get(&node).cloned().unwrap_or_else(|| node.to_string()); + let neighbor_name = table_names.get(&neighbor).cloned().unwrap_or_else(|| neighbor.to_string()); + return Some(format!("{} -> {}", node_name, neighbor_name)); + } + } + } + + rec_stack.remove(&node); + None +} + +/// Get processing order for tables based on dependencies +/// Tables with no dependencies come first, then tables that depend only on already-processed tables +pub async fn get_table_processing_order( + db_pool: &PgPool, + schema_id: i64, +) -> Result, sqlx::Error> { + let all_tables = sqlx::query!( + "SELECT id, table_name FROM table_definitions WHERE schema_id = $1 ORDER BY table_name", + schema_id + ) + .fetch_all(db_pool) + .await?; + + let dependencies = sqlx::query!( + r#"SELECT source_table_id, target_table_id + FROM script_dependencies sd + JOIN table_definitions td ON sd.source_table_id = td.id + WHERE td.schema_id = $1"#, + schema_id + ) + .fetch_all(db_pool) + .await?; + + // Build dependency graph + let mut graph: HashMap> = HashMap::new(); + let mut in_degree: HashMap = HashMap::new(); + let mut table_names: HashMap = HashMap::new(); + + // Initialize all tables + for table in &all_tables { + in_degree.insert(table.id, 0); + table_names.insert(table.id, table.table_name.clone()); + graph.insert(table.id, Vec::new()); + } + + // Build graph and calculate in-degrees + for dep in dependencies { + graph.entry(dep.target_table_id).or_default().push(dep.source_table_id); + *in_degree.entry(dep.source_table_id).or_insert(0) += 1; + } + + // Topological sort using Kahn's algorithm + let mut queue: Vec = in_degree + .iter() + .filter(|(_, °ree)| degree == 0) + .map(|(&id, _)| id) + .collect(); + + let mut result = Vec::new(); + + while let Some(node) = queue.pop() { + result.push(table_names[&node].clone()); + + if let Some(neighbors) = graph.get(&node) { + for &neighbor in neighbors { + if let Some(degree) = in_degree.get_mut(&neighbor) { + *degree -= 1; + if *degree == 0 { + queue.push(neighbor); + } + } + } + } + } + + // If result doesn't contain all tables, there are cycles + if result.len() != all_tables.len() { + return Err(sqlx::Error::Protocol("Circular dependencies detected - cannot determine processing order".into())); + } + + Ok(result) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cycle_detection_simple() { + // Test the cycle detection logic with a simple case + let mut graph = HashMap::new(); + graph.insert(1, vec![2]); + graph.insert(2, vec![1]); // 1 -> 2 -> 1 (cycle) + + let mut table_names = HashMap::new(); + table_names.insert(1, "table_a".to_string()); + table_names.insert(2, "table_b".to_string()); + + let mut visited = std::collections::HashSet::new(); + let mut rec_stack = std::collections::HashSet::new(); + + let cycle = detect_cycle_dfs(1, &graph, &mut visited, &mut rec_stack, &table_names); + assert!(cycle.is_some()); + assert!(cycle.unwrap().contains("table_a") && cycle.unwrap().contains("table_b")); + } +} diff --git a/server/src/table_script/handlers/post_table_script.rs b/server/src/table_script/handlers/post_table_script.rs index d09e06c..ea29323 100644 --- a/server/src/table_script/handlers/post_table_script.rs +++ b/server/src/table_script/handlers/post_table_script.rs @@ -1,13 +1,16 @@ // src/table_script/handlers/post_table_script.rs + use tonic::Status; use sqlx::{PgPool, Error as SqlxError}; use common::proto::multieko2::table_script::{PostTableScriptRequest, TableScriptResponse}; use serde_json::Value; use steel_decimal::SteelDecimal; +use crate::table_script::handlers::dependency_analyzer::DependencyAnalyzer; + const SYSTEM_COLUMNS: &[&str] = &["id", "deleted", "created_at"]; -// TODO MAKE SCRIPT PUSH ONLY TO THE EMPTY TABLES +// TODO MAKE THE SCRIPTS PUSH ONLY TO THE EMPTY FILES /// Validates the target column and ensures it is not a system column. /// Returns the column type if valid. fn validate_target_column( @@ -42,22 +45,24 @@ fn validate_target_column( .ok_or_else(|| format!("Target column '{}' not defined in table '{}'", target, table_name)) } -/// Handles the creation of a new table script. +/// Handles the creation of a new table script with dependency validation. pub async fn post_table_script( db_pool: &PgPool, request: PostTableScriptRequest, ) -> Result { + // Start a transaction for ALL operations - critical for atomicity + let mut tx = db_pool.begin().await + .map_err(|e| Status::internal(format!("Failed to start transaction: {}", e)))?; + // Fetch the table definition let table_def = sqlx::query!( r#"SELECT id, table_name, columns, schema_id FROM table_definitions WHERE id = $1"#, request.table_definition_id ) - .fetch_optional(db_pool) + .fetch_optional(&mut *tx) .await - .map_err(|e| { - Status::internal(format!("Failed to fetch table definition: {}", e)) - })? + .map_err(|e| Status::internal(format!("Failed to fetch table definition: {}", e)))? .ok_or_else(|| Status::not_found("Table definition not found"))?; // Validate the target column and get its type @@ -68,18 +73,36 @@ pub async fn post_table_script( ) .map_err(|e| Status::invalid_argument(e))?; - // Use the steel_decimal for script transformation + // Create dependency analyzer for this schema + let analyzer = DependencyAnalyzer::new(table_def.schema_id, db_pool.clone()); + + // Analyze script dependencies + let dependencies = analyzer + .analyze_script_dependencies(&request.script) + .map_err(|e| Status::from(e))?; + + // Check for circular dependencies BEFORE making any changes + // Pass the transaction to ensure we see any existing dependencies + analyzer + .check_for_cycles(&mut tx, table_def.id, &dependencies) + .await + .map_err(|e| Status::from(e))?; + + // Transform the script using steel_decimal let steel_decimal = SteelDecimal::new(); - - // Transform the script using steel_decimal (no context needed for basic transformation) let parsed_script = steel_decimal.transform(&request.script); - // Insert the script into the database + // Insert or update the script let script_record = sqlx::query!( r#"INSERT INTO table_scripts (table_definitions_id, target_table, target_column, target_column_type, script, description, schema_id) VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (table_definitions_id, target_column) + DO UPDATE SET + script = EXCLUDED.script, + description = EXCLUDED.description, + target_column_type = EXCLUDED.target_column_type RETURNING id"#, request.table_definition_id, table_def.table_name, @@ -89,18 +112,66 @@ pub async fn post_table_script( request.description, table_def.schema_id ) - .fetch_one(db_pool) + .fetch_one(&mut *tx) .await - .map_err(|e| match e { - SqlxError::Database(db_err) if db_err.constraint() == Some("table_scripts_table_definitions_id_target_column_key") => { - Status::already_exists("Script already exists for this column") + .map_err(|e| { + match e { + SqlxError::Database(db_err) + if db_err.constraint() == Some("table_scripts_table_definitions_id_target_column_key") => { + Status::already_exists("Script already exists for this column") + } + _ => Status::internal(format!("Failed to insert script: {}", e)), } - _ => Status::internal(format!("Failed to insert script: {}", e)), })?; - // Return the response with the new script ID + // Save the dependencies within the same transaction + analyzer + .save_dependencies(&mut tx, script_record.id, table_def.id, &dependencies) + .await + .map_err(|e| Status::from(e))?; + + // Only now commit the entire transaction - script + dependencies together + tx.commit().await + .map_err(|e| Status::internal(format!("Failed to commit transaction: {}", e)))?; + + // Generate warnings for potential issues + let warnings = generate_warnings(&dependencies, &table_def.table_name); + Ok(TableScriptResponse { id: script_record.id, - warnings: String::new(), // No warnings for now + warnings, }) } + +/// Generate helpful warnings for script dependencies +fn generate_warnings(dependencies: &[crate::table_script::handlers::dependency_analyzer::Dependency], table_name: &str) -> String { + let mut warnings = Vec::new(); + + // Check for self-references + if dependencies.iter().any(|d| d.target_table == table_name) { + warnings.push("Warning: Script references its own table, which may cause issues during initial population.".to_string()); + } + + // Check for complex SQL queries + let sql_deps_count = dependencies.iter() + .filter(|d| matches!(d.dependency_type, crate::table_script::handlers::dependency_analyzer::DependencyType::SqlQuery { .. })) + .count(); + + if sql_deps_count > 0 { + warnings.push(format!( + "Warning: Script contains {} SQL quer{}, ensure they are read-only and reference valid tables.", + sql_deps_count, + if sql_deps_count == 1 { "y" } else { "ies" } + )); + } + + // Check for many dependencies + if dependencies.len() > 5 { + warnings.push(format!( + "Warning: Script depends on {} tables, which may affect processing performance.", + dependencies.len() + )); + } + + warnings.join(" ") +} diff --git a/server/src/table_script/mod.rs b/server/src/table_script/mod.rs index 95dc60e..4d35650 100644 --- a/server/src/table_script/mod.rs +++ b/server/src/table_script/mod.rs @@ -1,3 +1,4 @@ -// src/tables_data/mod.rs - +// src/table_script/mod.rs pub mod handlers; + +pub use handlers::*;