circular dependency fix in post script logic
This commit is contained in:
@@ -0,0 +1,47 @@
|
||||
-- migrations/20250710201933_create_script_dependencies.sql
|
||||
|
||||
-- This table stores the dependency graph for table scripts
|
||||
-- More efficient design with better indexing
|
||||
CREATE TABLE script_dependencies (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
|
||||
-- The script that creates this dependency
|
||||
script_id BIGINT NOT NULL REFERENCES table_scripts(id) ON DELETE CASCADE,
|
||||
|
||||
-- The table that depends on another (source of dependency)
|
||||
source_table_id BIGINT NOT NULL REFERENCES table_definitions(id) ON DELETE CASCADE,
|
||||
|
||||
-- The table being depended upon (target of dependency)
|
||||
target_table_id BIGINT NOT NULL REFERENCES table_definitions(id) ON DELETE CASCADE,
|
||||
|
||||
-- What type of dependency (for better debugging)
|
||||
dependency_type TEXT NOT NULL CHECK (dependency_type IN ('column_access', 'sql_query', 'indexed_access')),
|
||||
|
||||
-- Additional context (column name, query snippet, etc.)
|
||||
context_info JSONB,
|
||||
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
-- Prevent duplicate dependencies for the same script
|
||||
UNIQUE (script_id, source_table_id, target_table_id, dependency_type)
|
||||
);
|
||||
|
||||
-- Optimized indexes for fast cycle detection
|
||||
CREATE INDEX idx_script_deps_source_target ON script_dependencies (source_table_id, target_table_id);
|
||||
CREATE INDEX idx_script_deps_by_script ON script_dependencies (script_id);
|
||||
CREATE INDEX idx_script_deps_by_target ON script_dependencies (target_table_id);
|
||||
|
||||
-- View for easy dependency analysis
|
||||
CREATE VIEW dependency_graph AS
|
||||
SELECT
|
||||
sd.source_table_id,
|
||||
sd.target_table_id,
|
||||
st.table_name AS source_table,
|
||||
tt.table_name AS target_table,
|
||||
sd.dependency_type,
|
||||
sd.context_info,
|
||||
s.name AS schema_name
|
||||
FROM script_dependencies sd
|
||||
JOIN table_definitions st ON sd.source_table_id = st.id
|
||||
JOIN table_definitions tt ON sd.target_table_id = tt.id
|
||||
JOIN schemas s ON st.schema_id = s.id;
|
||||
@@ -1,4 +1,8 @@
|
||||
// src/table_script/handlers.rs
|
||||
pub mod dependency_analyzer;
|
||||
pub mod dependency_utils;
|
||||
pub mod post_table_script;
|
||||
|
||||
pub use post_table_script::post_table_script;
|
||||
pub use dependency_analyzer::{DependencyAnalyzer, DependencyError, Dependency, DependencyType};
|
||||
pub use dependency_utils::*;
|
||||
pub use post_table_script::*;
|
||||
|
||||
404
server/src/table_script/handlers/dependency_analyzer.rs
Normal file
404
server/src/table_script/handlers/dependency_analyzer.rs
Normal file
@@ -0,0 +1,404 @@
|
||||
use std::collections::HashMap;
|
||||
use tonic::Status;
|
||||
use sqlx::PgPool;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
enum NodeState {
|
||||
Unvisited,
|
||||
Visiting, // Currently in recursion stack
|
||||
Visited, // Completely processed
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Dependency {
|
||||
pub target_table: String,
|
||||
pub dependency_type: DependencyType,
|
||||
pub context: Option<Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DependencyType {
|
||||
ColumnAccess { column: String },
|
||||
IndexedAccess { column: String, index: i64 },
|
||||
SqlQuery { query_fragment: String },
|
||||
}
|
||||
|
||||
impl DependencyType {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
DependencyType::ColumnAccess { .. } => "column_access",
|
||||
DependencyType::IndexedAccess { .. } => "indexed_access",
|
||||
DependencyType::SqlQuery { .. } => "sql_query",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn context_json(&self) -> Value {
|
||||
match self {
|
||||
DependencyType::ColumnAccess { column } => {
|
||||
json!({ "column": column })
|
||||
}
|
||||
DependencyType::IndexedAccess { column, index } => {
|
||||
json!({ "column": column, "index": index })
|
||||
}
|
||||
DependencyType::SqlQuery { query_fragment } => {
|
||||
json!({ "query_fragment": query_fragment })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DependencyError {
|
||||
CircularDependency {
|
||||
cycle_path: Vec<String>,
|
||||
involving_script: String
|
||||
},
|
||||
InvalidTableReference {
|
||||
table_name: String,
|
||||
script_context: String
|
||||
},
|
||||
ScriptParseError {
|
||||
error: String
|
||||
},
|
||||
DatabaseError {
|
||||
error: String
|
||||
},
|
||||
}
|
||||
|
||||
impl From<DependencyError> for Status {
|
||||
fn from(error: DependencyError) -> Self {
|
||||
match error {
|
||||
DependencyError::CircularDependency { cycle_path, involving_script } => {
|
||||
Status::failed_precondition(format!(
|
||||
"Circular dependency detected in script for '{}': {}",
|
||||
involving_script,
|
||||
cycle_path.join(" -> ")
|
||||
))
|
||||
}
|
||||
DependencyError::InvalidTableReference { table_name, script_context } => {
|
||||
Status::not_found(format!(
|
||||
"Table '{}' referenced in script '{}' does not exist",
|
||||
table_name, script_context
|
||||
))
|
||||
}
|
||||
DependencyError::ScriptParseError { error } => {
|
||||
Status::invalid_argument(format!("Script parsing failed: {}", error))
|
||||
}
|
||||
DependencyError::DatabaseError { error } => {
|
||||
Status::internal(format!("Database error: {}", error))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DependencyAnalyzer {
|
||||
schema_id: i64,
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl DependencyAnalyzer {
|
||||
pub fn new(schema_id: i64, pool: PgPool) -> Self {
|
||||
Self { schema_id, pool }
|
||||
}
|
||||
|
||||
/// Analyzes a Steel script to extract all table dependencies
|
||||
/// Uses regex patterns to find function calls that create dependencies
|
||||
pub fn analyze_script_dependencies(&self, script: &str) -> Result<Vec<Dependency>, DependencyError> {
|
||||
let mut dependencies = Vec::new();
|
||||
|
||||
// Extract function calls and SQL dependencies using regex
|
||||
dependencies.extend(self.extract_function_calls(script)?);
|
||||
dependencies.extend(self.extract_sql_dependencies(script)?);
|
||||
|
||||
Ok(dependencies)
|
||||
}
|
||||
|
||||
/// Extract function calls using regex patterns
|
||||
fn extract_function_calls(&self, script: &str) -> Result<Vec<Dependency>, DependencyError> {
|
||||
let mut dependencies = Vec::new();
|
||||
|
||||
// Look for steel_get_column patterns
|
||||
let column_pattern = regex::Regex::new(r#"\(\s*steel_get_column\s+"([^"]+)"\s+"([^"]+)""#)
|
||||
.map_err(|e: regex::Error| DependencyError::ScriptParseError { error: e.to_string() })?;
|
||||
|
||||
for caps in column_pattern.captures_iter(script) {
|
||||
let table = caps[1].to_string();
|
||||
let column = caps[2].to_string();
|
||||
dependencies.push(Dependency {
|
||||
target_table: table,
|
||||
dependency_type: DependencyType::ColumnAccess { column },
|
||||
context: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Look for steel_get_column_with_index patterns
|
||||
let indexed_pattern = regex::Regex::new(r#"\(\s*steel_get_column_with_index\s+"([^"]+)"\s+(\d+)\s+"([^"]+)""#)
|
||||
.map_err(|e: regex::Error| DependencyError::ScriptParseError { error: e.to_string() })?;
|
||||
|
||||
for caps in indexed_pattern.captures_iter(script) {
|
||||
let table = caps[1].to_string();
|
||||
let index: i64 = caps[2].parse()
|
||||
.map_err(|e: std::num::ParseIntError| DependencyError::ScriptParseError { error: e.to_string() })?;
|
||||
let column = caps[3].to_string();
|
||||
dependencies.push(Dependency {
|
||||
target_table: table,
|
||||
dependency_type: DependencyType::IndexedAccess { column, index },
|
||||
context: None,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(dependencies)
|
||||
}
|
||||
|
||||
/// Extract table references from SQL queries in steel_query_sql calls
|
||||
fn extract_sql_dependencies(&self, script: &str) -> Result<Vec<Dependency>, DependencyError> {
|
||||
let mut dependencies = Vec::new();
|
||||
|
||||
// Look for steel_query_sql calls and extract table names from the SQL
|
||||
let sql_pattern = regex::Regex::new(r#"\(\s*steel_query_sql\s+"([^"]+)""#)
|
||||
.map_err(|e: regex::Error| DependencyError::ScriptParseError { error: e.to_string() })?;
|
||||
|
||||
for caps in sql_pattern.captures_iter(script) {
|
||||
let query = caps[1].to_string();
|
||||
let table_refs = self.extract_table_references_from_sql(&query)?;
|
||||
|
||||
for table in table_refs {
|
||||
dependencies.push(Dependency {
|
||||
target_table: table.clone(),
|
||||
dependency_type: DependencyType::SqlQuery {
|
||||
query_fragment: query.clone()
|
||||
},
|
||||
context: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(dependencies)
|
||||
}
|
||||
|
||||
/// Extract table names from SQL query text
|
||||
fn extract_table_references_from_sql(&self, sql: &str) -> Result<Vec<String>, DependencyError> {
|
||||
let mut tables = Vec::new();
|
||||
|
||||
// Simple extraction - look for FROM and JOIN clauses
|
||||
// This could be made more sophisticated with a proper SQL parser
|
||||
let table_pattern = regex::Regex::new(r#"(?i)\b(?:FROM|JOIN)\s+(?:"([^"]+)"|(\w+))"#)
|
||||
.map_err(|e: regex::Error| DependencyError::ScriptParseError { error: e.to_string() })?;
|
||||
|
||||
for caps in table_pattern.captures_iter(sql) {
|
||||
let table = caps.get(1)
|
||||
.or_else(|| caps.get(2))
|
||||
.map(|m| m.as_str().to_string());
|
||||
|
||||
if let Some(table_name) = table {
|
||||
tables.push(table_name);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(tables)
|
||||
}
|
||||
|
||||
/// Check for cycles in the dependency graph using proper DFS
|
||||
pub async fn check_for_cycles(
|
||||
&self,
|
||||
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||
table_id: i64,
|
||||
new_dependencies: &[Dependency],
|
||||
) -> Result<(), DependencyError> {
|
||||
// Get current dependency graph for this schema
|
||||
let current_deps = sqlx::query!(
|
||||
r#"SELECT sd.source_table_id, sd.target_table_id, st.table_name as source_name, tt.table_name as target_name
|
||||
FROM script_dependencies sd
|
||||
JOIN table_definitions st ON sd.source_table_id = st.id
|
||||
JOIN table_definitions tt ON sd.target_table_id = tt.id
|
||||
WHERE st.schema_id = $1"#,
|
||||
self.schema_id
|
||||
)
|
||||
.fetch_all(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?;
|
||||
|
||||
// Build adjacency list
|
||||
let mut graph: HashMap<i64, Vec<i64>> = HashMap::new();
|
||||
let mut table_names: HashMap<i64, String> = HashMap::new();
|
||||
|
||||
for dep in current_deps {
|
||||
graph.entry(dep.source_table_id).or_default().push(dep.target_table_id);
|
||||
table_names.insert(dep.source_table_id, dep.source_name);
|
||||
table_names.insert(dep.target_table_id, dep.target_name);
|
||||
}
|
||||
|
||||
// Add new dependencies to test
|
||||
for dep in new_dependencies {
|
||||
// Look up target table ID
|
||||
let target_id = sqlx::query_scalar!(
|
||||
"SELECT id FROM table_definitions WHERE schema_id = $1 AND table_name = $2",
|
||||
self.schema_id,
|
||||
dep.target_table
|
||||
)
|
||||
.fetch_optional(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?
|
||||
.ok_or_else(|| DependencyError::InvalidTableReference {
|
||||
table_name: dep.target_table.clone(),
|
||||
script_context: format!("table_id_{}", table_id),
|
||||
})?;
|
||||
|
||||
graph.entry(table_id).or_default().push(target_id);
|
||||
|
||||
// Get table name for error reporting
|
||||
if !table_names.contains_key(&table_id) {
|
||||
let source_name = sqlx::query_scalar!(
|
||||
"SELECT table_name FROM table_definitions WHERE id = $1",
|
||||
table_id
|
||||
)
|
||||
.fetch_one(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?;
|
||||
|
||||
table_names.insert(table_id, source_name);
|
||||
}
|
||||
}
|
||||
|
||||
// Detect cycles using proper DFS algorithm
|
||||
self.detect_cycles_dfs(&graph, &table_names, table_id)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Proper DFS-based cycle detection with state tracking
|
||||
fn detect_cycles_dfs(
|
||||
&self,
|
||||
graph: &HashMap<i64, Vec<i64>>,
|
||||
table_names: &HashMap<i64, String>,
|
||||
starting_table: i64,
|
||||
) -> Result<(), DependencyError> {
|
||||
let mut states: HashMap<i64, NodeState> = HashMap::new();
|
||||
|
||||
// Initialize all nodes as unvisited
|
||||
for &node in graph.keys() {
|
||||
states.insert(node, NodeState::Unvisited);
|
||||
}
|
||||
|
||||
// Run DFS from each unvisited node
|
||||
for &node in graph.keys() {
|
||||
if states.get(&node) == Some(&NodeState::Unvisited) {
|
||||
let mut path = Vec::new();
|
||||
if let Err(cycle_error) = self.dfs_visit(
|
||||
node,
|
||||
&mut states,
|
||||
graph,
|
||||
&mut path,
|
||||
table_names,
|
||||
starting_table
|
||||
) {
|
||||
return Err(cycle_error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dfs_visit(
|
||||
&self,
|
||||
node: i64,
|
||||
states: &mut HashMap<i64, NodeState>,
|
||||
graph: &HashMap<i64, Vec<i64>>,
|
||||
path: &mut Vec<i64>,
|
||||
table_names: &HashMap<i64, String>,
|
||||
starting_table: i64,
|
||||
) -> Result<(), DependencyError> {
|
||||
states.insert(node, NodeState::Visiting);
|
||||
path.push(node);
|
||||
|
||||
if let Some(neighbors) = graph.get(&node) {
|
||||
for &neighbor in neighbors {
|
||||
// Ensure neighbor is in states map
|
||||
if !states.contains_key(&neighbor) {
|
||||
states.insert(neighbor, NodeState::Unvisited);
|
||||
}
|
||||
|
||||
match states.get(&neighbor).copied().unwrap_or(NodeState::Unvisited) {
|
||||
NodeState::Visiting => {
|
||||
// Found a cycle! Build the cycle path
|
||||
let cycle_start_idx = path.iter().position(|&x| x == neighbor).unwrap_or(0);
|
||||
let cycle_path: Vec<String> = path[cycle_start_idx..]
|
||||
.iter()
|
||||
.chain(std::iter::once(&neighbor))
|
||||
.map(|&id| table_names.get(&id).cloned().unwrap_or_else(|| id.to_string()))
|
||||
.collect();
|
||||
|
||||
let involving_script = table_names.get(&starting_table)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| starting_table.to_string());
|
||||
|
||||
return Err(DependencyError::CircularDependency {
|
||||
cycle_path,
|
||||
involving_script,
|
||||
});
|
||||
}
|
||||
NodeState::Unvisited => {
|
||||
// Recursively visit unvisited neighbor
|
||||
self.dfs_visit(neighbor, states, graph, path, table_names, starting_table)?;
|
||||
}
|
||||
NodeState::Visited => {
|
||||
// Already processed, no cycle through this path
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
path.pop();
|
||||
states.insert(node, NodeState::Visited);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Save dependencies to database within an existing transaction
|
||||
pub async fn save_dependencies(
|
||||
&self,
|
||||
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||
script_id: i64,
|
||||
table_id: i64,
|
||||
dependencies: &[Dependency],
|
||||
) -> Result<(), DependencyError> {
|
||||
// Clear existing dependencies for this script
|
||||
sqlx::query!("DELETE FROM script_dependencies WHERE script_id = $1", script_id)
|
||||
.execute(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?;
|
||||
|
||||
// Insert new dependencies
|
||||
for dep in dependencies {
|
||||
let target_id = sqlx::query_scalar!(
|
||||
"SELECT id FROM table_definitions WHERE schema_id = $1 AND table_name = $2",
|
||||
self.schema_id,
|
||||
dep.target_table
|
||||
)
|
||||
.fetch_optional(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?
|
||||
.ok_or_else(|| DependencyError::InvalidTableReference {
|
||||
table_name: dep.target_table.clone(),
|
||||
script_context: format!("script_id_{}", script_id),
|
||||
})?;
|
||||
|
||||
sqlx::query!(
|
||||
r#"INSERT INTO script_dependencies
|
||||
(script_id, source_table_id, target_table_id, dependency_type, context_info)
|
||||
VALUES ($1, $2, $3, $4, $5)"#,
|
||||
script_id,
|
||||
table_id,
|
||||
target_id,
|
||||
dep.dependency_type.as_str(),
|
||||
dep.dependency_type.context_json()
|
||||
)
|
||||
.execute(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| DependencyError::DatabaseError { error: e.to_string() })?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
228
server/src/table_script/handlers/dependency_utils.rs
Normal file
228
server/src/table_script/handlers/dependency_utils.rs
Normal file
@@ -0,0 +1,228 @@
|
||||
// src/table_script/handlers/dependency_utils.rs
|
||||
// Utility functions for dependency analysis and debugging
|
||||
|
||||
use sqlx::PgPool;
|
||||
use serde_json::{json, Value};
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Get a visual representation of the dependency graph for a schema
|
||||
pub async fn get_dependency_graph_visualization(
|
||||
db_pool: &PgPool,
|
||||
schema_id: i64,
|
||||
) -> Result<Value, sqlx::Error> {
|
||||
let dependencies = sqlx::query!(
|
||||
r#"SELECT
|
||||
st.table_name as source_table,
|
||||
tt.table_name as target_table,
|
||||
sd.dependency_type,
|
||||
sd.context_info,
|
||||
COUNT(*) as dependency_count
|
||||
FROM script_dependencies sd
|
||||
JOIN table_definitions st ON sd.source_table_id = st.id
|
||||
JOIN table_definitions tt ON sd.target_table_id = tt.id
|
||||
WHERE st.schema_id = $1
|
||||
GROUP BY st.table_name, tt.table_name, sd.dependency_type, sd.context_info
|
||||
ORDER BY st.table_name, tt.table_name"#,
|
||||
schema_id
|
||||
)
|
||||
.fetch_all(db_pool)
|
||||
.await?;
|
||||
|
||||
let mut graph = HashMap::new();
|
||||
for dep in dependencies {
|
||||
let source_entry = graph.entry(dep.source_table.clone()).or_insert_with(|| json!({
|
||||
"table": dep.source_table,
|
||||
"dependencies": []
|
||||
}));
|
||||
|
||||
if let Some(deps_array) = source_entry.get_mut("dependencies") {
|
||||
if let Some(deps) = deps_array.as_array_mut() {
|
||||
deps.push(json!({
|
||||
"target": dep.target_table,
|
||||
"type": dep.dependency_type,
|
||||
"context": dep.context_info,
|
||||
"count": dep.dependency_count.unwrap_or(0)
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(json!({
|
||||
"schema_id": schema_id,
|
||||
"dependency_graph": graph.into_values().collect::<Vec<_>>()
|
||||
}))
|
||||
}
|
||||
|
||||
/// Check if a schema has any circular dependencies
|
||||
pub async fn check_schema_for_cycles(
|
||||
db_pool: &PgPool,
|
||||
schema_id: i64,
|
||||
) -> Result<Vec<String>, sqlx::Error> {
|
||||
// This is a simplified cycle detection for monitoring purposes
|
||||
let dependencies = sqlx::query!(
|
||||
r#"SELECT sd.source_table_id, sd.target_table_id, st.table_name as source_table, tt.table_name as target_table
|
||||
FROM script_dependencies sd
|
||||
JOIN table_definitions st ON sd.source_table_id = st.id
|
||||
JOIN table_definitions tt ON sd.target_table_id = tt.id
|
||||
WHERE st.schema_id = $1"#,
|
||||
schema_id
|
||||
)
|
||||
.fetch_all(db_pool)
|
||||
.await?;
|
||||
|
||||
let mut graph: HashMap<i64, Vec<i64>> = HashMap::new();
|
||||
let mut table_names: HashMap<i64, String> = HashMap::new();
|
||||
|
||||
for dep in dependencies {
|
||||
graph.entry(dep.source_table_id).or_default().push(dep.target_table_id);
|
||||
table_names.insert(dep.source_table_id, dep.source_table);
|
||||
table_names.insert(dep.target_table_id, dep.target_table);
|
||||
}
|
||||
|
||||
// Simple cycle detection using DFS
|
||||
let mut visited = std::collections::HashSet::new();
|
||||
let mut rec_stack = std::collections::HashSet::new();
|
||||
let mut cycles = Vec::new();
|
||||
|
||||
for &node in graph.keys() {
|
||||
if !visited.contains(&node) {
|
||||
if let Some(cycle) = detect_cycle_dfs(
|
||||
node,
|
||||
&graph,
|
||||
&mut visited,
|
||||
&mut rec_stack,
|
||||
&table_names
|
||||
) {
|
||||
cycles.push(cycle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cycles)
|
||||
}
|
||||
|
||||
fn detect_cycle_dfs(
|
||||
node: i64,
|
||||
graph: &HashMap<i64, Vec<i64>>,
|
||||
visited: &mut std::collections::HashSet<i64>,
|
||||
rec_stack: &mut std::collections::HashSet<i64>,
|
||||
table_names: &HashMap<i64, String>,
|
||||
) -> Option<String> {
|
||||
visited.insert(node);
|
||||
rec_stack.insert(node);
|
||||
|
||||
if let Some(neighbors) = graph.get(&node) {
|
||||
for &neighbor in neighbors {
|
||||
if !visited.contains(&neighbor) {
|
||||
if let Some(cycle) = detect_cycle_dfs(neighbor, graph, visited, rec_stack, table_names) {
|
||||
return Some(cycle);
|
||||
}
|
||||
} else if rec_stack.contains(&neighbor) {
|
||||
// Found a cycle
|
||||
let node_name = table_names.get(&node).cloned().unwrap_or_else(|| node.to_string());
|
||||
let neighbor_name = table_names.get(&neighbor).cloned().unwrap_or_else(|| neighbor.to_string());
|
||||
return Some(format!("{} -> {}", node_name, neighbor_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rec_stack.remove(&node);
|
||||
None
|
||||
}
|
||||
|
||||
/// Get processing order for tables based on dependencies
|
||||
/// Tables with no dependencies come first, then tables that depend only on already-processed tables
|
||||
pub async fn get_table_processing_order(
|
||||
db_pool: &PgPool,
|
||||
schema_id: i64,
|
||||
) -> Result<Vec<String>, sqlx::Error> {
|
||||
let all_tables = sqlx::query!(
|
||||
"SELECT id, table_name FROM table_definitions WHERE schema_id = $1 ORDER BY table_name",
|
||||
schema_id
|
||||
)
|
||||
.fetch_all(db_pool)
|
||||
.await?;
|
||||
|
||||
let dependencies = sqlx::query!(
|
||||
r#"SELECT source_table_id, target_table_id
|
||||
FROM script_dependencies sd
|
||||
JOIN table_definitions td ON sd.source_table_id = td.id
|
||||
WHERE td.schema_id = $1"#,
|
||||
schema_id
|
||||
)
|
||||
.fetch_all(db_pool)
|
||||
.await?;
|
||||
|
||||
// Build dependency graph
|
||||
let mut graph: HashMap<i64, Vec<i64>> = HashMap::new();
|
||||
let mut in_degree: HashMap<i64, usize> = HashMap::new();
|
||||
let mut table_names: HashMap<i64, String> = HashMap::new();
|
||||
|
||||
// Initialize all tables
|
||||
for table in &all_tables {
|
||||
in_degree.insert(table.id, 0);
|
||||
table_names.insert(table.id, table.table_name.clone());
|
||||
graph.insert(table.id, Vec::new());
|
||||
}
|
||||
|
||||
// Build graph and calculate in-degrees
|
||||
for dep in dependencies {
|
||||
graph.entry(dep.target_table_id).or_default().push(dep.source_table_id);
|
||||
*in_degree.entry(dep.source_table_id).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
// Topological sort using Kahn's algorithm
|
||||
let mut queue: Vec<i64> = in_degree
|
||||
.iter()
|
||||
.filter(|(_, °ree)| degree == 0)
|
||||
.map(|(&id, _)| id)
|
||||
.collect();
|
||||
|
||||
let mut result = Vec::new();
|
||||
|
||||
while let Some(node) = queue.pop() {
|
||||
result.push(table_names[&node].clone());
|
||||
|
||||
if let Some(neighbors) = graph.get(&node) {
|
||||
for &neighbor in neighbors {
|
||||
if let Some(degree) = in_degree.get_mut(&neighbor) {
|
||||
*degree -= 1;
|
||||
if *degree == 0 {
|
||||
queue.push(neighbor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If result doesn't contain all tables, there are cycles
|
||||
if result.len() != all_tables.len() {
|
||||
return Err(sqlx::Error::Protocol("Circular dependencies detected - cannot determine processing order".into()));
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_cycle_detection_simple() {
|
||||
// Test the cycle detection logic with a simple case
|
||||
let mut graph = HashMap::new();
|
||||
graph.insert(1, vec![2]);
|
||||
graph.insert(2, vec![1]); // 1 -> 2 -> 1 (cycle)
|
||||
|
||||
let mut table_names = HashMap::new();
|
||||
table_names.insert(1, "table_a".to_string());
|
||||
table_names.insert(2, "table_b".to_string());
|
||||
|
||||
let mut visited = std::collections::HashSet::new();
|
||||
let mut rec_stack = std::collections::HashSet::new();
|
||||
|
||||
let cycle = detect_cycle_dfs(1, &graph, &mut visited, &mut rec_stack, &table_names);
|
||||
assert!(cycle.is_some());
|
||||
assert!(cycle.unwrap().contains("table_a") && cycle.unwrap().contains("table_b"));
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,16 @@
|
||||
// src/table_script/handlers/post_table_script.rs
|
||||
|
||||
use tonic::Status;
|
||||
use sqlx::{PgPool, Error as SqlxError};
|
||||
use common::proto::multieko2::table_script::{PostTableScriptRequest, TableScriptResponse};
|
||||
use serde_json::Value;
|
||||
use steel_decimal::SteelDecimal;
|
||||
|
||||
use crate::table_script::handlers::dependency_analyzer::DependencyAnalyzer;
|
||||
|
||||
const SYSTEM_COLUMNS: &[&str] = &["id", "deleted", "created_at"];
|
||||
|
||||
// TODO MAKE SCRIPT PUSH ONLY TO THE EMPTY TABLES
|
||||
// TODO MAKE THE SCRIPTS PUSH ONLY TO THE EMPTY FILES
|
||||
/// Validates the target column and ensures it is not a system column.
|
||||
/// Returns the column type if valid.
|
||||
fn validate_target_column(
|
||||
@@ -42,22 +45,24 @@ fn validate_target_column(
|
||||
.ok_or_else(|| format!("Target column '{}' not defined in table '{}'", target, table_name))
|
||||
}
|
||||
|
||||
/// Handles the creation of a new table script.
|
||||
/// Handles the creation of a new table script with dependency validation.
|
||||
pub async fn post_table_script(
|
||||
db_pool: &PgPool,
|
||||
request: PostTableScriptRequest,
|
||||
) -> Result<TableScriptResponse, Status> {
|
||||
// Start a transaction for ALL operations - critical for atomicity
|
||||
let mut tx = db_pool.begin().await
|
||||
.map_err(|e| Status::internal(format!("Failed to start transaction: {}", e)))?;
|
||||
|
||||
// Fetch the table definition
|
||||
let table_def = sqlx::query!(
|
||||
r#"SELECT id, table_name, columns, schema_id
|
||||
FROM table_definitions WHERE id = $1"#,
|
||||
request.table_definition_id
|
||||
)
|
||||
.fetch_optional(db_pool)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Status::internal(format!("Failed to fetch table definition: {}", e))
|
||||
})?
|
||||
.map_err(|e| Status::internal(format!("Failed to fetch table definition: {}", e)))?
|
||||
.ok_or_else(|| Status::not_found("Table definition not found"))?;
|
||||
|
||||
// Validate the target column and get its type
|
||||
@@ -68,18 +73,36 @@ pub async fn post_table_script(
|
||||
)
|
||||
.map_err(|e| Status::invalid_argument(e))?;
|
||||
|
||||
// Use the steel_decimal for script transformation
|
||||
let steel_decimal = SteelDecimal::new();
|
||||
// Create dependency analyzer for this schema
|
||||
let analyzer = DependencyAnalyzer::new(table_def.schema_id, db_pool.clone());
|
||||
|
||||
// Transform the script using steel_decimal (no context needed for basic transformation)
|
||||
// Analyze script dependencies
|
||||
let dependencies = analyzer
|
||||
.analyze_script_dependencies(&request.script)
|
||||
.map_err(|e| Status::from(e))?;
|
||||
|
||||
// Check for circular dependencies BEFORE making any changes
|
||||
// Pass the transaction to ensure we see any existing dependencies
|
||||
analyzer
|
||||
.check_for_cycles(&mut tx, table_def.id, &dependencies)
|
||||
.await
|
||||
.map_err(|e| Status::from(e))?;
|
||||
|
||||
// Transform the script using steel_decimal
|
||||
let steel_decimal = SteelDecimal::new();
|
||||
let parsed_script = steel_decimal.transform(&request.script);
|
||||
|
||||
// Insert the script into the database
|
||||
// Insert or update the script
|
||||
let script_record = sqlx::query!(
|
||||
r#"INSERT INTO table_scripts
|
||||
(table_definitions_id, target_table, target_column,
|
||||
target_column_type, script, description, schema_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT (table_definitions_id, target_column)
|
||||
DO UPDATE SET
|
||||
script = EXCLUDED.script,
|
||||
description = EXCLUDED.description,
|
||||
target_column_type = EXCLUDED.target_column_type
|
||||
RETURNING id"#,
|
||||
request.table_definition_id,
|
||||
table_def.table_name,
|
||||
@@ -89,18 +112,66 @@ pub async fn post_table_script(
|
||||
request.description,
|
||||
table_def.schema_id
|
||||
)
|
||||
.fetch_one(db_pool)
|
||||
.fetch_one(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
SqlxError::Database(db_err) if db_err.constraint() == Some("table_scripts_table_definitions_id_target_column_key") => {
|
||||
.map_err(|e| {
|
||||
match e {
|
||||
SqlxError::Database(db_err)
|
||||
if db_err.constraint() == Some("table_scripts_table_definitions_id_target_column_key") => {
|
||||
Status::already_exists("Script already exists for this column")
|
||||
}
|
||||
_ => Status::internal(format!("Failed to insert script: {}", e)),
|
||||
}
|
||||
})?;
|
||||
|
||||
// Return the response with the new script ID
|
||||
// Save the dependencies within the same transaction
|
||||
analyzer
|
||||
.save_dependencies(&mut tx, script_record.id, table_def.id, &dependencies)
|
||||
.await
|
||||
.map_err(|e| Status::from(e))?;
|
||||
|
||||
// Only now commit the entire transaction - script + dependencies together
|
||||
tx.commit().await
|
||||
.map_err(|e| Status::internal(format!("Failed to commit transaction: {}", e)))?;
|
||||
|
||||
// Generate warnings for potential issues
|
||||
let warnings = generate_warnings(&dependencies, &table_def.table_name);
|
||||
|
||||
Ok(TableScriptResponse {
|
||||
id: script_record.id,
|
||||
warnings: String::new(), // No warnings for now
|
||||
warnings,
|
||||
})
|
||||
}
|
||||
|
||||
/// Generate helpful warnings for script dependencies
|
||||
fn generate_warnings(dependencies: &[crate::table_script::handlers::dependency_analyzer::Dependency], table_name: &str) -> String {
|
||||
let mut warnings = Vec::new();
|
||||
|
||||
// Check for self-references
|
||||
if dependencies.iter().any(|d| d.target_table == table_name) {
|
||||
warnings.push("Warning: Script references its own table, which may cause issues during initial population.".to_string());
|
||||
}
|
||||
|
||||
// Check for complex SQL queries
|
||||
let sql_deps_count = dependencies.iter()
|
||||
.filter(|d| matches!(d.dependency_type, crate::table_script::handlers::dependency_analyzer::DependencyType::SqlQuery { .. }))
|
||||
.count();
|
||||
|
||||
if sql_deps_count > 0 {
|
||||
warnings.push(format!(
|
||||
"Warning: Script contains {} SQL quer{}, ensure they are read-only and reference valid tables.",
|
||||
sql_deps_count,
|
||||
if sql_deps_count == 1 { "y" } else { "ies" }
|
||||
));
|
||||
}
|
||||
|
||||
// Check for many dependencies
|
||||
if dependencies.len() > 5 {
|
||||
warnings.push(format!(
|
||||
"Warning: Script depends on {} tables, which may affect processing performance.",
|
||||
dependencies.len()
|
||||
));
|
||||
}
|
||||
|
||||
warnings.join(" ")
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
// src/tables_data/mod.rs
|
||||
|
||||
// src/table_script/mod.rs
|
||||
pub mod handlers;
|
||||
|
||||
pub use handlers::*;
|
||||
|
||||
Reference in New Issue
Block a user