fixed succcess operation matching #661
@@ -104,9 +104,12 @@ func (s *Service) onPaymentGatewayExecution(ctx context.Context, msg *pmodel.Pay
|
|||||||
|
|
||||||
event, ok := buildGatewayExecutionEvent(payment, msg)
|
event, ok := buildGatewayExecutionEvent(payment, msg)
|
||||||
if !ok {
|
if !ok {
|
||||||
s.logger.Debug("Skipping payment gateway execution event with unsupported status",
|
s.logger.Debug("Dropping payment gateway execution event",
|
||||||
zap.String("payment_ref", paymentRef),
|
zap.String("payment_ref", paymentRef),
|
||||||
zap.String("status", strings.TrimSpace(string(msg.Status))),
|
zap.String("status", strings.TrimSpace(string(msg.Status))),
|
||||||
|
zap.String("operation_ref", strings.TrimSpace(msg.OperationRef)),
|
||||||
|
zap.String("transfer_ref", strings.TrimSpace(msg.TransferRef)),
|
||||||
|
zap.String("drop_reason", gatewayExecutionDropReason(payment, msg)),
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -138,9 +141,15 @@ func buildGatewayExecutionEvent(payment *agg.Payment, msg *pmodel.PaymentGateway
|
|||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
stepRef, gatewayInstanceID := matchExecutionStep(payment, msg)
|
|
||||||
operationRef := strings.TrimSpace(msg.OperationRef)
|
operationRef := strings.TrimSpace(msg.OperationRef)
|
||||||
transferRef := strings.TrimSpace(msg.TransferRef)
|
transferRef := strings.TrimSpace(msg.TransferRef)
|
||||||
|
stepRef, gatewayInstanceID, matched := matchExecutionStep(payment, msg)
|
||||||
|
// Drop unmatched events that include correlation refs. This prevents
|
||||||
|
// unrelated gateway events (for the same payment_ref) from being applied to
|
||||||
|
// a running observe step via fallback inference.
|
||||||
|
if !matched && (operationRef != "" || transferRef != "") {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
if stepRef == "" && operationRef == "" && transferRef == "" {
|
if stepRef == "" && operationRef == "" && transferRef == "" {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
@@ -185,33 +194,58 @@ func mapGatewayExecutionStatus(status rail.OperationResult) (erecon.GatewayStatu
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func matchExecutionStep(payment *agg.Payment, msg *pmodel.PaymentGatewayExecution) (stepRef string, gatewayInstanceID string) {
|
func matchExecutionStep(payment *agg.Payment, msg *pmodel.PaymentGatewayExecution) (stepRef string, gatewayInstanceID string, matched bool) {
|
||||||
if payment == nil || msg == nil {
|
if payment == nil || msg == nil {
|
||||||
return "", ""
|
return "", "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
transferRef := strings.TrimSpace(msg.TransferRef)
|
transferRef := strings.TrimSpace(msg.TransferRef)
|
||||||
if transferRef != "" {
|
if transferRef != "" {
|
||||||
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindTransfer, transferRef); ok {
|
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindTransfer, transferRef); ok {
|
||||||
return stepRef, gatewayInstanceID
|
return stepRef, gatewayInstanceID, true
|
||||||
}
|
}
|
||||||
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindCardPayout, transferRef); ok {
|
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindCardPayout, transferRef); ok {
|
||||||
return stepRef, gatewayInstanceID
|
return stepRef, gatewayInstanceID, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
operationRef := strings.TrimSpace(msg.OperationRef)
|
operationRef := strings.TrimSpace(msg.OperationRef)
|
||||||
if operationRef != "" {
|
if operationRef != "" {
|
||||||
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindOperation, operationRef); ok {
|
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindOperation, operationRef); ok {
|
||||||
return stepRef, gatewayInstanceID
|
return stepRef, gatewayInstanceID, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fallback inference is allowed only when the event has no refs at all.
|
||||||
|
// If refs are present but unmatched, treat it as unrelated and skip.
|
||||||
|
if transferRef != "" || operationRef != "" {
|
||||||
|
return "", "", false
|
||||||
|
}
|
||||||
|
|
||||||
candidates := runningObserveCandidates(payment)
|
candidates := runningObserveCandidates(payment)
|
||||||
if len(candidates) == 1 {
|
if len(candidates) == 1 {
|
||||||
return candidates[0].stepRef, candidates[0].gatewayInstanceID
|
return candidates[0].stepRef, candidates[0].gatewayInstanceID, true
|
||||||
}
|
}
|
||||||
return "", ""
|
return "", "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
func gatewayExecutionDropReason(payment *agg.Payment, msg *pmodel.PaymentGatewayExecution) string {
|
||||||
|
if msg == nil {
|
||||||
|
return "nil_event"
|
||||||
|
}
|
||||||
|
if _, ok := mapGatewayExecutionStatus(msg.Status); !ok {
|
||||||
|
return "unsupported_status"
|
||||||
|
}
|
||||||
|
operationRef := strings.TrimSpace(msg.OperationRef)
|
||||||
|
transferRef := strings.TrimSpace(msg.TransferRef)
|
||||||
|
_, _, matched := matchExecutionStep(payment, msg)
|
||||||
|
if (operationRef != "" || transferRef != "") && !matched {
|
||||||
|
return "unmatched_refs"
|
||||||
|
}
|
||||||
|
if operationRef == "" && transferRef == "" && !matched {
|
||||||
|
return "missing_refs_and_no_observe_candidate"
|
||||||
|
}
|
||||||
|
return "not_accepted"
|
||||||
}
|
}
|
||||||
|
|
||||||
func findStepByExternalRef(payment *agg.Payment, kind, ref string) (stepRef string, gatewayInstanceID string, ok bool) {
|
func findStepByExternalRef(payment *agg.Payment, kind, ref string) (stepRef string, gatewayInstanceID string, ok bool) {
|
||||||
|
|||||||
@@ -134,6 +134,73 @@ func TestBuildGatewayExecutionEvent_MatchesCardObserveByCardPayoutRef(t *testing
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBuildGatewayExecutionEvent_SkipsUnmatchedRefsEvenWithSingleRunningObserve(t *testing.T) {
|
||||||
|
orgID := bson.NewObjectID()
|
||||||
|
payment := &agg.Payment{
|
||||||
|
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
|
||||||
|
PaymentRef: "payment-settlement-1",
|
||||||
|
StepExecutions: []agg.StepExecution{
|
||||||
|
{
|
||||||
|
StepRef: "hop_2_settlement_observe",
|
||||||
|
StepCode: "hop.2.settlement.observe",
|
||||||
|
State: agg.StepStateRunning,
|
||||||
|
ExternalRefs: []agg.ExternalRef{
|
||||||
|
{
|
||||||
|
GatewayInstanceID: "payment_gateway_settlement",
|
||||||
|
Kind: erecon.ExternalRefKindTransfer,
|
||||||
|
Ref: "settlement-transfer-ref",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// This models a foreign success event (e.g. crypto fee transfer) that should
|
||||||
|
// never close settlement observe by "single running observe" fallback.
|
||||||
|
_, ok := buildGatewayExecutionEvent(payment, &pm.PaymentGatewayExecution{
|
||||||
|
PaymentRef: payment.PaymentRef,
|
||||||
|
Status: rail.OperationResultSuccess,
|
||||||
|
OperationRef: "payment-1:hop_1_crypto_send:fee",
|
||||||
|
TransferRef: "fee-transfer-ref",
|
||||||
|
})
|
||||||
|
if ok {
|
||||||
|
t.Fatal("expected unmatched gateway execution event to be skipped")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildGatewayExecutionEvent_AllowsSingleRunningObserveFallbackWhenRefsMissing(t *testing.T) {
|
||||||
|
orgID := bson.NewObjectID()
|
||||||
|
payment := &agg.Payment{
|
||||||
|
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
|
||||||
|
PaymentRef: "payment-settlement-2",
|
||||||
|
StepExecutions: []agg.StepExecution{
|
||||||
|
{
|
||||||
|
StepRef: "hop_2_settlement_observe",
|
||||||
|
StepCode: "hop.2.settlement.observe",
|
||||||
|
State: agg.StepStateRunning,
|
||||||
|
ExternalRefs: []agg.ExternalRef{
|
||||||
|
{
|
||||||
|
GatewayInstanceID: "payment_gateway_settlement",
|
||||||
|
Kind: erecon.ExternalRefKindTransfer,
|
||||||
|
Ref: "settlement-transfer-ref",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
event, ok := buildGatewayExecutionEvent(payment, &pm.PaymentGatewayExecution{
|
||||||
|
PaymentRef: payment.PaymentRef,
|
||||||
|
Status: rail.OperationResultSuccess,
|
||||||
|
})
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("expected gateway execution event to be accepted")
|
||||||
|
}
|
||||||
|
if got, want := event.StepRef, "hop_2_settlement_observe"; got != want {
|
||||||
|
t.Fatalf("step_ref mismatch: got=%q want=%q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestOnPaymentGatewayExecution_ReconcilesUsingGlobalPaymentLookup(t *testing.T) {
|
func TestOnPaymentGatewayExecution_ReconcilesUsingGlobalPaymentLookup(t *testing.T) {
|
||||||
orgID := bson.NewObjectID()
|
orgID := bson.NewObjectID()
|
||||||
payment := &agg.Payment{
|
payment := &agg.Payment{
|
||||||
|
|||||||
Reference in New Issue
Block a user