use tonic::Status; use sqlx::{PgPool, Transaction, Postgres}; use serde_json::json; use time::OffsetDateTime; use common::proto::multieko2::table_definition::{PostTableDefinitionRequest, TableDefinitionResponse}; const GENERATED_SCHEMA_NAME: &str = "gen"; const PREDEFINED_FIELD_TYPES: &[(&str, &str)] = &[ ("text", "TEXT"), ("psc", "TEXT"), ("phone", "VARCHAR(15)"), ("address", "TEXT"), ("email", "VARCHAR(255)"), ("boolean", "BOOLEAN"), ("timestamp", "TIMESTAMPTZ"), ]; fn is_valid_identifier(s: &str) -> bool { !s.is_empty() && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') && !s.starts_with('_') && !s.chars().next().unwrap().is_ascii_digit() } fn sanitize_table_name(s: &str) -> String { let year = OffsetDateTime::now_utc().year(); let cleaned = s.replace(|c: char| !c.is_ascii_alphanumeric() && c != '_', "") .trim() .to_lowercase(); format!("{}_{}", year, cleaned) } fn sanitize_identifier(s: &str) -> String { s.replace(|c: char| !c.is_ascii_alphanumeric() && c != '_', "") .trim() .to_lowercase() } fn map_field_type(field_type: &str) -> Result<&str, Status> { PREDEFINED_FIELD_TYPES .iter() .find(|(key, _)| *key == field_type.to_lowercase().as_str()) .map(|(_, sql_type)| *sql_type) .ok_or_else(|| Status::invalid_argument(format!("Invalid field type: {}", field_type))) } fn is_invalid_table_name(table_name: &str) -> bool { table_name.ends_with("_id") || table_name == "id" || table_name == "deleted" || table_name == "created_at" } pub async fn post_table_definition( db_pool: &PgPool, request: PostTableDefinitionRequest, ) -> Result { let base_name = sanitize_table_name(&request.table_name); let user_part_cleaned = request.table_name .replace(|c: char| !c.is_ascii_alphanumeric() && c != '_', "") .trim_matches('_') .to_lowercase(); // New validation check if is_invalid_table_name(&user_part_cleaned) { return Err(Status::invalid_argument( "Table name cannot be 'id', 'deleted', 'created_at' or end with '_id'" )); } if !user_part_cleaned.is_empty() && !is_valid_identifier(&user_part_cleaned) { return Err(Status::invalid_argument("Invalid table name")); } else if user_part_cleaned.is_empty() { return Err(Status::invalid_argument("Table name cannot be empty")); } let mut tx = db_pool.begin().await .map_err(|e| Status::internal(format!("Failed to start transaction: {}", e)))?; match execute_table_definition(&mut tx, request, base_name).await { Ok(response) => { tx.commit().await .map_err(|e| Status::internal(format!("Failed to commit transaction: {}", e)))?; Ok(response) }, Err(e) => { let _ = tx.rollback().await; Err(e) } } } async fn execute_table_definition( tx: &mut Transaction<'_, Postgres>, mut request: PostTableDefinitionRequest, table_name: String, ) -> Result { let profile = sqlx::query!( "INSERT INTO profiles (name) VALUES ($1) ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name RETURNING id", request.profile_name ) .fetch_one(&mut **tx) .await .map_err(|e| Status::internal(format!("Profile error: {}", e)))?; let mut links = Vec::new(); for link in request.links.drain(..) { let linked_table = sqlx::query!( "SELECT id FROM table_definitions WHERE profile_id = $1 AND table_name = $2", profile.id, link.linked_table_name ) .fetch_optional(&mut **tx) .await .map_err(|e| Status::internal(format!("Linked table lookup failed: {}", e)))?; let linked_id = linked_table.ok_or_else(|| Status::not_found(format!("Linked table {} not found", link.linked_table_name)) )?.id; links.push((linked_id, link.required)); } let mut columns = Vec::new(); for col_def in request.columns.drain(..) { let col_name = sanitize_identifier(&col_def.name); if !is_valid_identifier(&col_def.name) { return Err(Status::invalid_argument("Invalid column name")); } let sql_type = map_field_type(&col_def.field_type)?; columns.push(format!("\"{}\" {}", col_name, sql_type)); } let mut indexes = Vec::new(); for idx in request.indexes.drain(..) { let idx_name = sanitize_identifier(&idx); if !is_valid_identifier(&idx) { return Err(Status::invalid_argument(format!("Invalid index name: {}", idx))); } if !columns.iter().any(|c| c.starts_with(&format!("\"{}\"", idx_name))) { return Err(Status::invalid_argument(format!("Index column {} not found", idx_name))); } indexes.push(idx_name); } let (create_sql, index_sql) = generate_table_sql(tx, &table_name, &columns, &indexes, &links).await?; let table_def = sqlx::query!( r#"INSERT INTO table_definitions (profile_id, table_name, columns, indexes) VALUES ($1, $2, $3, $4) RETURNING id"#, profile.id, &table_name, json!(columns), json!(indexes) ) .fetch_one(&mut **tx) .await .map_err(|e| { if let Some(db_err) = e.as_database_error() { if db_err.constraint() == Some("idx_table_definitions_profile_table") { return Status::already_exists("Table already exists in this profile"); } } Status::internal(format!("Database error: {}", e)) })?; for (linked_id, is_required) in links { sqlx::query!( "INSERT INTO table_definition_links (source_table_id, linked_table_id, is_required) VALUES ($1, $2, $3)", table_def.id, linked_id, is_required ) .execute(&mut **tx) .await .map_err(|e| Status::internal(format!("Failed to save link: {}", e)))?; } sqlx::query(&create_sql) .execute(&mut **tx) .await .map_err(|e| Status::internal(format!("Table creation failed: {}", e)))?; for sql in &index_sql { sqlx::query(sql) .execute(&mut **tx) .await .map_err(|e| Status::internal(format!("Index creation failed: {}", e)))?; } Ok(TableDefinitionResponse { success: true, sql: format!("{}\n{}", create_sql, index_sql.join("\n")), }) } async fn generate_table_sql( tx: &mut Transaction<'_, Postgres>, table_name: &str, columns: &[String], indexes: &[String], links: &[(i64, bool)], ) -> Result<(String, Vec), Status> { let qualified_table = format!("{}.\"{}\"", GENERATED_SCHEMA_NAME, table_name); let mut system_columns = vec![ "id BIGSERIAL PRIMARY KEY".to_string(), "deleted BOOLEAN NOT NULL DEFAULT FALSE".to_string(), ]; for (linked_id, required) in links { let linked_table = get_table_name_by_id(tx, *linked_id).await?; let qualified_linked_table = format!("{}.\"{}\"", GENERATED_SCHEMA_NAME, linked_table); let base_name = linked_table.split_once('_') .map(|(_, rest)| rest) .unwrap_or(&linked_table) .to_string(); let null_clause = if *required { "NOT NULL" } else { "" }; system_columns.push( format!("\"{0}_id\" BIGINT {1} REFERENCES {2}(id)", base_name, null_clause, qualified_linked_table ) ); } let all_columns = system_columns .iter() .chain(columns.iter()) .cloned() .collect::>(); let create_sql = format!( "CREATE TABLE {} (\n {},\n created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP\n)", qualified_table, all_columns.join(",\n ") ); let mut all_indexes = Vec::new(); for (linked_id, _) in links { let linked_table = get_table_name_by_id(tx, *linked_id).await?; let base_name = linked_table.split_once('_') .map(|(_, rest)| rest) .unwrap_or(&linked_table) .to_string(); all_indexes.push(format!( "CREATE INDEX \"idx_{}_{}_fk\" ON {} (\"{}_id\")", table_name, base_name, qualified_table, base_name )); } for idx in indexes { all_indexes.push(format!( "CREATE INDEX \"idx_{}_{}\" ON {} (\"{}\")", table_name, idx, qualified_table, idx )); } Ok((create_sql, all_indexes)) } async fn get_table_name_by_id( tx: &mut Transaction<'_, Postgres>, table_id: i64, ) -> Result { let record = sqlx::query!( "SELECT table_name FROM table_definitions WHERE id = $1", table_id ) .fetch_one(&mut **tx) .await .map_err(|e| Status::internal(format!("Table lookup failed: {}", e)))?; Ok(record.table_name) }