fixed status orchestration

This commit is contained in:
Stephan D
2026-02-05 15:33:41 +01:00
parent 7417b33de3
commit 5df02baa80
6 changed files with 85 additions and 68 deletions

View File

@@ -314,8 +314,6 @@ func applyCardPayoutUpdate(payment *model.Payment, payout *mntxv1.CardPayoutStat
}
payment.State = mapMntxStatusToState(payout.GetStatus())
switch payout.GetStatus() {
case mntxv1.PayoutStatus_PAYOUT_STATUS_SUCCESS:

View File

@@ -2,6 +2,7 @@ package orchestrator
import (
"context"
"fmt"
"strings"
paymodel "github.com/tech/sendico/payments/orchestrator/storage/model"
@@ -80,6 +81,7 @@ func (s *Service) onGatewayExecution(ctx context.Context, exec *model.PaymentGat
payment, err := store.GetByPaymentRef(ctx, paymentRef)
if err != nil {
s.logger.Warn("Failed to fetch payment from database", zap.Error(err))
return err
}
@@ -92,8 +94,8 @@ func (s *Service) onGatewayExecution(ctx context.Context, exec *model.PaymentGat
payment.Metadata["gateway_request_idempotency"] = exec.IdempotencyKey
// --- update exactly ONE step
updated := updateExecutionStepsFromGatewayExecution(s.logger, payment, exec)
if !updated {
if payment.State, err = updateExecutionStepsFromGatewayExecution(s.logger, payment, exec); err != nil {
s.logger.Warn("No execution step matched gateway result",
zap.String("payment_ref", paymentRef),
zap.String("operation_ref", exec.OperationRef),
@@ -137,45 +139,43 @@ func updateExecutionStepsFromGatewayExecution(
logger mlogger.Logger,
payment *paymodel.Payment,
exec *model.PaymentGatewayExecution,
) bool {
) (paymodel.PaymentState, error) {
log := logger.With(
zap.String("payment_ref", payment.PaymentRef),
zap.String("operation_ref", strings.TrimSpace(exec.OperationRef)),
zap.String("gateway_status", string(exec.Status)),
)
log.Debug("gateway execution received")
if payment == nil || payment.PaymentPlan == nil || exec == nil {
logger.Warn("updateExecutionSteps: invalid input",
zap.String("payment_ref", payment.PaymentRef),
)
return false
log.Warn("invalid input: payment/plan/exec is nil")
return paymodel.PaymentStateSubmitted,
merrors.DataConflict("payment is missing plan or execution step")
}
operationRef := strings.TrimSpace(exec.OperationRef)
if operationRef == "" {
logger.Warn("updateExecutionSteps: empty operation_ref from gateway",
zap.String("payment_ref", payment.PaymentRef),
)
return false
log.Warn("empty operation_ref from gateway")
return paymodel.PaymentStateSubmitted,
merrors.InvalidArgument("no operation reference provided")
}
execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan)
if execPlan == nil {
logger.Error("updateExecutionSteps: execution plan missing",
zap.String("payment_ref", payment.PaymentRef),
)
return false
log.Warn("Execution plan missing")
return paymodel.PaymentStateSubmitted, merrors.InvalidArgument("execution plan missing")
}
status := executionStepStatusFromGatewayStatus(exec.Status)
if status == "" {
logger.Warn("updateExecutionSteps: unknown gateway status",
zap.String("payment_ref", payment.PaymentRef),
zap.String("gateway_status", string(exec.Status)),
)
return false
log.Warn("Unknown gateway status")
return paymodel.PaymentStateSubmitted,
merrors.DataConflict(fmt.Sprintf("unknown gateway status: %s", exec.Status))
}
logger.Debug("updateExecutionSteps: matching by operation_ref",
zap.String("payment_ref", payment.PaymentRef),
zap.String("operation_ref", operationRef),
zap.String("mapped_status", string(status)),
)
var matched bool
for idx, execStep := range execPlan.Steps {
if execStep == nil {
@@ -184,37 +184,83 @@ func updateExecutionStepsFromGatewayExecution(
if strings.EqualFold(strings.TrimSpace(execStep.OperationRef), operationRef) {
logger.Debug("updateExecutionSteps: matched execution step",
zap.String("payment_ref", payment.PaymentRef),
log.Debug("Execution step matched",
zap.Int("step_index", idx),
zap.String("step_code", execStep.Code),
zap.String("prev_state", string(execStep.State)),
)
// update transfer ref if not set yet
if execStep.TransferRef == "" && exec.TransferRef != "" {
execStep.TransferRef = strings.TrimSpace(exec.TransferRef)
log.Debug("Transfer_ref attached to step", zap.String("transfer_ref", execStep.TransferRef))
}
setExecutionStepStatus(execStep, status)
logger.Debug("updateExecutionSteps: step state updated",
zap.String("payment_ref", payment.PaymentRef),
log.Debug("Execution step state updated",
zap.Int("step_index", idx),
zap.String("step_code", execStep.Code),
zap.String("new_state", string(execStep.State)),
)
return true
matched = true
break
}
}
logger.Error("updateExecutionSteps: no execution step found for operation_ref",
zap.String("payment_ref", payment.PaymentRef),
zap.String("operation_ref", operationRef),
if !matched {
log.Warn("No execution step found for operation_ref")
return paymodel.PaymentStateSubmitted,
merrors.InvalidArgument(
fmt.Sprintf("execution step not found for operation reference: %s", operationRef),
)
}
// -------- GLOBAL REDUCTION --------
var (
hasSuccess bool
allDone = true
)
return false
for idx, step := range execPlan.Steps {
if step == nil {
continue
}
log.Debug("Evaluating step for payment state",
zap.Int("step_index", idx),
zap.String("step_code", step.Code),
zap.String("step_state", string(step.State)),
)
switch step.State {
case paymodel.OperationStateFailed:
payment.FailureReason = step.Error
log.Info("Payment marked as FAILED due to step failure",
zap.String("failed_step_code", step.Code),
zap.String("error", step.Error),
)
return paymodel.PaymentStateFailed, nil
case paymodel.OperationStateSuccess:
hasSuccess = true
case paymodel.OperationStateSkipped:
// ok
default:
allDone = false
}
}
if hasSuccess && allDone {
log.Info("Payment marked as SUCCESS (all steps completed)")
return paymodel.PaymentStateSuccess, nil
}
log.Info("Payment still PROCESSING (steps not finished)")
return paymodel.PaymentStateSubmitted, nil
}
func executionStepStatusFromGatewayStatus(status rail.OperationResult) paymodel.OperationState {

View File

@@ -90,20 +90,9 @@ func (h *paymentEventHandler) processTransferUpdate(ctx context.Context, req *or
if err := h.resumePlan(ctx, store, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
} else {
payment.State = model.PaymentStateSubmitted
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
}
return gsresponse.Success(&orchestratorv1.ProcessTransferUpdateResponse{Payment: toProtoPayment(payment)})
case chainv1.TransferStatus_TRANSFER_WAITING:
if payment.State != model.PaymentStateFailed && payment.State != model.PaymentStateCancelled && payment.State != model.PaymentStateSettled {
payment.State = model.PaymentStateSubmitted
}
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Success(&orchestratorv1.ProcessTransferUpdateResponse{Payment: toProtoPayment(payment)})
default:
if err := store.Update(ctx, payment); err != nil {
@@ -134,9 +123,7 @@ func (h *paymentEventHandler) processTransferUpdate(ctx context.Context, req *or
case chainv1.TransferStatus_TRANSFER_SUCCESS:
if payment.State != model.PaymentStateFailed && payment.State != model.PaymentStateCancelled && payment.State != model.PaymentStateSettled {
if cardPayoutDependenciesConfirmed(payment.PaymentPlan, payment.ExecutionPlan) {
if payment.Execution.CardPayoutRef != "" {
payment.State = model.PaymentStateSubmitted
} else {
if payment.Execution.CardPayoutRef == "" {
payment.State = model.PaymentStateFundsReserved
if h.submitCardPayout == nil {
h.logger.Warn("card payout execution skipped", zap.String("payment_ref", payment.PaymentRef))
@@ -145,18 +132,11 @@ func (h *paymentEventHandler) processTransferUpdate(ctx context.Context, req *or
payment.FailureCode = model.PaymentFailureCodePolicy
payment.FailureReason = strings.TrimSpace(err.Error())
h.logger.Warn("card payout execution failed", zap.Error(err), zap.String("payment_ref", payment.PaymentRef))
} else {
payment.State = model.PaymentStateSubmitted
}
}
} else {
payment.State = model.PaymentStateSubmitted
}
}
case chainv1.TransferStatus_TRANSFER_WAITING:
if payment.State != model.PaymentStateFailed && payment.State != model.PaymentStateCancelled && payment.State != model.PaymentStateSettled {
payment.State = model.PaymentStateSubmitted
}
default:
// keep current state
}

View File

@@ -251,15 +251,6 @@ func (p *paymentExecutor) executePaymentPlan(
return nil
}
if payment.State != model.PaymentStateSubmitted &&
payment.State != model.PaymentStateFundsReserved {
payment.State = model.PaymentStateSubmitted
if err := store.Update(ctx, payment); err != nil {
return err
}
}
if payment.ExecutionPlan == nil {
logger.Debug("Initializing execution plan from payment plan")
payment.ExecutionPlan = ensureExecutionPlanForPlan(payment, payment.PaymentPlan)