fixing post with links
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -449,6 +449,7 @@ dependencies = [
|
|||||||
"dotenvy",
|
"dotenvy",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"prost",
|
"prost",
|
||||||
|
"prost-types",
|
||||||
"ratatui",
|
"ratatui",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@@ -487,6 +488,7 @@ name = "common"
|
|||||||
version = "0.3.13"
|
version = "0.3.13"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"prost",
|
"prost",
|
||||||
|
"prost-types",
|
||||||
"serde",
|
"serde",
|
||||||
"tantivy",
|
"tantivy",
|
||||||
"tonic",
|
"tonic",
|
||||||
@@ -2843,6 +2845,7 @@ dependencies = [
|
|||||||
"jsonwebtoken",
|
"jsonwebtoken",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"prost",
|
"prost",
|
||||||
|
"prost-types",
|
||||||
"regex",
|
"regex",
|
||||||
"rstest",
|
"rstest",
|
||||||
"rust-stemmers",
|
"rust-stemmers",
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ tokio = { version = "1.44.2", features = ["full"] }
|
|||||||
tonic = "0.13.0"
|
tonic = "0.13.0"
|
||||||
prost = "0.13.5"
|
prost = "0.13.5"
|
||||||
async-trait = "0.1.88"
|
async-trait = "0.1.88"
|
||||||
|
prost-types = "0.13.0"
|
||||||
|
|
||||||
# Data Handling & Serialization
|
# Data Handling & Serialization
|
||||||
serde = { version = "1.0.219", features = ["derive"] }
|
serde = { version = "1.0.219", features = ["derive"] }
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ anyhow = "1.0.98"
|
|||||||
async-trait = "0.1.88"
|
async-trait = "0.1.88"
|
||||||
common = { path = "../common" }
|
common = { path = "../common" }
|
||||||
|
|
||||||
|
prost-types = { workspace = true }
|
||||||
crossterm = "0.28.1"
|
crossterm = "0.28.1"
|
||||||
dirs = "6.0.0"
|
dirs = "6.0.0"
|
||||||
dotenvy = "0.15.7"
|
dotenvy = "0.15.7"
|
||||||
|
|||||||
@@ -23,8 +23,9 @@ use common::proto::multieko2::tables_data::{
|
|||||||
use common::proto::multieko2::search::{
|
use common::proto::multieko2::search::{
|
||||||
searcher_client::SearcherClient, SearchRequest, SearchResponse,
|
searcher_client::SearcherClient, SearchRequest, SearchResponse,
|
||||||
};
|
};
|
||||||
use anyhow::{Context, Result}; // Added Context
|
use anyhow::{Context, Result};
|
||||||
use std::collections::HashMap; // NEW
|
use std::collections::HashMap;
|
||||||
|
use prost_types::Value;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct GrpcClient {
|
pub struct GrpcClient {
|
||||||
@@ -48,7 +49,6 @@ impl GrpcClient {
|
|||||||
TableDefinitionClient::new(channel.clone());
|
TableDefinitionClient::new(channel.clone());
|
||||||
let table_script_client = TableScriptClient::new(channel.clone());
|
let table_script_client = TableScriptClient::new(channel.clone());
|
||||||
let tables_data_client = TablesDataClient::new(channel.clone());
|
let tables_data_client = TablesDataClient::new(channel.clone());
|
||||||
// NEW: Instantiate the search client
|
|
||||||
let search_client = SearcherClient::new(channel.clone());
|
let search_client = SearcherClient::new(channel.clone());
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
@@ -56,7 +56,7 @@ impl GrpcClient {
|
|||||||
table_definition_client,
|
table_definition_client,
|
||||||
table_script_client,
|
table_script_client,
|
||||||
tables_data_client,
|
tables_data_client,
|
||||||
search_client, // NEW
|
search_client,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,10 +161,16 @@ impl GrpcClient {
|
|||||||
table_name: String,
|
table_name: String,
|
||||||
data: HashMap<String, String>,
|
data: HashMap<String, String>,
|
||||||
) -> Result<PostTableDataResponse> {
|
) -> Result<PostTableDataResponse> {
|
||||||
|
// 2. CONVERT THE HASHMAP
|
||||||
|
let data: HashMap<String, Value> = data
|
||||||
|
.into_iter()
|
||||||
|
.map(|(k, v)| (k, Value::from(v)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
let grpc_request = PostTableDataRequest {
|
let grpc_request = PostTableDataRequest {
|
||||||
profile_name,
|
profile_name,
|
||||||
table_name,
|
table_name,
|
||||||
data,
|
data, // This is now the correct type
|
||||||
};
|
};
|
||||||
let request = tonic::Request::new(grpc_request);
|
let request = tonic::Request::new(grpc_request);
|
||||||
let response = self
|
let response = self
|
||||||
@@ -182,11 +188,17 @@ impl GrpcClient {
|
|||||||
id: i64,
|
id: i64,
|
||||||
data: HashMap<String, String>,
|
data: HashMap<String, String>,
|
||||||
) -> Result<PutTableDataResponse> {
|
) -> Result<PutTableDataResponse> {
|
||||||
|
// 2. CONVERT THE HASHMAP
|
||||||
|
let data: HashMap<String, Value> = data
|
||||||
|
.into_iter()
|
||||||
|
.map(|(k, v)| (k, Value::from(v)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
let grpc_request = PutTableDataRequest {
|
let grpc_request = PutTableDataRequest {
|
||||||
profile_name,
|
profile_name,
|
||||||
table_name,
|
table_name,
|
||||||
id,
|
id,
|
||||||
data,
|
data, // This is now the correct type
|
||||||
};
|
};
|
||||||
let request = tonic::Request::new(grpc_request);
|
let request = tonic::Request::new(grpc_request);
|
||||||
let response = self
|
let response = self
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ edition.workspace = true
|
|||||||
license.workspace = true
|
license.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
prost-types = { workspace = true }
|
||||||
|
|
||||||
tonic = "0.13.0"
|
tonic = "0.13.0"
|
||||||
prost = "0.13.5"
|
prost = "0.13.5"
|
||||||
serde = { version = "1.0.219", features = ["derive"] }
|
serde = { version = "1.0.219", features = ["derive"] }
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ syntax = "proto3";
|
|||||||
package multieko2.tables_data;
|
package multieko2.tables_data;
|
||||||
|
|
||||||
import "common.proto";
|
import "common.proto";
|
||||||
|
import "google/protobuf/struct.proto";
|
||||||
|
|
||||||
service TablesData {
|
service TablesData {
|
||||||
rpc PostTableData (PostTableDataRequest) returns (PostTableDataResponse);
|
rpc PostTableData (PostTableDataRequest) returns (PostTableDataResponse);
|
||||||
@@ -16,7 +17,7 @@ service TablesData {
|
|||||||
message PostTableDataRequest {
|
message PostTableDataRequest {
|
||||||
string profile_name = 1;
|
string profile_name = 1;
|
||||||
string table_name = 2;
|
string table_name = 2;
|
||||||
map<string, string> data = 3;
|
map<string, google.protobuf.Value> data = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message PostTableDataResponse {
|
message PostTableDataResponse {
|
||||||
@@ -29,7 +30,7 @@ message PutTableDataRequest {
|
|||||||
string profile_name = 1;
|
string profile_name = 1;
|
||||||
string table_name = 2;
|
string table_name = 2;
|
||||||
int64 id = 3;
|
int64 id = 3;
|
||||||
map<string, string> data = 4;
|
map<string, google.protobuf.Value> data = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message PutTableDataResponse {
|
message PutTableDataResponse {
|
||||||
|
|||||||
Binary file not shown.
@@ -5,10 +5,10 @@ pub struct PostTableDataRequest {
|
|||||||
pub profile_name: ::prost::alloc::string::String,
|
pub profile_name: ::prost::alloc::string::String,
|
||||||
#[prost(string, tag = "2")]
|
#[prost(string, tag = "2")]
|
||||||
pub table_name: ::prost::alloc::string::String,
|
pub table_name: ::prost::alloc::string::String,
|
||||||
#[prost(map = "string, string", tag = "3")]
|
#[prost(map = "string, message", tag = "3")]
|
||||||
pub data: ::std::collections::HashMap<
|
pub data: ::std::collections::HashMap<
|
||||||
::prost::alloc::string::String,
|
::prost::alloc::string::String,
|
||||||
::prost::alloc::string::String,
|
::prost_types::Value,
|
||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
@@ -28,10 +28,10 @@ pub struct PutTableDataRequest {
|
|||||||
pub table_name: ::prost::alloc::string::String,
|
pub table_name: ::prost::alloc::string::String,
|
||||||
#[prost(int64, tag = "3")]
|
#[prost(int64, tag = "3")]
|
||||||
pub id: i64,
|
pub id: i64,
|
||||||
#[prost(map = "string, string", tag = "4")]
|
#[prost(map = "string, message", tag = "4")]
|
||||||
pub data: ::std::collections::HashMap<
|
pub data: ::std::collections::HashMap<
|
||||||
::prost::alloc::string::String,
|
::prost::alloc::string::String,
|
||||||
::prost::alloc::string::String,
|
::prost_types::Value,
|
||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ search = { path = "../search" }
|
|||||||
|
|
||||||
anyhow = { workspace = true }
|
anyhow = { workspace = true }
|
||||||
tantivy = { workspace = true }
|
tantivy = { workspace = true }
|
||||||
|
prost-types = { workspace = true }
|
||||||
chrono = { version = "0.4.40", features = ["serde"] }
|
chrono = { version = "0.4.40", features = ["serde"] }
|
||||||
dotenvy = "0.15.7"
|
dotenvy = "0.15.7"
|
||||||
prost = "0.13.5"
|
prost = "0.13.5"
|
||||||
|
|||||||
@@ -8,16 +8,15 @@ use common::proto::multieko2::tables_data::{PostTableDataRequest, PostTableDataR
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use crate::shared::schema_qualifier::qualify_table_name_for_data;
|
use crate::shared::schema_qualifier::qualify_table_name_for_data;
|
||||||
|
use prost_types::value::Kind; // NEW: Import the Kind enum
|
||||||
|
|
||||||
use crate::steel::server::execution::{self, Value};
|
use crate::steel::server::execution::{self, Value};
|
||||||
use crate::steel::server::functions::SteelContext;
|
use crate::steel::server::functions::SteelContext;
|
||||||
|
|
||||||
// Add these imports
|
|
||||||
use crate::indexer::{IndexCommand, IndexCommandData};
|
use crate::indexer::{IndexCommand, IndexCommandData};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
|
||||||
// MODIFIED: Function signature now accepts the indexer sender
|
|
||||||
pub async fn post_table_data(
|
pub async fn post_table_data(
|
||||||
db_pool: &PgPool,
|
db_pool: &PgPool,
|
||||||
request: PostTableDataRequest,
|
request: PostTableDataRequest,
|
||||||
@@ -25,11 +24,12 @@ pub async fn post_table_data(
|
|||||||
) -> Result<PostTableDataResponse, Status> {
|
) -> Result<PostTableDataResponse, Status> {
|
||||||
let profile_name = request.profile_name;
|
let profile_name = request.profile_name;
|
||||||
let table_name = request.table_name;
|
let table_name = request.table_name;
|
||||||
let mut data = HashMap::new();
|
|
||||||
|
|
||||||
for (key, value) in request.data {
|
// REMOVED: The old data conversion loop. We will process request.data directly.
|
||||||
data.insert(key, value.trim().to_string());
|
// let mut data = HashMap::new();
|
||||||
}
|
// for (key, value) in request.data {
|
||||||
|
// data.insert(key, value.trim().to_string());
|
||||||
|
// }
|
||||||
|
|
||||||
// Lookup profile
|
// Lookup profile
|
||||||
let profile = sqlx::query!(
|
let profile = sqlx::query!(
|
||||||
@@ -94,13 +94,28 @@ pub async fn post_table_data(
|
|||||||
|
|
||||||
// Validate all data columns
|
// Validate all data columns
|
||||||
let user_columns: Vec<&String> = columns.iter().map(|(name, _)| name).collect();
|
let user_columns: Vec<&String> = columns.iter().map(|(name, _)| name).collect();
|
||||||
for key in data.keys() {
|
for key in request.data.keys() { // CHANGED: Use request.data
|
||||||
if !system_columns_set.contains(key.as_str()) &&
|
if !system_columns_set.contains(key.as_str()) &&
|
||||||
!user_columns.contains(&&key.to_string()) {
|
!user_columns.contains(&&key.to_string()) {
|
||||||
return Err(Status::invalid_argument(format!("Invalid column: {}", key)));
|
return Err(Status::invalid_argument(format!("Invalid column: {}", key)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NEW: Create a string-based map for backwards compatibility with Steel scripts.
|
||||||
|
let mut string_data_for_scripts = HashMap::new();
|
||||||
|
for (key, proto_value) in &request.data {
|
||||||
|
let str_val = match &proto_value.kind {
|
||||||
|
Some(Kind::StringValue(s)) => s.clone(),
|
||||||
|
Some(Kind::NumberValue(n)) => n.to_string(),
|
||||||
|
Some(Kind::BoolValue(b)) => b.to_string(),
|
||||||
|
Some(Kind::NullValue(_)) => String::new(),
|
||||||
|
Some(Kind::StructValue(_)) | Some(Kind::ListValue(_)) | None => {
|
||||||
|
return Err(Status::invalid_argument(format!("Unsupported type for script validation in column '{}'", key)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
string_data_for_scripts.insert(key.clone(), str_val);
|
||||||
|
}
|
||||||
|
|
||||||
// Validate Steel scripts
|
// Validate Steel scripts
|
||||||
let scripts = sqlx::query!(
|
let scripts = sqlx::query!(
|
||||||
"SELECT target_column, script FROM table_scripts WHERE table_definitions_id = $1",
|
"SELECT target_column, script FROM table_scripts WHERE table_definitions_id = $1",
|
||||||
@@ -114,20 +129,20 @@ pub async fn post_table_data(
|
|||||||
let target_column = script_record.target_column;
|
let target_column = script_record.target_column;
|
||||||
|
|
||||||
// Ensure target column exists in submitted data
|
// Ensure target column exists in submitted data
|
||||||
let user_value = data.get(&target_column)
|
let user_value = string_data_for_scripts.get(&target_column) // CHANGED: Use the new string map
|
||||||
.ok_or_else(|| Status::invalid_argument(
|
.ok_or_else(|| Status::invalid_argument(
|
||||||
format!("Script target column '{}' is required", target_column)
|
format!("Script target column '{}' is required", target_column)
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
// Create execution context
|
// Create execution context
|
||||||
let context = SteelContext {
|
let context = SteelContext {
|
||||||
current_table: table_name.clone(), // Keep base name for scripts
|
current_table: table_name.clone(),
|
||||||
profile_id,
|
profile_id,
|
||||||
row_data: data.clone(),
|
row_data: string_data_for_scripts.clone(), // CHANGED: Use the new string map
|
||||||
db_pool: Arc::new(db_pool.clone()),
|
db_pool: Arc::new(db_pool.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Execute validation script
|
// ... (rest of script execution is the same)
|
||||||
let script_result = execution::execute_script(
|
let script_result = execution::execute_script(
|
||||||
script_record.script,
|
script_record.script,
|
||||||
"STRINGS",
|
"STRINGS",
|
||||||
@@ -138,7 +153,6 @@ pub async fn post_table_data(
|
|||||||
format!("Script execution failed for '{}': {}", target_column, e)
|
format!("Script execution failed for '{}': {}", target_column, e)
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
// Validate script output
|
|
||||||
let Value::Strings(mut script_output) = script_result else {
|
let Value::Strings(mut script_output) = script_result else {
|
||||||
return Err(Status::internal("Script must return string values"));
|
return Err(Status::internal("Script must return string values"));
|
||||||
};
|
};
|
||||||
@@ -160,11 +174,12 @@ pub async fn post_table_data(
|
|||||||
let mut placeholders = Vec::new();
|
let mut placeholders = Vec::new();
|
||||||
let mut param_idx = 1;
|
let mut param_idx = 1;
|
||||||
|
|
||||||
for (col, value) in data {
|
// CHANGED: This entire loop is rewritten to handle prost_types::Value
|
||||||
|
for (col, proto_value) in request.data {
|
||||||
let sql_type = if system_columns_set.contains(col.as_str()) {
|
let sql_type = if system_columns_set.contains(col.as_str()) {
|
||||||
match col.as_str() {
|
match col.as_str() {
|
||||||
"deleted" => "BOOLEAN",
|
"deleted" => "BOOLEAN",
|
||||||
_ if col.ends_with("_id") => "BIGINT", // Handle foreign keys
|
_ if col.ends_with("_id") => "BIGINT",
|
||||||
_ => return Err(Status::invalid_argument("Invalid system column")),
|
_ => return Err(Status::invalid_argument("Invalid system column")),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -174,8 +189,13 @@ pub async fn post_table_data(
|
|||||||
.ok_or_else(|| Status::invalid_argument(format!("Column not found: {}", col)))?
|
.ok_or_else(|| Status::invalid_argument(format!("Column not found: {}", col)))?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let kind = proto_value.kind.ok_or_else(|| {
|
||||||
|
Status::invalid_argument(format!("Value for column '{}' cannot be null", col))
|
||||||
|
})?;
|
||||||
|
|
||||||
match sql_type {
|
match sql_type {
|
||||||
"TEXT" | "VARCHAR(15)" | "VARCHAR(255)" => {
|
"TEXT" | "VARCHAR(15)" | "VARCHAR(255)" => {
|
||||||
|
if let Kind::StringValue(value) = kind {
|
||||||
if let Some(max_len) = sql_type.strip_prefix("VARCHAR(")
|
if let Some(max_len) = sql_type.strip_prefix("VARCHAR(")
|
||||||
.and_then(|s| s.strip_suffix(')'))
|
.and_then(|s| s.strip_suffix(')'))
|
||||||
.and_then(|s| s.parse::<usize>().ok())
|
.and_then(|s| s.parse::<usize>().ok())
|
||||||
@@ -186,24 +206,38 @@ pub async fn post_table_data(
|
|||||||
}
|
}
|
||||||
params.add(value)
|
params.add(value)
|
||||||
.map_err(|e| Status::invalid_argument(format!("Failed to add text parameter for {}: {}", col, e)))?;
|
.map_err(|e| Status::invalid_argument(format!("Failed to add text parameter for {}: {}", col, e)))?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument(format!("Expected string for column '{}'", col)));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"BOOLEAN" => {
|
"BOOLEAN" => {
|
||||||
let val = value.parse::<bool>()
|
if let Kind::BoolValue(val) = kind {
|
||||||
.map_err(|_| Status::invalid_argument(format!("Invalid boolean for {}", col)))?;
|
|
||||||
params.add(val)
|
params.add(val)
|
||||||
.map_err(|e| Status::invalid_argument(format!("Failed to add boolean parameter for {}: {}", col, e)))?;
|
.map_err(|e| Status::invalid_argument(format!("Failed to add boolean parameter for {}: {}", col, e)))?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument(format!("Expected boolean for column '{}'", col)));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"TIMESTAMPTZ" => {
|
"TIMESTAMPTZ" => {
|
||||||
|
if let Kind::StringValue(value) = kind {
|
||||||
let dt = DateTime::parse_from_rfc3339(&value)
|
let dt = DateTime::parse_from_rfc3339(&value)
|
||||||
.map_err(|_| Status::invalid_argument(format!("Invalid timestamp for {}", col)))?;
|
.map_err(|_| Status::invalid_argument(format!("Invalid timestamp for {}", col)))?;
|
||||||
params.add(dt.with_timezone(&Utc))
|
params.add(dt.with_timezone(&Utc))
|
||||||
.map_err(|e| Status::invalid_argument(format!("Failed to add timestamp parameter for {}: {}", col, e)))?;
|
.map_err(|e| Status::invalid_argument(format!("Failed to add timestamp parameter for {}: {}", col, e)))?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument(format!("Expected ISO 8601 string for column '{}'", col)));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"BIGINT" => {
|
"BIGINT" => {
|
||||||
let val = value.parse::<i64>()
|
if let Kind::NumberValue(val) = kind {
|
||||||
.map_err(|_| Status::invalid_argument(format!("Invalid integer for {}", col)))?;
|
if val.fract() != 0.0 {
|
||||||
params.add(val)
|
return Err(Status::invalid_argument(format!("Expected integer for column '{}', but got a float", col)));
|
||||||
|
}
|
||||||
|
params.add(val as i64)
|
||||||
.map_err(|e| Status::invalid_argument(format!("Failed to add integer parameter for {}: {}", col, e)))?;
|
.map_err(|e| Status::invalid_argument(format!("Failed to add integer parameter for {}: {}", col, e)))?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument(format!("Expected number for column '{}'", col)));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
_ => return Err(Status::invalid_argument(format!("Unsupported type {}", sql_type))),
|
_ => return Err(Status::invalid_argument(format!("Unsupported type {}", sql_type))),
|
||||||
}
|
}
|
||||||
@@ -227,7 +261,7 @@ pub async fn post_table_data(
|
|||||||
placeholders.join(", ")
|
placeholders.join(", ")
|
||||||
);
|
);
|
||||||
|
|
||||||
// Execute query with enhanced error handling
|
// ... (rest of the function is unchanged)
|
||||||
let result = sqlx::query_scalar_with::<_, i64, _>(&sql, params)
|
let result = sqlx::query_scalar_with::<_, i64, _>(&sql, params)
|
||||||
.fetch_one(db_pool)
|
.fetch_one(db_pool)
|
||||||
.await;
|
.await;
|
||||||
@@ -235,7 +269,6 @@ pub async fn post_table_data(
|
|||||||
let inserted_id = match result {
|
let inserted_id = match result {
|
||||||
Ok(id) => id,
|
Ok(id) => id,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Handle "relation does not exist" error specifically
|
|
||||||
if let Some(db_err) = e.as_database_error() {
|
if let Some(db_err) = e.as_database_error() {
|
||||||
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
|
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
|
||||||
return Err(Status::internal(format!(
|
return Err(Status::internal(format!(
|
||||||
@@ -248,15 +281,12 @@ pub async fn post_table_data(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// After a successful insert, send a command to the indexer.
|
|
||||||
let command = IndexCommand::AddOrUpdate(IndexCommandData {
|
let command = IndexCommand::AddOrUpdate(IndexCommandData {
|
||||||
table_name: table_name.clone(),
|
table_name: table_name.clone(),
|
||||||
row_id: inserted_id,
|
row_id: inserted_id,
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Err(e) = indexer_tx.send(command).await {
|
if let Err(e) = indexer_tx.send(command).await {
|
||||||
// If sending fails, the DB is updated but the index will be stale.
|
|
||||||
// This is a critical situation to log and monitor.
|
|
||||||
error!(
|
error!(
|
||||||
"CRITICAL: DB insert for table '{}' (id: {}) succeeded but failed to queue for indexing: {}. Search index is now inconsistent.",
|
"CRITICAL: DB insert for table '{}' (id: {}) succeeded but failed to queue for indexing: {}. Search index is now inconsistent.",
|
||||||
table_name, inserted_id, e
|
table_name, inserted_id, e
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ use sqlx::{PgPool, Arguments, Postgres};
|
|||||||
use sqlx::postgres::PgArguments;
|
use sqlx::postgres::PgArguments;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use common::proto::multieko2::tables_data::{PutTableDataRequest, PutTableDataResponse};
|
use common::proto::multieko2::tables_data::{PutTableDataRequest, PutTableDataResponse};
|
||||||
use std::collections::HashMap;
|
use crate::shared::schema_qualifier::qualify_table_name_for_data;
|
||||||
use crate::shared::schema_qualifier::qualify_table_name_for_data; // Import schema qualifier
|
use prost_types::value::Kind;
|
||||||
|
|
||||||
pub async fn put_table_data(
|
pub async fn put_table_data(
|
||||||
db_pool: &PgPool,
|
db_pool: &PgPool,
|
||||||
@@ -15,20 +15,9 @@ pub async fn put_table_data(
|
|||||||
let table_name = request.table_name;
|
let table_name = request.table_name;
|
||||||
let record_id = request.id;
|
let record_id = request.id;
|
||||||
|
|
||||||
// Preprocess and validate data
|
// If no data is provided to update, it's an invalid request.
|
||||||
let mut processed_data = HashMap::new();
|
if request.data.is_empty() {
|
||||||
let mut null_fields = Vec::new();
|
return Err(Status::invalid_argument("No fields provided to update."));
|
||||||
|
|
||||||
// CORRECTED: Generic handling for all fields.
|
|
||||||
// Any field with an empty string will be added to the null_fields list.
|
|
||||||
// The special, hardcoded logic for "firma" has been removed.
|
|
||||||
for (key, value) in request.data {
|
|
||||||
let trimmed = value.trim().to_string();
|
|
||||||
if trimmed.is_empty() {
|
|
||||||
null_fields.push(key);
|
|
||||||
} else {
|
|
||||||
processed_data.insert(key, trimmed);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lookup profile
|
// Lookup profile
|
||||||
@@ -70,14 +59,29 @@ pub async fn put_table_data(
|
|||||||
columns.push((name, sql_type));
|
columns.push((name, sql_type));
|
||||||
}
|
}
|
||||||
|
|
||||||
// CORRECTED: "firma" is not a system column.
|
// Get all foreign key columns for this table (needed for validation)
|
||||||
// It should be treated as a user-defined column.
|
let fk_columns = sqlx::query!(
|
||||||
let system_columns = ["deleted"];
|
r#"SELECT ltd.table_name
|
||||||
|
FROM table_definition_links tdl
|
||||||
|
JOIN table_definitions ltd ON tdl.linked_table_id = ltd.id
|
||||||
|
WHERE tdl.source_table_id = $1"#,
|
||||||
|
table_def.id
|
||||||
|
)
|
||||||
|
.fetch_all(db_pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Status::internal(format!("Foreign key lookup error: {}", e)))?;
|
||||||
|
|
||||||
|
let mut system_columns = vec!["deleted".to_string()];
|
||||||
|
for fk in fk_columns {
|
||||||
|
let base_name = fk.table_name.split_once('_').map_or(fk.table_name.as_str(), |(_, rest)| rest);
|
||||||
|
system_columns.push(format!("{}_id", base_name));
|
||||||
|
}
|
||||||
|
let system_columns_set: std::collections::HashSet<_> = system_columns.iter().map(|s| s.as_str()).collect();
|
||||||
let user_columns: Vec<&String> = columns.iter().map(|(name, _)| name).collect();
|
let user_columns: Vec<&String> = columns.iter().map(|(name, _)| name).collect();
|
||||||
|
|
||||||
// Validate input columns
|
// Validate input columns
|
||||||
for key in processed_data.keys() {
|
for key in request.data.keys() {
|
||||||
if !system_columns.contains(&key.as_str()) && !user_columns.contains(&key) {
|
if !system_columns_set.contains(key.as_str()) && !user_columns.contains(&key) {
|
||||||
return Err(Status::invalid_argument(format!("Invalid column: {}", key)));
|
return Err(Status::invalid_argument(format!("Invalid column: {}", key)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -87,54 +91,65 @@ pub async fn put_table_data(
|
|||||||
let mut set_clauses = Vec::new();
|
let mut set_clauses = Vec::new();
|
||||||
let mut param_idx = 1;
|
let mut param_idx = 1;
|
||||||
|
|
||||||
// Add data parameters for non-empty fields
|
for (col, proto_value) in request.data {
|
||||||
for (col, value) in &processed_data {
|
let sql_type = if system_columns_set.contains(col.as_str()) {
|
||||||
// CORRECTED: The logic for "firma" is removed from this match.
|
|
||||||
// It will now fall through to the `else` block and have its type
|
|
||||||
// correctly looked up from the `columns` vector.
|
|
||||||
let sql_type = if system_columns.contains(&col.as_str()) {
|
|
||||||
match col.as_str() {
|
match col.as_str() {
|
||||||
"deleted" => "BOOLEAN",
|
"deleted" => "BOOLEAN",
|
||||||
|
_ if col.ends_with("_id") => "BIGINT",
|
||||||
_ => return Err(Status::invalid_argument("Invalid system column")),
|
_ => return Err(Status::invalid_argument("Invalid system column")),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
columns.iter()
|
columns.iter()
|
||||||
.find(|(name, _)| name == col)
|
.find(|(name, _)| name == &col)
|
||||||
.map(|(_, sql_type)| sql_type.as_str())
|
.map(|(_, sql_type)| sql_type.as_str())
|
||||||
.ok_or_else(|| Status::invalid_argument(format!("Column not found: {}", col)))?
|
.ok_or_else(|| Status::invalid_argument(format!("Column not found: {}", col)))?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// A provided value cannot be null or empty in a PUT request.
|
||||||
|
// To clear a field, it should be set to an empty string "" for text,
|
||||||
|
// or a specific value for other types if needed (though typically not done).
|
||||||
|
// For now, we reject nulls.
|
||||||
|
let kind = proto_value.kind.ok_or_else(|| {
|
||||||
|
Status::invalid_argument(format!("Value for column '{}' cannot be empty in a PUT request. To clear a text field, send an empty string.", col))
|
||||||
|
})?;
|
||||||
|
|
||||||
match sql_type {
|
match sql_type {
|
||||||
"TEXT" | "VARCHAR(15)" | "VARCHAR(255)" => {
|
"TEXT" | "VARCHAR(15)" | "VARCHAR(255)" => {
|
||||||
if let Some(max_len) = sql_type.strip_prefix("VARCHAR(")
|
if let Kind::StringValue(value) = kind {
|
||||||
.and_then(|s| s.strip_suffix(')'))
|
|
||||||
.and_then(|s| s.parse::<usize>().ok())
|
|
||||||
{
|
|
||||||
if value.len() > max_len {
|
|
||||||
return Err(Status::internal(format!("Value too long for {}", col)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
params.add(value)
|
params.add(value)
|
||||||
.map_err(|e| Status::internal(format!("Failed to add text parameter for {}: {}", col, e)))?;
|
.map_err(|e| Status::internal(format!("Failed to add text parameter for {}: {}", col, e)))?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument(format!("Expected string for column '{}'", col)));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"BOOLEAN" => {
|
"BOOLEAN" => {
|
||||||
let val = value.parse::<bool>()
|
if let Kind::BoolValue(val) = kind {
|
||||||
.map_err(|_| Status::invalid_argument(format!("Invalid boolean for {}", col)))?;
|
|
||||||
params.add(val)
|
params.add(val)
|
||||||
.map_err(|e| Status::internal(format!("Failed to add boolean parameter for {}: {}", col, e)))?;
|
.map_err(|e| Status::internal(format!("Failed to add boolean parameter for {}: {}", col, e)))?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument(format!("Expected boolean for column '{}'", col)));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"TIMESTAMPTZ" => {
|
"TIMESTAMPTZ" => {
|
||||||
let dt = DateTime::parse_from_rfc3339(value)
|
if let Kind::StringValue(value) = kind {
|
||||||
|
let dt = DateTime::parse_from_rfc3339(&value)
|
||||||
.map_err(|_| Status::invalid_argument(format!("Invalid timestamp for {}", col)))?;
|
.map_err(|_| Status::invalid_argument(format!("Invalid timestamp for {}", col)))?;
|
||||||
params.add(dt.with_timezone(&Utc))
|
params.add(dt.with_timezone(&Utc))
|
||||||
.map_err(|e| Status::internal(format!("Failed to add timestamp parameter for {}: {}", col, e)))?;
|
.map_err(|e| Status::internal(format!("Failed to add timestamp parameter for {}: {}", col, e)))?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument(format!("Expected ISO 8601 string for column '{}'", col)));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
// ADDED: BIGINT handling for completeness, if needed for other columns.
|
|
||||||
"BIGINT" => {
|
"BIGINT" => {
|
||||||
let val = value.parse::<i64>()
|
if let Kind::NumberValue(val) = kind {
|
||||||
.map_err(|_| Status::invalid_argument(format!("Invalid integer for {}", col)))?;
|
if val.fract() != 0.0 {
|
||||||
params.add(val)
|
return Err(Status::invalid_argument(format!("Expected integer for column '{}', but got a float", col)));
|
||||||
|
}
|
||||||
|
params.add(val as i64)
|
||||||
.map_err(|e| Status::internal(format!("Failed to add integer parameter for {}: {}", col, e)))?;
|
.map_err(|e| Status::internal(format!("Failed to add integer parameter for {}: {}", col, e)))?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument(format!("Expected number for column '{}'", col)));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
_ => return Err(Status::invalid_argument(format!("Unsupported type {}", sql_type))),
|
_ => return Err(Status::invalid_argument(format!("Unsupported type {}", sql_type))),
|
||||||
}
|
}
|
||||||
@@ -143,27 +158,10 @@ pub async fn put_table_data(
|
|||||||
param_idx += 1;
|
param_idx += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add NULL clauses for empty fields
|
|
||||||
for field in null_fields {
|
|
||||||
// Make sure the field is valid
|
|
||||||
if !system_columns.contains(&field.as_str()) && !user_columns.contains(&&field) {
|
|
||||||
return Err(Status::invalid_argument(format!("Invalid column to set NULL: {}", field)));
|
|
||||||
}
|
|
||||||
set_clauses.push(format!("\"{}\" = NULL", field));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure we have at least one field to update
|
|
||||||
if set_clauses.is_empty() {
|
|
||||||
return Err(Status::invalid_argument("No valid fields to update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add ID parameter at the end
|
|
||||||
params.add(record_id)
|
params.add(record_id)
|
||||||
.map_err(|e| Status::internal(format!("Failed to add record_id parameter: {}", e)))?;
|
.map_err(|e| Status::internal(format!("Failed to add record_id parameter: {}", e)))?;
|
||||||
|
|
||||||
// Qualify table name with schema
|
|
||||||
let qualified_table = qualify_table_name_for_data(&table_name)?;
|
let qualified_table = qualify_table_name_for_data(&table_name)?;
|
||||||
|
|
||||||
let set_clause = set_clauses.join(", ");
|
let set_clause = set_clauses.join(", ");
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
"UPDATE {} SET {} WHERE id = ${} AND deleted = FALSE RETURNING id",
|
"UPDATE {} SET {} WHERE id = ${} AND deleted = FALSE RETURNING id",
|
||||||
@@ -184,7 +182,6 @@ pub async fn put_table_data(
|
|||||||
}),
|
}),
|
||||||
Ok(None) => Err(Status::not_found("Record not found or already deleted")),
|
Ok(None) => Err(Status::not_found("Record not found or already deleted")),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Handle "relation does not exist" error specifically
|
|
||||||
if let Some(db_err) = e.as_database_error() {
|
if let Some(db_err) = e.as_database_error() {
|
||||||
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
|
if db_err.code() == Some(std::borrow::Cow::Borrowed("42P01")) {
|
||||||
return Err(Status::internal(format!(
|
return Err(Status::internal(format!(
|
||||||
|
|||||||
Reference in New Issue
Block a user