From 5df02baa80d7a55cd603146a967559e286ac0ce9 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Thu, 5 Feb 2026 15:33:41 +0100 Subject: [PATCH] fixed status orchestration --- .../service/gateway/driver/tron/transfer.go | 3 +- .../orchestrator/card_payout_submit.go | 2 - .../gateway_execution_consumer.go | 116 ++++++++++++------ .../service/orchestrator/handlers_events.go | 22 +--- .../orchestrator/payment_plan_executor.go | 9 -- .../orchestrator/storage/model/payment.go | 1 + 6 files changed, 85 insertions(+), 68 deletions(-) diff --git a/api/gateway/tron/internal/service/gateway/driver/tron/transfer.go b/api/gateway/tron/internal/service/gateway/driver/tron/transfer.go index 5eab782f..d261e404 100644 --- a/api/gateway/tron/internal/service/gateway/driver/tron/transfer.go +++ b/api/gateway/tron/internal/service/gateway/driver/tron/transfer.go @@ -73,7 +73,8 @@ func SubmitTransferNative( if contract != "" { normalizedContract, err := normalizeAddress(contract) if err != nil { - logger.Warn("Invalid TRON contract address", zap.String("contract_address", contract), zap.String("transfer_ref", transfer.TransferRef), zap.Error(err)) + logger.Warn("Invalid TRON contract address", zap.String("transfer_ref", transfer.TransferRef), + zap.String("contract_address", contract), zap.Error(err)) return "", err } contract = normalizedContract diff --git a/api/payments/orchestrator/internal/service/orchestrator/card_payout_submit.go b/api/payments/orchestrator/internal/service/orchestrator/card_payout_submit.go index 0f3407ed..eacac6c9 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/card_payout_submit.go +++ b/api/payments/orchestrator/internal/service/orchestrator/card_payout_submit.go @@ -314,8 +314,6 @@ func applyCardPayoutUpdate(payment *model.Payment, payout *mntxv1.CardPayoutStat } - payment.State = mapMntxStatusToState(payout.GetStatus()) - switch payout.GetStatus() { case mntxv1.PayoutStatus_PAYOUT_STATUS_SUCCESS: diff --git a/api/payments/orchestrator/internal/service/orchestrator/gateway_execution_consumer.go b/api/payments/orchestrator/internal/service/orchestrator/gateway_execution_consumer.go index d62bbb84..3ef0c74d 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/gateway_execution_consumer.go +++ b/api/payments/orchestrator/internal/service/orchestrator/gateway_execution_consumer.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "fmt" "strings" paymodel "github.com/tech/sendico/payments/orchestrator/storage/model" @@ -80,6 +81,7 @@ func (s *Service) onGatewayExecution(ctx context.Context, exec *model.PaymentGat payment, err := store.GetByPaymentRef(ctx, paymentRef) if err != nil { + s.logger.Warn("Failed to fetch payment from database", zap.Error(err)) return err } @@ -92,8 +94,8 @@ func (s *Service) onGatewayExecution(ctx context.Context, exec *model.PaymentGat payment.Metadata["gateway_request_idempotency"] = exec.IdempotencyKey // --- update exactly ONE step - updated := updateExecutionStepsFromGatewayExecution(s.logger, payment, exec) - if !updated { + + if payment.State, err = updateExecutionStepsFromGatewayExecution(s.logger, payment, exec); err != nil { s.logger.Warn("No execution step matched gateway result", zap.String("payment_ref", paymentRef), zap.String("operation_ref", exec.OperationRef), @@ -137,45 +139,43 @@ func updateExecutionStepsFromGatewayExecution( logger mlogger.Logger, payment *paymodel.Payment, exec *model.PaymentGatewayExecution, -) bool { +) (paymodel.PaymentState, error) { + + log := logger.With( + zap.String("payment_ref", payment.PaymentRef), + zap.String("operation_ref", strings.TrimSpace(exec.OperationRef)), + zap.String("gateway_status", string(exec.Status)), + ) + + log.Debug("gateway execution received") if payment == nil || payment.PaymentPlan == nil || exec == nil { - logger.Warn("updateExecutionSteps: invalid input", - zap.String("payment_ref", payment.PaymentRef), - ) - return false + log.Warn("invalid input: payment/plan/exec is nil") + return paymodel.PaymentStateSubmitted, + merrors.DataConflict("payment is missing plan or execution step") } operationRef := strings.TrimSpace(exec.OperationRef) if operationRef == "" { - logger.Warn("updateExecutionSteps: empty operation_ref from gateway", - zap.String("payment_ref", payment.PaymentRef), - ) - return false + log.Warn("empty operation_ref from gateway") + return paymodel.PaymentStateSubmitted, + merrors.InvalidArgument("no operation reference provided") } execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan) if execPlan == nil { - logger.Error("updateExecutionSteps: execution plan missing", - zap.String("payment_ref", payment.PaymentRef), - ) - return false + log.Warn("Execution plan missing") + return paymodel.PaymentStateSubmitted, merrors.InvalidArgument("execution plan missing") } status := executionStepStatusFromGatewayStatus(exec.Status) if status == "" { - logger.Warn("updateExecutionSteps: unknown gateway status", - zap.String("payment_ref", payment.PaymentRef), - zap.String("gateway_status", string(exec.Status)), - ) - return false + log.Warn("Unknown gateway status") + return paymodel.PaymentStateSubmitted, + merrors.DataConflict(fmt.Sprintf("unknown gateway status: %s", exec.Status)) } - logger.Debug("updateExecutionSteps: matching by operation_ref", - zap.String("payment_ref", payment.PaymentRef), - zap.String("operation_ref", operationRef), - zap.String("mapped_status", string(status)), - ) + var matched bool for idx, execStep := range execPlan.Steps { if execStep == nil { @@ -184,37 +184,83 @@ func updateExecutionStepsFromGatewayExecution( if strings.EqualFold(strings.TrimSpace(execStep.OperationRef), operationRef) { - logger.Debug("updateExecutionSteps: matched execution step", - zap.String("payment_ref", payment.PaymentRef), + log.Debug("Execution step matched", zap.Int("step_index", idx), zap.String("step_code", execStep.Code), zap.String("prev_state", string(execStep.State)), ) - // update transfer ref if not set yet if execStep.TransferRef == "" && exec.TransferRef != "" { execStep.TransferRef = strings.TrimSpace(exec.TransferRef) + log.Debug("Transfer_ref attached to step", zap.String("transfer_ref", execStep.TransferRef)) } setExecutionStepStatus(execStep, status) - logger.Debug("updateExecutionSteps: step state updated", - zap.String("payment_ref", payment.PaymentRef), + log.Debug("Execution step state updated", zap.Int("step_index", idx), zap.String("step_code", execStep.Code), zap.String("new_state", string(execStep.State)), ) - return true + matched = true + break } } - logger.Error("updateExecutionSteps: no execution step found for operation_ref", - zap.String("payment_ref", payment.PaymentRef), - zap.String("operation_ref", operationRef), + if !matched { + log.Warn("No execution step found for operation_ref") + return paymodel.PaymentStateSubmitted, + merrors.InvalidArgument( + fmt.Sprintf("execution step not found for operation reference: %s", operationRef), + ) + } + + // -------- GLOBAL REDUCTION -------- + + var ( + hasSuccess bool + allDone = true ) - return false + for idx, step := range execPlan.Steps { + if step == nil { + continue + } + + log.Debug("Evaluating step for payment state", + zap.Int("step_index", idx), + zap.String("step_code", step.Code), + zap.String("step_state", string(step.State)), + ) + + switch step.State { + case paymodel.OperationStateFailed: + payment.FailureReason = step.Error + log.Info("Payment marked as FAILED due to step failure", + zap.String("failed_step_code", step.Code), + zap.String("error", step.Error), + ) + return paymodel.PaymentStateFailed, nil + + case paymodel.OperationStateSuccess: + hasSuccess = true + + case paymodel.OperationStateSkipped: + // ok + + default: + allDone = false + } + } + + if hasSuccess && allDone { + log.Info("Payment marked as SUCCESS (all steps completed)") + return paymodel.PaymentStateSuccess, nil + } + + log.Info("Payment still PROCESSING (steps not finished)") + return paymodel.PaymentStateSubmitted, nil } func executionStepStatusFromGatewayStatus(status rail.OperationResult) paymodel.OperationState { diff --git a/api/payments/orchestrator/internal/service/orchestrator/handlers_events.go b/api/payments/orchestrator/internal/service/orchestrator/handlers_events.go index becd52a3..121111a6 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/handlers_events.go +++ b/api/payments/orchestrator/internal/service/orchestrator/handlers_events.go @@ -90,20 +90,9 @@ func (h *paymentEventHandler) processTransferUpdate(ctx context.Context, req *or if err := h.resumePlan(ctx, store, payment); err != nil { return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, err) } - } else { - payment.State = model.PaymentStateSubmitted - if err := store.Update(ctx, payment); err != nil { - return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, err) - } } return gsresponse.Success(&orchestratorv1.ProcessTransferUpdateResponse{Payment: toProtoPayment(payment)}) case chainv1.TransferStatus_TRANSFER_WAITING: - if payment.State != model.PaymentStateFailed && payment.State != model.PaymentStateCancelled && payment.State != model.PaymentStateSettled { - payment.State = model.PaymentStateSubmitted - } - if err := store.Update(ctx, payment); err != nil { - return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, err) - } return gsresponse.Success(&orchestratorv1.ProcessTransferUpdateResponse{Payment: toProtoPayment(payment)}) default: if err := store.Update(ctx, payment); err != nil { @@ -134,9 +123,7 @@ func (h *paymentEventHandler) processTransferUpdate(ctx context.Context, req *or case chainv1.TransferStatus_TRANSFER_SUCCESS: if payment.State != model.PaymentStateFailed && payment.State != model.PaymentStateCancelled && payment.State != model.PaymentStateSettled { if cardPayoutDependenciesConfirmed(payment.PaymentPlan, payment.ExecutionPlan) { - if payment.Execution.CardPayoutRef != "" { - payment.State = model.PaymentStateSubmitted - } else { + if payment.Execution.CardPayoutRef == "" { payment.State = model.PaymentStateFundsReserved if h.submitCardPayout == nil { h.logger.Warn("card payout execution skipped", zap.String("payment_ref", payment.PaymentRef)) @@ -145,18 +132,11 @@ func (h *paymentEventHandler) processTransferUpdate(ctx context.Context, req *or payment.FailureCode = model.PaymentFailureCodePolicy payment.FailureReason = strings.TrimSpace(err.Error()) h.logger.Warn("card payout execution failed", zap.Error(err), zap.String("payment_ref", payment.PaymentRef)) - } else { - payment.State = model.PaymentStateSubmitted } } - } else { - payment.State = model.PaymentStateSubmitted } } case chainv1.TransferStatus_TRANSFER_WAITING: - if payment.State != model.PaymentStateFailed && payment.State != model.PaymentStateCancelled && payment.State != model.PaymentStateSettled { - payment.State = model.PaymentStateSubmitted - } default: // keep current state } diff --git a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor.go b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor.go index 26bb4034..51fdedf8 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor.go +++ b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor.go @@ -251,15 +251,6 @@ func (p *paymentExecutor) executePaymentPlan( return nil } - if payment.State != model.PaymentStateSubmitted && - payment.State != model.PaymentStateFundsReserved { - - payment.State = model.PaymentStateSubmitted - if err := store.Update(ctx, payment); err != nil { - return err - } - } - if payment.ExecutionPlan == nil { logger.Debug("Initializing execution plan from payment plan") payment.ExecutionPlan = ensureExecutionPlanForPlan(payment, payment.PaymentPlan) diff --git a/api/payments/orchestrator/storage/model/payment.go b/api/payments/orchestrator/storage/model/payment.go index d86591b5..3b3f540a 100644 --- a/api/payments/orchestrator/storage/model/payment.go +++ b/api/payments/orchestrator/storage/model/payment.go @@ -49,6 +49,7 @@ const ( PaymentStateAccepted PaymentState = "accepted" PaymentStateFundsReserved PaymentState = "funds_reserved" PaymentStateSubmitted PaymentState = "submitted" + PaymentStateSuccess PaymentState = "success" PaymentStateSettled PaymentState = "settled" PaymentStateFailed PaymentState = "failed" PaymentStateCancelled PaymentState = "cancelled"