diff --git a/server/src/table_definition/handlers/post_table_definition.rs b/server/src/table_definition/handlers/post_table_definition.rs index d399b38..3cafa7d 100644 --- a/server/src/table_definition/handlers/post_table_definition.rs +++ b/server/src/table_definition/handlers/post_table_definition.rs @@ -10,6 +10,7 @@ const PREDEFINED_FIELD_TYPES: &[(&str, &str)] = &[ ("string", "TEXT"), ("boolean", "BOOLEAN"), ("timestamp", "TIMESTAMPTZ"), + ("timestamptz", "TIMESTAMPTZ"), ("time", "TIMESTAMPTZ"), ("money", "NUMERIC(14, 4)"), ("integer", "INTEGER"), diff --git a/server/tests/tables_data/handlers/put_table_data_test.rs b/server/tests/tables_data/handlers/put_table_data_test.rs index 30812a0..7f61d4e 100644 --- a/server/tests/tables_data/handlers/put_table_data_test.rs +++ b/server/tests/tables_data/handlers/put_table_data_test.rs @@ -15,6 +15,7 @@ use server::table_definition::handlers::post_table_definition; use server::tables_data::handlers::post_table_data; // The put_table_data handler is the function we are testing. use server::tables_data::handlers::put_table_data; +use rust_decimal_macros::dec; use crate::common::setup_test_db; use tokio::sync::mpsc; use server::indexer::IndexCommand; @@ -93,6 +94,8 @@ struct TestContext { indexer_tx: mpsc::Sender, } + + #[fixture] async fn test_context() -> TestContext { let pool = setup_test_db().await; @@ -537,3 +540,5 @@ async fn test_update_boolean_system_column_validation( include!("put_table_data_test2.rs"); include!("put_table_data_test3.rs"); +include!("put_table_data_test4.rs"); +include!("put_table_data_test5.rs"); diff --git a/server/tests/tables_data/handlers/put_table_data_test2.rs b/server/tests/tables_data/handlers/put_table_data_test2.rs index 73b76b0..c96fdb3 100644 --- a/server/tests/tables_data/handlers/put_table_data_test2.rs +++ b/server/tests/tables_data/handlers/put_table_data_test2.rs @@ -37,7 +37,7 @@ async fn create_data_type_test_table( columns: vec![ TableColumnDefinition { name: "my_text".into(), field_type: "text".into() }, TableColumnDefinition { name: "my_bool".into(), field_type: "boolean".into() }, - TableColumnDefinition { name: "my_timestamp".into(), field_type: "timestamptz".into() }, + TableColumnDefinition { name: "my_timestamp".into(), field_type: "timestamp".into() }, TableColumnDefinition { name: "my_bigint".into(), field_type: "integer".into() }, TableColumnDefinition { name: "my_money".into(), field_type: "decimal(19,4)".into() }, TableColumnDefinition { name: "my_decimal".into(), field_type: "decimal(10,2)".into() }, @@ -831,3 +831,463 @@ async fn test_update_valid_timestamp_formats( assert!(result.is_ok(), "Failed for timestamp: {}", timestamp); } } + +#[rstest] +#[tokio::test] +async fn test_update_indexer_command_generation(#[future] test_context: TestContext) { + let context = test_context.await; + let (indexer_tx, mut indexer_rx) = mpsc::channel(100); + + // Create initial record + let initial_data = HashMap::from([ + ("firma".to_string(), string_to_proto_value("Initial Company")), + ("kz".to_string(), string_to_proto_value("INIT123")), + ]); + + let create_request = PostTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + data: initial_data, + }; + + let create_response = post_table_data(&context.pool, create_request, &indexer_tx).await.unwrap(); + let record_id = create_response.inserted_id; + + // Consume the create indexer message + let _create_command = tokio::time::timeout( + tokio::time::Duration::from_millis(100), + indexer_rx.recv() + ).await.unwrap().unwrap(); + + // Now perform an update and verify the indexer command + let mut update_data = HashMap::new(); + update_data.insert("firma".to_string(), string_to_proto_value("Updated Indexer Test Company")); + update_data.insert("kz".to_string(), string_to_proto_value("UPD123")); + + let update_request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_data, + }; + + let update_response = put_table_data(&context.pool, update_request, &indexer_tx).await.unwrap(); + assert!(update_response.success); + + // Check that indexer command was sent for the UPDATE + let indexer_command = tokio::time::timeout( + tokio::time::Duration::from_millis(100), + indexer_rx.recv() + ).await; + + assert!(indexer_command.is_ok(), "Indexer command should be sent for PUT operation"); + let command = indexer_command.unwrap().unwrap(); + + match command { + IndexCommand::AddOrUpdate(data) => { + assert_eq!(data.table_name, context.table_name); + assert_eq!(data.row_id, record_id); // Should be the updated record ID + // Verify it's an update, not an insert (same ID as the original record) + assert_eq!(data.row_id, update_response.updated_id); + }, + IndexCommand::Delete(_) => panic!("Expected AddOrUpdate command for PUT operation, got Delete"), + } +} + +#[rstest] +#[tokio::test] +async fn test_update_indexer_failure_resilience(#[future] test_context: TestContext) { + let context = test_context.await; + + // Create initial record with working indexer + let (working_indexer_tx, mut _working_rx) = mpsc::channel(100); + let initial_data = HashMap::from([ + ("firma".to_string(), string_to_proto_value("Resilience Test Company")), + ("kz".to_string(), string_to_proto_value("RES123")), + ]); + + let create_request = PostTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + data: initial_data, + }; + + let create_response = post_table_data(&context.pool, create_request, &working_indexer_tx).await.unwrap(); + let record_id = create_response.inserted_id; + + // Create a closed channel to simulate indexer failure for the update + let (failed_indexer_tx, indexer_rx) = mpsc::channel(1); + drop(indexer_rx); // Close receiver to simulate failure + + let mut update_data = HashMap::new(); + update_data.insert("firma".to_string(), string_to_proto_value("Updated Despite Indexer Failure")); + update_data.insert("kz".to_string(), string_to_proto_value("FAIL123")); + + let update_request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_data, + }; + + // Update should still succeed even if indexer fails + let update_response = put_table_data(&context.pool, update_request, &failed_indexer_tx).await.unwrap(); + assert!(update_response.success); + assert_eq!(update_response.updated_id, record_id); + + // Verify data was updated despite indexer failure + let query = format!( + r#"SELECT COUNT(*) FROM "{}"."{}" WHERE kz = 'FAIL123' AND firma = 'Updated Despite Indexer Failure'"#, + context.profile_name, context.table_name + ); + + let count: i64 = sqlx::query_scalar(&query) + .fetch_one(&context.pool) + .await + .unwrap(); + assert_eq!(count, 1); +} + +// ======================================================================== +// EMPTY STRINGS BECOME NULL COMPREHENSIVE TESTING +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_empty_strings_become_null(#[future] test_context: TestContext) { + let context = test_context.await; + + let test_cases = vec![ + ("", "empty_string"), + (" ", "whitespace_only"), + ("\t\n", "tabs_newlines"), + ("\r\n \t", "mixed_whitespace"), + (" Normal Value ", "padded_value"), + ("Keep This", "normal_value"), + ]; + + for (input, test_name) in test_cases { + // Create initial record with a value + let record_id = create_initial_record( + &context, + HashMap::from([ + ("firma".to_string(), string_to_proto_value("Test Empty Strings")), + ("ulica".to_string(), string_to_proto_value("Original Street")), + ("telefon".to_string(), string_to_proto_value("555-1234")), + ]), + ).await; + + let mut update_data = HashMap::new(); + update_data.insert("firma".to_string(), string_to_proto_value(&format!("Test {}", test_name))); + update_data.insert("ulica".to_string(), string_to_proto_value(input)); + update_data.insert("telefon".to_string(), string_to_proto_value(input)); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_data, + }; + + let response = put_table_data(&context.pool, request, &context.indexer_tx) + .await + .unwrap(); + assert!(response.success, "Failed for test case: {}", test_name); + + // Check what was actually stored + let query = format!( + r#"SELECT ulica, telefon FROM "{}"."{}" WHERE id = $1"#, + context.profile_name, context.table_name + ); + + let row = sqlx::query(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + let stored_ulica: Option = row.get("ulica"); + let stored_telefon: Option = row.get("telefon"); + + let trimmed = input.trim(); + if trimmed.is_empty() { + assert!(stored_ulica.is_none(), "Empty/whitespace string should be NULL for ulica in: {}", test_name); + assert!(stored_telefon.is_none(), "Empty/whitespace string should be NULL for telefon in: {}", test_name); + } else { + assert_eq!(stored_ulica.unwrap(), trimmed, "String should be trimmed for ulica in: {}", test_name); + assert_eq!(stored_telefon.unwrap(), trimmed, "String should be trimmed for telefon in: {}", test_name); + } + } +} + +#[rstest] +#[tokio::test] +async fn test_update_empty_string_to_null_then_to_value(#[future] test_context: TestContext) { + let context = test_context.await; + + // Create initial record with values + let record_id = create_initial_record( + &context, + HashMap::from([ + ("firma".to_string(), string_to_proto_value("String Transition Test")), + ("ulica".to_string(), string_to_proto_value("Original Street")), + ("telefon".to_string(), string_to_proto_value("555-1234")), + ]), + ).await; + + // First update: set fields to empty strings (should become NULL) + let mut update_to_empty = HashMap::new(); + update_to_empty.insert("ulica".to_string(), string_to_proto_value(" ")); // Whitespace only + update_to_empty.insert("telefon".to_string(), string_to_proto_value("")); // Empty string + + let request1 = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_to_empty, + }; + + let response1 = put_table_data(&context.pool, request1, &context.indexer_tx).await.unwrap(); + assert!(response1.success); + + // Verify fields are now NULL + let query = format!( + r#"SELECT ulica, telefon FROM "{}"."{}" WHERE id = $1"#, + context.profile_name, context.table_name + ); + + let row = sqlx::query(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + let ulica1: Option = row.get("ulica"); + let telefon1: Option = row.get("telefon"); + + assert!(ulica1.is_none(), "ulica should be NULL after empty string update"); + assert!(telefon1.is_none(), "telefon should be NULL after empty string update"); + + // Second update: set fields back to actual values + let mut update_to_values = HashMap::new(); + update_to_values.insert("ulica".to_string(), string_to_proto_value("New Street")); + update_to_values.insert("telefon".to_string(), string_to_proto_value("999-8888")); + + let request2 = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_to_values, + }; + + let response2 = put_table_data(&context.pool, request2, &context.indexer_tx).await.unwrap(); + assert!(response2.success); + + // Verify fields now have the new values + let row2 = sqlx::query(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + let ulica2: Option = row2.get("ulica"); + let telefon2: Option = row2.get("telefon"); + + assert_eq!(ulica2.unwrap(), "New Street", "ulica should have new value"); + assert_eq!(telefon2.unwrap(), "999-8888", "telefon should have new value"); +} + +// ======================================================================== +// COMPREHENSIVE NULL HANDLING EDGE CASES +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_mixed_null_transitions_comprehensive(#[future] test_context: TestContext) { + let context = test_context.await; + + // Create initial record with mixed values and nulls + let record_id = create_initial_record( + &context, + HashMap::from([ + ("firma".to_string(), string_to_proto_value("Null Transition Test")), + ("ulica".to_string(), string_to_proto_value("Original Street")), + ("telefon".to_string(), string_to_proto_value("555-1234")), + ("mesto".to_string(), string_to_proto_value("Original City")), + ]), + ).await; + + // Define a sequence of updates that test various NULL transitions + let update_sequences = vec![ + ( + "Step 1: Mix of values, nulls, and empty strings", + HashMap::from([ + ("ulica".to_string(), string_to_proto_value("Updated Street")), + ("telefon".to_string(), Value { kind: Some(Kind::NullValue(0)) }), // Explicit NULL + ("mesto".to_string(), string_to_proto_value(" ")), // Empty string -> NULL + ("kz".to_string(), string_to_proto_value("NEW123")), // New value + ]) + ), + ( + "Step 2: Convert NULLs back to values", + HashMap::from([ + ("telefon".to_string(), string_to_proto_value("999-8888")), // NULL -> value + ("mesto".to_string(), string_to_proto_value("New City")), // NULL -> value + ("ulica".to_string(), Value { kind: Some(Kind::NullValue(0)) }), // value -> NULL + ]) + ), + ( + "Step 3: Mix empty strings and explicit nulls", + HashMap::from([ + ("ulica".to_string(), string_to_proto_value("")), // NULL -> empty string -> NULL + ("telefon".to_string(), string_to_proto_value("\t\n ")), // value -> whitespace -> NULL + ("mesto".to_string(), string_to_proto_value("Final City")), // value -> value + ("kz".to_string(), Value { kind: Some(Kind::NullValue(0)) }), // value -> NULL + ]) + ), + ]; + + for (step_description, update_data) in update_sequences { + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_data.clone(), + }; + + let response = put_table_data(&context.pool, request, &context.indexer_tx).await.unwrap(); + assert!(response.success, "Failed at: {}", step_description); + + // Verify the state after this update + let query = format!( + r#"SELECT ulica, telefon, mesto, kz FROM "{}"."{}" WHERE id = $1"#, + context.profile_name, context.table_name + ); + + let row = sqlx::query(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + // Verify each field based on what should have happened + for (field_name, expected_value) in update_data.iter() { + let db_value: Option = row.get(field_name.as_str()); + + match expected_value.kind.as_ref().unwrap() { + Kind::StringValue(s) => { + let trimmed = s.trim(); + if trimmed.is_empty() { + assert!(db_value.is_none(), + "Field {} should be NULL after empty string in {}", field_name, step_description); + } else { + assert_eq!(db_value.unwrap(), trimmed, + "Field {} should have trimmed value in {}", field_name, step_description); + } + }, + Kind::NullValue(_) => { + assert!(db_value.is_none(), + "Field {} should be NULL after explicit null in {}", field_name, step_description); + }, + _ => {} + } + } + } +} + +// ======================================================================== +// ADVANCED INDEXER INTEGRATION SCENARIOS +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_indexer_multiple_rapid_updates(#[future] test_context: TestContext) { + let context = test_context.await; + let (indexer_tx, mut indexer_rx) = mpsc::channel(1000); + + // FIX: Call post_table_data directly instead of using the helper + let create_request = PostTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + data: HashMap::from([( + "firma".to_string(), + string_to_proto_value("Rapid Update Test"), + )]), + }; + let response = post_table_data(&context.pool, create_request, &indexer_tx) + .await + .unwrap(); + let record_id = response.inserted_id; + + // Consume the initial create command + let _create_command = tokio::time::timeout( + tokio::time::Duration::from_millis(100), + indexer_rx.recv(), + ) + .await + .expect("Timeout waiting for create command") // More descriptive panic + .expect("Channel closed unexpectedly"); + + // ... rest of the test remains the same + let num_updates = 10; + for i in 0..num_updates { + let mut update_data = HashMap::new(); + update_data.insert( + "firma".to_string(), + string_to_proto_value(&format!("Rapid Update {}", i)), + ); + update_data.insert( + "kz".to_string(), + string_to_proto_value(&format!("RAP{}", i)), + ); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_data, + }; + + let response = put_table_data(&context.pool, request, &indexer_tx) + .await + .unwrap(); + assert!(response.success, "Rapid update {} should succeed", i); + } + + // Verify all update commands were sent to indexer + let mut received_commands = 0; + while let Ok(command) = tokio::time::timeout( + tokio::time::Duration::from_millis(50), + indexer_rx.recv() + ).await { + if command.is_none() { break; } + match command.unwrap() { + IndexCommand::AddOrUpdate(data) => { + assert_eq!(data.table_name, context.table_name); + assert_eq!(data.row_id, record_id); + received_commands += 1; + }, + IndexCommand::Delete(_) => panic!("Unexpected Delete command during updates"), + } + } + + assert_eq!(received_commands, num_updates, "Should receive indexer command for each update"); + + // Verify final state + let query = format!( + r#"SELECT firma, kz FROM "{}"."{}" WHERE id = $1"#, + context.profile_name, context.table_name + ); + + let row = sqlx::query(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + let final_firma: String = row.get("firma"); + let final_kz: String = row.get("kz"); + + assert_eq!(final_firma, format!("Rapid Update {}", num_updates - 1)); + assert_eq!(final_kz, format!("RAP{}", num_updates - 1)); +} diff --git a/server/tests/tables_data/handlers/put_table_data_test3.rs b/server/tests/tables_data/handlers/put_table_data_test3.rs index cfd470f..d2f93b7 100644 --- a/server/tests/tables_data/handlers/put_table_data_test3.rs +++ b/server/tests/tables_data/handlers/put_table_data_test3.rs @@ -4,8 +4,6 @@ // ADDITIONAL HELPER FUNCTIONS FOR COMPREHENSIVE PUT TEST 3 // ======================================================================== -use rust_decimal_macros::dec; - // Note: Helper functions like create_string_value, create_number_value, etc. // are already defined in the main test file, so we don't redefine them here. diff --git a/server/tests/tables_data/handlers/put_table_data_test4.rs b/server/tests/tables_data/handlers/put_table_data_test4.rs new file mode 100644 index 0000000..c39a1d6 --- /dev/null +++ b/server/tests/tables_data/handlers/put_table_data_test4.rs @@ -0,0 +1,907 @@ +// tests/tables_data/handlers/put_table_data_test4.rs + + +#[derive(Clone)] +struct ComprehensiveIntegerTestContext { + pool: PgPool, + profile_name: String, + mixed_integer_table: String, + bigint_only_table: String, + integer_only_table: String, + indexer_tx: mpsc::Sender, +} + +#[derive(Clone)] +struct AdvancedDecimalTestContext { + pool: PgPool, + profile_name: String, + table_name: String, + indexer_tx: mpsc::Sender, +} + +#[derive(Clone)] +struct PerformanceTestContext { + pool: PgPool, + profile_name: String, + stress_table: String, + indexer_tx: mpsc::Sender, +} + +// ======================================================================== +// TABLE CREATION HELPERS FOR COMPREHENSIVE TESTING +// ======================================================================== + +async fn create_comprehensive_integer_tables( + pool: &PgPool, + profile_name: &str, +) -> Result { + let unique_id = generate_unique_id(); + let mixed_table = format!("comprehensive_mixed_table_{}", unique_id); + let bigint_table = format!("comprehensive_bigint_table_{}", unique_id); + let integer_table = format!("comprehensive_integer_table_{}", unique_id); + + // Table with both INTEGER and BIGINT columns for comprehensive testing + let mixed_def = PostTableDefinitionRequest { + profile_name: profile_name.into(), + table_name: mixed_table.clone(), + columns: vec![ + TableColumnDefinition { name: "name".into(), field_type: "text".into() }, + TableColumnDefinition { name: "small_int".into(), field_type: "integer".into() }, + TableColumnDefinition { name: "big_int".into(), field_type: "biginteger".into() }, + TableColumnDefinition { name: "another_int".into(), field_type: "int".into() }, + TableColumnDefinition { name: "another_bigint".into(), field_type: "bigint".into() }, + TableColumnDefinition { name: "nullable_int".into(), field_type: "integer".into() }, + TableColumnDefinition { name: "nullable_bigint".into(), field_type: "biginteger".into() }, + ], + indexes: vec![], + links: vec![], + }; + post_table_definition(pool, mixed_def).await?; + + // Table with only BIGINT columns for edge case testing + let bigint_def = PostTableDefinitionRequest { + profile_name: profile_name.into(), + table_name: bigint_table.clone(), + columns: vec![ + TableColumnDefinition { name: "name".into(), field_type: "text".into() }, + TableColumnDefinition { name: "value1".into(), field_type: "biginteger".into() }, + TableColumnDefinition { name: "value2".into(), field_type: "bigint".into() }, + TableColumnDefinition { name: "extreme_value".into(), field_type: "biginteger".into() }, + ], + indexes: vec![], + links: vec![], + }; + post_table_definition(pool, bigint_def).await?; + + // Table with only INTEGER columns for boundary testing + let integer_def = PostTableDefinitionRequest { + profile_name: profile_name.into(), + table_name: integer_table.clone(), + columns: vec![ + TableColumnDefinition { name: "name".into(), field_type: "text".into() }, + TableColumnDefinition { name: "value1".into(), field_type: "integer".into() }, + TableColumnDefinition { name: "value2".into(), field_type: "int".into() }, + TableColumnDefinition { name: "boundary_test".into(), field_type: "integer".into() }, + ], + indexes: vec![], + links: vec![], + }; + post_table_definition(pool, integer_def).await?; + + let (tx, mut rx) = mpsc::channel(100); + tokio::spawn(async move { while rx.recv().await.is_some() {} }); + + Ok(ComprehensiveIntegerTestContext { + pool: pool.clone(), + profile_name: profile_name.to_string(), + mixed_integer_table: mixed_table, + bigint_only_table: bigint_table, + integer_only_table: integer_table, + indexer_tx: tx, + }) +} + +async fn create_advanced_decimal_table( + pool: &PgPool, + table_name: &str, + profile_name: &str, +) -> Result<(), tonic::Status> { + let table_def_request = PostTableDefinitionRequest { + profile_name: profile_name.into(), + table_name: table_name.into(), + columns: vec![ + TableColumnDefinition { name: "product_name".into(), field_type: "text".into() }, + TableColumnDefinition { name: "price".into(), field_type: "decimal(19, 4)".into() }, + TableColumnDefinition { name: "rate".into(), field_type: "decimal(10, 5)".into() }, + TableColumnDefinition { name: "discount".into(), field_type: "decimal(5, 3)".into() }, + TableColumnDefinition { name: "ultra_precise".into(), field_type: "decimal(28, 10)".into() }, + TableColumnDefinition { name: "percentage".into(), field_type: "decimal(5, 4)".into() }, + ], + indexes: vec![], + links: vec![], + }; + + post_table_definition(pool, table_def_request).await?; + Ok(()) +} + +async fn create_performance_stress_table( + pool: &PgPool, + table_name: &str, + profile_name: &str, +) -> Result<(), tonic::Status> { + let table_def_request = PostTableDefinitionRequest { + profile_name: profile_name.into(), + table_name: table_name.into(), + columns: vec![ + TableColumnDefinition { name: "test_name".into(), field_type: "text".into() }, + TableColumnDefinition { name: "int_val1".into(), field_type: "integer".into() }, + TableColumnDefinition { name: "int_val2".into(), field_type: "integer".into() }, + TableColumnDefinition { name: "bigint_val1".into(), field_type: "biginteger".into() }, + TableColumnDefinition { name: "bigint_val2".into(), field_type: "biginteger".into() }, + TableColumnDefinition { name: "decimal_val".into(), field_type: "decimal(10, 2)".into() }, + TableColumnDefinition { name: "bool_val".into(), field_type: "boolean".into() }, + TableColumnDefinition { name: "timestamp_val".into(), field_type: "timestamptz".into() }, + ], + indexes: vec![], + links: vec![], + }; + + post_table_definition(pool, table_def_request).await?; + Ok(()) +} + +// ======================================================================== +// FIXTURES +// ======================================================================== + +#[fixture] +async fn comprehensive_integer_test_context() -> ComprehensiveIntegerTestContext { + let pool = setup_test_db().await; + let unique_id = generate_unique_id(); + let profile_name = format!("comp_int_profile_{}", unique_id); + + create_comprehensive_integer_tables(&pool, &profile_name).await + .expect("Failed to create comprehensive integer test tables") +} + +#[fixture] +async fn advanced_decimal_test_context() -> AdvancedDecimalTestContext { + let pool = setup_test_db().await; + let unique_id = generate_unique_id(); + let profile_name = format!("adv_decimal_profile_{}", unique_id); + let table_name = format!("advanced_decimals_{}", unique_id); + + create_advanced_decimal_table(&pool, &table_name, &profile_name).await + .expect("Failed to create advanced decimal test table"); + + let (tx, mut rx) = mpsc::channel(100); + tokio::spawn(async move { while rx.recv().await.is_some() {} }); + + AdvancedDecimalTestContext { pool, profile_name, table_name, indexer_tx: tx } +} + +#[fixture] +async fn performance_test_context() -> PerformanceTestContext { + let pool = setup_test_db().await; + let unique_id = generate_unique_id(); + let profile_name = format!("perf_profile_{}", unique_id); + let stress_table = format!("stress_table_{}", unique_id); + + create_performance_stress_table(&pool, &stress_table, &profile_name).await + .expect("Failed to create performance stress test table"); + + let (tx, mut rx) = mpsc::channel(100); + tokio::spawn(async move { while rx.recv().await.is_some() {} }); + + PerformanceTestContext { pool, profile_name, stress_table, indexer_tx: tx } +} + +// ======================================================================== +// HELPER FUNCTIONS FOR CREATING INITIAL RECORDS +// ======================================================================== + +async fn create_initial_comprehensive_integer_record( + context: &ComprehensiveIntegerTestContext, + table_name: &str +) -> i64 { + let mut initial_data = HashMap::new(); + initial_data.insert("name".to_string(), string_to_proto_value("Initial Record")); + + match table_name { + table if table.contains("mixed") => { + initial_data.insert("small_int".to_string(), Value { kind: Some(Kind::NumberValue(100.0)) }); + initial_data.insert("big_int".to_string(), Value { kind: Some(Kind::NumberValue(1000000000000.0)) }); + initial_data.insert("another_int".to_string(), Value { kind: Some(Kind::NumberValue(200.0)) }); + initial_data.insert("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue(2000000000000.0)) }); + initial_data.insert("nullable_int".to_string(), Value { kind: Some(Kind::NumberValue(300.0)) }); + initial_data.insert("nullable_bigint".to_string(), Value { kind: Some(Kind::NumberValue(3000000000000.0)) }); + }, + table if table.contains("bigint") => { + initial_data.insert("value1".to_string(), Value { kind: Some(Kind::NumberValue(1000000000000.0)) }); + initial_data.insert("value2".to_string(), Value { kind: Some(Kind::NumberValue(2000000000000.0)) }); + initial_data.insert("extreme_value".to_string(), Value { kind: Some(Kind::NumberValue(9223372036854774784.0)) }); + }, + table if table.contains("integer") => { + initial_data.insert("value1".to_string(), Value { kind: Some(Kind::NumberValue(100.0)) }); + initial_data.insert("value2".to_string(), Value { kind: Some(Kind::NumberValue(200.0)) }); + initial_data.insert("boundary_test".to_string(), Value { kind: Some(Kind::NumberValue(300.0)) }); + }, + _ => {} + } + + let request = PostTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: table_name.to_string(), + data: initial_data, + }; + let response = post_table_data(&context.pool, request, &context.indexer_tx) + .await + .expect("Setup: Failed to create initial integer record"); + response.inserted_id +} + +async fn create_initial_advanced_decimal_record(context: &AdvancedDecimalTestContext) -> i64 { + let mut initial_data = HashMap::new(); + initial_data.insert("product_name".to_string(), string_to_proto_value("Initial Product")); + initial_data.insert("price".to_string(), string_to_proto_value("100.0000")); + initial_data.insert("rate".to_string(), string_to_proto_value("1.00000")); + initial_data.insert("discount".to_string(), string_to_proto_value("0.100")); + initial_data.insert("ultra_precise".to_string(), string_to_proto_value("123.4567890123")); + initial_data.insert("percentage".to_string(), string_to_proto_value("0.9999")); + + let request = PostTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + data: initial_data, + }; + let response = post_table_data(&context.pool, request, &context.indexer_tx) + .await + .expect("Setup: Failed to create initial decimal record"); + response.inserted_id +} + +async fn create_initial_performance_record(context: &PerformanceTestContext) -> i64 { + let mut initial_data = HashMap::new(); + initial_data.insert("test_name".to_string(), string_to_proto_value("Initial Performance Test")); + initial_data.insert("int_val1".to_string(), Value { kind: Some(Kind::NumberValue(1.0)) }); + initial_data.insert("int_val2".to_string(), Value { kind: Some(Kind::NumberValue(2.0)) }); + initial_data.insert("bigint_val1".to_string(), Value { kind: Some(Kind::NumberValue(1000000000000.0)) }); + initial_data.insert("bigint_val2".to_string(), Value { kind: Some(Kind::NumberValue(2000000000000.0)) }); + initial_data.insert("decimal_val".to_string(), string_to_proto_value("123.45")); + initial_data.insert("bool_val".to_string(), Value { kind: Some(Kind::BoolValue(false)) }); + initial_data.insert("timestamp_val".to_string(), string_to_proto_value("2024-01-01T00:00:00Z")); + + let request = PostTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.stress_table.clone(), + data: initial_data, + }; + let response = post_table_data(&context.pool, request, &context.indexer_tx) + .await + .expect("Setup: Failed to create initial performance record"); + response.inserted_id +} + +// ======================================================================== +// BIGINT SUCCESSFUL ROUNDTRIP VALUE TESTS +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_bigint_successful_roundtrip_values( + #[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext, +) { + let context = comprehensive_integer_test_context.await; + let record_id = create_initial_comprehensive_integer_record(&context, &context.bigint_only_table).await; + + // Values that SHOULD successfully round-trip and be accepted for updates + let successful_values = vec![ + (9223372036854775808.0, "Exactly i64::MAX as f64 (legitimate value)"), + (-9223372036854775808.0, "Exactly i64::MIN as f64 (legitimate value)"), + (9223372036854774784.0, "Large but precisely representable in f64"), + (-9223372036854774784.0, "Large negative but precisely representable in f64"), + (0.0, "Zero"), + (1.0, "One"), + (-1.0, "Negative one"), + (2147483647.0, "i32::MAX as f64"), + (-2147483648.0, "i32::MIN as f64"), + (4611686018427387904.0, "i64::MAX / 2"), + (-4611686018427387904.0, "i64::MIN / 2"), + (1000000000000.0, "One trillion"), + (-1000000000000.0, "Negative one trillion"), + ]; + + for (value, description) in successful_values { + let mut update_data = HashMap::new(); + update_data.insert("name".to_string(), string_to_proto_value(&format!("Roundtrip test: {}", description))); + update_data.insert("value1".to_string(), Value { kind: Some(Kind::NumberValue(value)) }); + update_data.insert("extreme_value".to_string(), Value { kind: Some(Kind::NumberValue(value)) }); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.bigint_only_table.clone(), + id: record_id, + data: update_data, + }; + + let result = put_table_data(&context.pool, request, &context.indexer_tx).await; + assert!(result.is_ok(), "Should have succeeded for legitimate i64 update value {}: {}", value, description); + + // Verify it was stored correctly + if let Ok(response) = result { + let query = format!( + r#"SELECT value1, extreme_value FROM "{}"."{}" WHERE id = $1"#, + context.profile_name, context.bigint_only_table + ); + let row = sqlx::query(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + let stored_value1: i64 = row.get("value1"); + let stored_extreme_value: i64 = row.get("extreme_value"); + + assert_eq!(stored_value1, value as i64, "Value1 should match for {}", description); + assert_eq!(stored_extreme_value, value as i64, "Extreme value should match for {}", description); + } + } +} + +#[rstest] +#[tokio::test] +async fn test_update_bigint_overflow_rejection_comprehensive( + #[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext, +) { + let context = comprehensive_integer_test_context.await; + let record_id = create_initial_comprehensive_integer_record(&context, &context.bigint_only_table).await; + + // Values that should be rejected for BIGINT columns due to precision loss or overflow + let overflow_values = vec![ + (f64::INFINITY, "Positive infinity"), + (f64::NEG_INFINITY, "Negative infinity"), + (1e20, "Very large number (100,000,000,000,000,000,000)"), + (-1e20, "Very large negative number"), + (1e25, "Extremely large number"), + (-1e25, "Extremely large negative number"), + (f64::MAX, "f64::MAX"), + (f64::MIN, "f64::MIN"), + (f64::NAN, "NaN"), + ]; + + for (value, description) in overflow_values { + let mut update_data = HashMap::new(); + update_data.insert("name".to_string(), string_to_proto_value(&format!("i64 Overflow update test: {}", description))); + update_data.insert("extreme_value".to_string(), Value { kind: Some(Kind::NumberValue(value)) }); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.bigint_only_table.clone(), + id: record_id, + data: update_data, + }; + + let result = put_table_data(&context.pool, request, &context.indexer_tx).await; + + assert!(result.is_err(), "Should have failed for i64 overflow update value {}: {}", value, description); + + if let Err(err) = result { + assert_eq!(err.code(), tonic::Code::InvalidArgument); + let message = err.message(); + assert!( + message.contains("Integer value out of range for BIGINT column") || + message.contains("Expected integer for column") || + message.contains("but got a float") || + message.contains("Invalid number"), + "Unexpected error message for {}: {}", + description, + message + ); + } + } +} + +// ======================================================================== +// WRONG TYPE FOR MIXED INTEGER COLUMNS TESTS +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_wrong_type_for_mixed_integer_columns( + #[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext, +) { + let context = comprehensive_integer_test_context.await; + let record_id = create_initial_comprehensive_integer_record(&context, &context.mixed_integer_table).await; + + // Try to put i64 values into i32 columns (should fail) + let wrong_type_tests = vec![ + ("small_int", 3000000000.0, "3 billion in i32 column"), + ("another_int", -3000000000.0, "negative 3 billion in i32 column"), + ("nullable_int", 2147483648.0, "i32::MAX + 1 in i32 column"), + ("small_int", 9223372036854775807.0, "i64::MAX in i32 column"), + ]; + + for (column_name, value, description) in wrong_type_tests { + let mut update_data = HashMap::new(); + update_data.insert("name".to_string(), string_to_proto_value(&format!("Wrong type test: {}", description))); + update_data.insert(column_name.to_string(), Value { kind: Some(Kind::NumberValue(value)) }); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.mixed_integer_table.clone(), + id: record_id, + data: update_data, + }; + + let result = put_table_data(&context.pool, request, &context.indexer_tx).await; + assert!(result.is_err(), "Should fail when putting i64 value {} in i32 column {}", value, column_name); + + if let Err(err) = result { + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert!(err.message().contains("Integer value out of range for INTEGER column")); + } + } + + // Try fractional values in integer columns (should fail) + let fractional_tests = vec![ + ("small_int", 42.5, "fractional in i32 column"), + ("big_int", 1000000000000.1, "fractional in i64 column"), + ("another_int", -42.9, "negative fractional in i32 column"), + ("another_bigint", -1000000000000.9, "negative fractional in i64 column"), + ]; + + for (column_name, value, description) in fractional_tests { + let mut update_data = HashMap::new(); + update_data.insert("name".to_string(), string_to_proto_value(&format!("Fractional test: {}", description))); + update_data.insert(column_name.to_string(), Value { kind: Some(Kind::NumberValue(value)) }); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.mixed_integer_table.clone(), + id: record_id, + data: update_data, + }; + + let result = put_table_data(&context.pool, request, &context.indexer_tx).await; + assert!(result.is_err(), "Should fail for fractional value {} in column {}", value, column_name); + + if let Err(err) = result { + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert!(err.message().contains("Expected integer for column") && err.message().contains("but got a float")); + } + } +} + +// ======================================================================== +// CONCURRENT MIXED INTEGER UPDATES TESTS +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_concurrent_mixed_integer_updates( + #[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext, +) { + let context = comprehensive_integer_test_context.await; + // Create multiple records for concurrent updating + let mut record_ids = Vec::new(); + for i in 0..10 { + let record_id = create_initial_comprehensive_integer_record(&context, &context.mixed_integer_table).await; + record_ids.push(record_id); + } + // Test concurrent updates with different integer types + let tasks: Vec<_> = record_ids.into_iter().enumerate().map(|(i, record_id)| { + let context = context.clone(); + tokio::spawn(async move { + let mut update_data = HashMap::new(); + update_data.insert("name".to_string(), string_to_proto_value(&format!("Concurrent update test {}", i))); + update_data.insert("small_int".to_string(), Value { kind: Some(Kind::NumberValue((i * 1000) as f64)) }); + update_data.insert("big_int".to_string(), Value { kind: Some(Kind::NumberValue((i as i64 * 1000000000000) as f64)) }); + // Fix: Cast i to i32 first, then multiply by negative number + update_data.insert("another_int".to_string(), Value { kind: Some(Kind::NumberValue(((i as i32) * -100) as f64)) }); + update_data.insert("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue((i as i64 * -1000000000000) as f64)) }); + // Alternate between null and values for nullable columns + if i % 2 == 0 { + update_data.insert("nullable_int".to_string(), Value { kind: Some(Kind::NumberValue((i * 42) as f64)) }); + update_data.insert("nullable_bigint".to_string(), Value { kind: Some(Kind::NullValue(0)) }); + } else { + update_data.insert("nullable_int".to_string(), Value { kind: Some(Kind::NullValue(0)) }); + update_data.insert("nullable_bigint".to_string(), Value { kind: Some(Kind::NumberValue((i as i64 * 9999999999) as f64)) }); + } + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.mixed_integer_table.clone(), + id: record_id, + data: update_data, + }; + put_table_data(&context.pool, request, &context.indexer_tx).await + }) + }).collect(); + // Wait for all tasks to complete + let results = futures::future::join_all(tasks).await; + // All should succeed + for (i, result) in results.into_iter().enumerate() { + let task_result = result.expect("Task should not panic"); + assert!(task_result.is_ok(), "Concurrent integer update {} should succeed", i); + } + // Verify all records were updated correctly + let query = format!( + r#"SELECT COUNT(*) FROM "{}"."{}" WHERE name LIKE 'Concurrent update test%'"#, + context.profile_name, context.mixed_integer_table + ); + let count: i64 = sqlx::query_scalar(&query) + .fetch_one(&context.pool) + .await + .unwrap(); + assert_eq!(count, 10); +} + +// ======================================================================== +// ADVANCED DECIMAL PRECISION EDGE CASES +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_ultra_high_precision_decimals( + #[future] advanced_decimal_test_context: AdvancedDecimalTestContext, +) { + let context = advanced_decimal_test_context.await; + let record_id = create_initial_advanced_decimal_record(&context).await; + + let ultra_precision_tests = vec![ + ("ultra_precise", "123456789.1234567890", dec!(123456789.1234567890)), + ("ultra_precise", "-999999999.9999999999", dec!(-999999999.9999999999)), + ("ultra_precise", "0.0000000001", dec!(0.0000000001)), + ("percentage", "0.9999", dec!(0.9999)), // decimal(5,4) - 0.9999 is max + ("percentage", "0.0001", dec!(0.0001)), // decimal(5,4) - minimum precision + ]; + + for (field, value_str, expected_decimal) in ultra_precision_tests { + let mut update_data = HashMap::new(); + update_data.insert("product_name".to_string(), string_to_proto_value("Ultra precision test")); + update_data.insert(field.to_string(), string_to_proto_value(value_str)); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_data, + }; + + let response = put_table_data(&context.pool, request, &context.indexer_tx) + .await + .unwrap(); + assert!(response.success); + + // Verify ultra high precision was preserved + let query = format!( + r#"SELECT {} FROM "{}"."{}" WHERE id = $1"#, + field, context.profile_name, context.table_name + ); + let stored_value: rust_decimal::Decimal = sqlx::query_scalar(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + assert_eq!(stored_value, expected_decimal, "Ultra precision mismatch for field {}", field); + } +} + +#[rstest] +#[tokio::test] +async fn test_update_decimal_edge_case_rounding( + #[future] advanced_decimal_test_context: AdvancedDecimalTestContext, +) { + let context = advanced_decimal_test_context.await; + let record_id = create_initial_advanced_decimal_record(&context).await; + + // Test edge cases where rounding behavior is critical + let edge_rounding_tests = vec![ + ("price", "12345.99995", dec!(12346.0000)), // Should round up at 5 + ("rate", "1.999995", dec!(2.00000)), // Should round up + ("discount", "0.9995", dec!(1.000)), // Should round up to 1.000 + ("percentage", "0.99995", dec!(1.0000)), // decimal(5,4) rounds to 1.0000 + ("ultra_precise", "1.99999999995", dec!(2.0000000000)), // Ultra precision rounding + ]; + + for (field, input_value, expected_rounded) in edge_rounding_tests { + let mut update_data = HashMap::new(); + update_data.insert("product_name".to_string(), string_to_proto_value("Edge rounding test")); + update_data.insert(field.to_string(), string_to_proto_value(input_value)); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_data, + }; + + let response = put_table_data(&context.pool, request, &context.indexer_tx) + .await + .unwrap(); + assert!(response.success); + + // Verify edge case rounding was applied correctly + let query = format!( + r#"SELECT {} FROM "{}"."{}" WHERE id = $1"#, + field, context.profile_name, context.table_name + ); + let stored_value: rust_decimal::Decimal = sqlx::query_scalar(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + assert_eq!(stored_value, expected_rounded, "Edge rounding mismatch for field {}", field); + } +} + +// ======================================================================== +// PERFORMANCE AND STRESS TESTS +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_rapid_integer_updates_stress( + #[future] performance_test_context: PerformanceTestContext, +) { + let context = performance_test_context.await; + + // Create initial records for stress testing + let mut record_ids = Vec::new(); + for i in 0..100 { + let record_id = create_initial_performance_record(&context).await; + record_ids.push(record_id); + } + + // Rapid sequential updates with alternating integer types and complex data + let start = std::time::Instant::now(); + + for (i, record_id) in record_ids.iter().enumerate() { + let mut update_data = HashMap::new(); + update_data.insert("test_name".to_string(), string_to_proto_value(&format!("Stress update test {}", i))); + + // Alternate between different boundary values for stress testing + let small_val = match i % 4 { + 0 => 2147483647.0, // i32::MAX + 1 => -2147483648.0, // i32::MIN + 2 => 0.0, + _ => (i as f64) * 1000.0, + }; + + let big_val = match i % 4 { + 0 => 9223372036854774784.0, // Near i64::MAX + 1 => -9223372036854774784.0, // Near i64::MIN + 2 => 0.0, + _ => (i as f64) * 1000000000000.0, + }; + + update_data.insert("int_val1".to_string(), Value { kind: Some(Kind::NumberValue(small_val)) }); + update_data.insert("int_val2".to_string(), Value { kind: Some(Kind::NumberValue(small_val)) }); + update_data.insert("bigint_val1".to_string(), Value { kind: Some(Kind::NumberValue(big_val)) }); + update_data.insert("bigint_val2".to_string(), Value { kind: Some(Kind::NumberValue(big_val)) }); + + // Add some decimal and other type updates for comprehensive stress test + update_data.insert("decimal_val".to_string(), string_to_proto_value(&format!("{}.{:02}", i * 10, i % 100))); + update_data.insert("bool_val".to_string(), Value { kind: Some(Kind::BoolValue(i % 2 == 0)) }); + update_data.insert("timestamp_val".to_string(), string_to_proto_value(&format!("2024-01-{:02}T{:02}:00:00Z", (i % 28) + 1, i % 24))); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.stress_table.clone(), + id: *record_id, + data: update_data, + }; + + let result = put_table_data(&context.pool, request, &context.indexer_tx).await; + assert!(result.is_ok(), "Rapid stress update {} should succeed", i); + } + + let duration = start.elapsed(); + println!("100 mixed data type stress updates took: {:?}", duration); + + // Should complete in reasonable time (adjust threshold as needed) + assert!(duration.as_secs() < 15, "Stress test took too long: {:?}", duration); + + // Verify all records were updated correctly + let query = format!( + r#"SELECT COUNT(*) FROM "{}"."{}" WHERE test_name LIKE 'Stress update test%'"#, + context.profile_name, context.stress_table + ); + + let count: i64 = sqlx::query_scalar(&query) + .fetch_one(&context.pool) + .await + .unwrap(); + assert_eq!(count, 100); +} + +#[rstest] +#[tokio::test] +async fn test_update_concurrent_stress_mixed_data_types( + #[future] performance_test_context: PerformanceTestContext, +) { + let context = performance_test_context.await; + + // Create initial records + let mut record_ids = Vec::new(); + for i in 0..20 { + let record_id = create_initial_performance_record(&context).await; + record_ids.push(record_id); + } + + // Concurrent stress test with mixed data types + let tasks: Vec<_> = record_ids.into_iter().enumerate().map(|(i, record_id)| { + let context = context.clone(); + + tokio::spawn(async move { + let mut update_data = HashMap::new(); + update_data.insert("test_name".to_string(), string_to_proto_value(&format!("Concurrent stress {}", i))); + + // Use complex values that stress different validation paths + let complex_int = match i % 3 { + 0 => 2147483647.0 - (i as f64), // Near i32::MAX + 1 => -2147483648.0 + (i as f64), // Near i32::MIN + _ => (i as f64) * 12345.0, + }; + + let complex_bigint = match i % 3 { + 0 => 9223372036854774784.0 - (i as f64 * 1000000000.0), + 1 => -9223372036854774784.0 + (i as f64 * 1000000000.0), + _ => (i as f64) * 987654321012345.0, + }; + + update_data.insert("int_val1".to_string(), Value { kind: Some(Kind::NumberValue(complex_int)) }); + update_data.insert("int_val2".to_string(), Value { kind: Some(Kind::NumberValue(complex_int)) }); + update_data.insert("bigint_val1".to_string(), Value { kind: Some(Kind::NumberValue(complex_bigint)) }); + update_data.insert("bigint_val2".to_string(), Value { kind: Some(Kind::NumberValue(complex_bigint)) }); + update_data.insert("decimal_val".to_string(), string_to_proto_value(&format!("{}.{:02}", i * 33, (i * 7) % 100))); + update_data.insert("bool_val".to_string(), Value { kind: Some(Kind::BoolValue((i * 3) % 2 == 0)) }); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.stress_table.clone(), + id: record_id, + data: update_data, + }; + + put_table_data(&context.pool, request, &context.indexer_tx).await + }) + }).collect(); + + // Wait for all concurrent updates to complete + let results = futures::future::join_all(tasks).await; + + // All should succeed + for (i, result) in results.into_iter().enumerate() { + let task_result = result.expect("Task should not panic"); + assert!(task_result.is_ok(), "Concurrent stress update {} should succeed", i); + } + + // Verify all records were updated + let query = format!( + r#"SELECT COUNT(*) FROM "{}"."{}" WHERE test_name LIKE 'Concurrent stress%'"#, + context.profile_name, context.stress_table + ); + + let count: i64 = sqlx::query_scalar(&query) + .fetch_one(&context.pool) + .await + .unwrap(); + assert_eq!(count, 20); +} + +// ======================================================================== +// EDGE CASE COMBINATION TESTS +// ======================================================================== + +#[rstest] +#[tokio::test] +async fn test_update_complex_mixed_data_type_combinations( + #[future] comprehensive_integer_test_context: ComprehensiveIntegerTestContext, +) { + let context = comprehensive_integer_test_context.await; + let record_id = create_initial_comprehensive_integer_record(&context, &context.mixed_integer_table).await; + + // Test complex combinations of data type updates that stress multiple validation paths + let complex_combinations = vec![ + ( + "All boundary values", + HashMap::from([ + ("small_int".to_string(), Value { kind: Some(Kind::NumberValue(2147483647.0)) }), + ("big_int".to_string(), Value { kind: Some(Kind::NumberValue(9223372036854774784.0)) }), + ("another_int".to_string(), Value { kind: Some(Kind::NumberValue(-2147483648.0)) }), + ("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue(-9223372036854774784.0)) }), + ("nullable_int".to_string(), Value { kind: Some(Kind::NullValue(0)) }), + ("nullable_bigint".to_string(), Value { kind: Some(Kind::NullValue(0)) }), + ]) + ), + ( + "Mixed nulls and values", + HashMap::from([ + ("small_int".to_string(), Value { kind: Some(Kind::NumberValue(42.0)) }), + ("big_int".to_string(), Value { kind: Some(Kind::NullValue(0)) }), + ("another_int".to_string(), Value { kind: Some(Kind::NullValue(0)) }), + ("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue(1000000000000.0)) }), + ("nullable_int".to_string(), Value { kind: Some(Kind::NumberValue(123.0)) }), + ("nullable_bigint".to_string(), Value { kind: Some(Kind::NullValue(0)) }), + ]) + ), + ( + "Zero and near-zero values", + HashMap::from([ + ("small_int".to_string(), Value { kind: Some(Kind::NumberValue(0.0)) }), + ("big_int".to_string(), Value { kind: Some(Kind::NumberValue(1.0)) }), + ("another_int".to_string(), Value { kind: Some(Kind::NumberValue(-1.0)) }), + ("another_bigint".to_string(), Value { kind: Some(Kind::NumberValue(0.0)) }), + ("nullable_int".to_string(), Value { kind: Some(Kind::NumberValue(1.0)) }), + ("nullable_bigint".to_string(), Value { kind: Some(Kind::NumberValue(-1.0)) }), + ]) + ), + ]; + + for (description, mut update_data) in complex_combinations { + update_data.insert("name".to_string(), string_to_proto_value(&format!("Complex combo: {}", description))); + + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.mixed_integer_table.clone(), + id: record_id, + data: update_data.clone(), + }; + + let result = put_table_data(&context.pool, request, &context.indexer_tx).await; + assert!(result.is_ok(), "Complex combination should succeed: {}", description); + + // Verify the complex combination was stored correctly + let query = format!( + r#"SELECT small_int, big_int, another_int, another_bigint, nullable_int, nullable_bigint FROM "{}"."{}" WHERE id = $1"#, + context.profile_name, context.mixed_integer_table + ); + let row = sqlx::query(&query) + .bind(record_id) + .fetch_one(&context.pool) + .await + .unwrap(); + + // Verify each field based on what was set in update_data + for (field_name, expected_value) in update_data.iter() { + if field_name == "name" { continue; } // Skip text field + + match expected_value.kind.as_ref().unwrap() { + Kind::NumberValue(num) => { + match field_name.as_str() { + "small_int" | "another_int" | "nullable_int" => { + let stored: Option = row.get(field_name.as_str()); + if let Some(stored_val) = stored { + assert_eq!(stored_val, *num as i32, "Field {} mismatch in {}", field_name, description); + } + }, + "big_int" | "another_bigint" | "nullable_bigint" => { + let stored: Option = row.get(field_name.as_str()); + if let Some(stored_val) = stored { + assert_eq!(stored_val, *num as i64, "Field {} mismatch in {}", field_name, description); + } + }, + _ => {} + } + }, + Kind::NullValue(_) => { + match field_name.as_str() { + "small_int" | "another_int" | "nullable_int" => { + let stored: Option = row.get(field_name.as_str()); + assert!(stored.is_none(), "Field {} should be null in {}", field_name, description); + }, + "big_int" | "another_bigint" | "nullable_bigint" => { + let stored: Option = row.get(field_name.as_str()); + assert!(stored.is_none(), "Field {} should be null in {}", field_name, description); + }, + _ => {} + } + }, + _ => {} + } + } + } +} diff --git a/server/tests/tables_data/handlers/put_table_data_test5.rs b/server/tests/tables_data/handlers/put_table_data_test5.rs new file mode 100644 index 0000000..a805e1e --- /dev/null +++ b/server/tests/tables_data/handlers/put_table_data_test5.rs @@ -0,0 +1,121 @@ +// tests/tables_data/handlers/put_table_data_test5.rs + +// ======================================================================== +// MISSING TEST SCENARIOS REPLICATED FROM POST TESTS +// ======================================================================== + +// Fixture to provide a closed database pool, simulating a connection error. +// This is needed for the database error test. +#[fixture] +async fn closed_test_context() -> TestContext { + let mut context = test_context().await; + context.pool.close().await; + context +} + +// Test 1: Ensure that an update fails gracefully when the database is unavailable. +#[rstest] +#[tokio::test] +async fn test_update_table_data_database_error( + #[future] closed_test_context: TestContext, +) { + // Arrange + let context = closed_test_context.await; + // The record ID doesn't matter as the connection is already closed. + let record_id = 1; + + // Act + let mut update_data = HashMap::new(); + update_data.insert( + "firma".to_string(), + string_to_proto_value("This will fail"), + ); + let request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.table_name.clone(), + id: record_id, + data: update_data, + }; + let result = + put_table_data(&context.pool, request, &context.indexer_tx).await; + + // Assert + assert!(result.is_err()); + assert_eq!(result.unwrap_err().code(), tonic::Code::Internal); +} + +// Test 2: Ensure that updating a required foreign key to NULL is not allowed. +// This uses the `foreign_key_update_test_context` from `put_table_data_test3.rs`. +#[rstest] +#[tokio::test] +async fn test_update_required_foreign_key_to_null_fails( + #[future] + foreign_key_update_test_context: ForeignKeyUpdateTestContext, +) { + let context = foreign_key_update_test_context.await; + + // Arrange: Create a category and a product linked to it. + let mut category_data = HashMap::new(); + category_data + .insert("name".to_string(), string_to_proto_value("Test Category")); + let category_request = PostTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.category_table.clone(), + data: category_data, + }; + let category_response = post_table_data( + &context.pool, + category_request, + &context.indexer_tx, + ) + .await + .unwrap(); + let category_id = category_response.inserted_id; + + let mut product_data = HashMap::new(); + product_data + .insert("name".to_string(), string_to_proto_value("Test Product")); + product_data.insert( + format!("{}_id", context.category_table), + Value { kind: Some(Kind::NumberValue(category_id as f64)) }, + ); + let product_request = PostTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.product_table.clone(), + data: product_data, + }; + let product_response = + post_table_data(&context.pool, product_request, &context.indexer_tx) + .await + .unwrap(); + let product_id = product_response.inserted_id; + + // Act: Attempt to update the product's required foreign key to NULL. + let mut update_data = HashMap::new(); + update_data.insert( + format!("{}_id", context.category_table), + Value { kind: Some(Kind::NullValue(0)) }, + ); + + let update_request = PutTableDataRequest { + profile_name: context.profile_name.clone(), + table_name: context.product_table.clone(), + id: product_id, + data: update_data, + }; + + let result = + put_table_data(&context.pool, update_request, &context.indexer_tx) + .await; + + // Assert: The operation should fail due to a database constraint. + assert!( + result.is_err(), + "Update of required foreign key to NULL should fail" + ); + let err = result.unwrap_err(); + // The database will likely return a NOT NULL violation, which our handler + // wraps as an Internal error. + assert_eq!(err.code(), tonic::Code::Internal); + assert!(err.message().contains("Update failed")); +}