Merge pull request 'fixed po <-> tgsettle contract' (#688) from po-687 into main
All checks were successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful

Reviewed-on: #688
This commit was merged in pull request #688.
This commit is contained in:
2026-03-06 14:12:46 +00:00
15 changed files with 540 additions and 76 deletions

View File

@@ -136,13 +136,22 @@ func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOp
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, "")}}, nil
}
transfer := resp.GetTransfer()
operationID := strings.TrimSpace(transfer.GetOperationRef())
if operationID == "" {
s.logger.Warn("Submit operation transfer response missing operation_ref", append(logFields,
zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())),
)...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{
Error: connectorError(connectorv1.ErrorCode_TEMPORARY_UNAVAILABLE, "submit_operation: operation_ref is missing in transfer response", op, ""),
}}, nil
}
s.logger.Info("Submit operation transfer submitted", append(logFields,
zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())),
zap.String("status", transfer.GetStatus().String()),
)...)
return &connectorv1.SubmitOperationResponse{
Receipt: &connectorv1.OperationReceipt{
OperationId: strings.TrimSpace(transfer.GetTransferRef()),
OperationId: operationID,
Status: transferStatusToOperation(transfer.GetStatus()),
ProviderRef: strings.TrimSpace(transfer.GetTransferRef()),
},
@@ -224,7 +233,7 @@ func transferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation {
return nil
}
op := &connectorv1.Operation{
OperationId: strings.TrimSpace(transfer.GetTransferRef()),
OperationId: strings.TrimSpace(transfer.GetOperationRef()),
Type: connectorv1.OperationType_TRANSFER,
Status: transferStatusToOperation(transfer.GetStatus()),
Money: transfer.GetRequestedAmount(),

View File

@@ -0,0 +1,119 @@
package gateway
import (
"context"
"testing"
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestSubmitOperation_UsesOperationRefAsOperationID(t *testing.T) {
svc, _, _ := newTestService(t)
svc.chatID = "1"
req := &connectorv1.SubmitOperationRequest{
Operation: &connectorv1.Operation{
Type: connectorv1.OperationType_TRANSFER,
IdempotencyKey: "idem-settlement-1",
OperationRef: "payment-1:hop_2_settlement_fx_convert",
IntentRef: "intent-1",
Money: &moneyv1.Money{Amount: "1.00", Currency: "USDT"},
From: &connectorv1.OperationParty{
Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{
ConnectorId: tgsettleConnectorID,
AccountId: "wallet-src",
}},
},
To: &connectorv1.OperationParty{
Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{
ConnectorId: tgsettleConnectorID,
AccountId: "wallet-dst",
}},
},
Params: structFromMap(map[string]interface{}{
"payment_ref": "payment-1",
"organization_ref": "org-1",
}),
},
}
resp, err := svc.SubmitOperation(context.Background(), req)
if err != nil {
t.Fatalf("SubmitOperation returned error: %v", err)
}
if resp.GetReceipt() == nil {
t.Fatal("expected receipt")
}
if got := resp.GetReceipt().GetError(); got != nil {
t.Fatalf("expected no connector error, got: %v", got)
}
if got, want := resp.GetReceipt().GetOperationId(), "payment-1:hop_2_settlement_fx_convert"; got != want {
t.Fatalf("operation_id mismatch: got=%q want=%q", got, want)
}
if got, want := resp.GetReceipt().GetProviderRef(), "idem-settlement-1"; got != want {
t.Fatalf("provider_ref mismatch: got=%q want=%q", got, want)
}
}
func TestGetOperation_UsesOperationRefIdentity(t *testing.T) {
svc, repo, _ := newTestService(t)
record := &storagemodel.PaymentRecord{
IdempotencyKey: "idem-settlement-2",
OperationRef: "payment-2:hop_2_settlement_fx_convert",
PaymentIntentID: "pi-2",
PaymentRef: "payment-2",
RequestedMoney: &paymenttypes.Money{Amount: "5.00", Currency: "USDT"},
Status: storagemodel.PaymentStatusSuccess,
}
if err := repo.payments.Upsert(context.Background(), record); err != nil {
t.Fatalf("failed to seed payment record: %v", err)
}
resp, err := svc.GetOperation(context.Background(), &connectorv1.GetOperationRequest{
OperationId: "payment-2:hop_2_settlement_fx_convert",
})
if err != nil {
t.Fatalf("GetOperation returned error: %v", err)
}
if resp.GetOperation() == nil {
t.Fatal("expected operation")
}
if got, want := resp.GetOperation().GetOperationId(), "payment-2:hop_2_settlement_fx_convert"; got != want {
t.Fatalf("operation_id mismatch: got=%q want=%q", got, want)
}
if got, want := resp.GetOperation().GetProviderRef(), "idem-settlement-2"; got != want {
t.Fatalf("provider_ref mismatch: got=%q want=%q", got, want)
}
}
func TestGetOperation_DoesNotResolveByIdempotencyKey(t *testing.T) {
svc, repo, _ := newTestService(t)
record := &storagemodel.PaymentRecord{
IdempotencyKey: "idem-settlement-3",
OperationRef: "payment-3:hop_2_settlement_fx_convert",
PaymentIntentID: "pi-3",
PaymentRef: "payment-3",
RequestedMoney: &paymenttypes.Money{Amount: "5.00", Currency: "USDT"},
Status: storagemodel.PaymentStatusSuccess,
}
if err := repo.payments.Upsert(context.Background(), record); err != nil {
t.Fatalf("failed to seed payment record: %v", err)
}
_, err := svc.GetOperation(context.Background(), &connectorv1.GetOperationRequest{
OperationId: "idem-settlement-3",
})
if err == nil {
t.Fatal("expected not found error")
}
if status.Code(err) != codes.NotFound {
t.Fatalf("unexpected error code: got=%s want=%s", status.Code(err), codes.NotFound)
}
}

View File

@@ -20,22 +20,25 @@ const (
)
type PaymentRecord struct {
storable.Base `bson:",inline" json:",inline"`
OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
IntentRef string `bson:"intentRef,omitempty" json:"intent_ref,omitempty"`
PaymentRef string `bson:"paymentRef,omitempty" json:"payment_ref,omitempty"`
OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"`
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
storable.Base `bson:",inline" json:",inline"`
OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
ConfirmationRef string `bson:"confirmationRef,omitempty" json:"confirmation_ref,omitempty"`
ConfirmationMessageID string `bson:"confirmationMessageId,omitempty" json:"confirmation_message_id,omitempty"`
ConfirmationReplyMessageID string `bson:"confirmationReplyMessageId,omitempty" json:"confirmation_reply_message_id,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
IntentRef string `bson:"intentRef,omitempty" json:"intent_ref,omitempty"`
PaymentRef string `bson:"paymentRef,omitempty" json:"payment_ref,omitempty"`
OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"`
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
}
type TelegramConfirmation struct {

View File

@@ -103,14 +103,13 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
return merrors.InvalidArgument("payment record is nil", "record")
}
record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(record.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg)
record.TargetChatID = strings.TrimSpace(record.TargetChatID)
record.IntentRef = strings.TrimSpace(record.IntentRef)
record.OperationRef = strings.TrimSpace(record.OperationRef)
if record.PaymentIntentID == "" {
return merrors.InvalidArgument("intention reference is required", "payment_intent_ref")
if record.IntentRef == "" {
return merrors.InvalidArgument("intention reference is required", "intent_ref")
}
if record.IdempotencyKey == "" {
return merrors.InvalidArgument("idempotency key is required", "idempotency_key")
@@ -148,7 +147,7 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to upsert payment record",
zap.String("idempotency_key", record.IdempotencyKey),
zap.String("payment_intent_id", record.PaymentIntentID),
zap.String("intent_ref", record.IntentRef),
zap.String("quote_ref", record.QuoteRef),
zap.Error(err))
}

View File

@@ -112,9 +112,8 @@ func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) {
CreatedAt: existingCreatedAt,
UpdatedAt: existingCreatedAt,
},
IdempotencyKey: key,
PaymentIntentID: "pi-old",
IntentRef: "intent-old",
IdempotencyKey: key,
IntentRef: "pi-old",
},
},
duplicateWhenZeroID: true,
@@ -122,10 +121,9 @@ func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) {
store := &Payments{logger: zap.NewNop(), repo: repo}
record := &model.PaymentRecord{
IdempotencyKey: key,
PaymentIntentID: "pi-new",
QuoteRef: "quote-new",
IntentRef: "intent-new",
IdempotencyKey: key,
IntentRef: "pi-new",
QuoteRef: "quote-new",
}
if err := store.Upsert(context.Background(), record); err != nil {
@@ -155,9 +153,8 @@ func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) {
CreatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC),
UpdatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC),
},
IdempotencyKey: key,
PaymentIntentID: "pi-existing",
IntentRef: "intent-existing",
IdempotencyKey: key,
IntentRef: "pi-existing",
},
},
findErrByCall: map[int]error{
@@ -168,10 +165,9 @@ func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) {
store := &Payments{logger: zap.NewNop(), repo: repo}
record := &model.PaymentRecord{
IdempotencyKey: key,
PaymentIntentID: "pi-new",
QuoteRef: "quote-new",
IntentRef: "intent-new",
IdempotencyKey: key,
IntentRef: "pi-new",
QuoteRef: "quote-new",
}
if err := store.Upsert(context.Background(), record); err != nil {
@@ -203,9 +199,8 @@ func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T)
CreatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC),
UpdatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC),
},
IdempotencyKey: key,
PaymentIntentID: "pi-existing",
IntentRef: "intent-existing",
IdempotencyKey: key,
IntentRef: "pi-existing",
},
},
findErrByCall: map[int]error{
@@ -221,10 +216,9 @@ func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T)
store := &Payments{logger: zap.NewNop(), repo: repo}
record := &model.PaymentRecord{
IdempotencyKey: key,
PaymentIntentID: "pi-new",
QuoteRef: "quote-new",
IntentRef: "intent-new",
IdempotencyKey: key,
IntentRef: "pi-new",
QuoteRef: "quote-new",
}
err := store.Upsert(context.Background(), record)

View File

@@ -27,7 +27,7 @@ require (
github.com/tech/sendico/pkg v0.1.0
go.mongodb.org/mongo-driver/v2 v2.5.0
go.uber.org/zap v1.27.1
google.golang.org/grpc v1.79.1
google.golang.org/grpc v1.79.2
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
)

View File

@@ -213,8 +213,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY=
google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU=
google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -45,6 +45,9 @@ const (
type StepShell struct {
StepRef string `bson:"stepRef" json:"stepRef"`
StepCode string `bson:"stepCode" json:"stepCode"`
Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"`
Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"`
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"`
UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"`
}
@@ -53,6 +56,9 @@ type StepShell struct {
type StepExecution struct {
StepRef string `bson:"stepRef" json:"stepRef"`
StepCode string `bson:"stepCode" json:"stepCode"`
Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"`
Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"`
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"`
UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"`
State StepState `bson:"state" json:"state"`

View File

@@ -143,10 +143,16 @@ func buildInitialStepTelemetry(shell []StepShell) ([]StepExecution, error) {
return nil, merrors.InvalidArgument("steps[" + itoa(i) + "].report_visibility is invalid")
}
userLabel := strings.TrimSpace(shell[i].UserLabel)
railValue := model.ParseRail(string(shell[i].Rail))
gatewayID := strings.TrimSpace(shell[i].Gateway)
instanceID := strings.TrimSpace(shell[i].InstanceID)
out = append(out, StepExecution{
StepRef: stepRef,
StepCode: stepCode,
Rail: railValue,
Gateway: gatewayID,
InstanceID: instanceID,
ReportVisibility: visibility,
UserLabel: userLabel,
State: StepStatePending,

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
@@ -42,7 +43,15 @@ func TestCreate_OK(t *testing.T) {
QuoteSnapshot: quote,
Steps: []StepShell{
{StepRef: " s1 ", StepCode: " reserve_funds ", ReportVisibility: model.ReportVisibilityHidden},
{StepRef: "s2", StepCode: "submit_gateway", ReportVisibility: model.ReportVisibilityUser, UserLabel: " Card payout "},
{
StepRef: "s2",
StepCode: "submit_gateway",
Rail: discovery.RailProviderSettlement,
Gateway: "payment_gateway_settlement",
InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12",
ReportVisibility: model.ReportVisibilityUser,
UserLabel: " Card payout ",
},
},
})
if err != nil {
@@ -111,6 +120,15 @@ func TestCreate_OK(t *testing.T) {
if got, want := payment.StepExecutions[1].UserLabel, "Card payout"; got != want {
t.Fatalf("unexpected second step user label: got=%q want=%q", got, want)
}
if got, want := payment.StepExecutions[1].Rail, model.Rail(discovery.RailProviderSettlement); got != want {
t.Fatalf("unexpected second step rail: got=%q want=%q", got, want)
}
if got, want := payment.StepExecutions[1].Gateway, "payment_gateway_settlement"; got != want {
t.Fatalf("unexpected second step gateway: got=%q want=%q", got, want)
}
if got, want := payment.StepExecutions[1].InstanceID, "04a54fec-20f4-4250-a715-eb9886e13e12"; got != want {
t.Fatalf("unexpected second step instance_id: got=%q want=%q", got, want)
}
// Verify immutable snapshot semantics by ensuring clones were created.
payment.IntentSnapshot.Ref = "changed"

View File

@@ -221,6 +221,9 @@ func toStepShells(graph *xplan.Graph) []agg.StepShell {
out = append(out, agg.StepShell{
StepRef: graph.Steps[i].StepRef,
StepCode: graph.Steps[i].StepCode,
Rail: graph.Steps[i].Rail,
Gateway: graph.Steps[i].Gateway,
InstanceID: graph.Steps[i].InstanceID,
ReportVisibility: graph.Steps[i].Visibility,
UserLabel: graph.Steps[i].UserLabel,
})

View File

@@ -408,6 +408,15 @@ func stepExecutionEqual(left, right agg.StepExecution) bool {
if left.StepRef != right.StepRef || left.StepCode != right.StepCode {
return false
}
if left.Rail != right.Rail {
return false
}
if strings.TrimSpace(left.Gateway) != strings.TrimSpace(right.Gateway) {
return false
}
if strings.TrimSpace(left.InstanceID) != strings.TrimSpace(right.InstanceID) {
return false
}
if left.State != right.State || left.Attempt != right.Attempt {
return false
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
)
@@ -144,6 +145,16 @@ func (s *svc) normalizeStepExecutions(
stepCode = stepsByRef[stepRef].StepCode
}
exec.StepCode = stepCode
step := stepsByRef[stepRef]
if exec.Rail == discovery.RailUnspecified {
exec.Rail = step.Rail
}
if strings.TrimSpace(exec.Gateway) == "" {
exec.Gateway = strings.TrimSpace(step.Gateway)
}
if strings.TrimSpace(exec.InstanceID) == "" {
exec.InstanceID = strings.TrimSpace(step.InstanceID)
}
exec.ReportVisibility = effectiveStepVisibility(exec.ReportVisibility, stepsByRef[stepRef].Visibility)
exec.UserLabel = firstNonEmpty(exec.UserLabel, stepsByRef[stepRef].UserLabel)
cloned := cloneStepExecution(exec)
@@ -158,6 +169,9 @@ func (s *svc) normalizeStepExecution(exec agg.StepExecution, index int) (agg.Ste
exec.FailureCode = strings.TrimSpace(exec.FailureCode)
exec.FailureMsg = strings.TrimSpace(exec.FailureMsg)
exec.UserLabel = strings.TrimSpace(exec.UserLabel)
exec.Gateway = strings.TrimSpace(exec.Gateway)
exec.InstanceID = strings.TrimSpace(exec.InstanceID)
exec.Rail = model.ParseRail(string(exec.Rail))
exec.ReportVisibility = model.NormalizeReportVisibility(exec.ReportVisibility)
exec.ExternalRefs = cloneExternalRefs(exec.ExternalRefs)
if exec.StepRef == "" {
@@ -197,6 +211,9 @@ func seedMissingExecutions(
executionsByRef[stepRef] = &agg.StepExecution{
StepRef: step.StepRef,
StepCode: step.StepCode,
Rail: step.Rail,
Gateway: strings.TrimSpace(step.Gateway),
InstanceID: strings.TrimSpace(step.InstanceID),
ReportVisibility: effectiveStepVisibility(model.ReportVisibilityUnspecified, step.Visibility),
UserLabel: strings.TrimSpace(step.UserLabel),
State: agg.StepStatePending,

View File

@@ -13,7 +13,6 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
cons "github.com/tech/sendico/pkg/messaging/consumer"
paymentgatewaynotifications "github.com/tech/sendico/pkg/messaging/notifications/paymentgateway"
@@ -412,6 +411,9 @@ func buildObserveCandidate(step agg.StepExecution) (runningObserveCandidate, boo
}
}
}
if candidate.gatewayInstanceID == "" {
candidate.gatewayInstanceID = strings.TrimSpace(step.InstanceID)
}
if candidate.stepRef == "" || candidate.transferRef == "" {
return runningObserveCandidate{}, false
}
@@ -475,7 +477,7 @@ func (s *Service) pollObserveCandidate(ctx context.Context, payment *agg.Payment
StepRef: candidate.stepRef,
OperationRef: firstNonEmpty(strings.TrimSpace(transfer.GetOperationRef()), candidate.operationRef),
TransferRef: strings.TrimSpace(candidate.transferRef),
GatewayInstanceID: firstNonEmpty(candidate.gatewayInstanceID, strings.TrimSpace(gateway.InstanceID), strings.TrimSpace(gateway.ID)),
GatewayInstanceID: resolvedObserveGatewayID(candidate.gatewayInstanceID, gateway),
Status: status,
}
switch status {
@@ -517,39 +519,106 @@ func (s *Service) pollObserveCandidate(ctx context.Context, payment *agg.Payment
}
func (s *Service) resolveObserveGateway(ctx context.Context, payment *agg.Payment, candidate runningObserveCandidate) (*model.GatewayInstanceDescriptor, error) {
if s == nil || s.gatewayRegistry == nil {
return nil, errors.New("observe polling: gateway registry is unavailable")
}
items, err := s.gatewayRegistry.List(ctx)
if err != nil {
return nil, err
}
hint, hasHint := observeStepGatewayHint(payment, candidate.stepRef)
expectedRail := model.Rail(discovery.RailUnspecified)
if hasHint {
expectedRail = hint.rail
}
if gatewayID := strings.TrimSpace(candidate.gatewayInstanceID); gatewayID != "" {
items, err := s.gatewayRegistry.List(ctx)
if err == nil {
for i := range items {
item := items[i]
if item == nil || !item.IsEnabled {
continue
}
if !strings.EqualFold(strings.TrimSpace(item.ID), gatewayID) && !strings.EqualFold(strings.TrimSpace(item.InstanceID), gatewayID) {
continue
}
if strings.TrimSpace(item.InvokeURI) == "" {
continue
}
return item, nil
}
if item := findEnabledGatewayDescriptor(items, gatewayID, expectedRail); item != nil {
return item, nil
}
}
if hasHint {
if item := findEnabledGatewayDescriptor(items, hint.instanceID, hint.rail); item != nil {
return item, nil
}
if item := findEnabledGatewayDescriptor(items, hint.gatewayID, hint.rail); item != nil {
return item, nil
}
}
return nil, errors.New("observe polling: gateway instance not found")
}
executor := gatewayCryptoExecutor{
gatewayRegistry: s.gatewayRegistry,
type observeStepHint struct {
rail model.Rail
gatewayID string
instanceID string
}
func observeStepGatewayHint(payment *agg.Payment, stepRef string) (observeStepHint, bool) {
if payment == nil {
return observeStepHint{}, false
}
step := xplan.Step{
Rail: discovery.RailCrypto,
key := strings.TrimSpace(stepRef)
if key == "" {
return observeStepHint{}, false
}
if gatewayID := strings.TrimSpace(candidate.gatewayInstanceID); gatewayID != "" {
step.InstanceID = gatewayID
step.Gateway = gatewayID
} else if gateway, instanceID, ok := sourceCryptoHop(payment); ok {
step.Gateway = strings.TrimSpace(gateway)
step.InstanceID = strings.TrimSpace(instanceID)
for i := range payment.StepExecutions {
step := payment.StepExecutions[i]
if !strings.EqualFold(strings.TrimSpace(step.StepRef), key) {
continue
}
hint := observeStepHint{
rail: model.ParseRail(string(step.Rail)),
gatewayID: strings.TrimSpace(step.Gateway),
instanceID: strings.TrimSpace(step.InstanceID),
}
if hint.gatewayID == "" && hint.instanceID == "" {
return observeStepHint{}, false
}
return hint, true
}
return executor.resolveGateway(ctx, step)
return observeStepHint{}, false
}
func findEnabledGatewayDescriptor(items []*model.GatewayInstanceDescriptor, identifier string, rail model.Rail) *model.GatewayInstanceDescriptor {
key := strings.TrimSpace(identifier)
if key == "" {
return nil
}
for i := range items {
item := items[i]
if item == nil || !item.IsEnabled || strings.TrimSpace(item.InvokeURI) == "" {
continue
}
if rail != model.Rail(discovery.RailUnspecified) && model.ParseRail(string(item.Rail)) != rail {
continue
}
if strings.EqualFold(strings.TrimSpace(item.ID), key) || strings.EqualFold(strings.TrimSpace(item.InstanceID), key) {
return item
}
}
return nil
}
func resolvedObserveGatewayID(candidateGatewayID string, gateway *model.GatewayInstanceDescriptor) string {
candidateID := strings.TrimSpace(candidateGatewayID)
if candidateID != "" && gatewayIdentifierMatches(gateway, candidateID) {
return candidateID
}
if gateway == nil {
return ""
}
return firstNonEmpty(strings.TrimSpace(gateway.InstanceID), strings.TrimSpace(gateway.ID))
}
func gatewayIdentifierMatches(gateway *model.GatewayInstanceDescriptor, identifier string) bool {
if gateway == nil {
return false
}
key := strings.TrimSpace(identifier)
if key == "" {
return false
}
return strings.EqualFold(strings.TrimSpace(gateway.ID), key) || strings.EqualFold(strings.TrimSpace(gateway.InstanceID), key)
}
func mapTransferStatus(status chainv1.TransferStatus) (gatewayStatus erecon.GatewayStatus, terminal bool, ok bool) {

View File

@@ -3,14 +3,15 @@ package orchestrator
import (
"context"
"errors"
"github.com/tech/sendico/pkg/discovery"
"testing"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
pm "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/payments/rail"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
@@ -412,6 +413,30 @@ func TestRunningObserveCandidates_UsesCardPayoutRefAsTransfer(t *testing.T) {
}
}
func TestRunningObserveCandidates_UsesPlannedStepInstanceWhenExternalRefGatewayMissing(t *testing.T) {
payment := &agg.Payment{
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_2_settlement_observe",
StepCode: "hop.2.settlement.observe",
InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{Kind: erecon.ExternalRefKindTransfer, Ref: "trf-2"},
},
},
},
}
candidates := runningObserveCandidates(payment)
if len(candidates) != 1 {
t.Fatalf("candidate count mismatch: got=%d want=1", len(candidates))
}
if got, want := candidates[0].gatewayInstanceID, "04a54fec-20f4-4250-a715-eb9886e13e12"; got != want {
t.Fatalf("gateway_instance_id mismatch: got=%q want=%q", got, want)
}
}
func TestResolveObserveGateway_UsesExternalRefGatewayInstanceAcrossRails(t *testing.T) {
svc := &Service{
gatewayRegistry: &fakeGatewayRegistry{
@@ -466,5 +491,192 @@ func TestResolveObserveGateway_UsesExternalRefGatewayInstanceAcrossRails(t *test
}
}
func TestResolveObserveGateway_UsesPlannedStepGatewayWhenExternalRefInstanceIsStale(t *testing.T) {
svc := &Service{
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: "payment_gateway_settlement",
InstanceID: "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4",
Rail: discovery.RailProviderSettlement,
InvokeURI: "grpc://tgsettle-gateway-new",
IsEnabled: true,
},
{
ID: "crypto_rail_gateway_tron_mainnet",
InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855",
Rail: discovery.RailCrypto,
InvokeURI: "grpc://tron-gateway",
IsEnabled: true,
},
},
},
}
payment := &agg.Payment{
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_2_settlement_observe",
StepCode: "hop.2.settlement.observe",
Rail: discovery.RailProviderSettlement,
Gateway: "payment_gateway_settlement",
InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12",
},
},
}
gateway, err := svc.resolveObserveGateway(context.Background(), payment, runningObserveCandidate{
stepRef: "hop_2_settlement_observe",
transferRef: "trf-1",
gatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12",
})
if err != nil {
t.Fatalf("resolveObserveGateway returned error: %v", err)
}
if gateway == nil {
t.Fatal("expected gateway")
}
if got, want := gateway.ID, "payment_gateway_settlement"; got != want {
t.Fatalf("gateway id mismatch: got=%q want=%q", got, want)
}
if got, want := gateway.InstanceID, "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4"; got != want {
t.Fatalf("gateway instance mismatch: got=%q want=%q", got, want)
}
}
func TestResolveObserveGateway_FailsWhenPlannedGatewayMetadataIsMissing(t *testing.T) {
svc := &Service{
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: "crypto_rail_gateway_tron_mainnet",
InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855",
Rail: discovery.RailCrypto,
InvokeURI: "grpc://tron-gateway",
IsEnabled: true,
},
},
},
}
payment := &agg.Payment{
QuoteSnapshot: &model.PaymentQuoteSnapshot{
Route: &paymenttypes.QuoteRouteSpecification{
Hops: []*paymenttypes.QuoteRouteHop{
{
Index: 1,
Rail: "CRYPTO",
Gateway: "crypto_rail_gateway_tron_mainnet",
InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855",
Role: paymenttypes.QuoteRouteHopRoleSource,
},
},
},
},
}
gateway, err := svc.resolveObserveGateway(context.Background(), payment, runningObserveCandidate{
stepRef: "hop_2_settlement_observe",
transferRef: "trf-1",
gatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12",
})
if err == nil {
t.Fatal("expected gateway resolution error")
}
if gateway != nil {
t.Fatal("expected nil gateway on resolution failure")
}
}
func TestPollObserveCandidate_UsesResolvedGatewayAfterInstanceRotation(t *testing.T) {
orgID := bson.NewObjectID()
transferRef := "b6874b55-20b0-425d-9e47-d430964b1616:hop_2_settlement_fx_convert"
operationRef := "69aabf823555e083d23b2964:hop_2_settlement_fx_convert"
var requestedTransferRef string
client := &chainclient.Fake{
GetTransferFn: func(_ context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) {
requestedTransferRef = req.GetTransferRef()
return &chainv1.GetTransferResponse{
Transfer: &chainv1.Transfer{
TransferRef: req.GetTransferRef(),
OperationRef: operationRef,
Status: chainv1.TransferStatus_TRANSFER_SUCCESS,
},
}, nil
},
}
resolver := &fakeGatewayInvokeResolver{client: client}
v2 := &fakeExternalRuntimeV2{}
svc := &Service{
logger: zap.NewNop(),
v2: v2,
gatewayInvokeResolver: resolver,
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: "payment_gateway_settlement",
InstanceID: "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4",
Rail: discovery.RailProviderSettlement,
InvokeURI: "grpc://tgsettle-gateway-new",
IsEnabled: true,
},
},
},
}
payment := &agg.Payment{
OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID},
PaymentRef: "69aabf823555e083d23b2964",
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_2_settlement_observe",
StepCode: "hop.2.settlement.observe",
Rail: discovery.RailProviderSettlement,
Gateway: "payment_gateway_settlement",
InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12",
State: agg.StepStateRunning,
ExternalRefs: []agg.ExternalRef{
{
GatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12",
Kind: erecon.ExternalRefKindTransfer,
Ref: transferRef,
},
},
},
},
}
candidates := runningObserveCandidates(payment)
if len(candidates) != 1 {
t.Fatalf("candidate count mismatch: got=%d want=1", len(candidates))
}
svc.pollObserveCandidate(context.Background(), payment, candidates[0])
if got, want := resolver.lastInvokeURI, "grpc://tgsettle-gateway-new"; got != want {
t.Fatalf("invoke uri mismatch: got=%q want=%q", got, want)
}
if got, want := requestedTransferRef, transferRef; got != want {
t.Fatalf("transfer_ref lookup mismatch: got=%q want=%q", got, want)
}
if v2.reconcileInput == nil || v2.reconcileInput.Event.Gateway == nil {
t.Fatal("expected reconcile gateway event")
}
gw := v2.reconcileInput.Event.Gateway
if got, want := gw.StepRef, "hop_2_settlement_observe"; got != want {
t.Fatalf("step_ref mismatch: got=%q want=%q", got, want)
}
if got, want := gw.Status, erecon.GatewayStatusSuccess; got != want {
t.Fatalf("status mismatch: got=%q want=%q", got, want)
}
if got, want := gw.OperationRef, operationRef; got != want {
t.Fatalf("operation_ref mismatch: got=%q want=%q", got, want)
}
if got, want := gw.GatewayInstanceID, "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4"; got != want {
t.Fatalf("gateway_instance_id mismatch: got=%q want=%q", got, want)
}
}
var _ prepo.Repository = (*fakeExternalRuntimeRepo)(nil)
var _ psvc.Service = (*fakeExternalRuntimeV2)(nil)