diff --git a/api/gateway/tgsettle/internal/service/gateway/connector.go b/api/gateway/tgsettle/internal/service/gateway/connector.go index f287fd9d..8a7d24e4 100644 --- a/api/gateway/tgsettle/internal/service/gateway/connector.go +++ b/api/gateway/tgsettle/internal/service/gateway/connector.go @@ -136,13 +136,22 @@ func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOp return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, "")}}, nil } transfer := resp.GetTransfer() + operationID := strings.TrimSpace(transfer.GetOperationRef()) + if operationID == "" { + s.logger.Warn("Submit operation transfer response missing operation_ref", append(logFields, + zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())), + )...) + return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{ + Error: connectorError(connectorv1.ErrorCode_TEMPORARY_UNAVAILABLE, "submit_operation: operation_ref is missing in transfer response", op, ""), + }}, nil + } s.logger.Info("Submit operation transfer submitted", append(logFields, zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())), zap.String("status", transfer.GetStatus().String()), )...) return &connectorv1.SubmitOperationResponse{ Receipt: &connectorv1.OperationReceipt{ - OperationId: strings.TrimSpace(transfer.GetTransferRef()), + OperationId: operationID, Status: transferStatusToOperation(transfer.GetStatus()), ProviderRef: strings.TrimSpace(transfer.GetTransferRef()), }, @@ -224,7 +233,7 @@ func transferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation { return nil } op := &connectorv1.Operation{ - OperationId: strings.TrimSpace(transfer.GetTransferRef()), + OperationId: strings.TrimSpace(transfer.GetOperationRef()), Type: connectorv1.OperationType_TRANSFER, Status: transferStatusToOperation(transfer.GetStatus()), Money: transfer.GetRequestedAmount(), diff --git a/api/gateway/tgsettle/internal/service/gateway/connector_test.go b/api/gateway/tgsettle/internal/service/gateway/connector_test.go new file mode 100644 index 00000000..4916e606 --- /dev/null +++ b/api/gateway/tgsettle/internal/service/gateway/connector_test.go @@ -0,0 +1,119 @@ +package gateway + +import ( + "context" + "testing" + + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" + paymenttypes "github.com/tech/sendico/pkg/payments/types" + moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" + connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestSubmitOperation_UsesOperationRefAsOperationID(t *testing.T) { + svc, _, _ := newTestService(t) + svc.chatID = "1" + + req := &connectorv1.SubmitOperationRequest{ + Operation: &connectorv1.Operation{ + Type: connectorv1.OperationType_TRANSFER, + IdempotencyKey: "idem-settlement-1", + OperationRef: "payment-1:hop_2_settlement_fx_convert", + IntentRef: "intent-1", + Money: &moneyv1.Money{Amount: "1.00", Currency: "USDT"}, + From: &connectorv1.OperationParty{ + Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{ + ConnectorId: tgsettleConnectorID, + AccountId: "wallet-src", + }}, + }, + To: &connectorv1.OperationParty{ + Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{ + ConnectorId: tgsettleConnectorID, + AccountId: "wallet-dst", + }}, + }, + Params: structFromMap(map[string]interface{}{ + "payment_ref": "payment-1", + "organization_ref": "org-1", + }), + }, + } + + resp, err := svc.SubmitOperation(context.Background(), req) + if err != nil { + t.Fatalf("SubmitOperation returned error: %v", err) + } + if resp.GetReceipt() == nil { + t.Fatal("expected receipt") + } + if got := resp.GetReceipt().GetError(); got != nil { + t.Fatalf("expected no connector error, got: %v", got) + } + if got, want := resp.GetReceipt().GetOperationId(), "payment-1:hop_2_settlement_fx_convert"; got != want { + t.Fatalf("operation_id mismatch: got=%q want=%q", got, want) + } + if got, want := resp.GetReceipt().GetProviderRef(), "idem-settlement-1"; got != want { + t.Fatalf("provider_ref mismatch: got=%q want=%q", got, want) + } +} + +func TestGetOperation_UsesOperationRefIdentity(t *testing.T) { + svc, repo, _ := newTestService(t) + + record := &storagemodel.PaymentRecord{ + IdempotencyKey: "idem-settlement-2", + OperationRef: "payment-2:hop_2_settlement_fx_convert", + PaymentIntentID: "pi-2", + PaymentRef: "payment-2", + RequestedMoney: &paymenttypes.Money{Amount: "5.00", Currency: "USDT"}, + Status: storagemodel.PaymentStatusSuccess, + } + if err := repo.payments.Upsert(context.Background(), record); err != nil { + t.Fatalf("failed to seed payment record: %v", err) + } + + resp, err := svc.GetOperation(context.Background(), &connectorv1.GetOperationRequest{ + OperationId: "payment-2:hop_2_settlement_fx_convert", + }) + if err != nil { + t.Fatalf("GetOperation returned error: %v", err) + } + if resp.GetOperation() == nil { + t.Fatal("expected operation") + } + if got, want := resp.GetOperation().GetOperationId(), "payment-2:hop_2_settlement_fx_convert"; got != want { + t.Fatalf("operation_id mismatch: got=%q want=%q", got, want) + } + if got, want := resp.GetOperation().GetProviderRef(), "idem-settlement-2"; got != want { + t.Fatalf("provider_ref mismatch: got=%q want=%q", got, want) + } +} + +func TestGetOperation_DoesNotResolveByIdempotencyKey(t *testing.T) { + svc, repo, _ := newTestService(t) + + record := &storagemodel.PaymentRecord{ + IdempotencyKey: "idem-settlement-3", + OperationRef: "payment-3:hop_2_settlement_fx_convert", + PaymentIntentID: "pi-3", + PaymentRef: "payment-3", + RequestedMoney: &paymenttypes.Money{Amount: "5.00", Currency: "USDT"}, + Status: storagemodel.PaymentStatusSuccess, + } + if err := repo.payments.Upsert(context.Background(), record); err != nil { + t.Fatalf("failed to seed payment record: %v", err) + } + + _, err := svc.GetOperation(context.Background(), &connectorv1.GetOperationRequest{ + OperationId: "idem-settlement-3", + }) + if err == nil { + t.Fatal("expected not found error") + } + if status.Code(err) != codes.NotFound { + t.Fatalf("unexpected error code: got=%s want=%s", status.Code(err), codes.NotFound) + } +} diff --git a/api/gateway/tgsettle/storage/model/execution.go b/api/gateway/tgsettle/storage/model/execution.go index fea1ffc2..194d7a19 100644 --- a/api/gateway/tgsettle/storage/model/execution.go +++ b/api/gateway/tgsettle/storage/model/execution.go @@ -20,22 +20,25 @@ const ( ) type PaymentRecord struct { - storable.Base `bson:",inline" json:",inline"` - OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"` - IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` - PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` - QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` - IntentRef string `bson:"intentRef,omitempty" json:"intent_ref,omitempty"` - PaymentRef string `bson:"paymentRef,omitempty" json:"payment_ref,omitempty"` - OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"` - TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"` - RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"` - ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"` - Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"` - FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"` - ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` - ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` - ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"` + storable.Base `bson:",inline" json:",inline"` + OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"` + IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` + ConfirmationRef string `bson:"confirmationRef,omitempty" json:"confirmation_ref,omitempty"` + ConfirmationMessageID string `bson:"confirmationMessageId,omitempty" json:"confirmation_message_id,omitempty"` + ConfirmationReplyMessageID string `bson:"confirmationReplyMessageId,omitempty" json:"confirmation_reply_message_id,omitempty"` + PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` + QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` + IntentRef string `bson:"intentRef,omitempty" json:"intent_ref,omitempty"` + PaymentRef string `bson:"paymentRef,omitempty" json:"payment_ref,omitempty"` + OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"` + TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"` + RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"` + ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"` + Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"` + FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"` + ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` + ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` + ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"` } type TelegramConfirmation struct { diff --git a/api/gateway/tgsettle/storage/mongo/store/payments.go b/api/gateway/tgsettle/storage/mongo/store/payments.go index 11759af5..3a8f83e6 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments.go @@ -103,14 +103,13 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro return merrors.InvalidArgument("payment record is nil", "record") } record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey) - record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID) record.QuoteRef = strings.TrimSpace(record.QuoteRef) record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg) record.TargetChatID = strings.TrimSpace(record.TargetChatID) record.IntentRef = strings.TrimSpace(record.IntentRef) record.OperationRef = strings.TrimSpace(record.OperationRef) - if record.PaymentIntentID == "" { - return merrors.InvalidArgument("intention reference is required", "payment_intent_ref") + if record.IntentRef == "" { + return merrors.InvalidArgument("intention reference is required", "intent_ref") } if record.IdempotencyKey == "" { return merrors.InvalidArgument("idempotency key is required", "idempotency_key") @@ -148,7 +147,7 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Failed to upsert payment record", zap.String("idempotency_key", record.IdempotencyKey), - zap.String("payment_intent_id", record.PaymentIntentID), + zap.String("intent_ref", record.IntentRef), zap.String("quote_ref", record.QuoteRef), zap.Error(err)) } diff --git a/api/gateway/tgsettle/storage/mongo/store/payments_test.go b/api/gateway/tgsettle/storage/mongo/store/payments_test.go index 6097c434..d11d810e 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments_test.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments_test.go @@ -112,9 +112,8 @@ func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) { CreatedAt: existingCreatedAt, UpdatedAt: existingCreatedAt, }, - IdempotencyKey: key, - PaymentIntentID: "pi-old", - IntentRef: "intent-old", + IdempotencyKey: key, + IntentRef: "pi-old", }, }, duplicateWhenZeroID: true, @@ -122,10 +121,9 @@ func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) { store := &Payments{logger: zap.NewNop(), repo: repo} record := &model.PaymentRecord{ - IdempotencyKey: key, - PaymentIntentID: "pi-new", - QuoteRef: "quote-new", - IntentRef: "intent-new", + IdempotencyKey: key, + IntentRef: "pi-new", + QuoteRef: "quote-new", } if err := store.Upsert(context.Background(), record); err != nil { @@ -155,9 +153,8 @@ func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) { CreatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC), UpdatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC), }, - IdempotencyKey: key, - PaymentIntentID: "pi-existing", - IntentRef: "intent-existing", + IdempotencyKey: key, + IntentRef: "pi-existing", }, }, findErrByCall: map[int]error{ @@ -168,10 +165,9 @@ func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) { store := &Payments{logger: zap.NewNop(), repo: repo} record := &model.PaymentRecord{ - IdempotencyKey: key, - PaymentIntentID: "pi-new", - QuoteRef: "quote-new", - IntentRef: "intent-new", + IdempotencyKey: key, + IntentRef: "pi-new", + QuoteRef: "quote-new", } if err := store.Upsert(context.Background(), record); err != nil { @@ -203,9 +199,8 @@ func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) CreatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC), UpdatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC), }, - IdempotencyKey: key, - PaymentIntentID: "pi-existing", - IntentRef: "intent-existing", + IdempotencyKey: key, + IntentRef: "pi-existing", }, }, findErrByCall: map[int]error{ @@ -221,10 +216,9 @@ func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) store := &Payments{logger: zap.NewNop(), repo: repo} record := &model.PaymentRecord{ - IdempotencyKey: key, - PaymentIntentID: "pi-new", - QuoteRef: "quote-new", - IntentRef: "intent-new", + IdempotencyKey: key, + IntentRef: "pi-new", + QuoteRef: "quote-new", } err := store.Upsert(context.Background(), record) diff --git a/api/payments/orchestrator/go.mod b/api/payments/orchestrator/go.mod index 8552be8d..d4fe9d97 100644 --- a/api/payments/orchestrator/go.mod +++ b/api/payments/orchestrator/go.mod @@ -27,7 +27,7 @@ require ( github.com/tech/sendico/pkg v0.1.0 go.mongodb.org/mongo-driver/v2 v2.5.0 go.uber.org/zap v1.27.1 - google.golang.org/grpc v1.79.1 + google.golang.org/grpc v1.79.2 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/api/payments/orchestrator/go.sum b/api/payments/orchestrator/go.sum index 4fb8cd3c..edeb912c 100644 --- a/api/payments/orchestrator/go.sum +++ b/api/payments/orchestrator/go.sum @@ -213,8 +213,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= -google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= -google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU= +google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/agg/module.go b/api/payments/orchestrator/internal/service/orchestrationv2/agg/module.go index 2402ee11..0a60069a 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/agg/module.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/agg/module.go @@ -45,6 +45,9 @@ const ( type StepShell struct { StepRef string `bson:"stepRef" json:"stepRef"` StepCode string `bson:"stepCode" json:"stepCode"` + Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"` + Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"` + InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"` ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"` UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"` } @@ -53,6 +56,9 @@ type StepShell struct { type StepExecution struct { StepRef string `bson:"stepRef" json:"stepRef"` StepCode string `bson:"stepCode" json:"stepCode"` + Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"` + Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"` + InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"` ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"` UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"` State StepState `bson:"state" json:"state"` diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/agg/service.go b/api/payments/orchestrator/internal/service/orchestrationv2/agg/service.go index b3affe03..a6104db6 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/agg/service.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/agg/service.go @@ -143,10 +143,16 @@ func buildInitialStepTelemetry(shell []StepShell) ([]StepExecution, error) { return nil, merrors.InvalidArgument("steps[" + itoa(i) + "].report_visibility is invalid") } userLabel := strings.TrimSpace(shell[i].UserLabel) + railValue := model.ParseRail(string(shell[i].Rail)) + gatewayID := strings.TrimSpace(shell[i].Gateway) + instanceID := strings.TrimSpace(shell[i].InstanceID) out = append(out, StepExecution{ StepRef: stepRef, StepCode: stepCode, + Rail: railValue, + Gateway: gatewayID, + InstanceID: instanceID, ReportVisibility: visibility, UserLabel: userLabel, State: StepStatePending, diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/agg/service_test.go b/api/payments/orchestrator/internal/service/orchestrationv2/agg/service_test.go index fd551e90..e4946fd5 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/agg/service_test.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/agg/service_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" paymenttypes "github.com/tech/sendico/pkg/payments/types" "go.mongodb.org/mongo-driver/v2/bson" @@ -42,7 +43,15 @@ func TestCreate_OK(t *testing.T) { QuoteSnapshot: quote, Steps: []StepShell{ {StepRef: " s1 ", StepCode: " reserve_funds ", ReportVisibility: model.ReportVisibilityHidden}, - {StepRef: "s2", StepCode: "submit_gateway", ReportVisibility: model.ReportVisibilityUser, UserLabel: " Card payout "}, + { + StepRef: "s2", + StepCode: "submit_gateway", + Rail: discovery.RailProviderSettlement, + Gateway: "payment_gateway_settlement", + InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + ReportVisibility: model.ReportVisibilityUser, + UserLabel: " Card payout ", + }, }, }) if err != nil { @@ -111,6 +120,15 @@ func TestCreate_OK(t *testing.T) { if got, want := payment.StepExecutions[1].UserLabel, "Card payout"; got != want { t.Fatalf("unexpected second step user label: got=%q want=%q", got, want) } + if got, want := payment.StepExecutions[1].Rail, model.Rail(discovery.RailProviderSettlement); got != want { + t.Fatalf("unexpected second step rail: got=%q want=%q", got, want) + } + if got, want := payment.StepExecutions[1].Gateway, "payment_gateway_settlement"; got != want { + t.Fatalf("unexpected second step gateway: got=%q want=%q", got, want) + } + if got, want := payment.StepExecutions[1].InstanceID, "04a54fec-20f4-4250-a715-eb9886e13e12"; got != want { + t.Fatalf("unexpected second step instance_id: got=%q want=%q", got, want) + } // Verify immutable snapshot semantics by ensuring clones were created. payment.IntentSnapshot.Ref = "changed" diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute.go index 273da8bd..0d834db6 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute.go @@ -221,6 +221,9 @@ func toStepShells(graph *xplan.Graph) []agg.StepShell { out = append(out, agg.StepShell{ StepRef: graph.Steps[i].StepRef, StepCode: graph.Steps[i].StepCode, + Rail: graph.Steps[i].Rail, + Gateway: graph.Steps[i].Gateway, + InstanceID: graph.Steps[i].InstanceID, ReportVisibility: graph.Steps[i].Visibility, UserLabel: graph.Steps[i].UserLabel, }) diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/runtime.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/runtime.go index 98ad3b5c..39fd00b3 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/runtime.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/runtime.go @@ -408,6 +408,15 @@ func stepExecutionEqual(left, right agg.StepExecution) bool { if left.StepRef != right.StepRef || left.StepCode != right.StepCode { return false } + if left.Rail != right.Rail { + return false + } + if strings.TrimSpace(left.Gateway) != strings.TrimSpace(right.Gateway) { + return false + } + if strings.TrimSpace(left.InstanceID) != strings.TrimSpace(right.InstanceID) { + return false + } if left.State != right.State || left.Attempt != right.Attempt { return false } diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/ssched/input.go b/api/payments/orchestrator/internal/service/orchestrationv2/ssched/input.go index 0def2253..edc87c4f 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/ssched/input.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/ssched/input.go @@ -6,6 +6,7 @@ import ( "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan" "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" ) @@ -144,6 +145,16 @@ func (s *svc) normalizeStepExecutions( stepCode = stepsByRef[stepRef].StepCode } exec.StepCode = stepCode + step := stepsByRef[stepRef] + if exec.Rail == discovery.RailUnspecified { + exec.Rail = step.Rail + } + if strings.TrimSpace(exec.Gateway) == "" { + exec.Gateway = strings.TrimSpace(step.Gateway) + } + if strings.TrimSpace(exec.InstanceID) == "" { + exec.InstanceID = strings.TrimSpace(step.InstanceID) + } exec.ReportVisibility = effectiveStepVisibility(exec.ReportVisibility, stepsByRef[stepRef].Visibility) exec.UserLabel = firstNonEmpty(exec.UserLabel, stepsByRef[stepRef].UserLabel) cloned := cloneStepExecution(exec) @@ -158,6 +169,9 @@ func (s *svc) normalizeStepExecution(exec agg.StepExecution, index int) (agg.Ste exec.FailureCode = strings.TrimSpace(exec.FailureCode) exec.FailureMsg = strings.TrimSpace(exec.FailureMsg) exec.UserLabel = strings.TrimSpace(exec.UserLabel) + exec.Gateway = strings.TrimSpace(exec.Gateway) + exec.InstanceID = strings.TrimSpace(exec.InstanceID) + exec.Rail = model.ParseRail(string(exec.Rail)) exec.ReportVisibility = model.NormalizeReportVisibility(exec.ReportVisibility) exec.ExternalRefs = cloneExternalRefs(exec.ExternalRefs) if exec.StepRef == "" { @@ -197,6 +211,9 @@ func seedMissingExecutions( executionsByRef[stepRef] = &agg.StepExecution{ StepRef: step.StepRef, StepCode: step.StepCode, + Rail: step.Rail, + Gateway: strings.TrimSpace(step.Gateway), + InstanceID: strings.TrimSpace(step.InstanceID), ReportVisibility: effectiveStepVisibility(model.ReportVisibilityUnspecified, step.Visibility), UserLabel: strings.TrimSpace(step.UserLabel), State: agg.StepStatePending, diff --git a/api/payments/orchestrator/internal/service/orchestrator/external_runtime.go b/api/payments/orchestrator/internal/service/orchestrator/external_runtime.go index b9770c38..add5f3c1 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/external_runtime.go +++ b/api/payments/orchestrator/internal/service/orchestrator/external_runtime.go @@ -13,7 +13,6 @@ import ( "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc" - "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan" "github.com/tech/sendico/payments/storage/model" cons "github.com/tech/sendico/pkg/messaging/consumer" paymentgatewaynotifications "github.com/tech/sendico/pkg/messaging/notifications/paymentgateway" @@ -412,6 +411,9 @@ func buildObserveCandidate(step agg.StepExecution) (runningObserveCandidate, boo } } } + if candidate.gatewayInstanceID == "" { + candidate.gatewayInstanceID = strings.TrimSpace(step.InstanceID) + } if candidate.stepRef == "" || candidate.transferRef == "" { return runningObserveCandidate{}, false } @@ -475,7 +477,7 @@ func (s *Service) pollObserveCandidate(ctx context.Context, payment *agg.Payment StepRef: candidate.stepRef, OperationRef: firstNonEmpty(strings.TrimSpace(transfer.GetOperationRef()), candidate.operationRef), TransferRef: strings.TrimSpace(candidate.transferRef), - GatewayInstanceID: firstNonEmpty(candidate.gatewayInstanceID, strings.TrimSpace(gateway.InstanceID), strings.TrimSpace(gateway.ID)), + GatewayInstanceID: resolvedObserveGatewayID(candidate.gatewayInstanceID, gateway), Status: status, } switch status { @@ -517,39 +519,106 @@ func (s *Service) pollObserveCandidate(ctx context.Context, payment *agg.Payment } func (s *Service) resolveObserveGateway(ctx context.Context, payment *agg.Payment, candidate runningObserveCandidate) (*model.GatewayInstanceDescriptor, error) { + if s == nil || s.gatewayRegistry == nil { + return nil, errors.New("observe polling: gateway registry is unavailable") + } + items, err := s.gatewayRegistry.List(ctx) + if err != nil { + return nil, err + } + hint, hasHint := observeStepGatewayHint(payment, candidate.stepRef) + expectedRail := model.Rail(discovery.RailUnspecified) + if hasHint { + expectedRail = hint.rail + } if gatewayID := strings.TrimSpace(candidate.gatewayInstanceID); gatewayID != "" { - items, err := s.gatewayRegistry.List(ctx) - if err == nil { - for i := range items { - item := items[i] - if item == nil || !item.IsEnabled { - continue - } - if !strings.EqualFold(strings.TrimSpace(item.ID), gatewayID) && !strings.EqualFold(strings.TrimSpace(item.InstanceID), gatewayID) { - continue - } - if strings.TrimSpace(item.InvokeURI) == "" { - continue - } - return item, nil - } + if item := findEnabledGatewayDescriptor(items, gatewayID, expectedRail); item != nil { + return item, nil } } + if hasHint { + if item := findEnabledGatewayDescriptor(items, hint.instanceID, hint.rail); item != nil { + return item, nil + } + if item := findEnabledGatewayDescriptor(items, hint.gatewayID, hint.rail); item != nil { + return item, nil + } + } + return nil, errors.New("observe polling: gateway instance not found") +} - executor := gatewayCryptoExecutor{ - gatewayRegistry: s.gatewayRegistry, +type observeStepHint struct { + rail model.Rail + gatewayID string + instanceID string +} + +func observeStepGatewayHint(payment *agg.Payment, stepRef string) (observeStepHint, bool) { + if payment == nil { + return observeStepHint{}, false } - step := xplan.Step{ - Rail: discovery.RailCrypto, + key := strings.TrimSpace(stepRef) + if key == "" { + return observeStepHint{}, false } - if gatewayID := strings.TrimSpace(candidate.gatewayInstanceID); gatewayID != "" { - step.InstanceID = gatewayID - step.Gateway = gatewayID - } else if gateway, instanceID, ok := sourceCryptoHop(payment); ok { - step.Gateway = strings.TrimSpace(gateway) - step.InstanceID = strings.TrimSpace(instanceID) + for i := range payment.StepExecutions { + step := payment.StepExecutions[i] + if !strings.EqualFold(strings.TrimSpace(step.StepRef), key) { + continue + } + hint := observeStepHint{ + rail: model.ParseRail(string(step.Rail)), + gatewayID: strings.TrimSpace(step.Gateway), + instanceID: strings.TrimSpace(step.InstanceID), + } + if hint.gatewayID == "" && hint.instanceID == "" { + return observeStepHint{}, false + } + return hint, true } - return executor.resolveGateway(ctx, step) + return observeStepHint{}, false +} + +func findEnabledGatewayDescriptor(items []*model.GatewayInstanceDescriptor, identifier string, rail model.Rail) *model.GatewayInstanceDescriptor { + key := strings.TrimSpace(identifier) + if key == "" { + return nil + } + for i := range items { + item := items[i] + if item == nil || !item.IsEnabled || strings.TrimSpace(item.InvokeURI) == "" { + continue + } + if rail != model.Rail(discovery.RailUnspecified) && model.ParseRail(string(item.Rail)) != rail { + continue + } + if strings.EqualFold(strings.TrimSpace(item.ID), key) || strings.EqualFold(strings.TrimSpace(item.InstanceID), key) { + return item + } + } + return nil +} + +func resolvedObserveGatewayID(candidateGatewayID string, gateway *model.GatewayInstanceDescriptor) string { + candidateID := strings.TrimSpace(candidateGatewayID) + if candidateID != "" && gatewayIdentifierMatches(gateway, candidateID) { + return candidateID + } + if gateway == nil { + return "" + } + return firstNonEmpty(strings.TrimSpace(gateway.InstanceID), strings.TrimSpace(gateway.ID)) +} + +func gatewayIdentifierMatches(gateway *model.GatewayInstanceDescriptor, identifier string) bool { + if gateway == nil { + return false + } + key := strings.TrimSpace(identifier) + if key == "" { + return false + } + return strings.EqualFold(strings.TrimSpace(gateway.ID), key) || strings.EqualFold(strings.TrimSpace(gateway.InstanceID), key) } func mapTransferStatus(status chainv1.TransferStatus) (gatewayStatus erecon.GatewayStatus, terminal bool, ok bool) { diff --git a/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go b/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go index fa0ad19f..ba0e19a8 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go @@ -3,14 +3,15 @@ package orchestrator import ( "context" "errors" - "github.com/tech/sendico/pkg/discovery" "testing" + chainclient "github.com/tech/sendico/gateway/chain/client" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc" "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/discovery" pm "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/payments/rail" paymenttypes "github.com/tech/sendico/pkg/payments/types" @@ -412,6 +413,30 @@ func TestRunningObserveCandidates_UsesCardPayoutRefAsTransfer(t *testing.T) { } } +func TestRunningObserveCandidates_UsesPlannedStepInstanceWhenExternalRefGatewayMissing(t *testing.T) { + payment := &agg.Payment{ + StepExecutions: []agg.StepExecution{ + { + StepRef: "hop_2_settlement_observe", + StepCode: "hop.2.settlement.observe", + InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + State: agg.StepStateRunning, + ExternalRefs: []agg.ExternalRef{ + {Kind: erecon.ExternalRefKindTransfer, Ref: "trf-2"}, + }, + }, + }, + } + + candidates := runningObserveCandidates(payment) + if len(candidates) != 1 { + t.Fatalf("candidate count mismatch: got=%d want=1", len(candidates)) + } + if got, want := candidates[0].gatewayInstanceID, "04a54fec-20f4-4250-a715-eb9886e13e12"; got != want { + t.Fatalf("gateway_instance_id mismatch: got=%q want=%q", got, want) + } +} + func TestResolveObserveGateway_UsesExternalRefGatewayInstanceAcrossRails(t *testing.T) { svc := &Service{ gatewayRegistry: &fakeGatewayRegistry{ @@ -466,5 +491,192 @@ func TestResolveObserveGateway_UsesExternalRefGatewayInstanceAcrossRails(t *test } } +func TestResolveObserveGateway_UsesPlannedStepGatewayWhenExternalRefInstanceIsStale(t *testing.T) { + svc := &Service{ + gatewayRegistry: &fakeGatewayRegistry{ + items: []*model.GatewayInstanceDescriptor{ + { + ID: "payment_gateway_settlement", + InstanceID: "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4", + Rail: discovery.RailProviderSettlement, + InvokeURI: "grpc://tgsettle-gateway-new", + IsEnabled: true, + }, + { + ID: "crypto_rail_gateway_tron_mainnet", + InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855", + Rail: discovery.RailCrypto, + InvokeURI: "grpc://tron-gateway", + IsEnabled: true, + }, + }, + }, + } + + payment := &agg.Payment{ + StepExecutions: []agg.StepExecution{ + { + StepRef: "hop_2_settlement_observe", + StepCode: "hop.2.settlement.observe", + Rail: discovery.RailProviderSettlement, + Gateway: "payment_gateway_settlement", + InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + }, + }, + } + + gateway, err := svc.resolveObserveGateway(context.Background(), payment, runningObserveCandidate{ + stepRef: "hop_2_settlement_observe", + transferRef: "trf-1", + gatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + }) + if err != nil { + t.Fatalf("resolveObserveGateway returned error: %v", err) + } + if gateway == nil { + t.Fatal("expected gateway") + } + if got, want := gateway.ID, "payment_gateway_settlement"; got != want { + t.Fatalf("gateway id mismatch: got=%q want=%q", got, want) + } + if got, want := gateway.InstanceID, "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4"; got != want { + t.Fatalf("gateway instance mismatch: got=%q want=%q", got, want) + } +} + +func TestResolveObserveGateway_FailsWhenPlannedGatewayMetadataIsMissing(t *testing.T) { + svc := &Service{ + gatewayRegistry: &fakeGatewayRegistry{ + items: []*model.GatewayInstanceDescriptor{ + { + ID: "crypto_rail_gateway_tron_mainnet", + InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855", + Rail: discovery.RailCrypto, + InvokeURI: "grpc://tron-gateway", + IsEnabled: true, + }, + }, + }, + } + + payment := &agg.Payment{ + QuoteSnapshot: &model.PaymentQuoteSnapshot{ + Route: &paymenttypes.QuoteRouteSpecification{ + Hops: []*paymenttypes.QuoteRouteHop{ + { + Index: 1, + Rail: "CRYPTO", + Gateway: "crypto_rail_gateway_tron_mainnet", + InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855", + Role: paymenttypes.QuoteRouteHopRoleSource, + }, + }, + }, + }, + } + + gateway, err := svc.resolveObserveGateway(context.Background(), payment, runningObserveCandidate{ + stepRef: "hop_2_settlement_observe", + transferRef: "trf-1", + gatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + }) + if err == nil { + t.Fatal("expected gateway resolution error") + } + if gateway != nil { + t.Fatal("expected nil gateway on resolution failure") + } +} + +func TestPollObserveCandidate_UsesResolvedGatewayAfterInstanceRotation(t *testing.T) { + orgID := bson.NewObjectID() + transferRef := "b6874b55-20b0-425d-9e47-d430964b1616:hop_2_settlement_fx_convert" + operationRef := "69aabf823555e083d23b2964:hop_2_settlement_fx_convert" + + var requestedTransferRef string + client := &chainclient.Fake{ + GetTransferFn: func(_ context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) { + requestedTransferRef = req.GetTransferRef() + return &chainv1.GetTransferResponse{ + Transfer: &chainv1.Transfer{ + TransferRef: req.GetTransferRef(), + OperationRef: operationRef, + Status: chainv1.TransferStatus_TRANSFER_SUCCESS, + }, + }, nil + }, + } + resolver := &fakeGatewayInvokeResolver{client: client} + v2 := &fakeExternalRuntimeV2{} + svc := &Service{ + logger: zap.NewNop(), + v2: v2, + gatewayInvokeResolver: resolver, + gatewayRegistry: &fakeGatewayRegistry{ + items: []*model.GatewayInstanceDescriptor{ + { + ID: "payment_gateway_settlement", + InstanceID: "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4", + Rail: discovery.RailProviderSettlement, + InvokeURI: "grpc://tgsettle-gateway-new", + IsEnabled: true, + }, + }, + }, + } + + payment := &agg.Payment{ + OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID}, + PaymentRef: "69aabf823555e083d23b2964", + StepExecutions: []agg.StepExecution{ + { + StepRef: "hop_2_settlement_observe", + StepCode: "hop.2.settlement.observe", + Rail: discovery.RailProviderSettlement, + Gateway: "payment_gateway_settlement", + InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + State: agg.StepStateRunning, + ExternalRefs: []agg.ExternalRef{ + { + GatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + Kind: erecon.ExternalRefKindTransfer, + Ref: transferRef, + }, + }, + }, + }, + } + + candidates := runningObserveCandidates(payment) + if len(candidates) != 1 { + t.Fatalf("candidate count mismatch: got=%d want=1", len(candidates)) + } + + svc.pollObserveCandidate(context.Background(), payment, candidates[0]) + + if got, want := resolver.lastInvokeURI, "grpc://tgsettle-gateway-new"; got != want { + t.Fatalf("invoke uri mismatch: got=%q want=%q", got, want) + } + if got, want := requestedTransferRef, transferRef; got != want { + t.Fatalf("transfer_ref lookup mismatch: got=%q want=%q", got, want) + } + if v2.reconcileInput == nil || v2.reconcileInput.Event.Gateway == nil { + t.Fatal("expected reconcile gateway event") + } + gw := v2.reconcileInput.Event.Gateway + if got, want := gw.StepRef, "hop_2_settlement_observe"; got != want { + t.Fatalf("step_ref mismatch: got=%q want=%q", got, want) + } + if got, want := gw.Status, erecon.GatewayStatusSuccess; got != want { + t.Fatalf("status mismatch: got=%q want=%q", got, want) + } + if got, want := gw.OperationRef, operationRef; got != want { + t.Fatalf("operation_ref mismatch: got=%q want=%q", got, want) + } + if got, want := gw.GatewayInstanceID, "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4"; got != want { + t.Fatalf("gateway_instance_id mismatch: got=%q want=%q", got, want) + } +} + var _ prepo.Repository = (*fakeExternalRuntimeRepo)(nil) var _ psvc.Service = (*fakeExternalRuntimeV2)(nil)