now working with the gen schema in the database

This commit is contained in:
filipriec
2025-06-02 12:39:23 +02:00
parent 59ed52814e
commit 6b5cbe854b
8 changed files with 229 additions and 75 deletions

View File

@@ -1,2 +1,3 @@
// src/shared/mod.rs // src/shared/mod.rs
pub mod date_utils; pub mod date_utils;
pub mod schema_qualifier;

View File

@@ -0,0 +1,34 @@
// src/shared/schema_qualifier.rs
use tonic::Status;
/// Qualifies table names with the appropriate schema
///
/// Rules:
/// - Tables created via PostTableDefinition (dynamically created tables) are in 'gen' schema
/// - System tables (like users, profiles) remain in 'public' schema
pub fn qualify_table_name(table_name: &str) -> String {
// Check if table matches the pattern of dynamically created tables (e.g., 2025_something)
if table_name.starts_with(|c: char| c.is_ascii_digit()) && table_name.contains('_') {
format!("gen.\"{}\"", table_name)
} else {
format!("\"{}\"", table_name)
}
}
/// Qualifies table names for data operations
pub fn qualify_table_name_for_data(table_name: &str) -> Result<String, Status> {
Ok(qualify_table_name(table_name))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_qualify_table_name() {
assert_eq!(qualify_table_name("2025_test_schema3"), "gen.\"2025_test_schema3\"");
assert_eq!(qualify_table_name("users"), "\"users\"");
assert_eq!(qualify_table_name("profiles"), "\"profiles\"");
assert_eq!(qualify_table_name("adresar"), "\"adresar\"");
}
}

View File

@@ -2,6 +2,7 @@
use tonic::Status; use tonic::Status;
use sqlx::PgPool; use sqlx::PgPool;
use common::proto::multieko2::tables_data::{DeleteTableDataRequest, DeleteTableDataResponse}; use common::proto::multieko2::tables_data::{DeleteTableDataRequest, DeleteTableDataResponse};
use crate::shared::schema_qualifier::qualify_table_name_for_data; // Import schema qualifier
pub async fn delete_table_data( pub async fn delete_table_data(
db_pool: &PgPool, db_pool: &PgPool,
@@ -36,20 +37,37 @@ pub async fn delete_table_data(
return Err(Status::not_found("Table not found in profile")); return Err(Status::not_found("Table not found in profile"));
} }
// Perform soft delete // Qualify table name with schema
let qualified_table = qualify_table_name_for_data(&request.table_name)?;
// Perform soft delete using qualified table name
let query = format!( let query = format!(
"UPDATE \"{}\" "UPDATE {}
SET deleted = true SET deleted = true
WHERE id = $1 AND deleted = false", WHERE id = $1 AND deleted = false",
request.table_name qualified_table
); );
let rows_affected = sqlx::query(&query) let result = sqlx::query(&query)
.bind(request.record_id) .bind(request.record_id)
.execute(db_pool) .execute(db_pool)
.await .await;
.map_err(|e| Status::internal(format!("Delete operation failed: {}", e)))?
.rows_affected(); let rows_affected = match result {
Ok(result) => result.rows_affected(),
Err(e) => {
// Handle "relation does not exist" error specifically
if let Some(db_err) = e.as_database_error() {
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
return Err(Status::internal(format!(
"Table '{}' is defined but does not physically exist in the database as {}",
request.table_name, qualified_table
)));
}
}
return Err(Status::internal(format!("Delete operation failed: {}", e)));
}
};
Ok(DeleteTableDataResponse { Ok(DeleteTableDataResponse {
success: rows_affected > 0, success: rows_affected > 0,

View File

@@ -3,6 +3,7 @@ use tonic::Status;
use sqlx::{PgPool, Row}; use sqlx::{PgPool, Row};
use std::collections::HashMap; use std::collections::HashMap;
use common::proto::multieko2::tables_data::{GetTableDataRequest, GetTableDataResponse}; use common::proto::multieko2::tables_data::{GetTableDataRequest, GetTableDataResponse};
use crate::shared::schema_qualifier::qualify_table_name_for_data; // Import schema qualifier
pub async fn get_table_data( pub async fn get_table_data(
db_pool: &PgPool, db_pool: &PgPool,
@@ -69,20 +70,36 @@ pub async fn get_table_data(
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
// Qualify table name with schema
let qualified_table = qualify_table_name_for_data(&table_name)?;
let sql = format!( let sql = format!(
"SELECT {} FROM \"{}\" WHERE id = $1 AND deleted = false", "SELECT {} FROM {} WHERE id = $1 AND deleted = false",
columns_clause, table_name columns_clause, qualified_table
); );
// Execute query // Execute query with enhanced error handling
let row = sqlx::query(&sql) let row_result = sqlx::query(&sql)
.bind(record_id) .bind(record_id)
.fetch_one(db_pool) .fetch_one(db_pool)
.await .await;
.map_err(|e| match e {
sqlx::Error::RowNotFound => Status::not_found("Record not found"), let row = match row_result {
_ => Status::internal(format!("Database error: {}", e)), Ok(row) => row,
})?; Err(sqlx::Error::RowNotFound) => return Err(Status::not_found("Record not found")),
Err(e) => {
// Handle "relation does not exist" error specifically
if let Some(db_err) = e.as_database_error() {
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
return Err(Status::internal(format!(
"Table '{}' is defined but does not physically exist in the database as {}",
table_name, qualified_table
)));
}
}
return Err(Status::internal(format!("Database error: {}", e)));
}
};
// Build response data // Build response data
let mut data = HashMap::new(); let mut data = HashMap::new();

View File

@@ -5,6 +5,7 @@ use common::proto::multieko2::tables_data::{
GetTableDataByPositionRequest, GetTableDataRequest, GetTableDataResponse GetTableDataByPositionRequest, GetTableDataRequest, GetTableDataResponse
}; };
use super::get_table_data; use super::get_table_data;
use crate::shared::schema_qualifier::qualify_table_name_for_data; // Import schema qualifier
pub async fn get_table_data_by_position( pub async fn get_table_data_by_position(
db_pool: &PgPool, db_pool: &PgPool,
@@ -27,39 +28,55 @@ pub async fn get_table_data_by_position(
let profile_id = profile.ok_or_else(|| Status::not_found("Profile not found"))?.id; let profile_id = profile.ok_or_else(|| Status::not_found("Profile not found"))?.id;
let table_exists = sqlx::query!( let table_exists = sqlx::query_scalar!(
r#"SELECT EXISTS( r#"SELECT EXISTS(
SELECT 1 FROM table_definitions SELECT 1 FROM table_definitions
WHERE profile_id = $1 AND table_name = $2 WHERE profile_id = $1 AND table_name = $2
)"#, ) AS "exists!""#,
profile_id, profile_id,
table_name table_name
) )
.fetch_one(db_pool) .fetch_one(db_pool)
.await .await
.map_err(|e| Status::internal(format!("Table verification error: {}", e)))? .map_err(|e| Status::internal(format!("Table verification error: {}", e)))?;
.exists
.unwrap_or(false);
if !table_exists { if !table_exists {
return Err(Status::not_found("Table not found")); return Err(Status::not_found("Table not found"));
} }
let id: i64 = sqlx::query_scalar( // Qualify table name with schema
let qualified_table = qualify_table_name_for_data(&table_name)?;
let id_result = sqlx::query_scalar(
&format!( &format!(
r#"SELECT id FROM "{}" r#"SELECT id FROM {}
WHERE deleted = FALSE WHERE deleted = FALSE
ORDER BY id ASC ORDER BY id ASC
OFFSET $1 OFFSET $1
LIMIT 1"#, LIMIT 1"#,
table_name qualified_table
) )
) )
.bind(request.position - 1) .bind(request.position - 1)
.fetch_optional(db_pool) .fetch_optional(db_pool)
.await .await;
.map_err(|e| Status::internal(format!("Position query failed: {}", e)))?
.ok_or_else(|| Status::not_found("Position out of bounds"))?; let id: i64 = match id_result {
Ok(Some(id)) => id,
Ok(None) => return Err(Status::not_found("Position out of bounds")),
Err(e) => {
// Handle "relation does not exist" error specifically
if let Some(db_err) = e.as_database_error() {
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
return Err(Status::internal(format!(
"Table '{}' is defined but does not physically exist in the database as {}",
table_name, qualified_table
)));
}
}
return Err(Status::internal(format!("Position query failed: {}", e)));
}
};
get_table_data( get_table_data(
db_pool, db_pool,

View File

@@ -3,59 +3,93 @@ use tonic::Status;
use sqlx::PgPool; use sqlx::PgPool;
use common::proto::multieko2::common::CountResponse; use common::proto::multieko2::common::CountResponse;
use common::proto::multieko2::tables_data::GetTableDataCountRequest; use common::proto::multieko2::tables_data::GetTableDataCountRequest;
use crate::shared::schema_qualifier::qualify_table_name_for_data; // 1. IMPORT THE FUNCTION
pub async fn get_table_data_count( pub async fn get_table_data_count(
db_pool: &PgPool, db_pool: &PgPool,
request: GetTableDataCountRequest, request: GetTableDataCountRequest,
) -> Result<CountResponse, Status> { ) -> Result<CountResponse, Status> {
let profile_name = request.profile_name; // We still need to verify that the table is logically defined for the profile.
let table_name = request.table_name; // The schema qualifier handles *how* to access it physically, but this check
// ensures the request is valid in the context of the application's definitions.
// Lookup profile
let profile = sqlx::query!( let profile = sqlx::query!(
"SELECT id FROM profiles WHERE name = $1", "SELECT id FROM profiles WHERE name = $1",
profile_name request.profile_name
) )
.fetch_optional(db_pool) .fetch_optional(db_pool)
.await .await
.map_err(|e| Status::internal(format!("Profile lookup error: {}", e)))?; .map_err(|e| Status::internal(format!("Profile lookup error for '{}': {}", request.profile_name, e)))?;
let profile_id = profile.ok_or_else(|| Status::not_found("Profile not found"))?.id; let profile_id = match profile {
Some(p) => p.id,
None => return Err(Status::not_found(format!("Profile '{}' not found", request.profile_name))),
};
// Verify table exists and belongs to profile let table_defined_for_profile = sqlx::query_scalar!(
let table_exists = sqlx::query!(
r#"SELECT EXISTS( r#"SELECT EXISTS(
SELECT 1 FROM table_definitions SELECT 1 FROM table_definitions
WHERE profile_id = $1 AND table_name = $2 WHERE profile_id = $1 AND table_name = $2
)"#, ) AS "exists!" "#, // Added AS "exists!" for clarity with sqlx macro
profile_id, profile_id,
table_name request.table_name
) )
.fetch_one(db_pool) .fetch_one(db_pool)
.await .await
.map_err(|e| Status::internal(format!("Table verification error: {}", e)))? .map_err(|e| Status::internal(format!("Table definition verification error for '{}.{}': {}", request.profile_name, request.table_name, e)))?;
.exists
.unwrap_or(false);
if !table_exists { if !table_defined_for_profile {
return Err(Status::not_found("Table not found")); // If the table isn't even defined for this profile in table_definitions,
// it's an error, regardless of whether a physical table with that name exists somewhere.
return Err(Status::not_found(format!(
"Table '{}' is not defined for profile '{}'",
request.table_name, request.profile_name
)));
} }
// Get count of non-deleted records // 2. QUALIFY THE TABLE NAME using the imported function
let query = format!( let qualified_table_name = qualify_table_name_for_data(&request.table_name)?;
// 3. USE THE QUALIFIED NAME in the SQL query
let query_sql = format!(
r#" r#"
SELECT COUNT(*) AS count SELECT COUNT(*) AS count
FROM "{}" FROM {}
WHERE deleted = FALSE WHERE deleted = FALSE
"#, "#,
table_name qualified_table_name // Use the schema-qualified name here
); );
let count: i64 = sqlx::query_scalar::<_, Option<i64>>(&query) // The rest of the logic remains largely the same, but error messages can be more specific.
let count_result = sqlx::query_scalar::<_, Option<i64>>(&query_sql)
.fetch_one(db_pool) .fetch_one(db_pool)
.await .await;
.map_err(|e| Status::internal(format!("Count query failed: {}", e)))?
.unwrap_or(0);
Ok(CountResponse { count }) match count_result {
Ok(Some(count_val)) => Ok(CountResponse { count: count_val }),
Ok(None) => {
// This case should ideally not be reached with COUNT(*),
// as it always returns a row, even if the count is 0.
// If it does, it might indicate an issue or an empty table if the query was different.
// For COUNT(*), a 0 count is expected if no non-deleted rows.
Ok(CountResponse { count: 0 })
}
Err(e) => {
// Check if the error is "relation does not exist" (PostgreSQL error code 42P01)
if let Some(db_err) = e.as_database_error() {
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
// This means the table (e.g., gen."2025_test_schema3") does not physically exist,
// even though it was defined in table_definitions. This is an inconsistency.
return Err(Status::internal(format!(
"Table '{}' is defined but does not physically exist in the database as {}.",
request.table_name, qualified_table_name
)));
}
}
// For other errors, provide a general message.
Err(Status::internal(format!(
"Count query failed for table {}: {}",
qualified_table_name, e
)))
}
}
} }

View File

@@ -6,6 +6,7 @@ use chrono::{DateTime, Utc};
use common::proto::multieko2::tables_data::{PostTableDataRequest, PostTableDataResponse}; use common::proto::multieko2::tables_data::{PostTableDataRequest, PostTableDataResponse};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use crate::shared::schema_qualifier::qualify_table_name_for_data; // Import schema qualifier
use crate::steel::server::execution::{self, Value}; use crate::steel::server::execution::{self, Value};
use crate::steel::server::functions::SteelContext; use crate::steel::server::functions::SteelContext;
@@ -97,7 +98,7 @@ pub async fn post_table_data(
// Validate all data columns // Validate all data columns
let user_columns: Vec<&String> = columns.iter().map(|(name, _)| name).collect(); let user_columns: Vec<&String> = columns.iter().map(|(name, _)| name).collect();
for key in data.keys() { for key in data.keys() {
if !system_columns_set.contains(key.as_str()) && if !system_columns_set.contains(key.as_str()) &&
!user_columns.contains(&&key.to_string()) { !user_columns.contains(&&key.to_string()) {
return Err(Status::invalid_argument(format!("Invalid column: {}", key))); return Err(Status::invalid_argument(format!("Invalid column: {}", key)));
} }
@@ -123,13 +124,12 @@ pub async fn post_table_data(
// Create execution context // Create execution context
let context = SteelContext { let context = SteelContext {
current_table: table_name.clone(), current_table: table_name.clone(), // Keep base name for scripts
profile_id, profile_id,
row_data: data.clone(), row_data: data.clone(),
db_pool: Arc::new(db_pool.clone()), db_pool: Arc::new(db_pool.clone()),
}; };
// Execute validation script // Execute validation script
let script_result = execution::execute_script( let script_result = execution::execute_script(
script_record.script, script_record.script,
@@ -220,17 +220,36 @@ pub async fn post_table_data(
return Err(Status::invalid_argument("No valid columns to insert")); return Err(Status::invalid_argument("No valid columns to insert"));
} }
// Qualify table name with schema
let qualified_table = qualify_table_name_for_data(&table_name)?;
let sql = format!( let sql = format!(
"INSERT INTO \"{}\" ({}) VALUES ({}) RETURNING id", "INSERT INTO {} ({}) VALUES ({}) RETURNING id",
table_name, qualified_table,
columns_list.join(", "), columns_list.join(", "),
placeholders.join(", ") placeholders.join(", ")
); );
let inserted_id: i64 = sqlx::query_scalar_with(&sql, params) // Execute query with enhanced error handling
let result = sqlx::query_scalar_with::<_, i64, _>(&sql, params)
.fetch_one(db_pool) .fetch_one(db_pool)
.await .await;
.map_err(|e| Status::internal(format!("Insert failed: {}", e)))?;
let inserted_id = match result {
Ok(id) => id,
Err(e) => {
// Handle "relation does not exist" error specifically
if let Some(db_err) = e.as_database_error() {
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
return Err(Status::internal(format!(
"Table '{}' is defined but does not physically exist in the database as {}",
table_name, qualified_table
)));
}
}
return Err(Status::internal(format!("Insert failed: {}", e)));
}
};
Ok(PostTableDataResponse { Ok(PostTableDataResponse {
success: true, success: true,

View File

@@ -5,6 +5,7 @@ use sqlx::postgres::PgArguments;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use common::proto::multieko2::tables_data::{PutTableDataRequest, PutTableDataResponse}; use common::proto::multieko2::tables_data::{PutTableDataRequest, PutTableDataResponse};
use std::collections::HashMap; use std::collections::HashMap;
use crate::shared::schema_qualifier::qualify_table_name_for_data; // Import schema qualifier
pub async fn put_table_data( pub async fn put_table_data(
db_pool: &PgPool, db_pool: &PgPool,
@@ -13,18 +14,18 @@ pub async fn put_table_data(
let profile_name = request.profile_name; let profile_name = request.profile_name;
let table_name = request.table_name; let table_name = request.table_name;
let record_id = request.id; let record_id = request.id;
// Preprocess and validate data // Preprocess and validate data
let mut processed_data = HashMap::new(); let mut processed_data = HashMap::new();
let mut null_fields = Vec::new(); let mut null_fields = Vec::new();
for (key, value) in request.data { for (key, value) in request.data {
let trimmed = value.trim().to_string(); let trimmed = value.trim().to_string();
if key == "firma" && trimmed.is_empty() { if key == "firma" && trimmed.is_empty() {
return Err(Status::invalid_argument("Firma cannot be empty")); return Err(Status::invalid_argument("Firma cannot be empty"));
} }
// Store fields that should be set to NULL // Store fields that should be set to NULL
if key != "firma" && trimmed.is_empty() { if key != "firma" && trimmed.is_empty() {
null_fields.push(key); null_fields.push(key);
@@ -103,7 +104,6 @@ pub async fn put_table_data(
.ok_or_else(|| Status::invalid_argument(format!("Column not found: {}", col)))? .ok_or_else(|| Status::invalid_argument(format!("Column not found: {}", col)))?
}; };
// TODO strong testing by user pick in the future
match sql_type { match sql_type {
"TEXT" | "VARCHAR(15)" | "VARCHAR(255)" => { "TEXT" | "VARCHAR(15)" | "VARCHAR(255)" => {
if let Some(max_len) = sql_type.strip_prefix("VARCHAR(") if let Some(max_len) = sql_type.strip_prefix("VARCHAR(")
@@ -121,7 +121,7 @@ pub async fn put_table_data(
let val = value.parse::<bool>() let val = value.parse::<bool>()
.map_err(|_| Status::invalid_argument(format!("Invalid boolean for {}", col)))?; .map_err(|_| Status::invalid_argument(format!("Invalid boolean for {}", col)))?;
params.add(val) params.add(val)
.map_err(|e| Status::internal(format!("Failed to add boolean parameter for {}: {}", col, e)))?; .map_err(|e| Status::internal(format!("Failed to add boolean parameter for {} {}", col, e)))?;
}, },
"TIMESTAMPTZ" => { "TIMESTAMPTZ" => {
let dt = DateTime::parse_from_rfc3339(value) let dt = DateTime::parse_from_rfc3339(value)
@@ -154,25 +154,39 @@ pub async fn put_table_data(
params.add(record_id) params.add(record_id)
.map_err(|e| Status::internal(format!("Failed to add record_id parameter: {}", e)))?; .map_err(|e| Status::internal(format!("Failed to add record_id parameter: {}", e)))?;
// Qualify table name with schema
let qualified_table = qualify_table_name_for_data(&table_name)?;
let set_clause = set_clauses.join(", "); let set_clause = set_clauses.join(", ");
let sql = format!( let sql = format!(
"UPDATE \"{}\" SET {} WHERE id = ${} AND deleted = FALSE RETURNING id", "UPDATE {} SET {} WHERE id = ${} AND deleted = FALSE RETURNING id",
table_name, qualified_table,
set_clause, set_clause,
param_idx param_idx
); );
let result = sqlx::query_scalar_with::<Postgres, i64, _>(&sql, params) let result = sqlx::query_scalar_with::<Postgres, i64, _>(&sql, params)
.fetch_optional(db_pool) .fetch_optional(db_pool)
.await .await;
.map_err(|e| Status::internal(format!("Update failed: {}", e)))?;
match result { match result {
Some(updated_id) => Ok(PutTableDataResponse { Ok(Some(updated_id)) => Ok(PutTableDataResponse {
success: true, success: true,
message: "Data updated successfully".into(), message: "Data updated successfully".into(),
updated_id, updated_id,
}), }),
None => Err(Status::not_found("Record not found or already deleted")), Ok(None) => Err(Status::not_found("Record not found or already deleted")),
Err(e) => {
// Handle "relation does not exist" error specifically
if let Some(db_err) = e.as_database_error() {
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
return Err(Status::internal(format!(
"Table '{}' is defined but does not physically exist in the database as {}",
table_name, qualified_table
)));
}
}
Err(Status::internal(format!("Update failed: {}", e)))
}
} }
} }