diff --git a/api-reference-v2/openapi_spec.json b/api-reference-v2/openapi_spec.json index 892931256bc..521141e9a18 100644 --- a/api-reference-v2/openapi_spec.json +++ b/api-reference-v2/openapi_spec.json @@ -8422,7 +8422,7 @@ }, "is_delivery_successful": { "type": "boolean", - "description": "Indicates whether the webhook delivery attempt was successful." + "description": "Indicates whether the webhook was ultimately delivery." }, "initial_attempt_id": { "type": "string", diff --git a/api-reference/openapi_spec.json b/api-reference/openapi_spec.json index c5368b6f256..b4d465050c3 100644 --- a/api-reference/openapi_spec.json +++ b/api-reference/openapi_spec.json @@ -5219,6 +5219,16 @@ "type": "string", "nullable": true } + }, + { + "name": "is_delivered", + "in": "query", + "description": "Only include Events which are ultimately delivered to the merchant.", + "required": false, + "schema": { + "type": "boolean", + "nullable": true + } } ], "responses": { @@ -11109,7 +11119,7 @@ }, "is_delivery_successful": { "type": "boolean", - "description": "Indicates whether the webhook delivery attempt was successful." + "description": "Indicates whether the webhook was ultimately delivery." }, "initial_attempt_id": { "type": "string", @@ -25543,6 +25553,7 @@ }, "TotalEventsResponse": { "type": "object", + "description": "The response body of list initial delivery attempts api call.", "required": [ "events", "total_count" @@ -25552,11 +25563,13 @@ "type": "array", "items": { "$ref": "#/components/schemas/EventListItemResponse" - } + }, + "description": "The list of events" }, "total_count": { "type": "integer", - "format": "int64" + "format": "int64", + "description": "Count of total events" } } }, diff --git a/crates/api_models/src/webhook_events.rs b/crates/api_models/src/webhook_events.rs index c0e275c199e..826c55f60d4 100644 --- a/crates/api_models/src/webhook_events.rs +++ b/crates/api_models/src/webhook_events.rs @@ -28,6 +28,9 @@ pub struct EventListConstraints { /// Filter all events associated with the specified business profile ID. #[schema(value_type = Option)] pub profile_id: Option, + + // Filter all events by overall_delivery_status. + pub is_delivered: Option, } #[derive(Debug)] @@ -37,6 +40,7 @@ pub enum EventListConstraintsInternal { created_before: Option, limit: Option, offset: Option, + is_delivered: Option, }, ObjectIdFilter { object_id: String, @@ -68,7 +72,7 @@ pub struct EventListItemResponse { /// Specifies the class of event (the type of object: Payment, Refund, etc.) pub event_class: EventClass, - /// Indicates whether the webhook delivery attempt was successful. + /// Indicates whether the webhook was ultimately delivery. pub is_delivery_successful: bool, /// The identifier for the initial delivery attempt. This will be the same as `event_id` for diff --git a/crates/diesel_models/src/events.rs b/crates/diesel_models/src/events.rs index 82b2b58f80b..531cb13127d 100644 --- a/crates/diesel_models/src/events.rs +++ b/crates/diesel_models/src/events.rs @@ -26,6 +26,7 @@ pub struct EventNew { pub response: Option, pub delivery_attempt: Option, pub metadata: Option, + pub is_overall_delivery_successful: bool, } #[derive(Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay)] @@ -33,6 +34,7 @@ pub struct EventNew { pub struct EventUpdateInternal { pub is_webhook_notified: Option, pub response: Option, + pub is_overall_delivery_successful: Option, } #[derive(Clone, Debug, Deserialize, Serialize, Identifiable, Queryable, Selectable)] @@ -57,6 +59,7 @@ pub struct Event { pub response: Option, pub delivery_attempt: Option, pub metadata: Option, + pub is_overall_delivery_successful: bool, } #[derive(Clone, Debug, Deserialize, Serialize, AsExpression, diesel::FromSqlRow)] diff --git a/crates/diesel_models/src/query/events.rs b/crates/diesel_models/src/query/events.rs index eb1c4de5662..f5d87cb993e 100644 --- a/crates/diesel_models/src/query/events.rs +++ b/crates/diesel_models/src/query/events.rs @@ -56,6 +56,7 @@ impl Event { created_before: time::PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, ) -> StorageResult> { use async_bb8_diesel::AsyncRunQueryDsl; use diesel::{debug_query, pg::Pg, QueryDsl}; @@ -81,6 +82,7 @@ impl Event { (dsl::created_at, created_after, created_before), limit, offset, + is_delivered, ); logger::debug!(query = %debug_query::(&query).to_string()); @@ -134,6 +136,7 @@ impl Event { created_before: time::PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, ) -> StorageResult> { use async_bb8_diesel::AsyncRunQueryDsl; use diesel::{debug_query, pg::Pg, QueryDsl}; @@ -159,6 +162,7 @@ impl Event { (dsl::created_at, created_after, created_before), limit, offset, + is_delivered, ); logger::debug!(query = %debug_query::(&query).to_string()); @@ -217,6 +221,7 @@ impl Event { ), limit: Option, offset: Option, + is_delivered: Option, ) -> T where T: diesel::query_dsl::methods::LimitDsl @@ -233,6 +238,10 @@ impl Event { diesel::dsl::Eq, Output = T, >, + T: diesel::query_dsl::methods::FilterDsl< + diesel::dsl::Eq, + Output = T, + >, { if let Some(profile_id) = profile_id { query = query.filter(dsl::business_profile_id.eq(profile_id)); @@ -250,6 +259,10 @@ impl Event { query = query.offset(offset); } + if let Some(is_delivered) = is_delivered { + query = query.filter(dsl::is_overall_delivery_successful.eq(is_delivered)); + } + query } @@ -259,6 +272,7 @@ impl Event { profile_id: Option, created_after: time::PrimitiveDateTime, created_before: time::PrimitiveDateTime, + is_delivered: Option, ) -> StorageResult { use async_bb8_diesel::AsyncRunQueryDsl; use diesel::{debug_query, pg::Pg, QueryDsl}; @@ -284,6 +298,7 @@ impl Event { (dsl::created_at, created_after, created_before), None, None, + is_delivered, ); logger::debug!(query = %debug_query::(&query).to_string()); diff --git a/crates/diesel_models/src/schema.rs b/crates/diesel_models/src/schema.rs index 7930ad5d9ae..f82a7d3929c 100644 --- a/crates/diesel_models/src/schema.rs +++ b/crates/diesel_models/src/schema.rs @@ -467,6 +467,7 @@ diesel::table! { response -> Nullable, delivery_attempt -> Nullable, metadata -> Nullable, + is_overall_delivery_successful -> Bool, } } diff --git a/crates/diesel_models/src/schema_v2.rs b/crates/diesel_models/src/schema_v2.rs index 13abf8ee64b..1e21b51af4c 100644 --- a/crates/diesel_models/src/schema_v2.rs +++ b/crates/diesel_models/src/schema_v2.rs @@ -479,6 +479,7 @@ diesel::table! { response -> Nullable, delivery_attempt -> Nullable, metadata -> Nullable, + is_overall_delivery_successful -> Bool, } } diff --git a/crates/openapi/src/routes/webhook_events.rs b/crates/openapi/src/routes/webhook_events.rs index c65a61768f4..5bee7109fb4 100644 --- a/crates/openapi/src/routes/webhook_events.rs +++ b/crates/openapi/src/routes/webhook_events.rs @@ -45,6 +45,11 @@ Query, description = "Only include Events associated with the Profile identified by the specified Profile ID." ), + ( + "is_delivered" = Option, + Query, + description = "Only include Events which are ultimately delivered to the merchant." + ), ), responses( (status = 200, description = "List of Events retrieved successfully", body = TotalEventsResponse), diff --git a/crates/router/src/core/webhooks/outgoing.rs b/crates/router/src/core/webhooks/outgoing.rs index 4bb45327d87..ebef5c04493 100644 --- a/crates/router/src/core/webhooks/outgoing.rs +++ b/crates/router/src/core/webhooks/outgoing.rs @@ -133,6 +133,7 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook( response: None, delivery_attempt: Some(delivery_attempt), metadata: Some(event_metadata), + is_overall_delivery_successful: false, }; let event_insert_result = state @@ -822,7 +823,7 @@ async fn update_event_in_storage( .attach_printable("Failed to encrypt outgoing webhook response content")?, ), }; - state + let current_event = state .store .update_event_by_merchant_id_event_id( key_manager_state, @@ -832,7 +833,36 @@ async fn update_event_in_storage( &merchant_key_store, ) .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)?; + + let parent_update = domain::EventUpdate::ParentUpdate { + is_overall_delivery_successful: status_code.is_success(), + }; + + let parent_event_id = current_event.initial_attempt_id.clone(); + let delivery_attempt = current_event.delivery_attempt; + + if let Some(( + parent_event_id, + enums::WebhookDeliveryAttempt::InitialAttempt + | enums::WebhookDeliveryAttempt::AutomaticRetry, + )) = parent_event_id.zip(delivery_attempt) + { + state + .store + .update_event_by_merchant_id_event_id( + key_manager_state, + merchant_id, + parent_event_id.as_str(), + parent_update, + &merchant_key_store, + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + .attach_printable("Failed to update parent event")?; + } + + Ok(current_event) } fn increment_webhook_outgoing_received_count(merchant_id: &common_utils::id_type::MerchantId) { diff --git a/crates/router/src/core/webhooks/webhook_events.rs b/crates/router/src/core/webhooks/webhook_events.rs index b1bbc963037..8e1ac593475 100644 --- a/crates/router/src/core/webhooks/webhook_events.rs +++ b/crates/router/src/core/webhooks/webhook_events.rs @@ -64,6 +64,7 @@ pub async fn list_initial_delivery_attempts( created_before, limit, offset, + is_delivered } => { let limit = match limit { Some(limit) if limit <= INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_LIMIT => Ok(Some(limit)), @@ -114,6 +115,7 @@ pub async fn list_initial_delivery_attempts( created_before, limit, offset, + is_delivered, &key_store, ) .await, @@ -124,6 +126,7 @@ pub async fn list_initial_delivery_attempts( created_before, limit, offset, + is_delivered, &key_store, ) .await, @@ -143,12 +146,15 @@ pub async fn list_initial_delivery_attempts( .unwrap_or(events_list_begin_time); let created_before = api_constraints.created_before.unwrap_or(now); + let is_delivered = api_constraints.is_delivered; + let total_count = store .count_initial_events_by_constraints( &merchant_id, profile_id, created_after, created_before, + is_delivered, ) .await .change_context(errors::ApiErrorResponse::InternalServerError) @@ -269,6 +275,7 @@ pub async fn retry_delivery_attempt( response: None, delivery_attempt: Some(delivery_attempt), metadata: event_to_retry.metadata, + is_overall_delivery_successful: false, }; let event = store diff --git a/crates/router/src/db/events.rs b/crates/router/src/db/events.rs index b15dd9bb5bd..0efcd771a3e 100644 --- a/crates/router/src/db/events.rs +++ b/crates/router/src/db/events.rs @@ -53,6 +53,7 @@ where created_before: time::PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError>; @@ -81,6 +82,7 @@ where created_before: time::PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError>; @@ -99,6 +101,7 @@ where profile_id: Option, created_after: time::PrimitiveDateTime, created_before: time::PrimitiveDateTime, + is_delivered: Option, ) -> CustomResult; } @@ -193,6 +196,7 @@ impl EventInterface for Store { created_before: time::PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { let conn = connection::pg_connection_read(self).await?; @@ -203,6 +207,7 @@ impl EventInterface for Store { created_before, limit, offset, + is_delivered, ) .await .map_err(|error| report!(errors::StorageError::from(error))) @@ -304,6 +309,7 @@ impl EventInterface for Store { created_before: time::PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { let conn = connection::pg_connection_read(self).await?; @@ -314,6 +320,7 @@ impl EventInterface for Store { created_before, limit, offset, + is_delivered, ) .await .map_err(|error| report!(errors::StorageError::from(error))) @@ -366,6 +373,7 @@ impl EventInterface for Store { profile_id: Option, created_after: time::PrimitiveDateTime, created_before: time::PrimitiveDateTime, + is_delivered: Option, ) -> CustomResult { let conn = connection::pg_connection_read(self).await?; storage::Event::count_initial_attempts_by_constraints( @@ -374,6 +382,7 @@ impl EventInterface for Store { profile_id, created_after, created_before, + is_delivered, ) .await .map_err(|error| report!(errors::StorageError::from(error))) @@ -483,17 +492,23 @@ impl EventInterface for MockDb { created_before: time::PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { let locked_events = self.events.lock().await; let events_iter = locked_events.iter().filter(|event| { - let check = event.merchant_id == Some(merchant_id.to_owned()) + let mut check = event.merchant_id == Some(merchant_id.to_owned()) && event.initial_attempt_id.as_ref() == Some(&event.event_id) && (event.created_at >= created_after) && (event.created_at <= created_before); + if let Some(is_delivered) = is_delivered { + check = check && (event.is_overall_delivery_successful == is_delivered); + } + check }); + let offset: usize = if let Some(offset) = offset { if offset < 0 { Err(errors::StorageError::MockDbError)?; @@ -614,15 +629,20 @@ impl EventInterface for MockDb { created_before: time::PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { let locked_events = self.events.lock().await; let events_iter = locked_events.iter().filter(|event| { - let check = event.business_profile_id == Some(profile_id.to_owned()) + let mut check = event.business_profile_id == Some(profile_id.to_owned()) && event.initial_attempt_id.as_ref() == Some(&event.event_id) && (event.created_at >= created_after) && (event.created_at <= created_before); + if let Some(is_delivered) = is_delivered { + check = check && (event.is_overall_delivery_successful == is_delivered); + } + check }); @@ -694,6 +714,9 @@ impl EventInterface for MockDb { event_to_update.is_webhook_notified = is_webhook_notified; event_to_update.response = response.map(Into::into); } + domain::EventUpdate::ParentUpdate { + is_overall_delivery_successful, + } => event_to_update.is_overall_delivery_successful = is_overall_delivery_successful, } event_to_update @@ -713,16 +736,21 @@ impl EventInterface for MockDb { profile_id: Option, created_after: time::PrimitiveDateTime, created_before: time::PrimitiveDateTime, + is_delivered: Option, ) -> CustomResult { let locked_events = self.events.lock().await; let iter_events = locked_events.iter().filter(|event| { - let check = event.initial_attempt_id.as_ref() == Some(&event.event_id) + let mut check = event.initial_attempt_id.as_ref() == Some(&event.event_id) && (event.merchant_id == Some(merchant_id.to_owned())) && (event.business_profile_id == profile_id) && (event.created_at >= created_after) && (event.created_at <= created_before); + if let Some(is_delivered) = is_delivered { + check = check && (event.is_overall_delivery_successful == is_delivered); + } + check }); @@ -843,6 +871,7 @@ mod tests { ) .unwrap(), }), + is_overall_delivery_successful: false, }, &merchant_key_store, ) diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 23cbdf303f1..d0fbe87f9c6 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -750,6 +750,7 @@ impl EventInterface for KafkaStore { created_before: PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { self.diesel_store @@ -760,6 +761,7 @@ impl EventInterface for KafkaStore { created_before, limit, offset, + is_delivered, merchant_key_store, ) .await @@ -807,6 +809,7 @@ impl EventInterface for KafkaStore { created_before: PrimitiveDateTime, limit: Option, offset: Option, + is_delivered: Option, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { self.diesel_store @@ -817,6 +820,7 @@ impl EventInterface for KafkaStore { created_before, limit, offset, + is_delivered, merchant_key_store, ) .await @@ -847,6 +851,7 @@ impl EventInterface for KafkaStore { profile_id: Option, created_after: PrimitiveDateTime, created_before: PrimitiveDateTime, + is_delivered: Option, ) -> CustomResult { self.diesel_store .count_initial_events_by_constraints( @@ -854,6 +859,7 @@ impl EventInterface for KafkaStore { profile_id, created_after, created_before, + is_delivered, ) .await } diff --git a/crates/router/src/types/domain/event.rs b/crates/router/src/types/domain/event.rs index db2d7530164..ddb0460ef52 100644 --- a/crates/router/src/types/domain/event.rs +++ b/crates/router/src/types/domain/event.rs @@ -19,24 +19,58 @@ use crate::{ #[derive(Clone, Debug, router_derive::ToEncryption)] pub struct Event { + /// A string that uniquely identifies the event. pub event_id: String, + + /// Represents the type of event for the webhook. pub event_type: EventType, + + /// Represents the class of event for the webhook. pub event_class: EventClass, + + /// Indicates whether the current webhook delivery was successful. pub is_webhook_notified: bool, + + /// Reference to the object for which the webhook was created. pub primary_object_id: String, + + /// Reference to the object type for which the webhook was created. pub primary_object_type: EventObjectType, + + /// The timestamp when the webhook was created. pub created_at: time::PrimitiveDateTime, + + /// Merchant identifier to which the webhook was sent. pub merchant_id: Option, + + /// Business Profile identifier to which the webhook was sent. pub business_profile_id: Option, + + /// The timestamp when the primary object was created. pub primary_object_created_at: Option, + + /// This allows the event to be uniquely identified to prevent multiple processing. pub idempotent_event_id: Option, + + /// Links to the initial attempt of the event. pub initial_attempt_id: Option, + + /// This field contains the encrypted request data sent as part of the event. #[encrypt] pub request: Option>>, + + /// This field contains the encrypted response data received as part of the event. #[encrypt] pub response: Option>>, + + /// Represents the event delivery type. pub delivery_attempt: Option, + + /// Holds any additional data related to the event. pub metadata: Option, + + /// Indicates whether the event was ultimately delivered. + pub is_overall_delivery_successful: bool, } #[derive(Debug)] @@ -45,6 +79,9 @@ pub enum EventUpdate { is_webhook_notified: bool, response: OptionalEncryptableSecretString, }, + ParentUpdate { + is_overall_delivery_successful: bool, + }, } impl From for EventUpdateInternal { @@ -56,6 +93,14 @@ impl From for EventUpdateInternal { } => Self { is_webhook_notified: Some(is_webhook_notified), response: response.map(Into::into), + is_overall_delivery_successful: None, + }, + EventUpdate::ParentUpdate { + is_overall_delivery_successful, + } => Self { + is_webhook_notified: None, + response: None, + is_overall_delivery_successful: Some(is_overall_delivery_successful), }, } } @@ -84,6 +129,7 @@ impl super::behaviour::Conversion for Event { response: self.response.map(Into::into), delivery_attempt: self.delivery_attempt, metadata: self.metadata, + is_overall_delivery_successful: self.is_overall_delivery_successful, }) } @@ -133,6 +179,7 @@ impl super::behaviour::Conversion for Event { response: encryptable_event.response, delivery_attempt: item.delivery_attempt, metadata: item.metadata, + is_overall_delivery_successful: item.is_overall_delivery_successful, }) } @@ -154,6 +201,7 @@ impl super::behaviour::Conversion for Event { response: self.response.map(Into::into), delivery_attempt: self.delivery_attempt, metadata: self.metadata, + is_overall_delivery_successful: self.is_overall_delivery_successful, }) } } diff --git a/crates/router/src/types/transformers.rs b/crates/router/src/types/transformers.rs index 06682f596ed..92b5bcdb08e 100644 --- a/crates/router/src/types/transformers.rs +++ b/crates/router/src/types/transformers.rs @@ -1895,6 +1895,7 @@ impl ForeignTryFrom created_before: item.created_before, limit: item.limit.map(i64::from), offset: item.offset.map(i64::from), + is_delivered: item.is_delivered, }), } } @@ -1930,7 +1931,7 @@ impl TryFrom for api_models::webhook_events::EventListItemRespons object_id: item.primary_object_id, event_type: item.event_type, event_class: item.event_class, - is_delivery_successful: item.is_webhook_notified, + is_delivery_successful: item.is_overall_delivery_successful, initial_attempt_id, created: item.created_at, }) diff --git a/crates/router/src/workflows/outgoing_webhook_retry.rs b/crates/router/src/workflows/outgoing_webhook_retry.rs index c7df4aeff0c..8424de26648 100644 --- a/crates/router/src/workflows/outgoing_webhook_retry.rs +++ b/crates/router/src/workflows/outgoing_webhook_retry.rs @@ -117,6 +117,7 @@ impl ProcessTrackerWorkflow for OutgoingWebhookRetryWorkflow { response: None, delivery_attempt: Some(delivery_attempt), metadata: initial_event.metadata, + is_overall_delivery_successful: false, }; let event = db diff --git a/migrations/2025-02-19-072457_add_is_overall_delivery_successful_column_in_events_table/down.sql b/migrations/2025-02-19-072457_add_is_overall_delivery_successful_column_in_events_table/down.sql new file mode 100644 index 00000000000..e71de9ac7c6 --- /dev/null +++ b/migrations/2025-02-19-072457_add_is_overall_delivery_successful_column_in_events_table/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE events DROP COLUMN IF EXISTS is_overall_delivery_successful; \ No newline at end of file diff --git a/migrations/2025-02-19-072457_add_is_overall_delivery_successful_column_in_events_table/up.sql b/migrations/2025-02-19-072457_add_is_overall_delivery_successful_column_in_events_table/up.sql new file mode 100644 index 00000000000..ab6653f6fc5 --- /dev/null +++ b/migrations/2025-02-19-072457_add_is_overall_delivery_successful_column_in_events_table/up.sql @@ -0,0 +1,2 @@ +-- Your SQL goes here +ALTER TABLE events ADD COLUMN IF NOT EXISTS is_overall_delivery_successful BOOLEAN NOT NULL DEFAULT FALSE;