working full passer on put request
This commit is contained in:
@@ -15,6 +15,7 @@ use server::table_definition::handlers::post_table_definition;
|
||||
use server::tables_data::handlers::post_table_data;
|
||||
// The put_table_data handler is the function we are testing.
|
||||
use server::tables_data::handlers::put_table_data;
|
||||
use rust_decimal_macros::dec;
|
||||
use crate::common::setup_test_db;
|
||||
use tokio::sync::mpsc;
|
||||
use server::indexer::IndexCommand;
|
||||
@@ -93,6 +94,8 @@ struct TestContext {
|
||||
indexer_tx: mpsc::Sender<IndexCommand>,
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[fixture]
|
||||
async fn test_context() -> TestContext {
|
||||
let pool = setup_test_db().await;
|
||||
@@ -537,3 +540,5 @@ async fn test_update_boolean_system_column_validation(
|
||||
|
||||
include!("put_table_data_test2.rs");
|
||||
include!("put_table_data_test3.rs");
|
||||
include!("put_table_data_test4.rs");
|
||||
include!("put_table_data_test5.rs");
|
||||
|
||||
@@ -37,7 +37,7 @@ async fn create_data_type_test_table(
|
||||
columns: vec![
|
||||
TableColumnDefinition { name: "my_text".into(), field_type: "text".into() },
|
||||
TableColumnDefinition { name: "my_bool".into(), field_type: "boolean".into() },
|
||||
TableColumnDefinition { name: "my_timestamp".into(), field_type: "timestamptz".into() },
|
||||
TableColumnDefinition { name: "my_timestamp".into(), field_type: "timestamp".into() },
|
||||
TableColumnDefinition { name: "my_bigint".into(), field_type: "integer".into() },
|
||||
TableColumnDefinition { name: "my_money".into(), field_type: "decimal(19,4)".into() },
|
||||
TableColumnDefinition { name: "my_decimal".into(), field_type: "decimal(10,2)".into() },
|
||||
@@ -831,3 +831,463 @@ async fn test_update_valid_timestamp_formats(
|
||||
assert!(result.is_ok(), "Failed for timestamp: {}", timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_indexer_command_generation(#[future] test_context: TestContext) {
|
||||
let context = test_context.await;
|
||||
let (indexer_tx, mut indexer_rx) = mpsc::channel(100);
|
||||
|
||||
// Create initial record
|
||||
let initial_data = HashMap::from([
|
||||
("firma".to_string(), string_to_proto_value("Initial Company")),
|
||||
("kz".to_string(), string_to_proto_value("INIT123")),
|
||||
]);
|
||||
|
||||
let create_request = PostTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
data: initial_data,
|
||||
};
|
||||
|
||||
let create_response = post_table_data(&context.pool, create_request, &indexer_tx).await.unwrap();
|
||||
let record_id = create_response.inserted_id;
|
||||
|
||||
// Consume the create indexer message
|
||||
let _create_command = tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(100),
|
||||
indexer_rx.recv()
|
||||
).await.unwrap().unwrap();
|
||||
|
||||
// Now perform an update and verify the indexer command
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("firma".to_string(), string_to_proto_value("Updated Indexer Test Company"));
|
||||
update_data.insert("kz".to_string(), string_to_proto_value("UPD123"));
|
||||
|
||||
let update_request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let update_response = put_table_data(&context.pool, update_request, &indexer_tx).await.unwrap();
|
||||
assert!(update_response.success);
|
||||
|
||||
// Check that indexer command was sent for the UPDATE
|
||||
let indexer_command = tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(100),
|
||||
indexer_rx.recv()
|
||||
).await;
|
||||
|
||||
assert!(indexer_command.is_ok(), "Indexer command should be sent for PUT operation");
|
||||
let command = indexer_command.unwrap().unwrap();
|
||||
|
||||
match command {
|
||||
IndexCommand::AddOrUpdate(data) => {
|
||||
assert_eq!(data.table_name, context.table_name);
|
||||
assert_eq!(data.row_id, record_id); // Should be the updated record ID
|
||||
// Verify it's an update, not an insert (same ID as the original record)
|
||||
assert_eq!(data.row_id, update_response.updated_id);
|
||||
},
|
||||
IndexCommand::Delete(_) => panic!("Expected AddOrUpdate command for PUT operation, got Delete"),
|
||||
}
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_indexer_failure_resilience(#[future] test_context: TestContext) {
|
||||
let context = test_context.await;
|
||||
|
||||
// Create initial record with working indexer
|
||||
let (working_indexer_tx, mut _working_rx) = mpsc::channel(100);
|
||||
let initial_data = HashMap::from([
|
||||
("firma".to_string(), string_to_proto_value("Resilience Test Company")),
|
||||
("kz".to_string(), string_to_proto_value("RES123")),
|
||||
]);
|
||||
|
||||
let create_request = PostTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
data: initial_data,
|
||||
};
|
||||
|
||||
let create_response = post_table_data(&context.pool, create_request, &working_indexer_tx).await.unwrap();
|
||||
let record_id = create_response.inserted_id;
|
||||
|
||||
// Create a closed channel to simulate indexer failure for the update
|
||||
let (failed_indexer_tx, indexer_rx) = mpsc::channel(1);
|
||||
drop(indexer_rx); // Close receiver to simulate failure
|
||||
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("firma".to_string(), string_to_proto_value("Updated Despite Indexer Failure"));
|
||||
update_data.insert("kz".to_string(), string_to_proto_value("FAIL123"));
|
||||
|
||||
let update_request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
// Update should still succeed even if indexer fails
|
||||
let update_response = put_table_data(&context.pool, update_request, &failed_indexer_tx).await.unwrap();
|
||||
assert!(update_response.success);
|
||||
assert_eq!(update_response.updated_id, record_id);
|
||||
|
||||
// Verify data was updated despite indexer failure
|
||||
let query = format!(
|
||||
r#"SELECT COUNT(*) FROM "{}"."{}" WHERE kz = 'FAIL123' AND firma = 'Updated Despite Indexer Failure'"#,
|
||||
context.profile_name, context.table_name
|
||||
);
|
||||
|
||||
let count: i64 = sqlx::query_scalar(&query)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count, 1);
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// EMPTY STRINGS BECOME NULL COMPREHENSIVE TESTING
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_empty_strings_become_null(#[future] test_context: TestContext) {
|
||||
let context = test_context.await;
|
||||
|
||||
let test_cases = vec![
|
||||
("", "empty_string"),
|
||||
(" ", "whitespace_only"),
|
||||
("\t\n", "tabs_newlines"),
|
||||
("\r\n \t", "mixed_whitespace"),
|
||||
(" Normal Value ", "padded_value"),
|
||||
("Keep This", "normal_value"),
|
||||
];
|
||||
|
||||
for (input, test_name) in test_cases {
|
||||
// Create initial record with a value
|
||||
let record_id = create_initial_record(
|
||||
&context,
|
||||
HashMap::from([
|
||||
("firma".to_string(), string_to_proto_value("Test Empty Strings")),
|
||||
("ulica".to_string(), string_to_proto_value("Original Street")),
|
||||
("telefon".to_string(), string_to_proto_value("555-1234")),
|
||||
]),
|
||||
).await;
|
||||
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("firma".to_string(), string_to_proto_value(&format!("Test {}", test_name)));
|
||||
update_data.insert("ulica".to_string(), string_to_proto_value(input));
|
||||
update_data.insert("telefon".to_string(), string_to_proto_value(input));
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let response = put_table_data(&context.pool, request, &context.indexer_tx)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(response.success, "Failed for test case: {}", test_name);
|
||||
|
||||
// Check what was actually stored
|
||||
let query = format!(
|
||||
r#"SELECT ulica, telefon FROM "{}"."{}" WHERE id = $1"#,
|
||||
context.profile_name, context.table_name
|
||||
);
|
||||
|
||||
let row = sqlx::query(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let stored_ulica: Option<String> = row.get("ulica");
|
||||
let stored_telefon: Option<String> = row.get("telefon");
|
||||
|
||||
let trimmed = input.trim();
|
||||
if trimmed.is_empty() {
|
||||
assert!(stored_ulica.is_none(), "Empty/whitespace string should be NULL for ulica in: {}", test_name);
|
||||
assert!(stored_telefon.is_none(), "Empty/whitespace string should be NULL for telefon in: {}", test_name);
|
||||
} else {
|
||||
assert_eq!(stored_ulica.unwrap(), trimmed, "String should be trimmed for ulica in: {}", test_name);
|
||||
assert_eq!(stored_telefon.unwrap(), trimmed, "String should be trimmed for telefon in: {}", test_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_empty_string_to_null_then_to_value(#[future] test_context: TestContext) {
|
||||
let context = test_context.await;
|
||||
|
||||
// Create initial record with values
|
||||
let record_id = create_initial_record(
|
||||
&context,
|
||||
HashMap::from([
|
||||
("firma".to_string(), string_to_proto_value("String Transition Test")),
|
||||
("ulica".to_string(), string_to_proto_value("Original Street")),
|
||||
("telefon".to_string(), string_to_proto_value("555-1234")),
|
||||
]),
|
||||
).await;
|
||||
|
||||
// First update: set fields to empty strings (should become NULL)
|
||||
let mut update_to_empty = HashMap::new();
|
||||
update_to_empty.insert("ulica".to_string(), string_to_proto_value(" ")); // Whitespace only
|
||||
update_to_empty.insert("telefon".to_string(), string_to_proto_value("")); // Empty string
|
||||
|
||||
let request1 = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_to_empty,
|
||||
};
|
||||
|
||||
let response1 = put_table_data(&context.pool, request1, &context.indexer_tx).await.unwrap();
|
||||
assert!(response1.success);
|
||||
|
||||
// Verify fields are now NULL
|
||||
let query = format!(
|
||||
r#"SELECT ulica, telefon FROM "{}"."{}" WHERE id = $1"#,
|
||||
context.profile_name, context.table_name
|
||||
);
|
||||
|
||||
let row = sqlx::query(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let ulica1: Option<String> = row.get("ulica");
|
||||
let telefon1: Option<String> = row.get("telefon");
|
||||
|
||||
assert!(ulica1.is_none(), "ulica should be NULL after empty string update");
|
||||
assert!(telefon1.is_none(), "telefon should be NULL after empty string update");
|
||||
|
||||
// Second update: set fields back to actual values
|
||||
let mut update_to_values = HashMap::new();
|
||||
update_to_values.insert("ulica".to_string(), string_to_proto_value("New Street"));
|
||||
update_to_values.insert("telefon".to_string(), string_to_proto_value("999-8888"));
|
||||
|
||||
let request2 = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_to_values,
|
||||
};
|
||||
|
||||
let response2 = put_table_data(&context.pool, request2, &context.indexer_tx).await.unwrap();
|
||||
assert!(response2.success);
|
||||
|
||||
// Verify fields now have the new values
|
||||
let row2 = sqlx::query(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let ulica2: Option<String> = row2.get("ulica");
|
||||
let telefon2: Option<String> = row2.get("telefon");
|
||||
|
||||
assert_eq!(ulica2.unwrap(), "New Street", "ulica should have new value");
|
||||
assert_eq!(telefon2.unwrap(), "999-8888", "telefon should have new value");
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// COMPREHENSIVE NULL HANDLING EDGE CASES
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_mixed_null_transitions_comprehensive(#[future] test_context: TestContext) {
|
||||
let context = test_context.await;
|
||||
|
||||
// Create initial record with mixed values and nulls
|
||||
let record_id = create_initial_record(
|
||||
&context,
|
||||
HashMap::from([
|
||||
("firma".to_string(), string_to_proto_value("Null Transition Test")),
|
||||
("ulica".to_string(), string_to_proto_value("Original Street")),
|
||||
("telefon".to_string(), string_to_proto_value("555-1234")),
|
||||
("mesto".to_string(), string_to_proto_value("Original City")),
|
||||
]),
|
||||
).await;
|
||||
|
||||
// Define a sequence of updates that test various NULL transitions
|
||||
let update_sequences = vec![
|
||||
(
|
||||
"Step 1: Mix of values, nulls, and empty strings",
|
||||
HashMap::from([
|
||||
("ulica".to_string(), string_to_proto_value("Updated Street")),
|
||||
("telefon".to_string(), Value { kind: Some(Kind::NullValue(0)) }), // Explicit NULL
|
||||
("mesto".to_string(), string_to_proto_value(" ")), // Empty string -> NULL
|
||||
("kz".to_string(), string_to_proto_value("NEW123")), // New value
|
||||
])
|
||||
),
|
||||
(
|
||||
"Step 2: Convert NULLs back to values",
|
||||
HashMap::from([
|
||||
("telefon".to_string(), string_to_proto_value("999-8888")), // NULL -> value
|
||||
("mesto".to_string(), string_to_proto_value("New City")), // NULL -> value
|
||||
("ulica".to_string(), Value { kind: Some(Kind::NullValue(0)) }), // value -> NULL
|
||||
])
|
||||
),
|
||||
(
|
||||
"Step 3: Mix empty strings and explicit nulls",
|
||||
HashMap::from([
|
||||
("ulica".to_string(), string_to_proto_value("")), // NULL -> empty string -> NULL
|
||||
("telefon".to_string(), string_to_proto_value("\t\n ")), // value -> whitespace -> NULL
|
||||
("mesto".to_string(), string_to_proto_value("Final City")), // value -> value
|
||||
("kz".to_string(), Value { kind: Some(Kind::NullValue(0)) }), // value -> NULL
|
||||
])
|
||||
),
|
||||
];
|
||||
|
||||
for (step_description, update_data) in update_sequences {
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_data.clone(),
|
||||
};
|
||||
|
||||
let response = put_table_data(&context.pool, request, &context.indexer_tx).await.unwrap();
|
||||
assert!(response.success, "Failed at: {}", step_description);
|
||||
|
||||
// Verify the state after this update
|
||||
let query = format!(
|
||||
r#"SELECT ulica, telefon, mesto, kz FROM "{}"."{}" WHERE id = $1"#,
|
||||
context.profile_name, context.table_name
|
||||
);
|
||||
|
||||
let row = sqlx::query(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify each field based on what should have happened
|
||||
for (field_name, expected_value) in update_data.iter() {
|
||||
let db_value: Option<String> = row.get(field_name.as_str());
|
||||
|
||||
match expected_value.kind.as_ref().unwrap() {
|
||||
Kind::StringValue(s) => {
|
||||
let trimmed = s.trim();
|
||||
if trimmed.is_empty() {
|
||||
assert!(db_value.is_none(),
|
||||
"Field {} should be NULL after empty string in {}", field_name, step_description);
|
||||
} else {
|
||||
assert_eq!(db_value.unwrap(), trimmed,
|
||||
"Field {} should have trimmed value in {}", field_name, step_description);
|
||||
}
|
||||
},
|
||||
Kind::NullValue(_) => {
|
||||
assert!(db_value.is_none(),
|
||||
"Field {} should be NULL after explicit null in {}", field_name, step_description);
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// ADVANCED INDEXER INTEGRATION SCENARIOS
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_indexer_multiple_rapid_updates(#[future] test_context: TestContext) {
|
||||
let context = test_context.await;
|
||||
let (indexer_tx, mut indexer_rx) = mpsc::channel(1000);
|
||||
|
||||
// FIX: Call post_table_data directly instead of using the helper
|
||||
let create_request = PostTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
data: HashMap::from([(
|
||||
"firma".to_string(),
|
||||
string_to_proto_value("Rapid Update Test"),
|
||||
)]),
|
||||
};
|
||||
let response = post_table_data(&context.pool, create_request, &indexer_tx)
|
||||
.await
|
||||
.unwrap();
|
||||
let record_id = response.inserted_id;
|
||||
|
||||
// Consume the initial create command
|
||||
let _create_command = tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(100),
|
||||
indexer_rx.recv(),
|
||||
)
|
||||
.await
|
||||
.expect("Timeout waiting for create command") // More descriptive panic
|
||||
.expect("Channel closed unexpectedly");
|
||||
|
||||
// ... rest of the test remains the same
|
||||
let num_updates = 10;
|
||||
for i in 0..num_updates {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert(
|
||||
"firma".to_string(),
|
||||
string_to_proto_value(&format!("Rapid Update {}", i)),
|
||||
);
|
||||
update_data.insert(
|
||||
"kz".to_string(),
|
||||
string_to_proto_value(&format!("RAP{}", i)),
|
||||
);
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let response = put_table_data(&context.pool, request, &indexer_tx)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(response.success, "Rapid update {} should succeed", i);
|
||||
}
|
||||
|
||||
// Verify all update commands were sent to indexer
|
||||
let mut received_commands = 0;
|
||||
while let Ok(command) = tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(50),
|
||||
indexer_rx.recv()
|
||||
).await {
|
||||
if command.is_none() { break; }
|
||||
match command.unwrap() {
|
||||
IndexCommand::AddOrUpdate(data) => {
|
||||
assert_eq!(data.table_name, context.table_name);
|
||||
assert_eq!(data.row_id, record_id);
|
||||
received_commands += 1;
|
||||
},
|
||||
IndexCommand::Delete(_) => panic!("Unexpected Delete command during updates"),
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(received_commands, num_updates, "Should receive indexer command for each update");
|
||||
|
||||
// Verify final state
|
||||
let query = format!(
|
||||
r#"SELECT firma, kz FROM "{}"."{}" WHERE id = $1"#,
|
||||
context.profile_name, context.table_name
|
||||
);
|
||||
|
||||
let row = sqlx::query(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let final_firma: String = row.get("firma");
|
||||
let final_kz: String = row.get("kz");
|
||||
|
||||
assert_eq!(final_firma, format!("Rapid Update {}", num_updates - 1));
|
||||
assert_eq!(final_kz, format!("RAP{}", num_updates - 1));
|
||||
}
|
||||
|
||||
@@ -4,8 +4,6 @@
|
||||
// ADDITIONAL HELPER FUNCTIONS FOR COMPREHENSIVE PUT TEST 3
|
||||
// ========================================================================
|
||||
|
||||
use rust_decimal_macros::dec;
|
||||
|
||||
// Note: Helper functions like create_string_value, create_number_value, etc.
|
||||
// are already defined in the main test file, so we don't redefine them here.
|
||||
|
||||
|
||||
907
server/tests/tables_data/handlers/put_table_data_test4.rs
Normal file
907
server/tests/tables_data/handlers/put_table_data_test4.rs
Normal file
@@ -0,0 +1,907 @@
|
||||
// tests/tables_data/handlers/put_table_data_test4.rs
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ComprehensiveIntegerTestContext {
|
||||
pool: PgPool,
|
||||
profile_name: String,
|
||||
mixed_integer_table: String,
|
||||
bigint_only_table: String,
|
||||
integer_only_table: String,
|
||||
indexer_tx: mpsc::Sender<IndexCommand>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AdvancedDecimalTestContext {
|
||||
pool: PgPool,
|
||||
profile_name: String,
|
||||
table_name: String,
|
||||
indexer_tx: mpsc::Sender<IndexCommand>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PerformanceTestContext {
|
||||
pool: PgPool,
|
||||
profile_name: String,
|
||||
stress_table: String,
|
||||
indexer_tx: mpsc::Sender<IndexCommand>,
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// TABLE CREATION HELPERS FOR COMPREHENSIVE TESTING
|
||||
// ========================================================================
|
||||
|
||||
async fn create_comprehensive_integer_tables(
|
||||
pool: &PgPool,
|
||||
profile_name: &str,
|
||||
) -> Result<ComprehensiveIntegerTestContext, tonic::Status> {
|
||||
let unique_id = generate_unique_id();
|
||||
let mixed_table = format!("comprehensive_mixed_table_{}", unique_id);
|
||||
let bigint_table = format!("comprehensive_bigint_table_{}", unique_id);
|
||||
let integer_table = format!("comprehensive_integer_table_{}", unique_id);
|
||||
|
||||
// Table with both INTEGER and BIGINT columns for comprehensive testing
|
||||
let mixed_def = PostTableDefinitionRequest {
|
||||
profile_name: profile_name.into(),
|
||||
table_name: mixed_table.clone(),
|
||||
columns: vec![
|
||||
TableColumnDefinition { name: "name".into(), field_type: "text".into() },
|
||||
TableColumnDefinition { name: "small_int".into(), field_type: "integer".into() },
|
||||
TableColumnDefinition { name: "big_int".into(), field_type: "biginteger".into() },
|
||||
TableColumnDefinition { name: "another_int".into(), field_type: "int".into() },
|
||||
TableColumnDefinition { name: "another_bigint".into(), field_type: "bigint".into() },
|
||||
TableColumnDefinition { name: "nullable_int".into(), field_type: "integer".into() },
|
||||
TableColumnDefinition { name: "nullable_bigint".into(), field_type: "biginteger".into() },
|
||||
],
|
||||
indexes: vec![],
|
||||
links: vec![],
|
||||
};
|
||||
post_table_definition(pool, mixed_def).await?;
|
||||
|
||||
// Table with only BIGINT columns for edge case testing
|
||||
let bigint_def = PostTableDefinitionRequest {
|
||||
profile_name: profile_name.into(),
|
||||
table_name: bigint_table.clone(),
|
||||
columns: vec![
|
||||
TableColumnDefinition { name: "name".into(), field_type: "text".into() },
|
||||
TableColumnDefinition { name: "value1".into(), field_type: "biginteger".into() },
|
||||
TableColumnDefinition { name: "value2".into(), field_type: "bigint".into() },
|
||||
TableColumnDefinition { name: "extreme_value".into(), field_type: "biginteger".into() },
|
||||
],
|
||||
indexes: vec![],
|
||||
links: vec![],
|
||||
};
|
||||
post_table_definition(pool, bigint_def).await?;
|
||||
|
||||
// Table with only INTEGER columns for boundary testing
|
||||
let integer_def = PostTableDefinitionRequest {
|
||||
profile_name: profile_name.into(),
|
||||
table_name: integer_table.clone(),
|
||||
columns: vec![
|
||||
TableColumnDefinition { name: "name".into(), field_type: "text".into() },
|
||||
TableColumnDefinition { name: "value1".into(), field_type: "integer".into() },
|
||||
TableColumnDefinition { name: "value2".into(), field_type: "int".into() },
|
||||
TableColumnDefinition { name: "boundary_test".into(), field_type: "integer".into() },
|
||||
],
|
||||
indexes: vec![],
|
||||
links: vec![],
|
||||
};
|
||||
post_table_definition(pool, integer_def).await?;
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(100);
|
||||
tokio::spawn(async move { while rx.recv().await.is_some() {} });
|
||||
|
||||
Ok(ComprehensiveIntegerTestContext {
|
||||
pool: pool.clone(),
|
||||
profile_name: profile_name.to_string(),
|
||||
mixed_integer_table: mixed_table,
|
||||
bigint_only_table: bigint_table,
|
||||
integer_only_table: integer_table,
|
||||
indexer_tx: tx,
|
||||
})
|
||||
}
|
||||
|
||||
async fn create_advanced_decimal_table(
|
||||
pool: &PgPool,
|
||||
table_name: &str,
|
||||
profile_name: &str,
|
||||
) -> Result<(), tonic::Status> {
|
||||
let table_def_request = PostTableDefinitionRequest {
|
||||
profile_name: profile_name.into(),
|
||||
table_name: table_name.into(),
|
||||
columns: vec![
|
||||
TableColumnDefinition { name: "product_name".into(), field_type: "text".into() },
|
||||
TableColumnDefinition { name: "price".into(), field_type: "decimal(19, 4)".into() },
|
||||
TableColumnDefinition { name: "rate".into(), field_type: "decimal(10, 5)".into() },
|
||||
TableColumnDefinition { name: "discount".into(), field_type: "decimal(5, 3)".into() },
|
||||
TableColumnDefinition { name: "ultra_precise".into(), field_type: "decimal(28, 10)".into() },
|
||||
TableColumnDefinition { name: "percentage".into(), field_type: "decimal(5, 4)".into() },
|
||||
],
|
||||
indexes: vec![],
|
||||
links: vec![],
|
||||
};
|
||||
|
||||
post_table_definition(pool, table_def_request).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_performance_stress_table(
|
||||
pool: &PgPool,
|
||||
table_name: &str,
|
||||
profile_name: &str,
|
||||
) -> Result<(), tonic::Status> {
|
||||
let table_def_request = PostTableDefinitionRequest {
|
||||
profile_name: profile_name.into(),
|
||||
table_name: table_name.into(),
|
||||
columns: vec![
|
||||
TableColumnDefinition { name: "test_name".into(), field_type: "text".into() },
|
||||
TableColumnDefinition { name: "int_val1".into(), field_type: "integer".into() },
|
||||
TableColumnDefinition { name: "int_val2".into(), field_type: "integer".into() },
|
||||
TableColumnDefinition { name: "bigint_val1".into(), field_type: "biginteger".into() },
|
||||
TableColumnDefinition { name: "bigint_val2".into(), field_type: "biginteger".into() },
|
||||
TableColumnDefinition { name: "decimal_val".into(), field_type: "decimal(10, 2)".into() },
|
||||
TableColumnDefinition { name: "bool_val".into(), field_type: "boolean".into() },
|
||||
TableColumnDefinition { name: "timestamp_val".into(), field_type: "timestamptz".into() },
|
||||
],
|
||||
indexes: vec![],
|
||||
links: vec![],
|
||||
};
|
||||
|
||||
post_table_definition(pool, table_def_request).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// FIXTURES
|
||||
// ========================================================================
|
||||
|
||||
#[fixture]
|
||||
async fn comprehensive_integer_test_context() -> ComprehensiveIntegerTestContext {
|
||||
let pool = setup_test_db().await;
|
||||
let unique_id = generate_unique_id();
|
||||
let profile_name = format!("comp_int_profile_{}", unique_id);
|
||||
|
||||
create_comprehensive_integer_tables(&pool, &profile_name).await
|
||||
.expect("Failed to create comprehensive integer test tables")
|
||||
}
|
||||
|
||||
#[fixture]
|
||||
async fn advanced_decimal_test_context() -> AdvancedDecimalTestContext {
|
||||
let pool = setup_test_db().await;
|
||||
let unique_id = generate_unique_id();
|
||||
let profile_name = format!("adv_decimal_profile_{}", unique_id);
|
||||
let table_name = format!("advanced_decimals_{}", unique_id);
|
||||
|
||||
create_advanced_decimal_table(&pool, &table_name, &profile_name).await
|
||||
.expect("Failed to create advanced decimal test table");
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(100);
|
||||
tokio::spawn(async move { while rx.recv().await.is_some() {} });
|
||||
|
||||
AdvancedDecimalTestContext { pool, profile_name, table_name, indexer_tx: tx }
|
||||
}
|
||||
|
||||
#[fixture]
|
||||
async fn performance_test_context() -> PerformanceTestContext {
|
||||
let pool = setup_test_db().await;
|
||||
let unique_id = generate_unique_id();
|
||||
let profile_name = format!("perf_profile_{}", unique_id);
|
||||
let stress_table = format!("stress_table_{}", unique_id);
|
||||
|
||||
create_performance_stress_table(&pool, &stress_table, &profile_name).await
|
||||
.expect("Failed to create performance stress test table");
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(100);
|
||||
tokio::spawn(async move { while rx.recv().await.is_some() {} });
|
||||
|
||||
PerformanceTestContext { pool, profile_name, stress_table, indexer_tx: tx }
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// HELPER FUNCTIONS FOR CREATING INITIAL RECORDS
|
||||
// ========================================================================
|
||||
|
||||
async fn create_initial_comprehensive_integer_record(
|
||||
context: &ComprehensiveIntegerTestContext,
|
||||
table_name: &str
|
||||
) -> i64 {
|
||||
let mut initial_data = HashMap::new();
|
||||
initial_data.insert("name".to_string(), string_to_proto_value("Initial Record"));
|
||||
|
||||
match table_name {
|
||||
table if table.contains("mixed") => {
|
||||
initial_data.insert("small_int".to_string(), Value { kind: Some(Kind::NumberValue(100.0)) });
|
||||
initial_data.insert("big_int".to_string(), Value { kind: Some(Kind::NumberValue(1000000000000.0)) });
|
||||
initial_data.insert("another_int".to_string(), Value { kind: Some(Kind::NumberValue(200.0)) });
|
||||
initial_data.insert("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue(2000000000000.0)) });
|
||||
initial_data.insert("nullable_int".to_string(), Value { kind: Some(Kind::NumberValue(300.0)) });
|
||||
initial_data.insert("nullable_bigint".to_string(), Value { kind: Some(Kind::NumberValue(3000000000000.0)) });
|
||||
},
|
||||
table if table.contains("bigint") => {
|
||||
initial_data.insert("value1".to_string(), Value { kind: Some(Kind::NumberValue(1000000000000.0)) });
|
||||
initial_data.insert("value2".to_string(), Value { kind: Some(Kind::NumberValue(2000000000000.0)) });
|
||||
initial_data.insert("extreme_value".to_string(), Value { kind: Some(Kind::NumberValue(9223372036854774784.0)) });
|
||||
},
|
||||
table if table.contains("integer") => {
|
||||
initial_data.insert("value1".to_string(), Value { kind: Some(Kind::NumberValue(100.0)) });
|
||||
initial_data.insert("value2".to_string(), Value { kind: Some(Kind::NumberValue(200.0)) });
|
||||
initial_data.insert("boundary_test".to_string(), Value { kind: Some(Kind::NumberValue(300.0)) });
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let request = PostTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: table_name.to_string(),
|
||||
data: initial_data,
|
||||
};
|
||||
let response = post_table_data(&context.pool, request, &context.indexer_tx)
|
||||
.await
|
||||
.expect("Setup: Failed to create initial integer record");
|
||||
response.inserted_id
|
||||
}
|
||||
|
||||
async fn create_initial_advanced_decimal_record(context: &AdvancedDecimalTestContext) -> i64 {
|
||||
let mut initial_data = HashMap::new();
|
||||
initial_data.insert("product_name".to_string(), string_to_proto_value("Initial Product"));
|
||||
initial_data.insert("price".to_string(), string_to_proto_value("100.0000"));
|
||||
initial_data.insert("rate".to_string(), string_to_proto_value("1.00000"));
|
||||
initial_data.insert("discount".to_string(), string_to_proto_value("0.100"));
|
||||
initial_data.insert("ultra_precise".to_string(), string_to_proto_value("123.4567890123"));
|
||||
initial_data.insert("percentage".to_string(), string_to_proto_value("0.9999"));
|
||||
|
||||
let request = PostTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
data: initial_data,
|
||||
};
|
||||
let response = post_table_data(&context.pool, request, &context.indexer_tx)
|
||||
.await
|
||||
.expect("Setup: Failed to create initial decimal record");
|
||||
response.inserted_id
|
||||
}
|
||||
|
||||
async fn create_initial_performance_record(context: &PerformanceTestContext) -> i64 {
|
||||
let mut initial_data = HashMap::new();
|
||||
initial_data.insert("test_name".to_string(), string_to_proto_value("Initial Performance Test"));
|
||||
initial_data.insert("int_val1".to_string(), Value { kind: Some(Kind::NumberValue(1.0)) });
|
||||
initial_data.insert("int_val2".to_string(), Value { kind: Some(Kind::NumberValue(2.0)) });
|
||||
initial_data.insert("bigint_val1".to_string(), Value { kind: Some(Kind::NumberValue(1000000000000.0)) });
|
||||
initial_data.insert("bigint_val2".to_string(), Value { kind: Some(Kind::NumberValue(2000000000000.0)) });
|
||||
initial_data.insert("decimal_val".to_string(), string_to_proto_value("123.45"));
|
||||
initial_data.insert("bool_val".to_string(), Value { kind: Some(Kind::BoolValue(false)) });
|
||||
initial_data.insert("timestamp_val".to_string(), string_to_proto_value("2024-01-01T00:00:00Z"));
|
||||
|
||||
let request = PostTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.stress_table.clone(),
|
||||
data: initial_data,
|
||||
};
|
||||
let response = post_table_data(&context.pool, request, &context.indexer_tx)
|
||||
.await
|
||||
.expect("Setup: Failed to create initial performance record");
|
||||
response.inserted_id
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// BIGINT SUCCESSFUL ROUNDTRIP VALUE TESTS
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_bigint_successful_roundtrip_values(
|
||||
#[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext,
|
||||
) {
|
||||
let context = comprehensive_integer_test_context.await;
|
||||
let record_id = create_initial_comprehensive_integer_record(&context, &context.bigint_only_table).await;
|
||||
|
||||
// Values that SHOULD successfully round-trip and be accepted for updates
|
||||
let successful_values = vec![
|
||||
(9223372036854775808.0, "Exactly i64::MAX as f64 (legitimate value)"),
|
||||
(-9223372036854775808.0, "Exactly i64::MIN as f64 (legitimate value)"),
|
||||
(9223372036854774784.0, "Large but precisely representable in f64"),
|
||||
(-9223372036854774784.0, "Large negative but precisely representable in f64"),
|
||||
(0.0, "Zero"),
|
||||
(1.0, "One"),
|
||||
(-1.0, "Negative one"),
|
||||
(2147483647.0, "i32::MAX as f64"),
|
||||
(-2147483648.0, "i32::MIN as f64"),
|
||||
(4611686018427387904.0, "i64::MAX / 2"),
|
||||
(-4611686018427387904.0, "i64::MIN / 2"),
|
||||
(1000000000000.0, "One trillion"),
|
||||
(-1000000000000.0, "Negative one trillion"),
|
||||
];
|
||||
|
||||
for (value, description) in successful_values {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("name".to_string(), string_to_proto_value(&format!("Roundtrip test: {}", description)));
|
||||
update_data.insert("value1".to_string(), Value { kind: Some(Kind::NumberValue(value)) });
|
||||
update_data.insert("extreme_value".to_string(), Value { kind: Some(Kind::NumberValue(value)) });
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.bigint_only_table.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let result = put_table_data(&context.pool, request, &context.indexer_tx).await;
|
||||
assert!(result.is_ok(), "Should have succeeded for legitimate i64 update value {}: {}", value, description);
|
||||
|
||||
// Verify it was stored correctly
|
||||
if let Ok(response) = result {
|
||||
let query = format!(
|
||||
r#"SELECT value1, extreme_value FROM "{}"."{}" WHERE id = $1"#,
|
||||
context.profile_name, context.bigint_only_table
|
||||
);
|
||||
let row = sqlx::query(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let stored_value1: i64 = row.get("value1");
|
||||
let stored_extreme_value: i64 = row.get("extreme_value");
|
||||
|
||||
assert_eq!(stored_value1, value as i64, "Value1 should match for {}", description);
|
||||
assert_eq!(stored_extreme_value, value as i64, "Extreme value should match for {}", description);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_bigint_overflow_rejection_comprehensive(
|
||||
#[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext,
|
||||
) {
|
||||
let context = comprehensive_integer_test_context.await;
|
||||
let record_id = create_initial_comprehensive_integer_record(&context, &context.bigint_only_table).await;
|
||||
|
||||
// Values that should be rejected for BIGINT columns due to precision loss or overflow
|
||||
let overflow_values = vec![
|
||||
(f64::INFINITY, "Positive infinity"),
|
||||
(f64::NEG_INFINITY, "Negative infinity"),
|
||||
(1e20, "Very large number (100,000,000,000,000,000,000)"),
|
||||
(-1e20, "Very large negative number"),
|
||||
(1e25, "Extremely large number"),
|
||||
(-1e25, "Extremely large negative number"),
|
||||
(f64::MAX, "f64::MAX"),
|
||||
(f64::MIN, "f64::MIN"),
|
||||
(f64::NAN, "NaN"),
|
||||
];
|
||||
|
||||
for (value, description) in overflow_values {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("name".to_string(), string_to_proto_value(&format!("i64 Overflow update test: {}", description)));
|
||||
update_data.insert("extreme_value".to_string(), Value { kind: Some(Kind::NumberValue(value)) });
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.bigint_only_table.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let result = put_table_data(&context.pool, request, &context.indexer_tx).await;
|
||||
|
||||
assert!(result.is_err(), "Should have failed for i64 overflow update value {}: {}", value, description);
|
||||
|
||||
if let Err(err) = result {
|
||||
assert_eq!(err.code(), tonic::Code::InvalidArgument);
|
||||
let message = err.message();
|
||||
assert!(
|
||||
message.contains("Integer value out of range for BIGINT column") ||
|
||||
message.contains("Expected integer for column") ||
|
||||
message.contains("but got a float") ||
|
||||
message.contains("Invalid number"),
|
||||
"Unexpected error message for {}: {}",
|
||||
description,
|
||||
message
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// WRONG TYPE FOR MIXED INTEGER COLUMNS TESTS
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_wrong_type_for_mixed_integer_columns(
|
||||
#[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext,
|
||||
) {
|
||||
let context = comprehensive_integer_test_context.await;
|
||||
let record_id = create_initial_comprehensive_integer_record(&context, &context.mixed_integer_table).await;
|
||||
|
||||
// Try to put i64 values into i32 columns (should fail)
|
||||
let wrong_type_tests = vec![
|
||||
("small_int", 3000000000.0, "3 billion in i32 column"),
|
||||
("another_int", -3000000000.0, "negative 3 billion in i32 column"),
|
||||
("nullable_int", 2147483648.0, "i32::MAX + 1 in i32 column"),
|
||||
("small_int", 9223372036854775807.0, "i64::MAX in i32 column"),
|
||||
];
|
||||
|
||||
for (column_name, value, description) in wrong_type_tests {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("name".to_string(), string_to_proto_value(&format!("Wrong type test: {}", description)));
|
||||
update_data.insert(column_name.to_string(), Value { kind: Some(Kind::NumberValue(value)) });
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.mixed_integer_table.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let result = put_table_data(&context.pool, request, &context.indexer_tx).await;
|
||||
assert!(result.is_err(), "Should fail when putting i64 value {} in i32 column {}", value, column_name);
|
||||
|
||||
if let Err(err) = result {
|
||||
assert_eq!(err.code(), tonic::Code::InvalidArgument);
|
||||
assert!(err.message().contains("Integer value out of range for INTEGER column"));
|
||||
}
|
||||
}
|
||||
|
||||
// Try fractional values in integer columns (should fail)
|
||||
let fractional_tests = vec![
|
||||
("small_int", 42.5, "fractional in i32 column"),
|
||||
("big_int", 1000000000000.1, "fractional in i64 column"),
|
||||
("another_int", -42.9, "negative fractional in i32 column"),
|
||||
("another_bigint", -1000000000000.9, "negative fractional in i64 column"),
|
||||
];
|
||||
|
||||
for (column_name, value, description) in fractional_tests {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("name".to_string(), string_to_proto_value(&format!("Fractional test: {}", description)));
|
||||
update_data.insert(column_name.to_string(), Value { kind: Some(Kind::NumberValue(value)) });
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.mixed_integer_table.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let result = put_table_data(&context.pool, request, &context.indexer_tx).await;
|
||||
assert!(result.is_err(), "Should fail for fractional value {} in column {}", value, column_name);
|
||||
|
||||
if let Err(err) = result {
|
||||
assert_eq!(err.code(), tonic::Code::InvalidArgument);
|
||||
assert!(err.message().contains("Expected integer for column") && err.message().contains("but got a float"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// CONCURRENT MIXED INTEGER UPDATES TESTS
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_concurrent_mixed_integer_updates(
|
||||
#[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext,
|
||||
) {
|
||||
let context = comprehensive_integer_test_context.await;
|
||||
// Create multiple records for concurrent updating
|
||||
let mut record_ids = Vec::new();
|
||||
for i in 0..10 {
|
||||
let record_id = create_initial_comprehensive_integer_record(&context, &context.mixed_integer_table).await;
|
||||
record_ids.push(record_id);
|
||||
}
|
||||
// Test concurrent updates with different integer types
|
||||
let tasks: Vec<_> = record_ids.into_iter().enumerate().map(|(i, record_id)| {
|
||||
let context = context.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("name".to_string(), string_to_proto_value(&format!("Concurrent update test {}", i)));
|
||||
update_data.insert("small_int".to_string(), Value { kind: Some(Kind::NumberValue((i * 1000) as f64)) });
|
||||
update_data.insert("big_int".to_string(), Value { kind: Some(Kind::NumberValue((i as i64 * 1000000000000) as f64)) });
|
||||
// Fix: Cast i to i32 first, then multiply by negative number
|
||||
update_data.insert("another_int".to_string(), Value { kind: Some(Kind::NumberValue(((i as i32) * -100) as f64)) });
|
||||
update_data.insert("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue((i as i64 * -1000000000000) as f64)) });
|
||||
// Alternate between null and values for nullable columns
|
||||
if i % 2 == 0 {
|
||||
update_data.insert("nullable_int".to_string(), Value { kind: Some(Kind::NumberValue((i * 42) as f64)) });
|
||||
update_data.insert("nullable_bigint".to_string(), Value { kind: Some(Kind::NullValue(0)) });
|
||||
} else {
|
||||
update_data.insert("nullable_int".to_string(), Value { kind: Some(Kind::NullValue(0)) });
|
||||
update_data.insert("nullable_bigint".to_string(), Value { kind: Some(Kind::NumberValue((i as i64 * 9999999999) as f64)) });
|
||||
}
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.mixed_integer_table.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
put_table_data(&context.pool, request, &context.indexer_tx).await
|
||||
})
|
||||
}).collect();
|
||||
// Wait for all tasks to complete
|
||||
let results = futures::future::join_all(tasks).await;
|
||||
// All should succeed
|
||||
for (i, result) in results.into_iter().enumerate() {
|
||||
let task_result = result.expect("Task should not panic");
|
||||
assert!(task_result.is_ok(), "Concurrent integer update {} should succeed", i);
|
||||
}
|
||||
// Verify all records were updated correctly
|
||||
let query = format!(
|
||||
r#"SELECT COUNT(*) FROM "{}"."{}" WHERE name LIKE 'Concurrent update test%'"#,
|
||||
context.profile_name, context.mixed_integer_table
|
||||
);
|
||||
let count: i64 = sqlx::query_scalar(&query)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count, 10);
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// ADVANCED DECIMAL PRECISION EDGE CASES
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_ultra_high_precision_decimals(
|
||||
#[future] advanced_decimal_test_context: AdvancedDecimalTestContext,
|
||||
) {
|
||||
let context = advanced_decimal_test_context.await;
|
||||
let record_id = create_initial_advanced_decimal_record(&context).await;
|
||||
|
||||
let ultra_precision_tests = vec![
|
||||
("ultra_precise", "123456789.1234567890", dec!(123456789.1234567890)),
|
||||
("ultra_precise", "-999999999.9999999999", dec!(-999999999.9999999999)),
|
||||
("ultra_precise", "0.0000000001", dec!(0.0000000001)),
|
||||
("percentage", "0.9999", dec!(0.9999)), // decimal(5,4) - 0.9999 is max
|
||||
("percentage", "0.0001", dec!(0.0001)), // decimal(5,4) - minimum precision
|
||||
];
|
||||
|
||||
for (field, value_str, expected_decimal) in ultra_precision_tests {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("product_name".to_string(), string_to_proto_value("Ultra precision test"));
|
||||
update_data.insert(field.to_string(), string_to_proto_value(value_str));
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let response = put_table_data(&context.pool, request, &context.indexer_tx)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(response.success);
|
||||
|
||||
// Verify ultra high precision was preserved
|
||||
let query = format!(
|
||||
r#"SELECT {} FROM "{}"."{}" WHERE id = $1"#,
|
||||
field, context.profile_name, context.table_name
|
||||
);
|
||||
let stored_value: rust_decimal::Decimal = sqlx::query_scalar(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(stored_value, expected_decimal, "Ultra precision mismatch for field {}", field);
|
||||
}
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_decimal_edge_case_rounding(
|
||||
#[future] advanced_decimal_test_context: AdvancedDecimalTestContext,
|
||||
) {
|
||||
let context = advanced_decimal_test_context.await;
|
||||
let record_id = create_initial_advanced_decimal_record(&context).await;
|
||||
|
||||
// Test edge cases where rounding behavior is critical
|
||||
let edge_rounding_tests = vec![
|
||||
("price", "12345.99995", dec!(12346.0000)), // Should round up at 5
|
||||
("rate", "1.999995", dec!(2.00000)), // Should round up
|
||||
("discount", "0.9995", dec!(1.000)), // Should round up to 1.000
|
||||
("percentage", "0.99995", dec!(1.0000)), // decimal(5,4) rounds to 1.0000
|
||||
("ultra_precise", "1.99999999995", dec!(2.0000000000)), // Ultra precision rounding
|
||||
];
|
||||
|
||||
for (field, input_value, expected_rounded) in edge_rounding_tests {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("product_name".to_string(), string_to_proto_value("Edge rounding test"));
|
||||
update_data.insert(field.to_string(), string_to_proto_value(input_value));
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let response = put_table_data(&context.pool, request, &context.indexer_tx)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(response.success);
|
||||
|
||||
// Verify edge case rounding was applied correctly
|
||||
let query = format!(
|
||||
r#"SELECT {} FROM "{}"."{}" WHERE id = $1"#,
|
||||
field, context.profile_name, context.table_name
|
||||
);
|
||||
let stored_value: rust_decimal::Decimal = sqlx::query_scalar(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(stored_value, expected_rounded, "Edge rounding mismatch for field {}", field);
|
||||
}
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// PERFORMANCE AND STRESS TESTS
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_rapid_integer_updates_stress(
|
||||
#[future] performance_test_context: PerformanceTestContext,
|
||||
) {
|
||||
let context = performance_test_context.await;
|
||||
|
||||
// Create initial records for stress testing
|
||||
let mut record_ids = Vec::new();
|
||||
for i in 0..100 {
|
||||
let record_id = create_initial_performance_record(&context).await;
|
||||
record_ids.push(record_id);
|
||||
}
|
||||
|
||||
// Rapid sequential updates with alternating integer types and complex data
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
for (i, record_id) in record_ids.iter().enumerate() {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("test_name".to_string(), string_to_proto_value(&format!("Stress update test {}", i)));
|
||||
|
||||
// Alternate between different boundary values for stress testing
|
||||
let small_val = match i % 4 {
|
||||
0 => 2147483647.0, // i32::MAX
|
||||
1 => -2147483648.0, // i32::MIN
|
||||
2 => 0.0,
|
||||
_ => (i as f64) * 1000.0,
|
||||
};
|
||||
|
||||
let big_val = match i % 4 {
|
||||
0 => 9223372036854774784.0, // Near i64::MAX
|
||||
1 => -9223372036854774784.0, // Near i64::MIN
|
||||
2 => 0.0,
|
||||
_ => (i as f64) * 1000000000000.0,
|
||||
};
|
||||
|
||||
update_data.insert("int_val1".to_string(), Value { kind: Some(Kind::NumberValue(small_val)) });
|
||||
update_data.insert("int_val2".to_string(), Value { kind: Some(Kind::NumberValue(small_val)) });
|
||||
update_data.insert("bigint_val1".to_string(), Value { kind: Some(Kind::NumberValue(big_val)) });
|
||||
update_data.insert("bigint_val2".to_string(), Value { kind: Some(Kind::NumberValue(big_val)) });
|
||||
|
||||
// Add some decimal and other type updates for comprehensive stress test
|
||||
update_data.insert("decimal_val".to_string(), string_to_proto_value(&format!("{}.{:02}", i * 10, i % 100)));
|
||||
update_data.insert("bool_val".to_string(), Value { kind: Some(Kind::BoolValue(i % 2 == 0)) });
|
||||
update_data.insert("timestamp_val".to_string(), string_to_proto_value(&format!("2024-01-{:02}T{:02}:00:00Z", (i % 28) + 1, i % 24)));
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.stress_table.clone(),
|
||||
id: *record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let result = put_table_data(&context.pool, request, &context.indexer_tx).await;
|
||||
assert!(result.is_ok(), "Rapid stress update {} should succeed", i);
|
||||
}
|
||||
|
||||
let duration = start.elapsed();
|
||||
println!("100 mixed data type stress updates took: {:?}", duration);
|
||||
|
||||
// Should complete in reasonable time (adjust threshold as needed)
|
||||
assert!(duration.as_secs() < 15, "Stress test took too long: {:?}", duration);
|
||||
|
||||
// Verify all records were updated correctly
|
||||
let query = format!(
|
||||
r#"SELECT COUNT(*) FROM "{}"."{}" WHERE test_name LIKE 'Stress update test%'"#,
|
||||
context.profile_name, context.stress_table
|
||||
);
|
||||
|
||||
let count: i64 = sqlx::query_scalar(&query)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count, 100);
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_concurrent_stress_mixed_data_types(
|
||||
#[future] performance_test_context: PerformanceTestContext,
|
||||
) {
|
||||
let context = performance_test_context.await;
|
||||
|
||||
// Create initial records
|
||||
let mut record_ids = Vec::new();
|
||||
for i in 0..20 {
|
||||
let record_id = create_initial_performance_record(&context).await;
|
||||
record_ids.push(record_id);
|
||||
}
|
||||
|
||||
// Concurrent stress test with mixed data types
|
||||
let tasks: Vec<_> = record_ids.into_iter().enumerate().map(|(i, record_id)| {
|
||||
let context = context.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert("test_name".to_string(), string_to_proto_value(&format!("Concurrent stress {}", i)));
|
||||
|
||||
// Use complex values that stress different validation paths
|
||||
let complex_int = match i % 3 {
|
||||
0 => 2147483647.0 - (i as f64), // Near i32::MAX
|
||||
1 => -2147483648.0 + (i as f64), // Near i32::MIN
|
||||
_ => (i as f64) * 12345.0,
|
||||
};
|
||||
|
||||
let complex_bigint = match i % 3 {
|
||||
0 => 9223372036854774784.0 - (i as f64 * 1000000000.0),
|
||||
1 => -9223372036854774784.0 + (i as f64 * 1000000000.0),
|
||||
_ => (i as f64) * 987654321012345.0,
|
||||
};
|
||||
|
||||
update_data.insert("int_val1".to_string(), Value { kind: Some(Kind::NumberValue(complex_int)) });
|
||||
update_data.insert("int_val2".to_string(), Value { kind: Some(Kind::NumberValue(complex_int)) });
|
||||
update_data.insert("bigint_val1".to_string(), Value { kind: Some(Kind::NumberValue(complex_bigint)) });
|
||||
update_data.insert("bigint_val2".to_string(), Value { kind: Some(Kind::NumberValue(complex_bigint)) });
|
||||
update_data.insert("decimal_val".to_string(), string_to_proto_value(&format!("{}.{:02}", i * 33, (i * 7) % 100)));
|
||||
update_data.insert("bool_val".to_string(), Value { kind: Some(Kind::BoolValue((i * 3) % 2 == 0)) });
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.stress_table.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
put_table_data(&context.pool, request, &context.indexer_tx).await
|
||||
})
|
||||
}).collect();
|
||||
|
||||
// Wait for all concurrent updates to complete
|
||||
let results = futures::future::join_all(tasks).await;
|
||||
|
||||
// All should succeed
|
||||
for (i, result) in results.into_iter().enumerate() {
|
||||
let task_result = result.expect("Task should not panic");
|
||||
assert!(task_result.is_ok(), "Concurrent stress update {} should succeed", i);
|
||||
}
|
||||
|
||||
// Verify all records were updated
|
||||
let query = format!(
|
||||
r#"SELECT COUNT(*) FROM "{}"."{}" WHERE test_name LIKE 'Concurrent stress%'"#,
|
||||
context.profile_name, context.stress_table
|
||||
);
|
||||
|
||||
let count: i64 = sqlx::query_scalar(&query)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count, 20);
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// EDGE CASE COMBINATION TESTS
|
||||
// ========================================================================
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_complex_mixed_data_type_combinations(
|
||||
#[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext,
|
||||
) {
|
||||
let context = comprehensive_integer_test_context.await;
|
||||
let record_id = create_initial_comprehensive_integer_record(&context, &context.mixed_integer_table).await;
|
||||
|
||||
// Test complex combinations of data type updates that stress multiple validation paths
|
||||
let complex_combinations = vec![
|
||||
(
|
||||
"All boundary values",
|
||||
HashMap::from([
|
||||
("small_int".to_string(), Value { kind: Some(Kind::NumberValue(2147483647.0)) }),
|
||||
("big_int".to_string(), Value { kind: Some(Kind::NumberValue(9223372036854774784.0)) }),
|
||||
("another_int".to_string(), Value { kind: Some(Kind::NumberValue(-2147483648.0)) }),
|
||||
("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue(-9223372036854774784.0)) }),
|
||||
("nullable_int".to_string(), Value { kind: Some(Kind::NullValue(0)) }),
|
||||
("nullable_bigint".to_string(), Value { kind: Some(Kind::NullValue(0)) }),
|
||||
])
|
||||
),
|
||||
(
|
||||
"Mixed nulls and values",
|
||||
HashMap::from([
|
||||
("small_int".to_string(), Value { kind: Some(Kind::NumberValue(42.0)) }),
|
||||
("big_int".to_string(), Value { kind: Some(Kind::NullValue(0)) }),
|
||||
("another_int".to_string(), Value { kind: Some(Kind::NullValue(0)) }),
|
||||
("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue(1000000000000.0)) }),
|
||||
("nullable_int".to_string(), Value { kind: Some(Kind::NumberValue(123.0)) }),
|
||||
("nullable_bigint".to_string(), Value { kind: Some(Kind::NullValue(0)) }),
|
||||
])
|
||||
),
|
||||
(
|
||||
"Zero and near-zero values",
|
||||
HashMap::from([
|
||||
("small_int".to_string(), Value { kind: Some(Kind::NumberValue(0.0)) }),
|
||||
("big_int".to_string(), Value { kind: Some(Kind::NumberValue(1.0)) }),
|
||||
("another_int".to_string(), Value { kind: Some(Kind::NumberValue(-1.0)) }),
|
||||
("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue(0.0)) }),
|
||||
("nullable_int".to_string(), Value { kind: Some(Kind::NumberValue(1.0)) }),
|
||||
("nullable_bigint".to_string(), Value { kind: Some(Kind::NumberValue(-1.0)) }),
|
||||
])
|
||||
),
|
||||
];
|
||||
|
||||
for (description, mut update_data) in complex_combinations {
|
||||
update_data.insert("name".to_string(), string_to_proto_value(&format!("Complex combo: {}", description)));
|
||||
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.mixed_integer_table.clone(),
|
||||
id: record_id,
|
||||
data: update_data.clone(),
|
||||
};
|
||||
|
||||
let result = put_table_data(&context.pool, request, &context.indexer_tx).await;
|
||||
assert!(result.is_ok(), "Complex combination should succeed: {}", description);
|
||||
|
||||
// Verify the complex combination was stored correctly
|
||||
let query = format!(
|
||||
r#"SELECT small_int, big_int, another_int, another_bigint, nullable_int, nullable_bigint FROM "{}"."{}" WHERE id = $1"#,
|
||||
context.profile_name, context.mixed_integer_table
|
||||
);
|
||||
let row = sqlx::query(&query)
|
||||
.bind(record_id)
|
||||
.fetch_one(&context.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify each field based on what was set in update_data
|
||||
for (field_name, expected_value) in update_data.iter() {
|
||||
if field_name == "name" { continue; } // Skip text field
|
||||
|
||||
match expected_value.kind.as_ref().unwrap() {
|
||||
Kind::NumberValue(num) => {
|
||||
match field_name.as_str() {
|
||||
"small_int" | "another_int" | "nullable_int" => {
|
||||
let stored: Option<i32> = row.get(field_name.as_str());
|
||||
if let Some(stored_val) = stored {
|
||||
assert_eq!(stored_val, *num as i32, "Field {} mismatch in {}", field_name, description);
|
||||
}
|
||||
},
|
||||
"big_int" | "another_bigint" | "nullable_bigint" => {
|
||||
let stored: Option<i64> = row.get(field_name.as_str());
|
||||
if let Some(stored_val) = stored {
|
||||
assert_eq!(stored_val, *num as i64, "Field {} mismatch in {}", field_name, description);
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
Kind::NullValue(_) => {
|
||||
match field_name.as_str() {
|
||||
"small_int" | "another_int" | "nullable_int" => {
|
||||
let stored: Option<i32> = row.get(field_name.as_str());
|
||||
assert!(stored.is_none(), "Field {} should be null in {}", field_name, description);
|
||||
},
|
||||
"big_int" | "another_bigint" | "nullable_bigint" => {
|
||||
let stored: Option<i64> = row.get(field_name.as_str());
|
||||
assert!(stored.is_none(), "Field {} should be null in {}", field_name, description);
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
121
server/tests/tables_data/handlers/put_table_data_test5.rs
Normal file
121
server/tests/tables_data/handlers/put_table_data_test5.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
// tests/tables_data/handlers/put_table_data_test5.rs
|
||||
|
||||
// ========================================================================
|
||||
// MISSING TEST SCENARIOS REPLICATED FROM POST TESTS
|
||||
// ========================================================================
|
||||
|
||||
// Fixture to provide a closed database pool, simulating a connection error.
|
||||
// This is needed for the database error test.
|
||||
#[fixture]
|
||||
async fn closed_test_context() -> TestContext {
|
||||
let mut context = test_context().await;
|
||||
context.pool.close().await;
|
||||
context
|
||||
}
|
||||
|
||||
// Test 1: Ensure that an update fails gracefully when the database is unavailable.
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_table_data_database_error(
|
||||
#[future] closed_test_context: TestContext,
|
||||
) {
|
||||
// Arrange
|
||||
let context = closed_test_context.await;
|
||||
// The record ID doesn't matter as the connection is already closed.
|
||||
let record_id = 1;
|
||||
|
||||
// Act
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert(
|
||||
"firma".to_string(),
|
||||
string_to_proto_value("This will fail"),
|
||||
);
|
||||
let request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.table_name.clone(),
|
||||
id: record_id,
|
||||
data: update_data,
|
||||
};
|
||||
let result =
|
||||
put_table_data(&context.pool, request, &context.indexer_tx).await;
|
||||
|
||||
// Assert
|
||||
assert!(result.is_err());
|
||||
assert_eq!(result.unwrap_err().code(), tonic::Code::Internal);
|
||||
}
|
||||
|
||||
// Test 2: Ensure that updating a required foreign key to NULL is not allowed.
|
||||
// This uses the `foreign_key_update_test_context` from `put_table_data_test3.rs`.
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_update_required_foreign_key_to_null_fails(
|
||||
#[future]
|
||||
foreign_key_update_test_context: ForeignKeyUpdateTestContext,
|
||||
) {
|
||||
let context = foreign_key_update_test_context.await;
|
||||
|
||||
// Arrange: Create a category and a product linked to it.
|
||||
let mut category_data = HashMap::new();
|
||||
category_data
|
||||
.insert("name".to_string(), string_to_proto_value("Test Category"));
|
||||
let category_request = PostTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.category_table.clone(),
|
||||
data: category_data,
|
||||
};
|
||||
let category_response = post_table_data(
|
||||
&context.pool,
|
||||
category_request,
|
||||
&context.indexer_tx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let category_id = category_response.inserted_id;
|
||||
|
||||
let mut product_data = HashMap::new();
|
||||
product_data
|
||||
.insert("name".to_string(), string_to_proto_value("Test Product"));
|
||||
product_data.insert(
|
||||
format!("{}_id", context.category_table),
|
||||
Value { kind: Some(Kind::NumberValue(category_id as f64)) },
|
||||
);
|
||||
let product_request = PostTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.product_table.clone(),
|
||||
data: product_data,
|
||||
};
|
||||
let product_response =
|
||||
post_table_data(&context.pool, product_request, &context.indexer_tx)
|
||||
.await
|
||||
.unwrap();
|
||||
let product_id = product_response.inserted_id;
|
||||
|
||||
// Act: Attempt to update the product's required foreign key to NULL.
|
||||
let mut update_data = HashMap::new();
|
||||
update_data.insert(
|
||||
format!("{}_id", context.category_table),
|
||||
Value { kind: Some(Kind::NullValue(0)) },
|
||||
);
|
||||
|
||||
let update_request = PutTableDataRequest {
|
||||
profile_name: context.profile_name.clone(),
|
||||
table_name: context.product_table.clone(),
|
||||
id: product_id,
|
||||
data: update_data,
|
||||
};
|
||||
|
||||
let result =
|
||||
put_table_data(&context.pool, update_request, &context.indexer_tx)
|
||||
.await;
|
||||
|
||||
// Assert: The operation should fail due to a database constraint.
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"Update of required foreign key to NULL should fail"
|
||||
);
|
||||
let err = result.unwrap_err();
|
||||
// The database will likely return a NOT NULL violation, which our handler
|
||||
// wraps as an Internal error.
|
||||
assert_eq!(err.code(), tonic::Code::Internal);
|
||||
assert!(err.message().contains("Update failed"));
|
||||
}
|
||||
Reference in New Issue
Block a user