Improved payment handling

This commit is contained in:
Stephan D
2026-02-25 19:25:51 +01:00
parent da11be526a
commit af4b68f4c7
65 changed files with 3890 additions and 259 deletions

View File

@@ -58,9 +58,9 @@ func (i *Imp) Start() error {
if broker != nil {
opts = append(opts, orchestrator.WithPaymentGatewayBroker(broker))
}
svc := orchestrator.NewService(logger, repo, opts...)
svc, err := orchestrator.NewService(logger, repo, opts...)
i.service = svc
return svc, nil
return svc, err
}
app, err := grpcapp.NewApp(i.logger, "payments.orchestrator", cfg.Config, i.debug, repoFactory, serviceFactory)

View File

@@ -43,54 +43,54 @@ const (
// StepShell defines one initial step telemetry item.
type StepShell struct {
StepRef string
StepCode string
StepRef string `bson:"stepRef" json:"stepRef"`
StepCode string `bson:"stepCode" json:"stepCode"`
}
// StepExecution is runtime telemetry for one step.
type StepExecution struct {
StepRef string
StepCode string
State StepState
Attempt uint32
StartedAt *time.Time
CompletedAt *time.Time
FailureCode string
FailureMsg string
ExternalRefs []ExternalRef
StepRef string `bson:"stepRef" json:"stepRef"`
StepCode string `bson:"stepCode" json:"stepCode"`
State StepState `bson:"state" json:"state"`
Attempt uint32 `bson:"attempt" json:"attempt"`
StartedAt *time.Time `bson:"startedAt,omitempty" json:"startedAt,omitempty"`
CompletedAt *time.Time `bson:"completedAt,omitempty" json:"completedAt,omitempty"`
FailureCode string `bson:"failureCode,omitempty" json:"failureCode,omitempty"`
FailureMsg string `bson:"failureMsg,omitempty" json:"failureMsg,omitempty"`
ExternalRefs []ExternalRef `bson:"externalRefs,omitempty" json:"externalRefs,omitempty"`
}
// ExternalRef links step execution to an external operation.
type ExternalRef struct {
GatewayInstanceID string
Kind string
Ref string
GatewayInstanceID string `bson:"gatewayInstanceId,omitempty" json:"gatewayInstanceId,omitempty"`
Kind string `bson:"kind" json:"kind"`
Ref string `bson:"ref" json:"ref"`
}
// Input defines payload for creating an initial payment aggregate.
type Input struct {
OrganizationRef bson.ObjectID
IdempotencyKey string
QuotationRef string
ClientPaymentRef string
IntentSnapshot model.PaymentIntent
QuoteSnapshot *model.PaymentQuoteSnapshot
Steps []StepShell
OrganizationRef bson.ObjectID `bson:"organizationRef" json:"organizationRef"`
IdempotencyKey string `bson:"idempotencyKey" json:"idempotencyKey"`
QuotationRef string `bson:"quotationRef" json:"quotationRef"`
ClientPaymentRef string `bson:"clientPaymentRef,omitempty" json:"clientPaymentRef,omitempty"`
IntentSnapshot model.PaymentIntent `bson:"intentSnapshot" json:"intentSnapshot"`
QuoteSnapshot *model.PaymentQuoteSnapshot `bson:"quoteSnapshot" json:"quoteSnapshot"`
Steps []StepShell `bson:"steps,omitempty" json:"steps,omitempty"`
}
// Payment is orchestration-v2 runtime aggregate.
type Payment struct {
storable.Base
pm.OrganizationBoundBase
PaymentRef string
IdempotencyKey string
QuotationRef string
ClientPaymentRef string
IntentSnapshot model.PaymentIntent
QuoteSnapshot *model.PaymentQuoteSnapshot
State State
Version uint64
StepExecutions []StepExecution
storable.Base `bson:",inline" json:",inline"`
pm.OrganizationBoundBase `bson:",inline" json:",inline"`
PaymentRef string `bson:"paymentRef" json:"paymentRef"`
IdempotencyKey string `bson:"idempotencyKey" json:"idempotencyKey"`
QuotationRef string `bson:"quotationRef" json:"quotationRef"`
ClientPaymentRef string `bson:"clientPaymentRef,omitempty" json:"clientPaymentRef,omitempty"`
IntentSnapshot model.PaymentIntent `bson:"intentSnapshot" json:"intentSnapshot"`
QuoteSnapshot *model.PaymentQuoteSnapshot `bson:"quoteSnapshot" json:"quoteSnapshot"`
State State `bson:"state" json:"state"`
Version uint64 `bson:"version" json:"version"`
StepExecutions []StepExecution `bson:"stepExecutions,omitempty" json:"stepExecutions,omitempty"`
}
// Dependencies configures aggregate factory integrations.
@@ -108,7 +108,7 @@ func New(deps ...Dependencies) Factory {
logger = zap.NewNop()
}
return &svc{
logger: logger.Named("agg"),
logger: logger.Named("aggregator"),
now: func() time.Time { return time.Now().UTC() },
newID: func() bson.ObjectID {
return bson.NewObjectID()

View File

@@ -9,15 +9,23 @@ import (
)
type normalizedEvent struct {
stepRef string
matchRefs []agg.ExternalRef
appendRefs []agg.ExternalRef
targetState agg.StepState
failureCode string
failureMsg string
occurredAt *time.Time
forceAggregateFailed bool
forceAggregateNeedsAttention bool
stepRef string `bson:"stepRef"`
matchRefs []agg.ExternalRef `bson:"matchRefs"`
appendRefs []agg.ExternalRef `bson:"appendRefs"`
targetState agg.StepState `bson:"targetState"`
failureInfo *failureInfo `bson:"failure,omitempty"`
forceAggregate *forceAggregate `bson:"forceAggregate,omitempty"`
}
type failureInfo struct {
code string `bson:"code"`
msg string `bson:"message"`
occurredAt *time.Time `bson:"occurredAt,omitempty"`
}
type forceAggregate struct {
failed bool `bson:"failed"`
needsAttention bool `bson:"needsAttention"`
}
func normalizeEvent(event Event) (*normalizedEvent, error) {
@@ -48,6 +56,56 @@ func countPayloads(event Event) int {
return count
}
func (e *normalizedEvent) failureCodeValue() string {
if e == nil || e.failureInfo == nil {
return ""
}
return strings.TrimSpace(e.failureInfo.code)
}
func (e *normalizedEvent) failureMsgValue() string {
if e == nil || e.failureInfo == nil {
return ""
}
return strings.TrimSpace(e.failureInfo.msg)
}
func (e *normalizedEvent) occurredAtValue() *time.Time {
if e == nil || e.failureInfo == nil {
return nil
}
return e.failureInfo.occurredAt
}
func (e *normalizedEvent) forceAggregateFailedValue() bool {
return e != nil && e.forceAggregate != nil && e.forceAggregate.failed
}
func (e *normalizedEvent) forceAggregateNeedsAttentionValue() bool {
return e != nil && e.forceAggregate != nil && e.forceAggregate.needsAttention
}
func buildFailureInfo(code, msg string, occurredAt *time.Time) *failureInfo {
if code == "" && msg == "" && occurredAt == nil {
return nil
}
return &failureInfo{
code: code,
msg: msg,
occurredAt: occurredAt,
}
}
func buildForceAggregate(failed, needsAttention bool) *forceAggregate {
if !failed && !needsAttention {
return nil
}
return &forceAggregate{
failed: failed,
needsAttention: needsAttention,
}
}
func normalizeGatewayEvent(src GatewayEvent) (*normalizedEvent, error) {
status, ok := normalizeGatewayStatus(src.Status)
if !ok {
@@ -55,14 +113,16 @@ func normalizeGatewayEvent(src GatewayEvent) (*normalizedEvent, error) {
}
target, needsAttention := mapFailureTarget(status, src.Retryable)
failureCode := strings.TrimSpace(src.FailureCode)
failureMsg := strings.TrimSpace(src.FailureMsg)
if target == agg.StepStateFailed && failureMsg == "" {
failureMsg = "gateway operation failed"
}
ev := &normalizedEvent{
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureCode: strings.TrimSpace(src.FailureCode),
failureMsg: strings.TrimSpace(src.FailureMsg),
occurredAt: normalizeTimePtr(src.OccurredAt),
forceAggregateFailed: src.TerminalFailure,
forceAggregateNeedsAttention: needsAttention,
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureInfo: buildFailureInfo(failureCode, failureMsg, normalizeTimePtr(src.OccurredAt)),
forceAggregate: buildForceAggregate(src.TerminalFailure, needsAttention),
}
ev.matchRefs = normalizeRefList([]agg.ExternalRef{
{
@@ -81,9 +141,6 @@ func normalizeGatewayEvent(src GatewayEvent) (*normalizedEvent, error) {
if ev.stepRef == "" && len(ev.matchRefs) == 0 {
return nil, merrors.InvalidArgument("gateway event must include step_ref or operation/transfer reference")
}
if ev.targetState == agg.StepStateFailed && ev.failureMsg == "" {
ev.failureMsg = "gateway operation failed"
}
return ev, nil
}
@@ -94,14 +151,16 @@ func normalizeLedgerEvent(src LedgerEvent) (*normalizedEvent, error) {
}
target, needsAttention := mapFailureTarget(status, src.Retryable)
failureCode := strings.TrimSpace(src.FailureCode)
failureMsg := strings.TrimSpace(src.FailureMsg)
if target == agg.StepStateFailed && failureMsg == "" {
failureMsg = "ledger operation failed"
}
ev := &normalizedEvent{
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureCode: strings.TrimSpace(src.FailureCode),
failureMsg: strings.TrimSpace(src.FailureMsg),
occurredAt: normalizeTimePtr(src.OccurredAt),
forceAggregateFailed: src.TerminalFailure,
forceAggregateNeedsAttention: needsAttention,
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureInfo: buildFailureInfo(failureCode, failureMsg, normalizeTimePtr(src.OccurredAt)),
forceAggregate: buildForceAggregate(src.TerminalFailure, needsAttention),
}
ev.matchRefs = normalizeRefList([]agg.ExternalRef{
{
@@ -114,9 +173,6 @@ func normalizeLedgerEvent(src LedgerEvent) (*normalizedEvent, error) {
if ev.stepRef == "" && len(ev.matchRefs) == 0 {
return nil, merrors.InvalidArgument("ledger event must include step_ref or entry_ref")
}
if ev.targetState == agg.StepStateFailed && ev.failureMsg == "" {
ev.failureMsg = "ledger operation failed"
}
return ev, nil
}
@@ -127,14 +183,16 @@ func normalizeCardEvent(src CardEvent) (*normalizedEvent, error) {
}
target, needsAttention := mapFailureTarget(status, src.Retryable)
failureCode := strings.TrimSpace(src.FailureCode)
failureMsg := strings.TrimSpace(src.FailureMsg)
if target == agg.StepStateFailed && failureMsg == "" {
failureMsg = "card payout failed"
}
ev := &normalizedEvent{
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureCode: strings.TrimSpace(src.FailureCode),
failureMsg: strings.TrimSpace(src.FailureMsg),
occurredAt: normalizeTimePtr(src.OccurredAt),
forceAggregateFailed: src.TerminalFailure,
forceAggregateNeedsAttention: needsAttention,
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureInfo: buildFailureInfo(failureCode, failureMsg, normalizeTimePtr(src.OccurredAt)),
forceAggregate: buildForceAggregate(src.TerminalFailure, needsAttention),
}
ev.matchRefs = normalizeRefList([]agg.ExternalRef{
{
@@ -148,9 +206,6 @@ func normalizeCardEvent(src CardEvent) (*normalizedEvent, error) {
if ev.stepRef == "" && len(ev.matchRefs) == 0 {
return nil, merrors.InvalidArgument("card event must include step_ref or payout_ref")
}
if ev.targetState == agg.StepStateFailed && ev.failureMsg == "" {
ev.failureMsg = "card payout failed"
}
return ev, nil
}

View File

@@ -140,7 +140,7 @@ func New(deps ...Dependencies) Reconciler {
now = defaultNow
}
return &svc{
logger: logger.Named("erecon"),
logger: logger.Named("reconciler"),
now: now,
}
}

View File

@@ -18,7 +18,7 @@ func deriveAggregateTarget(payment *agg.Payment, event *normalizedEvent, sm osta
if payment == nil {
return agg.StateUnspecified
}
if event != nil && event.forceAggregateFailed {
if event != nil && event.forceAggregateFailedValue() {
return agg.StateFailed
}
@@ -48,7 +48,7 @@ func deriveAggregateTarget(payment *agg.Payment, event *normalizedEvent, sm osta
if allTerminalSuccessOrSkipped {
return agg.StateSettled
}
if hasNeedsAttention || (event != nil && event.forceAggregateNeedsAttention) {
if hasNeedsAttention || (event != nil && event.forceAggregateNeedsAttentionValue()) {
return agg.StateNeedsAttention
}
if hasWork {

View File

@@ -177,8 +177,8 @@ func (s *svc) applyStepDiagnostics(step *agg.StepExecution, event *normalizedEve
now := s.now().UTC()
at := now
if event.occurredAt != nil {
at = event.occurredAt.UTC()
if eventAt := event.occurredAtValue(); eventAt != nil {
at = eventAt.UTC()
}
changed := false
@@ -222,8 +222,8 @@ func (s *svc) applyStepDiagnostics(step *agg.StepExecution, event *normalizedEve
step.CompletedAt = &at
changed = true
}
fc := strings.TrimSpace(event.failureCode)
fm := strings.TrimSpace(event.failureMsg)
fc := event.failureCodeValue()
fm := event.failureMsgValue()
if step.FailureCode != fc || step.FailureMsg != fm {
step.FailureCode = fc
step.FailureMsg = fm

View File

@@ -57,5 +57,5 @@ func New(deps ...Dependencies) Service {
if logger == nil {
logger = zap.NewNop()
}
return &svc{logger: logger.Named("idem")}
return &svc{logger: logger.Named("idempotency")}
}

View File

@@ -29,7 +29,7 @@ func newService(deps Dependencies) (Observer, error) {
if logger == nil {
logger = zap.NewNop()
}
logger = logger.Named("oobs")
logger = logger.Named("observer")
metrics := deps.Metrics
if metrics == nil {

View File

@@ -31,5 +31,5 @@ func New(deps ...Dependencies) StateMachine {
if logger == nil {
logger = zap.NewNop()
}
return &svc{logger: logger.Named("ostate")}
return &svc{logger: logger.Named("state_machine")}
}

View File

@@ -53,7 +53,7 @@ func newService(deps Dependencies) (Service, error) {
logger = zap.NewNop()
}
return &svc{
logger: logger.Named("pquery"),
logger: logger.Named("repository"),
repo: deps.Repository,
}, nil
}

View File

@@ -15,6 +15,11 @@ func requiredIndexes() []*indexDefinition {
},
Unique: true,
},
{
Keys: []ri.Key{
{Field: "paymentRef", Sort: ri.Asc},
},
},
{
Keys: []ri.Key{
{Field: "organizationRef", Sort: ri.Asc},
@@ -36,5 +41,11 @@ func requiredIndexes() []*indexDefinition {
{Field: "createdAt", Sort: ri.Desc},
},
},
{
Keys: []ri.Key{
{Field: "state", Sort: ri.Asc},
{Field: "createdAt", Sort: ri.Desc},
},
},
}
}

View File

@@ -15,9 +15,11 @@ type Repository interface {
Create(ctx context.Context, payment *agg.Payment) error
UpdateCAS(ctx context.Context, payment *agg.Payment, expectedVersion uint64) error
GetByPaymentRef(ctx context.Context, orgRef bson.ObjectID, paymentRef string) (*agg.Payment, error)
GetByPaymentRefGlobal(ctx context.Context, paymentRef string) (*agg.Payment, error)
GetByIdempotencyKey(ctx context.Context, orgRef bson.ObjectID, idempotencyKey string) (*agg.Payment, error)
ListByQuotationRef(ctx context.Context, in ListByQuotationRefInput) (*ListOutput, error)
ListByState(ctx context.Context, in ListByStateInput) (*ListOutput, error)
ListByStateGlobal(ctx context.Context, in ListByStateGlobalInput) (*ListOutput, error)
}
// ListCursor is a stable pagination cursor sorted by created_at desc then id desc.
@@ -48,6 +50,13 @@ type ListByStateInput struct {
Cursor *ListCursor
}
// ListByStateGlobalInput defines global listing scope by aggregate state.
type ListByStateGlobalInput struct {
State agg.State
Limit int32
Cursor *ListCursor
}
// Dependencies configures repository integrations.
type Dependencies struct {
Logger mlogger.Logger

View File

@@ -122,6 +122,12 @@ func (s *mongoStore) GetByPaymentRef(ctx context.Context, orgRef bson.ObjectID,
})
}
func (s *mongoStore) GetByPaymentRefGlobal(ctx context.Context, paymentRef string) (*paymentDocument, error) {
return s.findOne(ctx, bson.D{
{Key: "paymentRef", Value: paymentRef},
})
}
func (s *mongoStore) GetByIdempotencyKey(ctx context.Context, orgRef bson.ObjectID, idempotencyKey string) (*paymentDocument, error) {
return s.findOne(ctx, bson.D{
{Key: "organizationRef", Value: orgRef},
@@ -152,6 +158,13 @@ func (s *mongoStore) ListByState(ctx context.Context, orgRef bson.ObjectID, stat
return s.list(ctx, filter, cursor, limit)
}
func (s *mongoStore) ListByStateGlobal(ctx context.Context, state agg.State, cursor *listCursor, limit int64) ([]*paymentDocument, error) {
filter := bson.D{
{Key: "state", Value: state},
}
return s.list(ctx, filter, cursor, limit)
}
func (s *mongoStore) findOne(ctx context.Context, filter bson.D) (*paymentDocument, error) {
if s.collection == nil {
return nil, merrors.InvalidArgument("payment repository v2: mongo collection is required")

View File

@@ -10,6 +10,7 @@ import (
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mutil/mzap"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
)
@@ -29,10 +30,12 @@ type paymentStore interface {
Create(ctx context.Context, doc *paymentDocument) error
UpdateCAS(ctx context.Context, doc *paymentDocument, expectedVersion uint64) (bool, error)
GetByPaymentRef(ctx context.Context, orgRef bson.ObjectID, paymentRef string) (*paymentDocument, error)
GetByPaymentRefGlobal(ctx context.Context, paymentRef string) (*paymentDocument, error)
GetByIdempotencyKey(ctx context.Context, orgRef bson.ObjectID, idempotencyKey string) (*paymentDocument, error)
GetByID(ctx context.Context, orgRef bson.ObjectID, id bson.ObjectID) (*paymentDocument, error)
ListByQuotationRef(ctx context.Context, orgRef bson.ObjectID, quotationRef string, cursor *listCursor, limit int64) ([]*paymentDocument, error)
ListByState(ctx context.Context, orgRef bson.ObjectID, state agg.State, cursor *listCursor, limit int64) ([]*paymentDocument, error)
ListByStateGlobal(ctx context.Context, state agg.State, cursor *listCursor, limit int64) ([]*paymentDocument, error)
}
type svc struct {
@@ -56,7 +59,7 @@ func newWithStoreLogger(store paymentStore, logger mlogger.Logger) (Repository,
return nil, err
}
return &svc{
logger: logger.Named("prepo"),
logger: logger.Named("repository"),
store: store,
now: func() time.Time {
return time.Now().UTC()
@@ -209,6 +212,43 @@ func (s *svc) GetByPaymentRef(ctx context.Context, orgRef bson.ObjectID, payment
return payment, err
}
func (s *svc) GetByPaymentRefGlobal(ctx context.Context, paymentRef string) (payment *agg.Payment, err error) {
logger := s.logger
requestPaymentRef := strings.TrimSpace(paymentRef)
logger.Debug("Starting Get by payment ref global",
zap.String("payment_ref", requestPaymentRef),
)
defer func(start time.Time) {
fields := []zap.Field{
zap.Int64("duration_ms", time.Since(start).Milliseconds()),
zap.String("payment_ref", requestPaymentRef),
}
if payment != nil {
fields = append(fields,
zap.String("organization_ref", payment.OrganizationRef.Hex()),
zap.String("state", string(payment.State)),
zap.Uint64("version", payment.Version),
)
}
if err != nil {
logger.Warn("Failed to get by payment ref global", append(fields, zap.Error(err))...)
return
}
logger.Debug("Completed Get by payment ref global", fields...)
}(time.Now())
paymentRef = strings.TrimSpace(paymentRef)
if paymentRef == "" {
return nil, merrors.InvalidArgument("payment_ref is required")
}
doc, err := s.store.GetByPaymentRefGlobal(ctx, paymentRef)
if err != nil {
return nil, err
}
payment, err = fromDocument(doc)
return payment, err
}
func (s *svc) GetByIdempotencyKey(ctx context.Context, orgRef bson.ObjectID, idempotencyKey string) (payment *agg.Payment, err error) {
logger := s.logger
hasKey := strings.TrimSpace(idempotencyKey) != ""
@@ -219,8 +259,8 @@ func (s *svc) GetByIdempotencyKey(ctx context.Context, orgRef bson.ObjectID, ide
defer func(start time.Time) {
fields := []zap.Field{
zap.Int64("duration_ms", time.Since(start).Milliseconds()),
zap.String("organization_ref", orgRef.Hex()),
zap.Bool("has_idempotency_key", hasKey),
mzap.ObjRef("organization_ref", orgRef),
zap.String("idempotency_key", idempotencyKey),
}
if payment != nil {
fields = append(fields,
@@ -230,10 +270,14 @@ func (s *svc) GetByIdempotencyKey(ctx context.Context, orgRef bson.ObjectID, ide
)
}
if err != nil {
if errors.Is(err, ErrPaymentNotFound) {
logger.Debug("Completed Get by idempotency key", append(fields, zap.Bool("found", false))...)
return
}
logger.Warn("Failed to get by idempotency key", append(fields, zap.Error(err))...)
return
}
logger.Debug("Completed Get by idempotency key", fields...)
logger.Debug("Completed Get by idempotency key", append(fields, zap.Bool("found", true))...)
}(time.Now())
if orgRef.IsZero() {
@@ -329,6 +373,41 @@ func (s *svc) ListByState(ctx context.Context, in ListByStateInput) (out *ListOu
return out, err
}
func (s *svc) ListByStateGlobal(ctx context.Context, in ListByStateGlobalInput) (out *ListOutput, err error) {
logger := s.logger
logger.Debug("Starting List by state global",
zap.String("state", string(in.State)),
zap.Int32("limit", in.Limit),
)
defer func(start time.Time) {
fields := []zap.Field{zap.Int64("duration_ms", time.Since(start).Milliseconds())}
if out != nil {
fields = append(fields, zap.Int("items_count", len(out.Items)))
}
if err != nil {
logger.Warn("Failed to list by state global", append(fields, zap.Error(err))...)
return
}
logger.Debug("Completed List by state global", fields...)
}(time.Now())
state, ok := normalizeAggregateState(in.State)
if !ok {
return nil, merrors.InvalidArgument("state is invalid")
}
cursor, err := normalizeCursor(in.Cursor)
if err != nil {
return nil, err
}
out, err = s.list(ctx, listQuery{
limit: sanitizeLimit(in.Limit),
run: func(limit int64) ([]*paymentDocument, error) {
return s.store.ListByStateGlobal(ctx, state, cursor, limit)
},
})
return out, err
}
type listQuery struct {
limit int64
run func(limit int64) ([]*paymentDocument, error)
@@ -453,7 +532,7 @@ func normalizePayment(payment *agg.Payment, requirePaymentRef bool) (*paymentDoc
step.FailureCode = strings.TrimSpace(step.FailureCode)
step.FailureMsg = strings.TrimSpace(step.FailureMsg)
if step.StepRef == "" {
return nil, merrors.InvalidArgument("step_executions[" + itoa(i) + "].step_ref is required")
return nil, merrors.InvalidArgument("stepExecutions[" + itoa(i) + "].step_ref is required")
}
if step.StepCode == "" {
step.StepCode = step.StepRef
@@ -463,7 +542,7 @@ func normalizePayment(payment *agg.Payment, requirePaymentRef bool) (*paymentDoc
}
ss, ok := normalizeStepState(step.State)
if !ok {
return nil, merrors.InvalidArgument("step_executions[" + itoa(i) + "].state is invalid")
return nil, merrors.InvalidArgument("stepExecutions[" + itoa(i) + "].state is invalid")
}
step.State = ss
}

View File

@@ -15,6 +15,9 @@ import (
pm "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)
func TestNewWithStore_EnsuresRequiredIndexes(t *testing.T) {
@@ -24,14 +27,16 @@ func TestNewWithStore_EnsuresRequiredIndexes(t *testing.T) {
t.Fatalf("newWithStore returned error: %v", err)
}
if len(store.indexes) != 4 {
t.Fatalf("index count mismatch: got=%d want=4", len(store.indexes))
if len(store.indexes) != 6 {
t.Fatalf("index count mismatch: got=%d want=6", len(store.indexes))
}
assertIndex(t, store.indexes[0], []string{"organizationRef", "paymentRef"}, true)
assertIndex(t, store.indexes[1], []string{"organizationRef", "idempotencyKey"}, true)
assertIndex(t, store.indexes[2], []string{"organizationRef", "quotationRef", "createdAt"}, false)
assertIndex(t, store.indexes[3], []string{"organizationRef", "state", "createdAt"}, false)
assertIndex(t, store.indexes[1], []string{"paymentRef"}, false)
assertIndex(t, store.indexes[2], []string{"organizationRef", "idempotencyKey"}, true)
assertIndex(t, store.indexes[3], []string{"organizationRef", "quotationRef", "createdAt"}, false)
assertIndex(t, store.indexes[4], []string{"organizationRef", "state", "createdAt"}, false)
assertIndex(t, store.indexes[5], []string{"state", "createdAt"}, false)
}
func TestCreateAndGet(t *testing.T) {
@@ -89,6 +94,28 @@ func TestCreateAndGet(t *testing.T) {
}
}
func TestGetByIdempotencyKey_NotFoundDoesNotWarn(t *testing.T) {
store := newFakeStore()
core, observed := observer.New(zapcore.DebugLevel)
repo, err := newWithStoreLogger(store, zap.New(core))
if err != nil {
t.Fatalf("newWithStoreLogger returned error: %v", err)
}
org := bson.NewObjectID()
_, err = repo.GetByIdempotencyKey(context.Background(), org, "missing-idempotency-key")
if !errors.Is(err, ErrPaymentNotFound) {
t.Fatalf("expected ErrPaymentNotFound, got %v", err)
}
if got := observed.FilterMessage("Failed to get by idempotency key").Len(); got != 0 {
t.Fatalf("expected no warning log for not found lookup, got=%d", got)
}
if got := observed.FilterMessage("Completed Get by idempotency key").Len(); got == 0 {
t.Fatal("expected completion debug log for not found lookup")
}
}
func TestCreate_Duplicate(t *testing.T) {
store := newFakeStore()
repo, err := newWithStore(store)
@@ -354,6 +381,15 @@ func (f *fakeStore) GetByPaymentRef(_ context.Context, orgRef bson.ObjectID, pay
return nil, ErrPaymentNotFound
}
func (f *fakeStore) GetByPaymentRefGlobal(_ context.Context, paymentRef string) (*paymentDocument, error) {
for _, doc := range f.docs {
if doc.PaymentRef == paymentRef {
return cloneDocument(doc)
}
}
return nil, ErrPaymentNotFound
}
func (f *fakeStore) GetByIdempotencyKey(_ context.Context, orgRef bson.ObjectID, idempotencyKey string) (*paymentDocument, error) {
for _, doc := range f.docs {
if doc.OrganizationRef == orgRef && doc.IdempotencyKey == idempotencyKey {
@@ -383,6 +419,12 @@ func (f *fakeStore) ListByState(_ context.Context, orgRef bson.ObjectID, state a
}, cursor, limit)
}
func (f *fakeStore) ListByStateGlobal(_ context.Context, state agg.State, cursor *listCursor, limit int64) ([]*paymentDocument, error) {
return f.list(func(doc *paymentDocument) bool {
return doc.State == state
}, cursor, limit)
}
func (f *fakeStore) list(match func(*paymentDocument) bool, cursor *listCursor, limit int64) ([]*paymentDocument, error) {
items := make([]*paymentDocument, 0)
for _, doc := range f.docs {

View File

@@ -59,10 +59,10 @@ func validatePaymentInvariants(payment *agg.Payment) error {
func validateStepInvariants(step agg.StepExecution, index int) error {
if strings.TrimSpace(step.StepRef) == "" {
return merrors.InvalidArgument("payment.step_executions[" + itoa(index) + "].step_ref is required")
return merrors.InvalidArgument("payment.stepExecutions[" + itoa(index) + "].step_ref is required")
}
if _, ok := normalizeStepState(step.State); !ok {
return merrors.InvalidArgument("payment.step_executions[" + itoa(index) + "].state is invalid")
return merrors.InvalidArgument("payment.stepExecutions[" + itoa(index) + "].state is invalid")
}
return nil
}

View File

@@ -37,5 +37,5 @@ func New(deps ...Dependencies) Mapper {
if logger == nil {
logger = zap.NewNop()
}
return &svc{logger: logger.Named("prmap")}
return &svc{logger: logger.Named("mapper")}
}

View File

@@ -27,7 +27,7 @@ func mapStepExecutions(src []agg.StepExecution) ([]*orchestrationv2.StepExecutio
func mapStepExecution(step agg.StepExecution, index int) (*orchestrationv2.StepExecution, error) {
state, ok := normalizeStepState(step.State)
if !ok {
return nil, merrors.InvalidArgument("payment.step_executions[" + itoa(index) + "].state is invalid")
return nil, merrors.InvalidArgument("payment.stepExecutions[" + itoa(index) + "].state is invalid")
}
attempt := step.Attempt

View File

@@ -25,14 +25,14 @@ func (s *svc) recomputeAggregateState(ctx context.Context, payment *agg.Payment)
return false, nil
}
payment.State = next
logger.Debug("psvc.payment_state_changed",
logger.Debug("Recomputed payment state from step executions",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("from_state", string(current)),
zap.String("to_state", string(next)),
zap.Uint64("version", payment.Version),
)
if next == agg.StateSettled || next == agg.StateNeedsAttention || next == agg.StateFailed {
logger.Debug("psvc.payment_finalization_state",
logger.Debug("Payment entered finalization state",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("state", string(next)),
zap.Uint64("version", payment.Version),

View File

@@ -6,6 +6,9 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/pkg/mlogger"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
type defaultLedgerExecutor struct{}
@@ -13,6 +16,43 @@ type defaultCryptoExecutor struct{}
type defaultProviderSettlementExecutor struct{}
type defaultCardPayoutExecutor struct{}
type defaultObserveConfirmExecutor struct{}
type defaultGuardExecutor struct{}
// NewDefaultExecutors constructs the baseline executor registry and applies any
// provided overrides.
func NewDefaultExecutors(logger mlogger.Logger, overrides sexec.Dependencies) sexec.Registry {
deps := sexec.Dependencies{
Logger: logger,
Ledger: defaultLedgerExecutor{},
Crypto: defaultCryptoExecutor{},
ProviderSettlement: defaultProviderSettlementExecutor{},
CardPayout: defaultCardPayoutExecutor{},
ObserveConfirm: defaultObserveConfirmExecutor{},
Guard: defaultGuardExecutor{},
}
if overrides.Logger != nil {
deps.Logger = overrides.Logger
}
if overrides.Ledger != nil {
deps.Ledger = overrides.Ledger
}
if overrides.Crypto != nil {
deps.Crypto = overrides.Crypto
}
if overrides.ProviderSettlement != nil {
deps.ProviderSettlement = overrides.ProviderSettlement
}
if overrides.CardPayout != nil {
deps.CardPayout = overrides.CardPayout
}
if overrides.ObserveConfirm != nil {
deps.ObserveConfirm = overrides.ObserveConfirm
}
if overrides.Guard != nil {
deps.Guard = overrides.Guard
}
return sexec.New(deps)
}
func (defaultLedgerExecutor) ExecuteLedger(_ context.Context, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
@@ -35,7 +75,37 @@ func (defaultCardPayoutExecutor) ExecuteCardPayout(_ context.Context, req sexec.
}
func (defaultObserveConfirmExecutor) ExecuteObserveConfirm(_ context.Context, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
return asyncOutput(req.StepExecution, "operation_ref", "observe:"+req.Step.StepRef), nil
refs := inheritedExternalRefs(req.Payment, req.Step, req.StepExecution)
if len(refs) == 0 {
refs = append(refs, agg.ExternalRef{
Kind: "operation_ref",
Ref: "observe:" + req.Step.StepRef,
})
}
step := req.StepExecution
step.State = agg.StepStateRunning
step.ExternalRefs = refs
step.FailureCode = ""
step.FailureMsg = ""
return &sexec.ExecuteOutput{
StepExecution: step,
Async: true,
}, nil
}
func (defaultGuardExecutor) ExecuteGuard(_ context.Context, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
conditions := quoteExecutionConditions(req.Payment)
switch guardKind(req.Step) {
case xplan.StepKindLiquidityCheck:
return executeLiquidityGuard(req.StepExecution, conditions), nil
case xplan.StepKindPrefunding:
return executePrefundingGuard(req.StepExecution, conditions), nil
default:
return failedOutput(req.StepExecution,
"guard.unsupported_step",
"unsupported guard step: step_code="+strings.TrimSpace(req.Step.StepCode),
), nil
}
}
func asyncOutput(step agg.StepExecution, kind, ref string) *sexec.ExecuteOutput {
@@ -51,3 +121,120 @@ func asyncOutput(step agg.StepExecution, kind, ref string) *sexec.ExecuteOutput
Async: true,
}
}
func completedOutput(step agg.StepExecution) *sexec.ExecuteOutput {
step.State = agg.StepStateCompleted
step.FailureCode = ""
step.FailureMsg = ""
return &sexec.ExecuteOutput{StepExecution: step}
}
func failedOutput(step agg.StepExecution, code, msg string) *sexec.ExecuteOutput {
step.State = agg.StepStateFailed
step.FailureCode = strings.TrimSpace(code)
step.FailureMsg = strings.TrimSpace(msg)
return &sexec.ExecuteOutput{StepExecution: step}
}
func executeLiquidityGuard(
step agg.StepExecution,
conditions *paymenttypes.QuoteExecutionConditions,
) *sexec.ExecuteOutput {
if conditions == nil {
return failedOutput(step, "guard.conditions_missing", "liquidity guard requires execution conditions")
}
switch conditions.Readiness {
case paymenttypes.QuoteExecutionReadinessIndicative:
return failedOutput(step, "guard.indicative_quote", "liquidity guard cannot execute indicative quotes")
case paymenttypes.QuoteExecutionReadinessLiquidityObtainable:
return failedOutput(step, "guard.liquidity_not_ready", "liquidity is not yet available at execution time")
case paymenttypes.QuoteExecutionReadinessUnspecified:
return failedOutput(step, "guard.readiness_unspecified", "liquidity guard requires explicit readiness")
default:
return completedOutput(step)
}
}
func executePrefundingGuard(
step agg.StepExecution,
conditions *paymenttypes.QuoteExecutionConditions,
) *sexec.ExecuteOutput {
if conditions == nil {
return failedOutput(step, "guard.conditions_missing", "prefunding guard requires execution conditions")
}
if conditions.Readiness == paymenttypes.QuoteExecutionReadinessIndicative {
return failedOutput(step, "guard.indicative_quote", "prefunding guard cannot execute indicative quotes")
}
// Prefunding confirmation is handled by upstream funding flows; this guard
// currently validates quote executability semantics only.
return completedOutput(step)
}
func quoteExecutionConditions(payment *agg.Payment) *paymenttypes.QuoteExecutionConditions {
if payment == nil || payment.QuoteSnapshot == nil {
return nil
}
return payment.QuoteSnapshot.ExecutionConditions
}
func guardKind(step xplan.Step) xplan.StepKind {
return xplan.GuardStepKind(step)
}
func inheritedExternalRefs(payment *agg.Payment, step xplan.Step, current agg.StepExecution) []agg.ExternalRef {
refs := appendExternalRefs(nil, current.ExternalRefs...)
if payment == nil || len(step.DependsOn) == 0 {
return refs
}
index := stepIndexByRef(payment.StepExecutions)
for i := range step.DependsOn {
idx, ok := index[strings.TrimSpace(step.DependsOn[i])]
if !ok || idx < 0 || idx >= len(payment.StepExecutions) {
continue
}
refs = appendExternalRefs(refs, payment.StepExecutions[idx].ExternalRefs...)
}
return refs
}
func appendExternalRefs(existing []agg.ExternalRef, additions ...agg.ExternalRef) []agg.ExternalRef {
out := append([]agg.ExternalRef{}, existing...)
seen := map[string]struct{}{}
for i := range out {
key := externalRefKey(out[i])
if key == "" {
continue
}
seen[key] = struct{}{}
}
for i := range additions {
ref := agg.ExternalRef{
GatewayInstanceID: strings.TrimSpace(additions[i].GatewayInstanceID),
Kind: strings.TrimSpace(additions[i].Kind),
Ref: strings.TrimSpace(additions[i].Ref),
}
key := externalRefKey(ref)
if key == "" {
continue
}
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
out = append(out, ref)
}
if len(out) == 0 {
return nil
}
return out
}
func externalRefKey(ref agg.ExternalRef) string {
if strings.TrimSpace(ref.Kind) == "" || strings.TrimSpace(ref.Ref) == "" {
return ""
}
return strings.ToLower(strings.TrimSpace(ref.GatewayInstanceID)) + "|" +
strings.ToLower(strings.TrimSpace(ref.Kind)) + "|" +
strings.ToLower(strings.TrimSpace(ref.Ref))
}

View File

@@ -0,0 +1,164 @@
package psvc
import (
"context"
"testing"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
func TestDefaultGuardExecutor_LiquidityReadyCompletes(t *testing.T) {
exec := defaultGuardExecutor{}
out, err := exec.ExecuteGuard(context.Background(), sexec.StepRequest{
Payment: &agg.Payment{
QuoteSnapshot: &model.PaymentQuoteSnapshot{
ExecutionConditions: &paymenttypes.QuoteExecutionConditions{
Readiness: paymenttypes.QuoteExecutionReadinessLiquidityReady,
LiquidityCheckRequiredAtExecution: true,
},
},
},
Step: xplan.Step{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Kind: xplan.StepKindLiquidityCheck,
},
StepExecution: agg.StepExecution{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Attempt: 1,
},
})
if err != nil {
t.Fatalf("ExecuteGuard returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if got, want := out.StepExecution.State, agg.StepStateCompleted; got != want {
t.Fatalf("state mismatch: got=%q want=%q", got, want)
}
}
func TestDefaultGuardExecutor_LiquidityObtainableFails(t *testing.T) {
exec := defaultGuardExecutor{}
out, err := exec.ExecuteGuard(context.Background(), sexec.StepRequest{
Payment: &agg.Payment{
QuoteSnapshot: &model.PaymentQuoteSnapshot{
ExecutionConditions: &paymenttypes.QuoteExecutionConditions{
Readiness: paymenttypes.QuoteExecutionReadinessLiquidityObtainable,
LiquidityCheckRequiredAtExecution: true,
},
},
},
Step: xplan.Step{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Kind: xplan.StepKindLiquidityCheck,
},
StepExecution: agg.StepExecution{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Attempt: 1,
},
})
if err != nil {
t.Fatalf("ExecuteGuard returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if got, want := out.StepExecution.State, agg.StepStateFailed; got != want {
t.Fatalf("state mismatch: got=%q want=%q", got, want)
}
if got, want := out.StepExecution.FailureCode, "guard.liquidity_not_ready"; got != want {
t.Fatalf("failure code mismatch: got=%q want=%q", got, want)
}
}
func TestDefaultGuardExecutor_UnknownGuardFails(t *testing.T) {
exec := defaultGuardExecutor{}
out, err := exec.ExecuteGuard(context.Background(), sexec.StepRequest{
Payment: &agg.Payment{
QuoteSnapshot: &model.PaymentQuoteSnapshot{
ExecutionConditions: &paymenttypes.QuoteExecutionConditions{
Readiness: paymenttypes.QuoteExecutionReadinessLiquidityReady,
},
},
},
Step: xplan.Step{
StepRef: "guard_unknown",
StepCode: "guard.custom",
Kind: xplan.StepKindUnspecified,
},
StepExecution: agg.StepExecution{
StepRef: "guard_unknown",
StepCode: "guard.custom",
Attempt: 1,
},
})
if err != nil {
t.Fatalf("ExecuteGuard returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if got, want := out.StepExecution.State, agg.StepStateFailed; got != want {
t.Fatalf("state mismatch: got=%q want=%q", got, want)
}
if got, want := out.StepExecution.FailureCode, "guard.unsupported_step"; got != want {
t.Fatalf("failure code mismatch: got=%q want=%q", got, want)
}
}
func TestDefaultObserveConfirmExecutor_InheritsDependencyRefs(t *testing.T) {
exec := defaultObserveConfirmExecutor{}
out, err := exec.ExecuteObserveConfirm(context.Background(), sexec.StepRequest{
Payment: &agg.Payment{
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_1_crypto_send",
ExternalRefs: []agg.ExternalRef{
{GatewayInstanceID: "crypto-gw", Kind: "operation_ref", Ref: "op-1"},
{GatewayInstanceID: "crypto-gw", Kind: "transfer_ref", Ref: "trf-1"},
},
},
},
},
Step: xplan.Step{
StepRef: "hop_1_crypto_observe",
StepCode: "hop.1.crypto.observe",
DependsOn: []string{"hop_1_crypto_send"},
},
StepExecution: agg.StepExecution{
StepRef: "hop_1_crypto_observe",
StepCode: "hop.1.crypto.observe",
Attempt: 1,
},
})
if err != nil {
t.Fatalf("ExecuteObserveConfirm returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if !out.Async {
t.Fatal("expected async observe output")
}
if got, want := out.StepExecution.State, agg.StepStateRunning; got != want {
t.Fatalf("state mismatch: got=%q want=%q", got, want)
}
if got, want := len(out.StepExecution.ExternalRefs), 2; got != want {
t.Fatalf("external refs count mismatch: got=%d want=%d", got, want)
}
if got, want := out.StepExecution.ExternalRefs[0].Kind, "operation_ref"; got != want {
t.Fatalf("first external ref kind mismatch: got=%q want=%q", got, want)
}
if got, want := out.StepExecution.ExternalRefs[0].Ref, "op-1"; got != want {
t.Fatalf("first external ref value mismatch: got=%q want=%q", got, want)
}
}

View File

@@ -63,7 +63,7 @@ func (s *svc) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePa
}
}
if payment != nil {
logger.Debug("psvc.payment_started",
logger.Debug("Loaded payment for execution",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.Bool("reused", reused),
zap.String("state", string(payment.State)),
@@ -75,7 +75,7 @@ func (s *svc) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePa
return nil, err
}
if payment != nil {
logger.Debug("psvc.payment_execution_progressed",
logger.Debug("Completed runtime loop for payment execution",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("state", string(payment.State)),
zap.Uint64("version", payment.Version),
@@ -86,7 +86,7 @@ func (s *svc) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePa
return nil, err
}
if payment != nil {
logger.Debug("psvc.payment_finalized",
logger.Debug("Prepared finalized payment response",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("state", string(payment.State)),
zap.Uint64("version", payment.Version),

View File

@@ -27,7 +27,7 @@ func (s *svc) runRuntime(ctx context.Context, payment *agg.Payment) (*agg.Paymen
state = payment.State
stepCount = len(payment.StepExecutions)
}
logger.Debug("Starting Run runtime",
logger.Debug("Starting payment runtime loop",
zap.String("payment_ref", paymentRef),
zap.String("state", string(state)),
zap.Int("steps_count", stepCount),
@@ -37,7 +37,10 @@ func (s *svc) runRuntime(ctx context.Context, payment *agg.Payment) (*agg.Paymen
return nil, merrors.InvalidArgument("payment is required")
}
if s.state.IsAggregateTerminal(payment.State) {
logger.Debug("psvc.run_runtime.terminal", zap.String("payment_ref", paymentRef), zap.String("state", string(payment.State)))
logger.Debug("Skipping runtime loop because payment is already terminal",
zap.String("payment_ref", paymentRef),
zap.String("state", string(payment.State)),
)
return payment, nil
}
@@ -52,7 +55,7 @@ func (s *svc) runRuntime(ctx context.Context, payment *agg.Payment) (*agg.Paymen
if err != nil {
return nil, err
}
logger.Debug("psvc.run_runtime.tick",
logger.Debug("Processed runtime tick for payment",
zap.String("payment_ref", paymentRef),
zap.Int("tick", tick),
zap.Bool("changed", changed),
@@ -72,7 +75,7 @@ func (s *svc) runRuntime(ctx context.Context, payment *agg.Payment) (*agg.Paymen
}
current = updated
}
logger.Debug("psvc.run_runtime.max_ticks_reached",
logger.Debug("Stopped runtime loop after reaching max ticks",
zap.String("payment_ref", paymentRef),
zap.Int("max_ticks", s.maxTicks),
zap.String("state", string(current.State)),
@@ -93,7 +96,7 @@ func (s *svc) runTick(ctx context.Context, payment *agg.Payment, graph *xplan.Gr
if err != nil {
return nil, false, false, err
}
logger.Debug("psvc.run_tick.scheduled",
logger.Debug("Calculated runnable, blocked, and skipped steps for tick",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.Int("runnable_count", len(scheduled.Runnable)),
zap.Int("blocked_count", len(scheduled.Blocked)),
@@ -126,7 +129,7 @@ func (s *svc) runTick(ctx context.Context, payment *agg.Payment, graph *xplan.Gr
if err := s.repository.UpdateCAS(ctx, payment, expectedVersion); err != nil {
if errors.Is(err, prepo.ErrVersionConflict) {
logger.Debug("psvc.run_tick.cas_conflict",
logger.Debug("Detected version conflict while persisting tick; reloading payment",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.Uint64("expected_version", expectedVersion),
)
@@ -138,7 +141,7 @@ func (s *svc) runTick(ctx context.Context, payment *agg.Payment, graph *xplan.Gr
}
return nil, false, false, err
}
logger.Debug("psvc.run_tick.persisted",
logger.Debug("Persisted tick updates to payment",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.Uint64("version", payment.Version),
zap.String("state", string(payment.State)),
@@ -200,7 +203,7 @@ func (s *svc) recordScheduleTransitions(ctx context.Context, payment *agg.Paymen
func (s *svc) executeRunnable(ctx context.Context, payment *agg.Payment, graph *xplan.Graph, runnable ssched.RunnableStep) (bool, error) {
logger := s.logger
logger.Debug("Starting Step execution",
logger.Debug("Starting step execution attempt",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", strings.TrimSpace(runnable.StepRef)),
zap.String("step_code", strings.TrimSpace(runnable.StepCode)),
@@ -244,7 +247,7 @@ func (s *svc) executeRunnable(ctx context.Context, payment *agg.Payment, graph *
StepExecution: stepExecution,
})
if err != nil {
logger.Warn("psvc.step_execution.executor_error",
logger.Warn("Step executor returned error; marking step as failed",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", strings.TrimSpace(runnable.StepRef)),
zap.Uint32("attempt", runnable.Attempt),
@@ -264,7 +267,7 @@ func (s *svc) executeRunnable(ctx context.Context, payment *agg.Payment, graph *
next := normalizeExecutorOutput(stepExecution, out, s.nowUTC())
payment.StepExecutions[idx] = next
logger.Debug("Completed Step execution",
logger.Debug("Completed step execution attempt",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", strings.TrimSpace(next.StepRef)),
zap.String("state", string(next.State)),

View File

@@ -62,12 +62,12 @@ func newService(deps Dependencies) (Service, error) {
if logger == nil {
logger = zap.NewNop()
}
logger = logger.Named("psvc")
logger = logger.Named("imp")
observer := deps.Observer
if observer == nil {
var err error
observer, err = oobs.New(oobs.Dependencies{Logger: logger.Named("oobs")})
observer, err = oobs.New(oobs.Dependencies{Logger: logger})
if err != nil {
return nil, err
}
@@ -78,7 +78,7 @@ func newService(deps Dependencies) (Service, error) {
var err error
query, err = pquery.New(pquery.Dependencies{
Repository: deps.Repository,
Logger: logger.Named("pquery"),
Logger: logger,
})
if err != nil {
return nil, err
@@ -90,18 +90,18 @@ func newService(deps Dependencies) (Service, error) {
quoteStore: deps.QuoteStore,
validator: firstValidator(deps.Validator, logger.Named("reqval")),
idempotency: firstIdempotency(deps.Idempotency, logger.Named("idem")),
quote: firstQuoteResolver(deps.Quote, logger.Named("qsnap")),
aggregate: firstAggregateFactory(deps.Aggregate, logger.Named("agg")),
planner: firstPlanCompiler(deps.Planner, logger.Named("xplan")),
state: firstStateMachine(deps.State, logger.Named("ostate")),
scheduler: firstScheduler(deps.Scheduler, logger.Named("ssched")),
executors: firstExecutors(deps.Executors, logger.Named("sexec")),
reconciler: firstReconciler(deps.Reconciler, logger.Named("erecon")),
validator: firstValidator(deps.Validator, logger),
idempotency: firstIdempotency(deps.Idempotency, logger),
quote: firstQuoteResolver(deps.Quote, logger),
aggregate: firstAggregateFactory(deps.Aggregate, logger),
planner: firstPlanCompiler(deps.Planner, logger),
state: firstStateMachine(deps.State, logger),
scheduler: firstScheduler(deps.Scheduler, logger),
executors: firstExecutors(deps.Executors, logger),
reconciler: firstReconciler(deps.Reconciler, logger),
repository: deps.Repository,
query: query,
mapper: firstMapper(deps.Mapper, logger.Named("prmap")),
mapper: firstMapper(deps.Mapper, logger),
observer: observer,
retryPolicy: deps.RetryPolicy,
@@ -173,14 +173,7 @@ func firstExecutors(v sexec.Registry, logger mlogger.Logger) sexec.Registry {
if v != nil {
return v
}
return sexec.New(sexec.Dependencies{
Logger: logger,
Ledger: defaultLedgerExecutor{},
Crypto: defaultCryptoExecutor{},
ProviderSettlement: defaultProviderSettlementExecutor{},
CardPayout: defaultCardPayoutExecutor{},
ObserveConfirm: defaultObserveConfirmExecutor{},
})
return NewDefaultExecutors(logger, sexec.Dependencies{})
}
func firstReconciler(v erecon.Reconciler, logger mlogger.Logger) erecon.Reconciler {

View File

@@ -287,6 +287,7 @@ func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (
ProviderSettlement: script,
CardPayout: script,
ObserveConfirm: script,
Guard: script,
})
svc, err := New(Dependencies{
@@ -328,6 +329,9 @@ func (s *scriptedExecutors) ExecuteCardPayout(_ context.Context, req sexec.StepR
func (s *scriptedExecutors) ExecuteObserveConfirm(_ context.Context, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
return s.handler("observe_confirm", req)
}
func (s *scriptedExecutors) ExecuteGuard(_ context.Context, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
return s.handler("guard", req)
}
type memoryQuoteStore struct {
mu sync.Mutex
@@ -456,6 +460,19 @@ func (r *memoryRepo) GetByPaymentRef(_ context.Context, orgRef bson.ObjectID, pa
return clonePayment(r.byID[id]), nil
}
func (r *memoryRepo) GetByPaymentRefGlobal(_ context.Context, paymentRef string) (*agg.Payment, error) {
r.mu.Lock()
defer r.mu.Unlock()
ref := strings.TrimSpace(paymentRef)
for _, payment := range r.byID {
if strings.TrimSpace(payment.PaymentRef) != ref {
continue
}
return clonePayment(payment), nil
}
return nil, prepo.ErrPaymentNotFound
}
func (r *memoryRepo) GetByIdempotencyKey(_ context.Context, orgRef bson.ObjectID, idempotencyKey string) (*agg.Payment, error) {
r.mu.Lock()
defer r.mu.Unlock()
@@ -504,6 +521,22 @@ func (r *memoryRepo) ListByState(_ context.Context, in prepo.ListByStateInput) (
return paginatePayments(items, in.Limit), nil
}
func (r *memoryRepo) ListByStateGlobal(_ context.Context, in prepo.ListByStateGlobalInput) (*prepo.ListOutput, error) {
r.mu.Lock()
defer r.mu.Unlock()
items := make([]*agg.Payment, 0)
for _, payment := range r.byID {
if payment.State != in.State {
continue
}
if !isBeforeCursor(payment, in.Cursor) {
continue
}
items = append(items, clonePayment(payment))
}
return paginatePayments(items, in.Limit), nil
}
func repoPaymentRefKey(orgRef bson.ObjectID, paymentRef string) string {
return orgRef.Hex() + "|" + strings.TrimSpace(paymentRef)
}

View File

@@ -55,7 +55,7 @@ func New(deps ...Dependencies) Resolver {
now = time.Now
}
return &svc{
logger: logger.Named("qsnap"),
logger: logger.Named("quote_resolver"),
now: now,
}
}

View File

@@ -55,5 +55,5 @@ func New(deps ...Dependencies) Validator {
if logger == nil {
logger = zap.NewNop()
}
return &svc{logger: logger.Named("reqval")}
return &svc{logger: logger.Named("request_validator")}
}

View File

@@ -60,6 +60,11 @@ type ObserveConfirmExecutor interface {
ExecuteObserveConfirm(ctx context.Context, req StepRequest) (*ExecuteOutput, error)
}
// GuardExecutor handles non-rail guard steps (liquidity/prefunding).
type GuardExecutor interface {
ExecuteGuard(ctx context.Context, req StepRequest) (*ExecuteOutput, error)
}
// Dependencies defines concrete executors used by the registry.
type Dependencies struct {
Logger mlogger.Logger
@@ -68,6 +73,7 @@ type Dependencies struct {
ProviderSettlement ProviderSettlementExecutor
CardPayout CardPayoutExecutor
ObserveConfirm ObserveConfirmExecutor
Guard GuardExecutor
}
func New(deps Dependencies) Registry {
@@ -76,7 +82,7 @@ func New(deps Dependencies) Registry {
logger = zap.NewNop()
}
return &svc{
logger: logger.Named("sexec"),
logger: logger.Named("executor"),
deps: deps,
}
}

View File

@@ -14,9 +14,14 @@ const (
routeProviderSettlement
routeCardPayout
routeObserveConfirm
routeGuard
)
func classifyRoute(step xplan.Step) route {
if isGuardStep(step) {
return routeGuard
}
action := normalizeAction(step.Action)
rail := normalizeRail(step.Rail)
@@ -54,6 +59,10 @@ func classifyRoute(step xplan.Step) route {
}
}
func isGuardStep(step xplan.Step) bool {
return xplan.GuardStepKind(step) != xplan.StepKindUnspecified
}
func isLedgerAction(action model.RailOperation) bool {
switch action {
case model.RailOperationDebit,

View File

@@ -79,6 +79,12 @@ func (s *svc) Execute(ctx context.Context, in ExecuteInput) (out *ExecuteOutput,
}
out, err = s.deps.ObserveConfirm.ExecuteObserveConfirm(ctx, req)
return out, err
case routeGuard:
if s.deps.Guard == nil {
return nil, missingExecutorError("guard")
}
out, err = s.deps.Guard.ExecuteGuard(ctx, req)
return out, err
default:
return nil, unsupportedStepError(req.Step)
}
@@ -140,6 +146,20 @@ func missingExecutorError(kind string) error {
}
func unsupportedStepError(step xplan.Step) error {
msg := "action=" + strings.TrimSpace(string(step.Action)) + " rail=" + strings.TrimSpace(string(step.Rail))
msg := strings.Join([]string{
"step_ref=" + stepField(step.StepRef),
"step_code=" + stepField(step.StepCode),
"step_kind=" + stepField(string(step.Kind)),
"action=" + stepField(string(step.Action)),
"rail=" + stepField(string(step.Rail)),
}, " ")
return xerr.Wrapf(ErrUnsupportedStep, "%s", msg)
}
func stepField(value string) string {
clean := strings.TrimSpace(value)
if clean == "" {
return "UNSPECIFIED"
}
return clean
}

View File

@@ -3,6 +3,7 @@ package sexec
import (
"context"
"errors"
"strings"
"testing"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
@@ -131,6 +132,40 @@ func TestExecute_DispatchRailsAndObserve(t *testing.T) {
}
}
func TestExecute_DispatchGuard(t *testing.T) {
guard := &fakeGuardExecutor{}
registry := New(Dependencies{Guard: guard})
out, err := registry.Execute(context.Background(), ExecuteInput{
Payment: &agg.Payment{PaymentRef: "p1"},
Step: xplan.Step{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Kind: xplan.StepKindLiquidityCheck,
Action: model.RailOperationUnspecified,
Rail: model.RailUnspecified,
DependsOn: nil,
},
StepExecution: agg.StepExecution{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Attempt: 1,
},
})
if err != nil {
t.Fatalf("Execute returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if guard.calls != 1 {
t.Fatalf("expected guard executor to be called once, got %d", guard.calls)
}
if got, want := guard.lastReq.Step.StepRef, xplan.QuoteReadinessGuardStepRef; got != want {
t.Fatalf("step_ref mismatch: got=%q want=%q", got, want)
}
}
func TestExecute_UnsupportedStep(t *testing.T) {
registry := New(Dependencies{})
@@ -142,6 +177,21 @@ func TestExecute_UnsupportedStep(t *testing.T) {
if !errors.Is(err, ErrUnsupportedStep) {
t.Fatalf("expected ErrUnsupportedStep, got %v", err)
}
if err == nil {
t.Fatal("expected non-nil error")
}
msg := err.Error()
for _, token := range []string{
"step_ref=s1",
"step_code=bad.send",
"step_kind=UNSPECIFIED",
"action=SEND",
"rail=LEDGER",
} {
if !strings.Contains(msg, token) {
t.Fatalf("expected error message to include %q, got %q", token, msg)
}
}
}
func TestExecute_UnsupportedProviderSettlementSend(t *testing.T) {
@@ -280,3 +330,17 @@ func (f *fakeObserveConfirmExecutor) ExecuteObserveConfirm(_ context.Context, re
Async: true,
}, nil
}
type fakeGuardExecutor struct {
calls int
lastReq StepRequest
}
func (f *fakeGuardExecutor) ExecuteGuard(_ context.Context, req StepRequest) (*ExecuteOutput, error) {
f.calls++
f.lastReq = req
return &ExecuteOutput{
StepExecution: req.StepExecution,
Async: false,
}, nil
}

View File

@@ -127,10 +127,10 @@ func (s *svc) normalizeStepExecutions(
}
stepRef := exec.StepRef
if _, ok := stepsByRef[stepRef]; !ok {
return nil, merrors.InvalidArgument("step_executions[" + itoa(i) + "].step_ref is unknown: " + stepRef)
return nil, merrors.InvalidArgument("stepExecutions[" + itoa(i) + "].step_ref is unknown: " + stepRef)
}
if _, exists := out[stepRef]; exists {
return nil, merrors.InvalidArgument("step_executions[" + itoa(i) + "].step_ref must be unique")
return nil, merrors.InvalidArgument("stepExecutions[" + itoa(i) + "].step_ref must be unique")
}
if exec.Attempt == 0 {
exec.Attempt = 1
@@ -156,16 +156,16 @@ func (s *svc) normalizeStepExecution(exec agg.StepExecution, index int) (agg.Ste
exec.FailureMsg = strings.TrimSpace(exec.FailureMsg)
exec.ExternalRefs = cloneExternalRefs(exec.ExternalRefs)
if exec.StepRef == "" {
return agg.StepExecution{}, merrors.InvalidArgument("step_executions[" + itoa(index) + "].step_ref is required")
return agg.StepExecution{}, merrors.InvalidArgument("stepExecutions[" + itoa(index) + "].step_ref is required")
}
state, ok := normalizeStepState(exec.State)
if !ok {
return agg.StepExecution{}, merrors.InvalidArgument("step_executions[" + itoa(index) + "].state is invalid")
return agg.StepExecution{}, merrors.InvalidArgument("stepExecutions[" + itoa(index) + "].state is invalid")
}
exec.State = state
if err := s.stateMachine.EnsureStepTransition(exec.State, exec.State); err != nil {
return agg.StepExecution{}, merrors.InvalidArgument("step_executions[" + itoa(index) + "].state is invalid")
return agg.StepExecution{}, merrors.InvalidArgument("stepExecutions[" + itoa(index) + "].state is invalid")
}
return exec, nil
}

View File

@@ -78,9 +78,10 @@ func New(deps ...Dependencies) Runtime {
if logger == nil {
logger = zap.NewNop()
}
logger = logger.Named("scheduler")
stateMachine := dep.StateMachine
if stateMachine == nil {
stateMachine = ostate.New(ostate.Dependencies{Logger: logger.Named("ssched.ostate")})
stateMachine = ostate.New(ostate.Dependencies{Logger: logger})
}
now := dep.Now
if now == nil {
@@ -89,7 +90,7 @@ func New(deps ...Dependencies) Runtime {
}
}
return &svc{
logger: logger.Named("ssched"),
logger: logger,
stateMachine: stateMachine,
now: now,
}

View File

@@ -222,7 +222,7 @@ func evaluateGate(step xplan.Step, executionsByRef map[string]*agg.StepExecution
return evaluateTerminal(stepCommitTargets(step), executionsByRef, maxAttemptsByRef)
default:
for _, outcome := range depOutcomes {
if outcome == outcomeFailure {
if outcome == outcomeFailure || outcome == outcomeSkipped {
return gateImpossible
}
}

View File

@@ -172,6 +172,41 @@ func TestSchedule_FailedDependencySkipsImmediateDependents(t *testing.T) {
assertBlockedReason(t, out, "a", BlockedNeedsAttention)
}
func TestSchedule_SkippedDependencyAlsoSkipsDependent(t *testing.T) {
runtime := New()
out, err := runtime.Schedule(Input{
Steps: []xplan.Step{
step("guard", nil),
step("send", []string{"guard"}),
step("observe", []string{"send"}),
},
StepExecutions: []agg.StepExecution{
exec("guard", agg.StepStateNeedsAttention, 2),
exec("send", agg.StepStatePending, 1),
exec("observe", agg.StepStatePending, 1),
},
})
if err != nil {
t.Fatalf("Schedule returned error: %v", err)
}
if len(out.Runnable) != 0 {
t.Fatalf("expected no runnable steps, got %d", len(out.Runnable))
}
assertSkippedRefs(t, out, []string{"send", "observe"})
assertBlockedReason(t, out, "guard", BlockedNeedsAttention)
send := mustExecution(t, out, "send")
if send.State != agg.StepStateSkipped {
t.Fatalf("send state mismatch: got=%q want=%q", send.State, agg.StepStateSkipped)
}
observe := mustExecution(t, out, "observe")
if observe.State != agg.StepStateSkipped {
t.Fatalf("observe state mismatch: got=%q want=%q", observe.State, agg.StepStateSkipped)
}
}
func TestSchedule_ValidationErrors(t *testing.T) {
runtime := New()

View File

@@ -0,0 +1,52 @@
package xplan
import "strings"
// GuardOperationName identifies non-rail guard operation codes.
type GuardOperationName string
const (
GuardOperationUnspecified GuardOperationName = ""
GuardOperationQuoteReadinessGuard GuardOperationName = "quote.readiness.guard"
GuardOperationPrefundingEnsure GuardOperationName = "prefunding.ensure"
)
const (
QuoteReadinessGuardStepRef = "quote_readiness"
PrefundingGuardStepRef = "prefunding_ensure"
)
func ParseGuardOperationName(value string) GuardOperationName {
switch strings.ToLower(strings.TrimSpace(value)) {
case string(GuardOperationQuoteReadinessGuard):
return GuardOperationQuoteReadinessGuard
case string(GuardOperationPrefundingEnsure):
return GuardOperationPrefundingEnsure
default:
return GuardOperationUnspecified
}
}
func IsLiquidityGuardOperation(value string) bool {
return ParseGuardOperationName(value) == GuardOperationQuoteReadinessGuard
}
func IsPrefundingGuardOperation(value string) bool {
return ParseGuardOperationName(value) == GuardOperationPrefundingEnsure
}
func GuardStepKind(step Step) StepKind {
switch strings.ToLower(strings.TrimSpace(string(step.Kind))) {
case string(StepKindLiquidityCheck):
return StepKindLiquidityCheck
case string(StepKindPrefunding):
return StepKindPrefunding
}
if IsLiquidityGuardOperation(step.StepCode) {
return StepKindLiquidityCheck
}
if IsPrefundingGuardOperation(step.StepCode) {
return StepKindPrefunding
}
return StepKindUnspecified
}

View File

@@ -122,6 +122,6 @@ func New(deps ...Dependencies) Compiler {
logger = zap.NewNop()
}
return &svc{
logger: logger.Named("xplan"),
logger: logger.Named("plan_compiler"),
}
}

View File

@@ -122,7 +122,8 @@ func appendGuards(ex *expansion, conditions *paymenttypes.QuoteExecutionConditio
if conditions.LiquidityCheckRequiredAtExecution {
ex.appendMain(Step{
StepCode: "liquidity.check",
StepRef: QuoteReadinessGuardStepRef,
StepCode: string(GuardOperationQuoteReadinessGuard),
Kind: StepKindLiquidityCheck,
Action: model.RailOperationUnspecified,
Rail: model.RailUnspecified,
@@ -132,7 +133,8 @@ func appendGuards(ex *expansion, conditions *paymenttypes.QuoteExecutionConditio
if conditions.PrefundingRequired {
ex.appendMain(Step{
StepCode: "prefunding.ensure",
StepRef: PrefundingGuardStepRef,
StepCode: string(GuardOperationPrefundingEnsure),
Kind: StepKindPrefunding,
Action: model.RailOperationUnspecified,
Rail: model.RailUnspecified,

View File

@@ -0,0 +1,348 @@
package orchestrator
import (
"context"
"strings"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
)
type gatewayCryptoExecutor struct {
gatewayInvokeResolver GatewayInvokeResolver
gatewayRegistry GatewayRegistry
cardGatewayRoutes map[string]CardGatewayRoute
}
func (e *gatewayCryptoExecutor) ExecuteCrypto(ctx context.Context, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
if req.Payment == nil {
return nil, merrors.InvalidArgument("crypto send: payment is required")
}
action := model.ParseRailOperation(string(req.Step.Action))
switch action {
case model.RailOperationSend, model.RailOperationFee:
default:
return nil, merrors.InvalidArgument("crypto send: unsupported action")
}
gateway, err := e.resolveGateway(ctx, req.Step)
if err != nil {
return nil, err
}
client, err := e.gatewayInvokeResolver.Resolve(ctx, gateway.InvokeURI)
if err != nil {
return nil, err
}
sourceWalletRef, err := sourceManagedWalletRef(req.Payment)
if err != nil {
return nil, err
}
destination, err := e.resolveDestination(req.Payment, action)
if err != nil {
return nil, err
}
amount, err := sourceAmount(req.Payment)
if err != nil {
return nil, err
}
stepRef := strings.TrimSpace(req.Step.StepRef)
operationRef := strings.TrimSpace(req.Payment.PaymentRef) + ":" + stepRef
idempotencyKey := strings.TrimSpace(req.Payment.IdempotencyKey)
if idempotencyKey == "" {
idempotencyKey = operationRef
}
idempotencyKey += ":" + stepRef
resp, err := client.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{
IdempotencyKey: idempotencyKey,
OrganizationRef: req.Payment.OrganizationRef.Hex(),
SourceWalletRef: sourceWalletRef,
Destination: destination,
Amount: amount,
OperationRef: operationRef,
IntentRef: strings.TrimSpace(req.Payment.IntentSnapshot.Ref),
PaymentRef: strings.TrimSpace(req.Payment.PaymentRef),
Metadata: transferMetadata(req.Step),
})
if err != nil {
return nil, err
}
if resp == nil || resp.GetTransfer() == nil {
return nil, merrors.Internal("crypto send: transfer response is missing")
}
step := req.StepExecution
refs, refsErr := transferExternalRefs(resp.GetTransfer(), firstNonEmpty(
strings.TrimSpace(req.Step.InstanceID),
strings.TrimSpace(gateway.InstanceID),
strings.TrimSpace(req.Step.Gateway),
strings.TrimSpace(gateway.ID),
))
if refsErr != nil {
return nil, refsErr
}
step.ExternalRefs = refs
step.State = agg.StepStateCompleted
step.FailureCode = ""
step.FailureMsg = ""
return &sexec.ExecuteOutput{StepExecution: step}, nil
}
func (e *gatewayCryptoExecutor) resolveGateway(ctx context.Context, step xplan.Step) (*model.GatewayInstanceDescriptor, error) {
if e.gatewayRegistry == nil {
return nil, merrors.InvalidArgument("crypto send: gateway registry is required")
}
items, err := e.gatewayRegistry.List(ctx)
if err != nil {
return nil, err
}
stepGateway := strings.TrimSpace(step.Gateway)
stepInstance := strings.TrimSpace(step.InstanceID)
var byInstance *model.GatewayInstanceDescriptor
var byGateway *model.GatewayInstanceDescriptor
var single *model.GatewayInstanceDescriptor
cryptoCount := 0
for i := range items {
item := items[i]
if item == nil || model.ParseRail(string(item.Rail)) != model.RailCrypto || !item.IsEnabled {
continue
}
cryptoCount++
single = item
if stepInstance != "" && (strings.EqualFold(strings.TrimSpace(item.InstanceID), stepInstance) || strings.EqualFold(strings.TrimSpace(item.ID), stepInstance)) {
byInstance = item
break
}
if stepGateway != "" && (strings.EqualFold(strings.TrimSpace(item.ID), stepGateway) || strings.EqualFold(strings.TrimSpace(item.InstanceID), stepGateway)) {
byGateway = item
}
}
switch {
case byInstance != nil:
if strings.TrimSpace(byInstance.InvokeURI) == "" {
return nil, merrors.InvalidArgument("crypto send: gateway invoke uri is missing")
}
return byInstance, nil
case byGateway != nil:
if strings.TrimSpace(byGateway.InvokeURI) == "" {
return nil, merrors.InvalidArgument("crypto send: gateway invoke uri is missing")
}
return byGateway, nil
case stepGateway == "" && stepInstance == "" && cryptoCount == 1:
if strings.TrimSpace(single.InvokeURI) == "" {
return nil, merrors.InvalidArgument("crypto send: gateway invoke uri is missing")
}
return single, nil
default:
return nil, merrors.InvalidArgument("crypto send: gateway instance not found")
}
}
func sourceManagedWalletRef(payment *agg.Payment) (string, error) {
if payment == nil {
return "", merrors.InvalidArgument("crypto send: payment is required")
}
if payment.IntentSnapshot.Source.Type != model.EndpointTypeManagedWallet || payment.IntentSnapshot.Source.ManagedWallet == nil {
return "", merrors.InvalidArgument("crypto send: managed wallet source is required")
}
ref := strings.TrimSpace(payment.IntentSnapshot.Source.ManagedWallet.ManagedWalletRef)
if ref == "" {
return "", merrors.InvalidArgument("crypto send: source managed wallet ref is required")
}
return ref, nil
}
func sourceAmount(payment *agg.Payment) (*moneyv1.Money, error) {
if payment == nil {
return nil, merrors.InvalidArgument("crypto send: payment is required")
}
money := effectiveSourceAmount(payment)
if money == nil {
return nil, merrors.InvalidArgument("crypto send: source amount is required")
}
amount := strings.TrimSpace(money.Amount)
currency := strings.TrimSpace(money.Currency)
if amount == "" || currency == "" {
return nil, merrors.InvalidArgument("crypto send: source amount is invalid")
}
return &moneyv1.Money{
Amount: amount,
Currency: currency,
}, nil
}
func effectiveSourceAmount(payment *agg.Payment) *paymenttypes.Money {
if payment == nil {
return nil
}
if payment.QuoteSnapshot != nil && payment.QuoteSnapshot.DebitAmount != nil {
return payment.QuoteSnapshot.DebitAmount
}
return payment.IntentSnapshot.Amount
}
func (e *gatewayCryptoExecutor) resolveDestination(payment *agg.Payment, action model.RailOperation) (*chainv1.TransferDestination, error) {
if payment == nil {
return nil, merrors.InvalidArgument("crypto send: payment is required")
}
destination := payment.IntentSnapshot.Destination
switch destination.Type {
case model.EndpointTypeManagedWallet:
if destination.ManagedWallet == nil || strings.TrimSpace(destination.ManagedWallet.ManagedWalletRef) == "" {
return nil, merrors.InvalidArgument("crypto send: destination managed wallet ref is required")
}
return &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{
ManagedWalletRef: strings.TrimSpace(destination.ManagedWallet.ManagedWalletRef),
},
}, nil
case model.EndpointTypeExternalChain:
if destination.ExternalChain == nil || strings.TrimSpace(destination.ExternalChain.Address) == "" {
return nil, merrors.InvalidArgument("crypto send: destination external address is required")
}
return &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ExternalAddress{
ExternalAddress: strings.TrimSpace(destination.ExternalChain.Address),
},
Memo: strings.TrimSpace(destination.ExternalChain.Memo),
}, nil
case model.EndpointTypeCard:
address, err := e.resolveCardFundingAddress(payment, action)
if err != nil {
return nil, err
}
return &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ExternalAddress{
ExternalAddress: address,
},
}, nil
default:
return nil, merrors.InvalidArgument("crypto send: unsupported destination type")
}
}
func (e *gatewayCryptoExecutor) resolveCardFundingAddress(payment *agg.Payment, action model.RailOperation) (string, error) {
if payment == nil {
return "", merrors.InvalidArgument("crypto send: payment is required")
}
gatewayKey := destinationCardGatewayKey(payment)
if gatewayKey == "" {
return "", merrors.InvalidArgument("crypto send: destination card gateway is required")
}
route, ok := lookupCardGatewayRoute(e.cardGatewayRoutes, gatewayKey)
if !ok {
return "", merrors.InvalidArgument("crypto send: card gateway route is not configured")
}
switch action {
case model.RailOperationFee:
if feeAddress := strings.TrimSpace(route.FeeAddress); feeAddress != "" {
return feeAddress, nil
}
}
address := strings.TrimSpace(route.FundingAddress)
if address == "" {
return "", merrors.InvalidArgument("crypto send: card gateway funding address is required")
}
return address, nil
}
func destinationCardGatewayKey(payment *agg.Payment) string {
if payment == nil || payment.QuoteSnapshot == nil || payment.QuoteSnapshot.Route == nil {
return ""
}
hops := payment.QuoteSnapshot.Route.Hops
fallback := ""
for i := range hops {
hop := hops[i]
if hop == nil || model.ParseRail(hop.Rail) != model.RailCardPayout {
continue
}
key := firstNonEmpty(strings.TrimSpace(hop.Gateway), strings.TrimSpace(hop.InstanceID))
if key == "" {
continue
}
if hop.Role != paymenttypes.QuoteRouteHopRoleDestination {
fallback = key
continue
}
return key
}
return fallback
}
func lookupCardGatewayRoute(routes map[string]CardGatewayRoute, key string) (CardGatewayRoute, bool) {
if len(routes) == 0 {
return CardGatewayRoute{}, false
}
normalized := strings.TrimSpace(strings.ToLower(key))
if normalized == "" {
return CardGatewayRoute{}, false
}
route, ok := routes[normalized]
return route, ok
}
func transferExternalRefs(transfer *chainv1.Transfer, gatewayInstanceID string) ([]agg.ExternalRef, error) {
if transfer == nil {
return nil, merrors.InvalidArgument("crypto send: transfer is required")
}
refs := make([]agg.ExternalRef, 0, 2)
if operationRef := strings.TrimSpace(transfer.GetOperationRef()); operationRef != "" {
refs = append(refs, agg.ExternalRef{
GatewayInstanceID: strings.TrimSpace(gatewayInstanceID),
Kind: erecon.ExternalRefKindOperation,
Ref: operationRef,
})
}
if transferRef := strings.TrimSpace(transfer.GetTransferRef()); transferRef != "" {
refs = append(refs, agg.ExternalRef{
GatewayInstanceID: strings.TrimSpace(gatewayInstanceID),
Kind: erecon.ExternalRefKindTransfer,
Ref: transferRef,
})
}
if len(refs) == 0 {
return nil, merrors.Internal("crypto send: transfer response does not contain references")
}
return refs, nil
}
func transferMetadata(step xplan.Step) map[string]string {
items := map[string]string{
"step_ref": strings.TrimSpace(step.StepRef),
"step_code": strings.TrimSpace(step.StepCode),
"gateway": strings.TrimSpace(step.Gateway),
"rail": strings.TrimSpace(string(step.Rail)),
"action": strings.TrimSpace(string(step.Action)),
}
out := map[string]string{}
for key, value := range items {
if value == "" {
continue
}
out[key] = value
}
if len(out) == 0 {
return nil
}
return out
}
func firstNonEmpty(values ...string) string {
for i := range values {
if cleaned := strings.TrimSpace(values[i]); cleaned != "" {
return cleaned
}
}
return ""
}
var _ sexec.CryptoExecutor = (*gatewayCryptoExecutor)(nil)

View File

@@ -0,0 +1,221 @@
package orchestrator
import (
"context"
"testing"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
pm "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"go.mongodb.org/mongo-driver/v2/bson"
)
func TestGatewayCryptoExecutor_ExecuteCrypto_SubmitsTransfer(t *testing.T) {
orgID := bson.NewObjectID()
var submitReq *chainv1.SubmitTransferRequest
client := &chainclient.Fake{
SubmitTransferFn: func(_ context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
submitReq = req
return &chainv1.SubmitTransferResponse{
Transfer: &chainv1.Transfer{
TransferRef: "trf-1",
OperationRef: "op-1",
},
}, nil
},
}
resolver := &fakeGatewayInvokeResolver{client: client}
registry := &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: "crypto_rail_gateway_arbitrum_sepolia",
InstanceID: "crypto_rail_gateway_arbitrum_sepolia",
Rail: model.RailCrypto,
InvokeURI: "grpc://crypto-gateway",
IsEnabled: true,
},
},
}
executor := &gatewayCryptoExecutor{
gatewayInvokeResolver: resolver,
gatewayRegistry: registry,
cardGatewayRoutes: map[string]CardGatewayRoute{
"monetix": {FundingAddress: "TUA_DEST"},
},
}
req := sexec.StepRequest{
Payment: &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "payment-1",
IdempotencyKey: "idem-1",
IntentSnapshot: model.PaymentIntent{
Ref: "intent-1",
Source: model.PaymentEndpoint{
Type: model.EndpointTypeManagedWallet,
ManagedWallet: &model.ManagedWalletEndpoint{
ManagedWalletRef: "wallet-src",
},
},
Destination: model.PaymentEndpoint{
Type: model.EndpointTypeCard,
Card: &model.CardEndpoint{Pan: "4111111111111111"},
},
Amount: &paymenttypes.Money{Amount: "1", Currency: "USDT"},
},
QuoteSnapshot: &model.PaymentQuoteSnapshot{
DebitAmount: &paymenttypes.Money{Amount: "1.000000", Currency: "USDT"},
Route: &paymenttypes.QuoteRouteSpecification{
Hops: []*paymenttypes.QuoteRouteHop{
{Index: 1, Rail: "CRYPTO", Gateway: "crypto_rail_gateway_arbitrum_sepolia", InstanceID: "crypto_rail_gateway_arbitrum_sepolia", Role: paymenttypes.QuoteRouteHopRoleSource},
{Index: 4, Rail: "CARD", Gateway: "monetix", InstanceID: "monetix", Role: paymenttypes.QuoteRouteHopRoleDestination},
},
},
},
},
Step: xplan.Step{
StepRef: "hop_1_crypto_send",
StepCode: "hop.1.crypto.send",
Action: model.RailOperationSend,
Rail: model.RailCrypto,
Gateway: "crypto_rail_gateway_arbitrum_sepolia",
InstanceID: "crypto_rail_gateway_arbitrum_sepolia",
},
StepExecution: agg.StepExecution{
StepRef: "hop_1_crypto_send",
StepCode: "hop.1.crypto.send",
Attempt: 1,
},
}
out, err := executor.ExecuteCrypto(context.Background(), req)
if err != nil {
t.Fatalf("ExecuteCrypto returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if out.StepExecution.State != agg.StepStateCompleted {
t.Fatalf("expected completed state, got=%q", out.StepExecution.State)
}
if submitReq == nil {
t.Fatal("expected transfer submission request")
}
if got, want := submitReq.GetSourceWalletRef(), "wallet-src"; got != want {
t.Fatalf("source wallet mismatch: got=%q want=%q", got, want)
}
if got, want := submitReq.GetAmount().GetAmount(), "1.000000"; got != want {
t.Fatalf("amount mismatch: got=%q want=%q", got, want)
}
if got, want := submitReq.GetDestination().GetExternalAddress(), "TUA_DEST"; got != want {
t.Fatalf("destination mismatch: got=%q want=%q", got, want)
}
if got, want := resolver.lastInvokeURI, "grpc://crypto-gateway"; got != want {
t.Fatalf("invoke uri mismatch: got=%q want=%q", got, want)
}
if len(out.StepExecution.ExternalRefs) != 2 {
t.Fatalf("expected two external refs, got=%d", len(out.StepExecution.ExternalRefs))
}
if out.StepExecution.ExternalRefs[0].Kind != erecon.ExternalRefKindOperation {
t.Fatalf("unexpected first external ref kind: %q", out.StepExecution.ExternalRefs[0].Kind)
}
}
func TestGatewayCryptoExecutor_ExecuteCrypto_MissingCardRoute(t *testing.T) {
orgID := bson.NewObjectID()
executor := &gatewayCryptoExecutor{
gatewayInvokeResolver: &fakeGatewayInvokeResolver{
client: &chainclient.Fake{},
},
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: "crypto_1",
InstanceID: "crypto_1",
Rail: model.RailCrypto,
InvokeURI: "grpc://crypto-gateway",
IsEnabled: true,
},
},
},
cardGatewayRoutes: map[string]CardGatewayRoute{},
}
_, err := executor.ExecuteCrypto(context.Background(), sexec.StepRequest{
Payment: &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "payment-2",
IdempotencyKey: "idem-2",
IntentSnapshot: model.PaymentIntent{
Source: model.PaymentEndpoint{
Type: model.EndpointTypeManagedWallet,
ManagedWallet: &model.ManagedWalletEndpoint{
ManagedWalletRef: "wallet-src",
},
},
Destination: model.PaymentEndpoint{
Type: model.EndpointTypeCard,
Card: &model.CardEndpoint{Pan: "4111111111111111"},
},
Amount: &paymenttypes.Money{Amount: "1", Currency: "USDT"},
},
QuoteSnapshot: &model.PaymentQuoteSnapshot{
Route: &paymenttypes.QuoteRouteSpecification{
Hops: []*paymenttypes.QuoteRouteHop{
{Index: 1, Rail: "CRYPTO", Gateway: "crypto_1", InstanceID: "crypto_1", Role: paymenttypes.QuoteRouteHopRoleSource},
{Index: 4, Rail: "CARD", Gateway: "monetix", InstanceID: "monetix", Role: paymenttypes.QuoteRouteHopRoleDestination},
},
},
},
},
Step: xplan.Step{
StepRef: "hop_1_crypto_send",
StepCode: "hop.1.crypto.send",
Action: model.RailOperationSend,
Rail: model.RailCrypto,
Gateway: "crypto_1",
InstanceID: "crypto_1",
},
StepExecution: agg.StepExecution{
StepRef: "hop_1_crypto_send",
StepCode: "hop.1.crypto.send",
Attempt: 1,
},
})
if err == nil {
t.Fatal("expected error for missing card route")
}
}
type fakeGatewayInvokeResolver struct {
lastInvokeURI string
client chainclient.Client
err error
}
func (f *fakeGatewayInvokeResolver) Resolve(_ context.Context, invokeURI string) (chainclient.Client, error) {
f.lastInvokeURI = invokeURI
if f.err != nil {
return nil, f.err
}
return f.client, nil
}
type fakeGatewayRegistry struct {
items []*model.GatewayInstanceDescriptor
err error
}
func (f *fakeGatewayRegistry) List(_ context.Context) ([]*model.GatewayInstanceDescriptor, error) {
if f.err != nil {
return nil, f.err
}
return f.items, nil
}

View File

@@ -0,0 +1,508 @@
package orchestrator
import (
"context"
"errors"
"strings"
"time"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
cons "github.com/tech/sendico/pkg/messaging/consumer"
paymentgatewaynotifications "github.com/tech/sendico/pkg/messaging/notifications/paymentgateway"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/payments/rail"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"go.uber.org/zap"
)
const (
observePollInterval = 5 * time.Second
observePollPageLimit = int32(100)
observePollMaxPerTick = 200
observeStepCodeToken = ".observe"
failureCodeGatewayExecutionFailed = "gateway.execution_failed"
failureCodeGatewayExecutionCancel = "gateway.execution_cancelled"
failureCodeGatewayTransferFailed = "gateway.transfer_failed"
failureCodeGatewayTransferCanceled = "gateway.transfer_cancelled"
)
type runningObserveCandidate struct {
stepRef string
transferRef string
operationRef string
gatewayInstanceID string
}
func (s *Service) startExternalRuntime() {
if s == nil || s.v2 == nil || s.paymentRepo == nil {
return
}
runCtx, cancel := context.WithCancel(context.Background())
started := false
if s.paymentGatewayBroker != nil {
processor := paymentgatewaynotifications.NewPaymentGatewayExecutionProcessor(s.logger, s.onPaymentGatewayExecution)
consumer, err := cons.NewConsumer(s.logger, s.paymentGatewayBroker, processor.GetSubject())
if err != nil {
s.logger.Warn("Failed to start payment gateway execution consumer", zap.Error(err))
} else {
s.gatewayConsumers = append(s.gatewayConsumers, consumer)
go func() {
if err := consumer.ConsumeMessages(processor.Process); err != nil && !errors.Is(err, context.Canceled) {
s.logger.Warn("Payment gateway execution consumer stopped", zap.Error(err))
}
}()
started = true
}
}
if s.gatewayInvokeResolver != nil && s.gatewayRegistry != nil {
go s.observePollLoop(runCtx)
started = true
} else {
s.logger.Warn("Observe polling fallback disabled: gateway resolver or registry is missing")
}
if started {
s.stopExternalWorkers = cancel
return
}
cancel()
}
func (s *Service) onPaymentGatewayExecution(ctx context.Context, msg *pmodel.PaymentGatewayExecution) error {
if s == nil || s.v2 == nil || s.paymentRepo == nil || msg == nil {
return nil
}
paymentRef := strings.TrimSpace(msg.PaymentRef)
if paymentRef == "" {
s.logger.Debug("Skipping payment gateway execution event without payment_ref")
return nil
}
payment, err := s.paymentRepo.GetByPaymentRefGlobal(ctx, paymentRef)
if err != nil {
if errors.Is(err, prepo.ErrPaymentNotFound) {
s.logger.Debug("Skipping payment gateway execution event for unknown payment",
zap.String("payment_ref", paymentRef),
)
return nil
}
return err
}
event, ok := buildGatewayExecutionEvent(payment, msg)
if !ok {
s.logger.Debug("Skipping payment gateway execution event with unsupported status",
zap.String("payment_ref", paymentRef),
zap.String("status", strings.TrimSpace(string(msg.Status))),
)
return nil
}
s.logger.Debug("Reconciling payment from gateway execution event",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("organization_ref", payment.OrganizationRef.Hex()),
zap.String("step_ref", strings.TrimSpace(event.StepRef)),
zap.String("status", strings.TrimSpace(string(event.Status))),
zap.String("transfer_ref", strings.TrimSpace(event.TransferRef)),
zap.String("operation_ref", strings.TrimSpace(event.OperationRef)),
)
_, err = s.v2.ReconcileExternal(ctx, psvc.ReconcileExternalInput{
OrganizationRef: payment.OrganizationRef.Hex(),
PaymentRef: payment.PaymentRef,
Event: erecon.Event{Gateway: event},
})
return err
}
func buildGatewayExecutionEvent(payment *agg.Payment, msg *pmodel.PaymentGatewayExecution) (*erecon.GatewayEvent, bool) {
if payment == nil || msg == nil {
return nil, false
}
status, ok := mapGatewayExecutionStatus(msg.Status)
if !ok {
return nil, false
}
stepRef, gatewayInstanceID := matchExecutionStep(payment, msg)
operationRef := strings.TrimSpace(msg.OperationRef)
transferRef := strings.TrimSpace(msg.TransferRef)
if stepRef == "" && operationRef == "" && transferRef == "" {
return nil, false
}
event := &erecon.GatewayEvent{
StepRef: stepRef,
OperationRef: operationRef,
TransferRef: transferRef,
GatewayInstanceID: gatewayInstanceID,
Status: status,
}
switch status {
case erecon.GatewayStatusFailed:
retryable := false
event.Retryable = &retryable
event.FailureCode = failureCodeGatewayExecutionFailed
event.FailureMsg = strings.TrimSpace(msg.Error)
case erecon.GatewayStatusCancelled:
retryable := false
event.Retryable = &retryable
event.FailureCode = failureCodeGatewayExecutionCancel
event.FailureMsg = strings.TrimSpace(msg.Error)
default:
event.FailureCode = ""
event.FailureMsg = ""
}
return event, true
}
func mapGatewayExecutionStatus(status rail.OperationResult) (erecon.GatewayStatus, bool) {
switch strings.ToLower(strings.TrimSpace(string(status))) {
case string(rail.OperationResultSuccess):
return erecon.GatewayStatusSuccess, true
case string(rail.OperationResultFailed):
return erecon.GatewayStatusFailed, true
case string(rail.OperationResultCancelled):
return erecon.GatewayStatusCancelled, true
default:
return erecon.GatewayStatusUnspecified, false
}
}
func matchExecutionStep(payment *agg.Payment, msg *pmodel.PaymentGatewayExecution) (stepRef string, gatewayInstanceID string) {
if payment == nil || msg == nil {
return "", ""
}
transferRef := strings.TrimSpace(msg.TransferRef)
if transferRef != "" {
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindTransfer, transferRef); ok {
return stepRef, gatewayInstanceID
}
}
operationRef := strings.TrimSpace(msg.OperationRef)
if operationRef != "" {
if stepRef, gatewayInstanceID, ok := findStepByExternalRef(payment, erecon.ExternalRefKindOperation, operationRef); ok {
return stepRef, gatewayInstanceID
}
}
candidates := runningObserveCandidates(payment)
if len(candidates) == 1 {
return candidates[0].stepRef, candidates[0].gatewayInstanceID
}
return "", ""
}
func findStepByExternalRef(payment *agg.Payment, kind, ref string) (stepRef string, gatewayInstanceID string, ok bool) {
if payment == nil {
return "", "", false
}
kind = strings.TrimSpace(kind)
ref = strings.TrimSpace(ref)
if kind == "" || ref == "" {
return "", "", false
}
type match struct {
stepRef string
state agg.StepState
gatewayInstanceID string
}
var matches []match
for i := range payment.StepExecutions {
step := payment.StepExecutions[i]
for j := range step.ExternalRefs {
externalRef := step.ExternalRefs[j]
if !strings.EqualFold(strings.TrimSpace(externalRef.Kind), kind) {
continue
}
if !strings.EqualFold(strings.TrimSpace(externalRef.Ref), ref) {
continue
}
matches = append(matches, match{
stepRef: strings.TrimSpace(step.StepRef),
state: step.State,
gatewayInstanceID: strings.TrimSpace(externalRef.GatewayInstanceID),
})
break
}
}
if len(matches) == 0 {
return "", "", false
}
for i := range matches {
if matches[i].state == agg.StepStateRunning {
return matches[i].stepRef, matches[i].gatewayInstanceID, true
}
}
return matches[0].stepRef, matches[0].gatewayInstanceID, true
}
func (s *Service) observePollLoop(ctx context.Context) {
ticker := time.NewTicker(observePollInterval)
defer ticker.Stop()
s.logger.Info("Started observe polling fallback",
zap.Duration("interval", observePollInterval),
zap.Int32("page_limit", observePollPageLimit),
zap.Int("max_per_tick", observePollMaxPerTick),
)
defer s.logger.Info("Stopped observe polling fallback")
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := s.pollObserveCandidates(ctx); err != nil && !errors.Is(err, context.Canceled) {
s.logger.Warn("Observe polling fallback tick failed", zap.Error(err))
}
}
}
}
func (s *Service) pollObserveCandidates(ctx context.Context) error {
if s == nil || s.paymentRepo == nil {
return nil
}
cursor := (*prepo.ListCursor)(nil)
processed := 0
for processed < observePollMaxPerTick {
page, err := s.paymentRepo.ListByStateGlobal(ctx, prepo.ListByStateGlobalInput{
State: agg.StateExecuting,
Limit: observePollPageLimit,
Cursor: cursor,
})
if err != nil {
return err
}
if page == nil || len(page.Items) == 0 {
return nil
}
for i := range page.Items {
payment := page.Items[i]
candidates := runningObserveCandidates(payment)
for j := range candidates {
if processed >= observePollMaxPerTick {
return nil
}
processed++
s.pollObserveCandidate(ctx, payment, candidates[j])
}
}
if page.NextCursor == nil {
return nil
}
cursor = page.NextCursor
}
return nil
}
func runningObserveCandidates(payment *agg.Payment) []runningObserveCandidate {
if payment == nil || len(payment.StepExecutions) == 0 {
return nil
}
out := make([]runningObserveCandidate, 0, len(payment.StepExecutions))
for i := range payment.StepExecutions {
step := payment.StepExecutions[i]
if step.State != agg.StepStateRunning {
continue
}
if !isObserveStepCode(step.StepCode) {
continue
}
candidate, ok := buildObserveCandidate(step)
if !ok {
continue
}
out = append(out, candidate)
}
return out
}
func isObserveStepCode(stepCode string) bool {
code := strings.ToLower(strings.TrimSpace(stepCode))
return strings.Contains(code, observeStepCodeToken)
}
func buildObserveCandidate(step agg.StepExecution) (runningObserveCandidate, bool) {
candidate := runningObserveCandidate{
stepRef: strings.TrimSpace(step.StepRef),
}
for i := range step.ExternalRefs {
ref := step.ExternalRefs[i]
kind := strings.TrimSpace(ref.Kind)
value := strings.TrimSpace(ref.Ref)
if kind == "" || value == "" {
continue
}
switch {
case strings.EqualFold(kind, erecon.ExternalRefKindTransfer):
if candidate.transferRef == "" {
candidate.transferRef = value
candidate.gatewayInstanceID = strings.TrimSpace(ref.GatewayInstanceID)
}
case strings.EqualFold(kind, erecon.ExternalRefKindOperation):
if candidate.operationRef == "" {
candidate.operationRef = value
}
}
}
if candidate.stepRef == "" || candidate.transferRef == "" {
return runningObserveCandidate{}, false
}
return candidate, true
}
func (s *Service) pollObserveCandidate(ctx context.Context, payment *agg.Payment, candidate runningObserveCandidate) {
if s == nil || payment == nil || s.v2 == nil {
return
}
gateway, err := s.resolveObserveGateway(ctx, payment, candidate)
if err != nil {
s.logger.Debug("Observe polling skipped: gateway resolution failed",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", candidate.stepRef),
zap.String("transfer_ref", candidate.transferRef),
zap.Error(err),
)
return
}
client, err := s.gatewayInvokeResolver.Resolve(ctx, strings.TrimSpace(gateway.InvokeURI))
if err != nil {
s.logger.Warn("Observe polling failed to resolve gateway client",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", candidate.stepRef),
zap.String("gateway_instance_id", strings.TrimSpace(gateway.InstanceID)),
zap.Error(err),
)
return
}
transferResp, err := client.GetTransfer(ctx, &chainv1.GetTransferRequest{TransferRef: candidate.transferRef})
if err != nil {
s.logger.Warn("Observe polling transfer status call failed",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", candidate.stepRef),
zap.String("transfer_ref", candidate.transferRef),
zap.String("gateway_instance_id", strings.TrimSpace(gateway.InstanceID)),
zap.Error(err),
)
return
}
transfer := transferResp.GetTransfer()
if transfer == nil {
s.logger.Warn("Observe polling transfer status response is empty",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", candidate.stepRef),
zap.String("transfer_ref", candidate.transferRef),
)
return
}
status, terminal, ok := mapTransferStatus(transfer.GetStatus())
if !ok || !terminal {
return
}
event := erecon.GatewayEvent{
StepRef: candidate.stepRef,
OperationRef: firstNonEmpty(strings.TrimSpace(transfer.GetOperationRef()), candidate.operationRef),
TransferRef: strings.TrimSpace(candidate.transferRef),
GatewayInstanceID: firstNonEmpty(candidate.gatewayInstanceID, strings.TrimSpace(gateway.InstanceID), strings.TrimSpace(gateway.ID)),
Status: status,
}
switch status {
case erecon.GatewayStatusFailed:
retryable := false
event.Retryable = &retryable
event.FailureCode = failureCodeGatewayTransferFailed
event.FailureMsg = strings.TrimSpace(transfer.GetFailureReason())
case erecon.GatewayStatusCancelled:
retryable := false
event.Retryable = &retryable
event.FailureCode = failureCodeGatewayTransferCanceled
event.FailureMsg = strings.TrimSpace(transfer.GetFailureReason())
}
s.logger.Debug("Reconciling payment from observe polling result",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("organization_ref", payment.OrganizationRef.Hex()),
zap.String("step_ref", candidate.stepRef),
zap.String("status", strings.TrimSpace(string(event.Status))),
zap.String("transfer_ref", candidate.transferRef),
zap.String("operation_ref", strings.TrimSpace(event.OperationRef)),
zap.String("gateway_instance_id", strings.TrimSpace(event.GatewayInstanceID)),
)
_, err = s.v2.ReconcileExternal(ctx, psvc.ReconcileExternalInput{
OrganizationRef: payment.OrganizationRef.Hex(),
PaymentRef: strings.TrimSpace(payment.PaymentRef),
Event: erecon.Event{Gateway: &event},
})
if err != nil {
s.logger.Warn("Observe polling reconciliation failed",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", candidate.stepRef),
zap.String("transfer_ref", candidate.transferRef),
zap.Error(err),
)
}
}
func (s *Service) resolveObserveGateway(ctx context.Context, payment *agg.Payment, candidate runningObserveCandidate) (*model.GatewayInstanceDescriptor, error) {
executor := gatewayCryptoExecutor{
gatewayRegistry: s.gatewayRegistry,
}
step := xplan.Step{
Rail: model.RailCrypto,
}
if gatewayID := strings.TrimSpace(candidate.gatewayInstanceID); gatewayID != "" {
step.InstanceID = gatewayID
step.Gateway = gatewayID
} else if gateway, instanceID, ok := sourceCryptoHop(payment); ok {
step.Gateway = strings.TrimSpace(gateway)
step.InstanceID = strings.TrimSpace(instanceID)
}
return executor.resolveGateway(ctx, step)
}
func mapTransferStatus(status chainv1.TransferStatus) (gatewayStatus erecon.GatewayStatus, terminal bool, ok bool) {
switch status {
case chainv1.TransferStatus_TRANSFER_CREATED:
return erecon.GatewayStatusCreated, false, true
case chainv1.TransferStatus_TRANSFER_PROCESSING:
return erecon.GatewayStatusProcessing, false, true
case chainv1.TransferStatus_TRANSFER_WAITING:
return erecon.GatewayStatusWaiting, false, true
case chainv1.TransferStatus_TRANSFER_SUCCESS:
return erecon.GatewayStatusSuccess, true, true
case chainv1.TransferStatus_TRANSFER_FAILED:
return erecon.GatewayStatusFailed, true, true
case chainv1.TransferStatus_TRANSFER_CANCELLED:
return erecon.GatewayStatusCancelled, true, true
default:
return erecon.GatewayStatusUnspecified, false, false
}
}

View File

@@ -0,0 +1,275 @@
package orchestrator
import (
"context"
"errors"
"testing"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
pm "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/payments/rail"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
)
func TestBuildGatewayExecutionEvent_MapsStatusAndMatchedStep(t *testing.T) {
orgID := bson.NewObjectID()
payment := &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "payment-1",
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_1_crypto_observe",
StepCode: "hop.1.crypto.observe",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{
GatewayInstanceID: "crypto_rail_gateway_tron_nile",
Kind: erecon.ExternalRefKindTransfer,
Ref: "trf-1",
},
},
},
},
}
event, ok := buildGatewayExecutionEvent(payment, &pm.PaymentGatewayExecution{
PaymentRef: payment.PaymentRef,
Status: rail.OperationResultSuccess,
TransferRef: "trf-1",
})
if !ok {
t.Fatal("expected gateway execution event to be accepted")
}
if got, want := event.StepRef, "hop_1_crypto_observe"; got != want {
t.Fatalf("step_ref mismatch: got=%q want=%q", got, want)
}
if got, want := event.GatewayInstanceID, "crypto_rail_gateway_tron_nile"; got != want {
t.Fatalf("gateway_instance_id mismatch: got=%q want=%q", got, want)
}
if got, want := event.Status, erecon.GatewayStatusSuccess; got != want {
t.Fatalf("status mismatch: got=%q want=%q", got, want)
}
}
func TestBuildGatewayExecutionEvent_FailedSetsTerminalNeedsAttentionHint(t *testing.T) {
orgID := bson.NewObjectID()
payment := &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "payment-2",
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_1_crypto_observe",
StepCode: "hop.1.crypto.observe",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{Kind: erecon.ExternalRefKindTransfer, Ref: "trf-2"},
},
},
},
}
event, ok := buildGatewayExecutionEvent(payment, &pm.PaymentGatewayExecution{
PaymentRef: payment.PaymentRef,
Status: rail.OperationResultFailed,
TransferRef: "trf-2",
Error: "insufficient funds",
})
if !ok {
t.Fatal("expected failed gateway execution event to be accepted")
}
if event.Retryable == nil || *event.Retryable {
t.Fatal("expected retryable=false for failed gateway execution")
}
if got, want := event.FailureCode, "gateway.execution_failed"; got != want {
t.Fatalf("failure_code mismatch: got=%q want=%q", got, want)
}
if got, want := event.FailureMsg, "insufficient funds"; got != want {
t.Fatalf("failure_msg mismatch: got=%q want=%q", got, want)
}
}
func TestOnPaymentGatewayExecution_ReconcilesUsingGlobalPaymentLookup(t *testing.T) {
orgID := bson.NewObjectID()
payment := &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "payment-3",
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_1_crypto_observe",
StepCode: "hop.1.crypto.observe",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{Kind: erecon.ExternalRefKindTransfer, Ref: "trf-3"},
},
},
},
}
repo := &fakeExternalRuntimeRepo{payment: payment}
v2 := &fakeExternalRuntimeV2{}
svc := &Service{
v2: v2,
paymentRepo: repo,
logger: zap.NewNop(),
}
err := svc.onPaymentGatewayExecution(context.Background(), &pm.PaymentGatewayExecution{
PaymentRef: payment.PaymentRef,
Status: rail.OperationResultSuccess,
TransferRef: "trf-3",
})
if err != nil {
t.Fatalf("onPaymentGatewayExecution returned error: %v", err)
}
if v2.reconcileInput == nil {
t.Fatal("expected reconcile call")
}
if got, want := v2.reconcileInput.OrganizationRef, orgID.Hex(); got != want {
t.Fatalf("organization_ref mismatch: got=%q want=%q", got, want)
}
if got, want := v2.reconcileInput.PaymentRef, payment.PaymentRef; got != want {
t.Fatalf("payment_ref mismatch: got=%q want=%q", got, want)
}
if v2.reconcileInput.Event.Gateway == nil || v2.reconcileInput.Event.Gateway.Status != erecon.GatewayStatusSuccess {
t.Fatal("expected success gateway reconcile event")
}
}
type fakeExternalRuntimeRepo struct {
payment *agg.Payment
err error
}
func (f *fakeExternalRuntimeRepo) Create(context.Context, *agg.Payment) error { return nil }
func (f *fakeExternalRuntimeRepo) UpdateCAS(context.Context, *agg.Payment, uint64) error { return nil }
func (f *fakeExternalRuntimeRepo) GetByPaymentRef(_ context.Context, _ bson.ObjectID, _ string) (*agg.Payment, error) {
return nil, prepo.ErrPaymentNotFound
}
func (f *fakeExternalRuntimeRepo) GetByPaymentRefGlobal(_ context.Context, paymentRef string) (*agg.Payment, error) {
if f.err != nil {
return nil, f.err
}
if f.payment == nil || f.payment.PaymentRef != paymentRef {
return nil, prepo.ErrPaymentNotFound
}
return f.payment, nil
}
func (f *fakeExternalRuntimeRepo) GetByIdempotencyKey(context.Context, bson.ObjectID, string) (*agg.Payment, error) {
return nil, prepo.ErrPaymentNotFound
}
func (f *fakeExternalRuntimeRepo) ListByQuotationRef(context.Context, prepo.ListByQuotationRefInput) (*prepo.ListOutput, error) {
return &prepo.ListOutput{}, nil
}
func (f *fakeExternalRuntimeRepo) ListByState(context.Context, prepo.ListByStateInput) (*prepo.ListOutput, error) {
return &prepo.ListOutput{}, nil
}
func (f *fakeExternalRuntimeRepo) ListByStateGlobal(context.Context, prepo.ListByStateGlobalInput) (*prepo.ListOutput, error) {
return &prepo.ListOutput{}, nil
}
type fakeExternalRuntimeV2 struct {
reconcileInput *psvc.ReconcileExternalInput
}
func (f *fakeExternalRuntimeV2) ExecutePayment(context.Context, *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) {
return nil, errors.New("not implemented")
}
func (f *fakeExternalRuntimeV2) GetPayment(context.Context, *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) {
return nil, errors.New("not implemented")
}
func (f *fakeExternalRuntimeV2) ListPayments(context.Context, *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) {
return nil, errors.New("not implemented")
}
func (f *fakeExternalRuntimeV2) ReconcileExternal(_ context.Context, in psvc.ReconcileExternalInput) (*psvc.ReconcileExternalOutput, error) {
cloned := in
f.reconcileInput = &cloned
return &psvc.ReconcileExternalOutput{
Payment: &orchestrationv2.Payment{
PaymentRef: in.PaymentRef,
State: orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_EXECUTING,
Version: 1,
},
}, nil
}
func TestMapTransferStatus(t *testing.T) {
cases := []struct {
status chainv1.TransferStatus
wantStatus erecon.GatewayStatus
wantTerminal bool
wantSupported bool
}{
{status: chainv1.TransferStatus_TRANSFER_CREATED, wantStatus: erecon.GatewayStatusCreated, wantTerminal: false, wantSupported: true},
{status: chainv1.TransferStatus_TRANSFER_PROCESSING, wantStatus: erecon.GatewayStatusProcessing, wantTerminal: false, wantSupported: true},
{status: chainv1.TransferStatus_TRANSFER_WAITING, wantStatus: erecon.GatewayStatusWaiting, wantTerminal: false, wantSupported: true},
{status: chainv1.TransferStatus_TRANSFER_SUCCESS, wantStatus: erecon.GatewayStatusSuccess, wantTerminal: true, wantSupported: true},
{status: chainv1.TransferStatus_TRANSFER_FAILED, wantStatus: erecon.GatewayStatusFailed, wantTerminal: true, wantSupported: true},
{status: chainv1.TransferStatus_TRANSFER_CANCELLED, wantStatus: erecon.GatewayStatusCancelled, wantTerminal: true, wantSupported: true},
}
for _, tc := range cases {
gotStatus, gotTerminal, gotSupported := mapTransferStatus(tc.status)
if gotStatus != tc.wantStatus || gotTerminal != tc.wantTerminal || gotSupported != tc.wantSupported {
t.Fatalf("status mapping mismatch: status=%v got=(%q,%v,%v) want=(%q,%v,%v)",
tc.status, gotStatus, gotTerminal, gotSupported, tc.wantStatus, tc.wantTerminal, tc.wantSupported)
}
}
}
func TestRunningObserveCandidates(t *testing.T) {
payment := &agg.Payment{
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_1_crypto_observe",
StepCode: "hop.1.crypto.observe",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{Kind: erecon.ExternalRefKindTransfer, Ref: "trf-running"},
},
},
{
StepRef: "hop_2_crypto_observe",
StepCode: "hop.2.crypto.observe",
State: agg.StepStateCompleted,
ExternalRefs: []agg.ExternalRef{
{Kind: erecon.ExternalRefKindTransfer, Ref: "trf-completed"},
},
},
{
StepRef: "hop_3_crypto_observe",
StepCode: "hop.3.crypto.observe",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{Kind: erecon.ExternalRefKindOperation, Ref: "op-only"},
},
},
},
}
candidates := runningObserveCandidates(payment)
if len(candidates) != 1 {
t.Fatalf("candidate count mismatch: got=%d want=1", len(candidates))
}
if got, want := candidates[0].transferRef, "trf-running"; got != want {
t.Fatalf("transfer_ref mismatch: got=%q want=%q", got, want)
}
}
var _ prepo.Repository = (*fakeExternalRuntimeRepo)(nil)
var _ psvc.Service = (*fakeExternalRuntimeV2)(nil)

View File

@@ -0,0 +1,285 @@
package orchestrator
import (
"context"
"fmt"
"strings"
"github.com/shopspring/decimal"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"go.uber.org/zap"
)
type gatewayGuardExecutor struct {
logger mlogger.Logger
gatewayInvokeResolver GatewayInvokeResolver
gatewayRegistry GatewayRegistry
}
func (e *gatewayGuardExecutor) ExecuteGuard(ctx context.Context, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
conditions := quoteExecutionConditionsForGuard(req.Payment)
switch xplan.GuardStepKind(req.Step) {
case xplan.StepKindLiquidityCheck:
base := executeLiquidityGuardReadiness(req.StepExecution, conditions)
if base.StepExecution.State != agg.StepStateCompleted {
return base, nil
}
failCode, failMsg := e.probeLiquidity(ctx, req)
if failCode != "" {
if e.logger != nil {
e.logger.Warn("Liquidity preflight probe failed",
zap.String("payment_ref", paymentRef(req.Payment)),
zap.String("step_ref", strings.TrimSpace(req.Step.StepRef)),
zap.String("failure_code", failCode),
zap.String("failure_message", failMsg),
)
}
return failedOutputForGuard(req.StepExecution, failCode, failMsg), nil
}
return completedOutputForGuard(req.StepExecution), nil
case xplan.StepKindPrefunding:
return executePrefundingGuardReadiness(req.StepExecution, conditions), nil
default:
return failedOutputForGuard(
req.StepExecution,
"guard.unsupported_step",
"unsupported guard step: step_code="+strings.TrimSpace(req.Step.StepCode),
), nil
}
}
func (e *gatewayGuardExecutor) probeLiquidity(ctx context.Context, req sexec.StepRequest) (string, string) {
payment := req.Payment
if payment == nil {
return "guard.payment_missing", "liquidity probe requires payment context"
}
hopGateway, hopInstanceID, hasCryptoSource := sourceCryptoHop(payment)
if !hasCryptoSource {
if e.logger != nil {
e.logger.Debug("Liquidity preflight probe skipped for non-crypto source",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", strings.TrimSpace(req.Step.StepRef)),
)
}
return "", ""
}
if e.gatewayInvokeResolver == nil || e.gatewayRegistry == nil {
return "guard.liquidity_probe_unavailable", "liquidity probe dependencies are not configured"
}
walletRef, err := sourceManagedWalletRef(payment)
if err != nil {
return "guard.liquidity_probe_unsupported_source", err.Error()
}
requiredAmount, requiredCurrency, err := requiredLiquidity(payment)
if err != nil {
return "guard.liquidity_probe_error", err.Error()
}
resolver := gatewayCryptoExecutor{
gatewayRegistry: e.gatewayRegistry,
}
gateway, err := resolver.resolveGateway(ctx, xplan.Step{
Gateway: hopGateway,
InstanceID: hopInstanceID,
Rail: model.RailCrypto,
})
if err != nil {
return "guard.liquidity_probe_error", err.Error()
}
client, err := e.gatewayInvokeResolver.Resolve(ctx, strings.TrimSpace(gateway.InvokeURI))
if err != nil {
return "guard.liquidity_probe_error", err.Error()
}
balanceResp, err := client.GetWalletBalance(ctx, &chainv1.GetWalletBalanceRequest{WalletRef: walletRef})
if err != nil {
return "guard.liquidity_probe_error", err.Error()
}
if balanceResp == nil || balanceResp.GetBalance() == nil || balanceResp.GetBalance().GetAvailable() == nil {
return "guard.liquidity_probe_error", "wallet balance is missing from gateway response"
}
available := balanceResp.GetBalance().GetAvailable()
availableCurrency := strings.ToUpper(strings.TrimSpace(available.GetCurrency()))
availableAmount := strings.TrimSpace(available.GetAmount())
if availableCurrency == "" || availableAmount == "" {
return "guard.liquidity_probe_error", "wallet available balance is incomplete"
}
if !strings.EqualFold(availableCurrency, requiredCurrency) {
return "guard.liquidity_probe_currency_mismatch", fmt.Sprintf(
"wallet balance currency mismatch: available=%s required=%s",
availableCurrency,
requiredCurrency,
)
}
availableDec, err := decimal.NewFromString(availableAmount)
if err != nil {
return "guard.liquidity_probe_error", "wallet available amount is invalid"
}
requiredDec, err := decimal.NewFromString(requiredAmount)
if err != nil {
return "guard.liquidity_probe_error", "required liquidity amount is invalid"
}
if availableDec.Cmp(requiredDec) < 0 {
return "guard.liquidity_insufficient", fmt.Sprintf(
"insufficient liquidity: available=%s required=%s currency=%s wallet_ref=%s",
availableAmount,
requiredAmount,
requiredCurrency,
walletRef,
)
}
if e.logger != nil {
e.logger.Info("Liquidity preflight probe passed",
zap.String("payment_ref", strings.TrimSpace(payment.PaymentRef)),
zap.String("step_ref", strings.TrimSpace(req.Step.StepRef)),
zap.String("wallet_ref", walletRef),
zap.String("currency", requiredCurrency),
zap.String("required_amount", requiredAmount),
zap.String("available_amount", availableAmount),
zap.String("gateway_id", strings.TrimSpace(gateway.ID)),
zap.String("gateway_instance_id", strings.TrimSpace(gateway.InstanceID)),
)
}
return "", ""
}
func sourceCryptoHop(payment *agg.Payment) (gateway string, instanceID string, ok bool) {
if payment == nil || payment.QuoteSnapshot == nil || payment.QuoteSnapshot.Route == nil {
return "", "", false
}
hops := payment.QuoteSnapshot.Route.Hops
fallbackGateway := ""
fallbackInstance := ""
for i := range hops {
hop := hops[i]
if hop == nil || model.ParseRail(hop.Rail) != model.RailCrypto {
continue
}
gw := strings.TrimSpace(hop.Gateway)
inst := strings.TrimSpace(hop.InstanceID)
if gw == "" && inst == "" {
continue
}
if fallbackGateway == "" && fallbackInstance == "" {
fallbackGateway = gw
fallbackInstance = inst
}
if hop.Role == paymenttypes.QuoteRouteHopRoleSource {
return gw, inst, true
}
}
if fallbackGateway != "" || fallbackInstance != "" {
return fallbackGateway, fallbackInstance, true
}
return "", "", false
}
func requiredLiquidity(payment *agg.Payment) (amount string, currency string, err error) {
if payment == nil {
return "", "", merrors.InvalidArgument("payment is required")
}
base := effectiveSourceAmount(payment)
if base == nil {
return "", "", merrors.InvalidArgument("source amount is required")
}
amount = strings.TrimSpace(base.Amount)
currency = strings.ToUpper(strings.TrimSpace(base.Currency))
if amount == "" || currency == "" {
return "", "", merrors.InvalidArgument("source amount is invalid")
}
// If total cost is same-currency and greater, probe against total liquidity required.
if payment.QuoteSnapshot != nil && payment.QuoteSnapshot.TotalCost != nil {
total := payment.QuoteSnapshot.TotalCost
totalAmount := strings.TrimSpace(total.Amount)
totalCurrency := strings.ToUpper(strings.TrimSpace(total.Currency))
if totalAmount != "" && strings.EqualFold(totalCurrency, currency) {
totalDec, totalErr := decimal.NewFromString(totalAmount)
baseDec, baseErr := decimal.NewFromString(amount)
if totalErr == nil && baseErr == nil && totalDec.Cmp(baseDec) > 0 {
amount = totalAmount
}
}
}
return amount, currency, nil
}
func quoteExecutionConditionsForGuard(payment *agg.Payment) *paymenttypes.QuoteExecutionConditions {
if payment == nil || payment.QuoteSnapshot == nil {
return nil
}
return payment.QuoteSnapshot.ExecutionConditions
}
func executeLiquidityGuardReadiness(
step agg.StepExecution,
conditions *paymenttypes.QuoteExecutionConditions,
) *sexec.ExecuteOutput {
if conditions == nil {
return failedOutputForGuard(step, "guard.conditions_missing", "liquidity guard requires execution conditions")
}
switch conditions.Readiness {
case paymenttypes.QuoteExecutionReadinessIndicative:
return failedOutputForGuard(step, "guard.indicative_quote", "liquidity guard cannot execute indicative quotes")
case paymenttypes.QuoteExecutionReadinessLiquidityObtainable:
return failedOutputForGuard(step, "guard.liquidity_not_ready", "liquidity is not yet available at execution time")
case paymenttypes.QuoteExecutionReadinessUnspecified:
return failedOutputForGuard(step, "guard.readiness_unspecified", "liquidity guard requires explicit readiness")
default:
return completedOutputForGuard(step)
}
}
func executePrefundingGuardReadiness(
step agg.StepExecution,
conditions *paymenttypes.QuoteExecutionConditions,
) *sexec.ExecuteOutput {
if conditions == nil {
return failedOutputForGuard(step, "guard.conditions_missing", "prefunding guard requires execution conditions")
}
if conditions.Readiness == paymenttypes.QuoteExecutionReadinessIndicative {
return failedOutputForGuard(step, "guard.indicative_quote", "prefunding guard cannot execute indicative quotes")
}
return completedOutputForGuard(step)
}
func completedOutputForGuard(step agg.StepExecution) *sexec.ExecuteOutput {
step.State = agg.StepStateCompleted
step.FailureCode = ""
step.FailureMsg = ""
return &sexec.ExecuteOutput{StepExecution: step}
}
func failedOutputForGuard(step agg.StepExecution, code, msg string) *sexec.ExecuteOutput {
step.State = agg.StepStateFailed
step.FailureCode = strings.TrimSpace(code)
step.FailureMsg = strings.TrimSpace(msg)
return &sexec.ExecuteOutput{StepExecution: step}
}
func paymentRef(payment *agg.Payment) string {
if payment == nil {
return ""
}
return strings.TrimSpace(payment.PaymentRef)
}
var _ sexec.GuardExecutor = (*gatewayGuardExecutor)(nil)

View File

@@ -0,0 +1,239 @@
package orchestrator
import (
"context"
"strings"
"testing"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
pm "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"go.mongodb.org/mongo-driver/v2/bson"
)
func TestGatewayGuardExecutor_ExecuteGuard_LiquidityProbePasses(t *testing.T) {
orgID := bson.NewObjectID()
var walletReq *chainv1.GetWalletBalanceRequest
executor := &gatewayGuardExecutor{
gatewayInvokeResolver: &fakeGatewayInvokeResolver{
client: &chainclient.Fake{
GetWalletBalanceFn: func(_ context.Context, req *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error) {
walletReq = req
return &chainv1.GetWalletBalanceResponse{
Balance: &chainv1.WalletBalance{
Available: &moneyv1.Money{Amount: "5", Currency: "USDT"},
},
}, nil
},
},
},
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: "crypto_1",
InstanceID: "crypto_1",
Rail: model.RailCrypto,
InvokeURI: "grpc://crypto-gateway",
IsEnabled: true,
},
},
},
}
out, err := executor.ExecuteGuard(context.Background(), sexec.StepRequest{
Payment: testLiquidityProbePayment(orgID, "wallet-src", "1.00", "USDT", paymenttypes.QuoteExecutionReadinessLiquidityReady),
Step: xplan.Step{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Kind: xplan.StepKindLiquidityCheck,
},
StepExecution: agg.StepExecution{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Attempt: 1,
},
})
if err != nil {
t.Fatalf("ExecuteGuard returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if got, want := out.StepExecution.State, agg.StepStateCompleted; got != want {
t.Fatalf("state mismatch: got=%q want=%q", got, want)
}
if walletReq == nil {
t.Fatal("expected wallet balance request")
}
if got, want := walletReq.GetWalletRef(), "wallet-src"; got != want {
t.Fatalf("wallet_ref mismatch: got=%q want=%q", got, want)
}
}
func TestGatewayGuardExecutor_ExecuteGuard_InsufficientLiquidity(t *testing.T) {
orgID := bson.NewObjectID()
executor := &gatewayGuardExecutor{
gatewayInvokeResolver: &fakeGatewayInvokeResolver{
client: &chainclient.Fake{
GetWalletBalanceFn: func(_ context.Context, _ *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error) {
return &chainv1.GetWalletBalanceResponse{
Balance: &chainv1.WalletBalance{
Available: &moneyv1.Money{Amount: "0.5", Currency: "USDT"},
},
}, nil
},
},
},
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: "crypto_1",
InstanceID: "crypto_1",
Rail: model.RailCrypto,
InvokeURI: "grpc://crypto-gateway",
IsEnabled: true,
},
},
},
}
out, err := executor.ExecuteGuard(context.Background(), sexec.StepRequest{
Payment: testLiquidityProbePayment(orgID, "wallet-src", "1.00", "USDT", paymenttypes.QuoteExecutionReadinessLiquidityReady),
Step: xplan.Step{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Kind: xplan.StepKindLiquidityCheck,
},
StepExecution: agg.StepExecution{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Attempt: 1,
},
})
if err != nil {
t.Fatalf("ExecuteGuard returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if got, want := out.StepExecution.State, agg.StepStateFailed; got != want {
t.Fatalf("state mismatch: got=%q want=%q", got, want)
}
if got, want := out.StepExecution.FailureCode, "guard.liquidity_insufficient"; got != want {
t.Fatalf("failure code mismatch: got=%q want=%q", got, want)
}
if !strings.Contains(out.StepExecution.FailureMsg, "available=0.5") {
t.Fatalf("expected failure message to include available balance, got=%q", out.StepExecution.FailureMsg)
}
}
func TestGatewayGuardExecutor_ExecuteGuard_ReadinessStopsBeforeProbe(t *testing.T) {
orgID := bson.NewObjectID()
probeCalls := 0
executor := &gatewayGuardExecutor{
gatewayInvokeResolver: &fakeGatewayInvokeResolver{
client: &chainclient.Fake{
GetWalletBalanceFn: func(_ context.Context, _ *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error) {
probeCalls++
return &chainv1.GetWalletBalanceResponse{}, nil
},
},
},
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: "crypto_1",
InstanceID: "crypto_1",
Rail: model.RailCrypto,
InvokeURI: "grpc://crypto-gateway",
IsEnabled: true,
},
},
},
}
out, err := executor.ExecuteGuard(context.Background(), sexec.StepRequest{
Payment: testLiquidityProbePayment(orgID, "wallet-src", "1.00", "USDT", paymenttypes.QuoteExecutionReadinessLiquidityObtainable),
Step: xplan.Step{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Kind: xplan.StepKindLiquidityCheck,
},
StepExecution: agg.StepExecution{
StepRef: xplan.QuoteReadinessGuardStepRef,
StepCode: string(xplan.GuardOperationQuoteReadinessGuard),
Attempt: 1,
},
})
if err != nil {
t.Fatalf("ExecuteGuard returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if got, want := out.StepExecution.FailureCode, "guard.liquidity_not_ready"; got != want {
t.Fatalf("failure code mismatch: got=%q want=%q", got, want)
}
if probeCalls != 0 {
t.Fatalf("expected no probe calls when readiness is not ready, got=%d", probeCalls)
}
}
func testLiquidityProbePayment(
orgID bson.ObjectID,
walletRef string,
amount string,
currency string,
readiness paymenttypes.QuoteExecutionReadiness,
) *agg.Payment {
return &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "payment-guard",
IntentSnapshot: model.PaymentIntent{
Source: model.PaymentEndpoint{
Type: model.EndpointTypeManagedWallet,
ManagedWallet: &model.ManagedWalletEndpoint{
ManagedWalletRef: walletRef,
},
},
Amount: &paymenttypes.Money{
Amount: amount,
Currency: currency,
},
},
QuoteSnapshot: &model.PaymentQuoteSnapshot{
DebitAmount: &paymenttypes.Money{
Amount: amount,
Currency: currency,
},
ExecutionConditions: &paymenttypes.QuoteExecutionConditions{
Readiness: readiness,
LiquidityCheckRequiredAtExecution: true,
},
Route: &paymenttypes.QuoteRouteSpecification{
Hops: []*paymenttypes.QuoteRouteHop{
{
Index: 1,
Rail: "CRYPTO",
Gateway: "crypto_1",
InstanceID: "crypto_1",
Role: paymenttypes.QuoteRouteHopRoleSource,
},
{
Index: 2,
Rail: "CARD",
Gateway: "monetix",
InstanceID: "monetix",
Role: paymenttypes.QuoteRouteHopRoleDestination,
},
},
},
},
}
}

View File

@@ -54,9 +54,14 @@ func WithMntxGateway(_ mntxclient.Client) Option {
return func(*Service) {}
}
// WithPaymentGatewayBroker is retained for backward-compatible wiring and is currently a no-op.
func WithPaymentGatewayBroker(_ mb.Broker) Option {
return func(*Service) {}
// WithPaymentGatewayBroker wires broker subscription for payment gateway execution events.
func WithPaymentGatewayBroker(broker mb.Broker) Option {
return func(s *Service) {
if s == nil || broker == nil {
return
}
s.paymentGatewayBroker = broker
}
}
// WithClock is retained for backward-compatible wiring and is currently a no-op.
@@ -69,14 +74,24 @@ func WithMaxFXQuoteTTLMillis(_ int64) Option {
return func(*Service) {}
}
// WithGatewayInvokeResolver is retained for backward-compatible wiring and is currently a no-op.
func WithGatewayInvokeResolver(_ GatewayInvokeResolver) Option {
return func(*Service) {}
// WithGatewayInvokeResolver configures invoke-URI-to-chain-client resolution.
func WithGatewayInvokeResolver(resolver GatewayInvokeResolver) Option {
return func(s *Service) {
if s == nil {
return
}
s.gatewayInvokeResolver = resolver
}
}
// WithCardGatewayRoutes is retained for backward-compatible wiring and is currently a no-op.
func WithCardGatewayRoutes(_ map[string]CardGatewayRoute) Option {
return func(*Service) {}
// WithCardGatewayRoutes configures card gateway funding/fee route metadata.
func WithCardGatewayRoutes(routes map[string]CardGatewayRoute) Option {
return func(s *Service) {
if s == nil {
return
}
s.cardGatewayRoutes = cloneCardGatewayRoutes(routes)
}
}
// WithFeeLedgerAccounts is retained for backward-compatible wiring and is currently a no-op.
@@ -84,9 +99,14 @@ func WithFeeLedgerAccounts(_ map[string]string) Option {
return func(*Service) {}
}
// WithGatewayRegistry is retained for backward-compatible wiring and is currently a no-op.
func WithGatewayRegistry(_ GatewayRegistry) Option {
return func(*Service) {}
// WithGatewayRegistry configures runtime gateway descriptor discovery.
func WithGatewayRegistry(registry GatewayRegistry) Option {
return func(s *Service) {
if s == nil {
return
}
s.gatewayRegistry = registry
}
}
type discoveryGatewayRegistry struct {
@@ -205,3 +225,25 @@ func limitsFromDiscovery(src *discovery.Limits) model.Limits {
return limits
}
func cloneCardGatewayRoutes(src map[string]CardGatewayRoute) map[string]CardGatewayRoute {
if len(src) == 0 {
return nil
}
out := make(map[string]CardGatewayRoute, len(src))
for key, route := range src {
normalizedKey := strings.TrimSpace(strings.ToLower(key))
if normalizedKey == "" {
continue
}
out[normalizedKey] = CardGatewayRoute{
FundingAddress: strings.TrimSpace(route.FundingAddress),
FeeAddress: strings.TrimSpace(route.FeeAddress),
FeeWalletRef: strings.TrimSpace(route.FeeWalletRef),
}
}
if len(out) == 0 {
return nil
}
return out
}

View File

@@ -1,9 +1,14 @@
package orchestrator
import (
"context"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/storage"
"github.com/tech/sendico/pkg/api/routers"
msg "github.com/tech/sendico/pkg/messaging"
mb "github.com/tech/sendico/pkg/messaging/broker"
"github.com/tech/sendico/pkg/mlogger"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
"go.uber.org/zap"
@@ -12,19 +17,27 @@ import (
// Service is a v2-only payment orchestrator gRPC adapter.
type Service struct {
logger mlogger.Logger
repo storage.Repository
v2 psvc.Service
logger mlogger.Logger
repo storage.Repository
v2 psvc.Service
paymentRepo prepo.Repository
gatewayInvokeResolver GatewayInvokeResolver
gatewayRegistry GatewayRegistry
cardGatewayRoutes map[string]CardGatewayRoute
paymentGatewayBroker mb.Broker
gatewayConsumers []msg.Consumer
stopExternalWorkers context.CancelFunc
}
// NewService constructs the v2 orchestrator service.
func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) *Service {
func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) (*Service, error) {
if logger == nil {
logger = zap.NewNop()
}
svc := &Service{
logger: logger.Named("payment_orchestrator"),
logger: logger.Named("service"),
repo: repo,
}
@@ -34,8 +47,17 @@ func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option)
}
}
svc.v2 = newOrchestrationV2Service(svc.logger, repo)
return svc
var err error
svc.v2, svc.paymentRepo, err = newOrchestrationV2Service(svc.logger, repo, v2RuntimeDeps{
GatewayInvokeResolver: svc.gatewayInvokeResolver,
GatewayRegistry: svc.gatewayRegistry,
CardGatewayRoutes: svc.cardGatewayRoutes,
})
svc.startExternalRuntime()
if err != nil {
svc.logger.Error("Failed to initialize", zap.Error(err))
}
return svc, err
}
// Register attaches the service to the supplied gRPC router.
@@ -48,5 +70,19 @@ func (s *Service) Register(router routers.GRPC) error {
})
}
// Shutdown releases runtime resources. Orchestration v2 currently has no background workers.
func (s *Service) Shutdown() {}
// Shutdown releases runtime resources.
func (s *Service) Shutdown() {
if s == nil {
return
}
if s.stopExternalWorkers != nil {
s.stopExternalWorkers()
s.stopExternalWorkers = nil
}
for i := range s.gatewayConsumers {
if s.gatewayConsumers[i] != nil {
s.gatewayConsumers[i].Close()
}
}
s.gatewayConsumers = nil
}

View File

@@ -7,7 +7,9 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/pquery"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/storage"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
@@ -19,54 +21,75 @@ type v2MongoDBProvider interface {
MongoDatabase() *mongo.Database
}
func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository) psvc.Service {
type v2RuntimeDeps struct {
GatewayInvokeResolver GatewayInvokeResolver
GatewayRegistry GatewayRegistry
CardGatewayRoutes map[string]CardGatewayRoute
}
func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, runtimeDeps v2RuntimeDeps) (psvc.Service, prepo.Repository, error) {
if logger == nil {
logger = zap.NewNop()
}
if repo == nil {
return nil
return nil, nil, merrors.Internal("No repo for orchestrator v2 provided")
}
paymentRepo := buildPaymentRepositoryV2(repo, logger)
if paymentRepo == nil {
if logger != nil {
logger.Warn("Orchestration v2 disabled: mongo database not available")
}
return nil
logger.Error("Orchestration v2 disabled: database not available")
return nil, nil, merrors.Internal("database is not available")
}
query, err := pquery.New(pquery.Dependencies{
Repository: paymentRepo,
Logger: logger.Named("orchestration_v2_pquery"),
Logger: logger,
})
if err != nil {
if logger != nil {
logger.Warn("Orchestration v2 disabled: query service init failed", zap.Error(err))
}
return nil
logger.Error("Orchestration v2 disabled: query service init failed", zap.Error(err))
return nil, paymentRepo, err
}
observer, err := oobs.New(oobs.Dependencies{Logger: logger.Named("orchestration_v2_observer")})
observer, err := oobs.New(oobs.Dependencies{Logger: logger})
if err != nil {
if logger != nil {
logger.Warn("Orchestration v2 disabled: observer init failed", zap.Error(err))
}
return nil
logger.Error("Orchestration v2 disabled: observer init failed", zap.Error(err))
return nil, paymentRepo, err
}
executors := buildOrchestrationV2Executors(logger, runtimeDeps)
svc, err := psvc.New(psvc.Dependencies{
Logger: logger.Named("orchestration_v2_psvc"),
Logger: logger.Named("v2"),
QuoteStore: repo.Quotes(),
Repository: paymentRepo,
Query: query,
Observer: observer,
Executors: executors,
})
if err != nil {
if logger != nil {
logger.Warn("Orchestration v2 disabled: service init failed", zap.Error(err))
}
logger.Error("Orchestration v2 disabled: service init failed", zap.Error(err))
return nil, paymentRepo, err
}
return svc, paymentRepo, err
}
func buildOrchestrationV2Executors(logger mlogger.Logger, runtimeDeps v2RuntimeDeps) sexec.Registry {
if runtimeDeps.GatewayInvokeResolver == nil || runtimeDeps.GatewayRegistry == nil {
return nil
}
return svc
execLogger := logger.Named("v2")
cryptoExecutor := &gatewayCryptoExecutor{
gatewayInvokeResolver: runtimeDeps.GatewayInvokeResolver,
gatewayRegistry: runtimeDeps.GatewayRegistry,
cardGatewayRoutes: cloneCardGatewayRoutes(runtimeDeps.CardGatewayRoutes),
}
guardExecutor := &gatewayGuardExecutor{
logger: execLogger.Named("guard"),
gatewayInvokeResolver: runtimeDeps.GatewayInvokeResolver,
gatewayRegistry: runtimeDeps.GatewayRegistry,
}
return psvc.NewDefaultExecutors(execLogger, sexec.Dependencies{
Crypto: cryptoExecutor,
Guard: guardExecutor,
})
}
func buildPaymentRepositoryV2(repo storage.Repository, logger mlogger.Logger) prepo.Repository {
@@ -83,7 +106,7 @@ func buildPaymentRepositoryV2(repo storage.Repository, logger mlogger.Logger) pr
}
paymentRepo, err := prepo.NewMongo(
db.Collection(mservice.Payments),
prepo.Dependencies{Logger: logger.Named("orchestration_v2_prepo")},
prepo.Dependencies{Logger: logger},
)
if err != nil {
return nil