fixed succcess operation matching #661

Merged
tech merged 1 commits from po-660 into main 2026-03-05 11:24:54 +00:00
2 changed files with 110 additions and 9 deletions

View File

@@ -104,9 +104,12 @@ func (s *Service) onPaymentGatewayExecution(ctx context.Context, msg *pmodel.Pay
event, ok := buildGatewayExecutionEvent(payment, msg)
if !ok {
s.logger.Debug("Skipping payment gateway execution event with unsupported status",
s.logger.Debug("Dropping payment gateway execution event",
zap.String("payment_ref", paymentRef),
zap.String("status", strings.TrimSpace(string(msg.Status))),
zap.String("operation_ref", strings.TrimSpace(msg.OperationRef)),
zap.String("transfer_ref", strings.TrimSpace(msg.TransferRef)),
zap.String("drop_reason", gatewayExecutionDropReason(payment, msg)),
)
return nil
}
@@ -138,9 +141,15 @@ func buildGatewayExecutionEvent(payment *agg.Payment, msg *pmodel.PaymentGateway
return nil, false
}
stepRef, gatewayInstanceID := matchExecutionStep(payment, msg)
operationRef := strings.TrimSpace(msg.OperationRef)
transferRef := strings.TrimSpace(msg.TransferRef)
stepRef, gatewayInstanceID, matched := matchExecutionStep(payment, msg)
// Drop unmatched events that include correlation refs. This prevents
// unrelated gateway events (for the same payment_ref) from being applied to
// a running observe step via fallback inference.
if !matched && (operationRef != "" || transferRef != "") {
return nil, false
}
if stepRef == "" && operationRef == "" && transferRef == "" {
return nil, false
}
@@ -185,33 +194,58 @@ func mapGatewayExecutionStatus(status rail.OperationResult) (erecon.GatewayStatu
}
}
func matchExecutionStep(payment *agg.Payment, msg *pmodel.PaymentGatewayExecution) (stepRef string, gatewayInstanceID string) {
func matchExecutionStep(payment *agg.Payment, msg *pmodel.PaymentGatewayExecution) (stepRef string, gatewayInstanceID string, matched bool) {
if payment == nil || msg == nil {
return "", ""
return "", "", false
}
transferRef := strings.TrimSpace(msg.TransferRef)
if transferRef != "" {
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindTransfer, transferRef); ok {
return stepRef, gatewayInstanceID
return stepRef, gatewayInstanceID, true
}
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindCardPayout, transferRef); ok {
return stepRef, gatewayInstanceID
return stepRef, gatewayInstanceID, true
}
}
operationRef := strings.TrimSpace(msg.OperationRef)
if operationRef != "" {
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindOperation, operationRef); ok {
return stepRef, gatewayInstanceID
return stepRef, gatewayInstanceID, true
}
}
// Fallback inference is allowed only when the event has no refs at all.
// If refs are present but unmatched, treat it as unrelated and skip.
if transferRef != "" || operationRef != "" {
return "", "", false
}
candidates := runningObserveCandidates(payment)
if len(candidates) == 1 {
return candidates[0].stepRef, candidates[0].gatewayInstanceID
return candidates[0].stepRef, candidates[0].gatewayInstanceID, true
}
return "", ""
return "", "", false
}
func gatewayExecutionDropReason(payment *agg.Payment, msg *pmodel.PaymentGatewayExecution) string {
if msg == nil {
return "nil_event"
}
if _, ok := mapGatewayExecutionStatus(msg.Status); !ok {
return "unsupported_status"
}
operationRef := strings.TrimSpace(msg.OperationRef)
transferRef := strings.TrimSpace(msg.TransferRef)
_, _, matched := matchExecutionStep(payment, msg)
if (operationRef != "" || transferRef != "") && !matched {
return "unmatched_refs"
}
if operationRef == "" && transferRef == "" && !matched {
return "missing_refs_and_no_observe_candidate"
}
return "not_accepted"
}
func findStepByExternalRef(payment *agg.Payment, kind, ref string) (stepRef string, gatewayInstanceID string, ok bool) {

View File

@@ -134,6 +134,73 @@ func TestBuildGatewayExecutionEvent_MatchesCardObserveByCardPayoutRef(t *testing
}
}
func TestBuildGatewayExecutionEvent_SkipsUnmatchedRefsEvenWithSingleRunningObserve(t *testing.T) {
orgID := bson.NewObjectID()
payment := &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "payment-settlement-1",
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_2_settlement_observe",
StepCode: "hop.2.settlement.observe",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{
GatewayInstanceID: "payment_gateway_settlement",
Kind: erecon.ExternalRefKindTransfer,
Ref: "settlement-transfer-ref",
},
},
},
},
}
// This models a foreign success event (e.g. crypto fee transfer) that should
// never close settlement observe by "single running observe" fallback.
_, ok := buildGatewayExecutionEvent(payment, &pm.PaymentGatewayExecution{
PaymentRef: payment.PaymentRef,
Status: rail.OperationResultSuccess,
OperationRef: "payment-1:hop_1_crypto_send:fee",
TransferRef: "fee-transfer-ref",
})
if ok {
t.Fatal("expected unmatched gateway execution event to be skipped")
}
}
func TestBuildGatewayExecutionEvent_AllowsSingleRunningObserveFallbackWhenRefsMissing(t *testing.T) {
orgID := bson.NewObjectID()
payment := &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "payment-settlement-2",
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_2_settlement_observe",
StepCode: "hop.2.settlement.observe",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{
GatewayInstanceID: "payment_gateway_settlement",
Kind: erecon.ExternalRefKindTransfer,
Ref: "settlement-transfer-ref",
},
},
},
},
}
event, ok := buildGatewayExecutionEvent(payment, &pm.PaymentGatewayExecution{
PaymentRef: payment.PaymentRef,
Status: rail.OperationResultSuccess,
})
if !ok {
t.Fatal("expected gateway execution event to be accepted")
}
if got, want := event.StepRef, "hop_2_settlement_observe"; got != want {
t.Fatalf("step_ref mismatch: got=%q want=%q", got, want)
}
}
func TestOnPaymentGatewayExecution_ReconcilesUsingGlobalPaymentLookup(t *testing.T) {
orgID := bson.NewObjectID()
payment := &agg.Payment{