Fixes + stable gateway ids

This commit is contained in:
Stephan D
2026-02-18 20:38:08 +01:00
parent 4dc182bfa2
commit 770c7b9da9
119 changed files with 3000 additions and 734 deletions

View File

@@ -47,10 +47,11 @@ func (r *compositeGatewayRegistry) List(ctx context.Context) ([]*model.GatewayIn
continue
}
for _, entry := range list {
if entry == nil || entry.ID == "" {
key := model.GatewayDescriptorIdentityKey(entry)
if key == "" {
continue
}
items[entry.ID] = entry
items[key] = entry
}
}
result := make([]*model.GatewayInstanceDescriptor, 0, len(items))
@@ -58,7 +59,7 @@ func (r *compositeGatewayRegistry) List(ctx context.Context) ([]*model.GatewayIn
result = append(result, entry)
}
sort.Slice(result, func(i, j int) bool {
return result[i].ID < result[j].ID
return model.LessGatewayDescriptor(result[i], result[j])
})
return result, nil
}

View File

@@ -57,7 +57,7 @@ func (r *discoveryGatewayRegistry) List(_ context.Context) ([]*model.GatewayInst
})
}
sort.Slice(items, func(i, j int) bool {
return items[i].ID < items[j].ID
return model.LessGatewayDescriptor(items[i], items[j])
})
return items, nil
}

View File

@@ -31,14 +31,11 @@ func NewGatewayRegistry(logger mlogger.Logger, static []*model.GatewayInstanceDe
func (r *gatewayRegistry) List(ctx context.Context) ([]*model.GatewayInstanceDescriptor, error) {
items := map[string]*model.GatewayInstanceDescriptor{}
for _, gw := range r.static {
if gw == nil {
key := model.GatewayDescriptorIdentityKey(gw)
if key == "" {
continue
}
id := strings.TrimSpace(gw.ID)
if id == "" {
continue
}
items[id] = cloneGatewayDescriptor(gw)
items[key] = cloneGatewayDescriptor(gw)
}
result := make([]*model.GatewayInstanceDescriptor, 0, len(items))
@@ -46,7 +43,7 @@ func (r *gatewayRegistry) List(ctx context.Context) ([]*model.GatewayInstanceDes
result = append(result, gw)
}
sort.Slice(result, func(i, j int) bool {
return result[i].ID < result[j].ID
return model.LessGatewayDescriptor(result[i], result[j])
})
return result, nil
}

View File

@@ -0,0 +1,72 @@
package orchestrator
import (
"context"
"testing"
"github.com/tech/sendico/payments/storage/model"
)
type identityGatewayRegistryStub struct {
items []*model.GatewayInstanceDescriptor
}
func (s identityGatewayRegistryStub) List(context.Context) ([]*model.GatewayInstanceDescriptor, error) {
return s.items, nil
}
func TestGatewayRegistry_ListKeepsDistinctInstancesPerGatewayID(t *testing.T) {
registry := NewGatewayRegistry(nil, []*model.GatewayInstanceDescriptor{
{ID: "crypto_rail_gateway_tron", InstanceID: "inst-b", InvokeURI: "grpc://b"},
{ID: "crypto_rail_gateway_tron", InstanceID: "inst-a", InvokeURI: "grpc://a"},
{ID: "crypto_rail_gateway_tron", InstanceID: "inst-a", InvokeURI: "grpc://a-new"},
})
if registry == nil {
t.Fatalf("expected registry to be created")
}
items, err := registry.List(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := len(items), 2; got != want {
t.Fatalf("unexpected items count: got=%d want=%d", got, want)
}
if got, want := items[0].InstanceID, "inst-a"; got != want {
t.Fatalf("unexpected first instance id: got=%q want=%q", got, want)
}
if got, want := items[0].InvokeURI, "grpc://a-new"; got != want {
t.Fatalf("expected latest duplicate to win for same gateway+instance: got=%q want=%q", got, want)
}
if got, want := items[1].InstanceID, "inst-b"; got != want {
t.Fatalf("unexpected second instance id: got=%q want=%q", got, want)
}
}
func TestCompositeGatewayRegistry_ListKeepsDistinctInstancesPerGatewayID(t *testing.T) {
registry := NewCompositeGatewayRegistry(nil,
identityGatewayRegistryStub{items: []*model.GatewayInstanceDescriptor{
{ID: "crypto_rail_gateway_tron", InstanceID: "inst-b", InvokeURI: "grpc://b"},
}},
identityGatewayRegistryStub{items: []*model.GatewayInstanceDescriptor{
{ID: "crypto_rail_gateway_tron", InstanceID: "inst-a", InvokeURI: "grpc://a"},
}},
)
if registry == nil {
t.Fatalf("expected registry to be created")
}
items, err := registry.List(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := len(items), 2; got != want {
t.Fatalf("unexpected items count: got=%d want=%d", got, want)
}
if got, want := items[0].InstanceID, "inst-a"; got != want {
t.Fatalf("unexpected first instance id: got=%q want=%q", got, want)
}
if got, want := items[1].InstanceID, "inst-b"; got != want {
t.Fatalf("unexpected second instance id: got=%q want=%q", got, want)
}
}

View File

@@ -90,9 +90,6 @@ func selectGatewayForActions(ctx context.Context, registry GatewayRegistry, rail
if entry.Rail != rail {
continue
}
if instanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(instanceID)) {
continue
}
ok := true
for _, action := range actions {
if err := isGatewayEligible(entry, rail, network, currency, action, dir, amt); err != nil {
@@ -116,6 +113,13 @@ func selectGatewayForActions(ctx context.Context, registry GatewayRegistry, rail
sort.Slice(eligible, func(i, j int) bool {
return eligible[i].ID < eligible[j].ID
})
if instanceID != "" {
for _, entry := range eligible {
if strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(instanceID)) {
return entry, nil
}
}
}
return eligible[0], nil
}

View File

@@ -131,12 +131,6 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P
if entry.Rail != step.Rail {
continue
}
if step.GatewayID != "" && entry.ID != step.GatewayID {
continue
}
if step.InstanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(step.InstanceID)) {
continue
}
if step.Action != model.RailOperationUnspecified {
if err := isGatewayEligible(entry, step.Rail, "", currency, step.Action, sendDirectionForRail(step.Rail), amount); err != nil {
lastErr = err
@@ -152,13 +146,38 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].ID < candidates[j].ID
return model.LessGatewayDescriptor(candidates[i], candidates[j])
})
entry := candidates[0]
entry, selectionMode := model.SelectGatewayByPreference(
candidates,
step.GatewayID,
step.InstanceID,
step.GatewayInvokeURI,
)
if entry == nil {
entry = candidates[0]
selectionMode = "rail_fallback"
}
invokeURI := strings.TrimSpace(entry.InvokeURI)
if invokeURI == "" {
return nil, merrors.InvalidArgument("rail gateway: invoke uri is required")
}
originalGatewayID := strings.TrimSpace(step.GatewayID)
originalInstanceID := strings.TrimSpace(step.InstanceID)
originalInvokeURI := strings.TrimSpace(step.GatewayInvokeURI)
step.GatewayID = strings.TrimSpace(entry.ID)
step.InstanceID = strings.TrimSpace(entry.InstanceID)
step.GatewayInvokeURI = invokeURI
g.logger.Debug("Rail gateway candidate selected",
zap.String("step_id", strings.TrimSpace(step.StepID)),
zap.String("selection_mode", selectionMode),
zap.String("requested_gateway_id", originalGatewayID),
zap.String("requested_instance_id", originalInstanceID),
zap.String("requested_invoke_uri", originalInvokeURI),
zap.String("resolved_gateway_id", step.GatewayID),
zap.String("resolved_instance_id", step.InstanceID),
zap.String("resolved_invoke_uri", step.GatewayInvokeURI),
)
cfg := chainclient.RailGatewayConfig{
Rail: string(entry.Rail),
@@ -174,9 +193,22 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P
},
}
if selectionMode != "exact" && (originalGatewayID != "" || originalInstanceID != "" || originalInvokeURI != "") {
g.logger.Warn("Rail gateway identity fallback applied",
zap.String("step_id", strings.TrimSpace(step.StepID)),
zap.String("selection_mode", selectionMode),
zap.String("requested_gateway_id", originalGatewayID),
zap.String("requested_instance_id", originalInstanceID),
zap.String("requested_invoke_uri", originalInvokeURI),
zap.String("resolved_gateway_id", step.GatewayID),
zap.String("resolved_instance_id", step.InstanceID),
zap.String("resolved_invoke_uri", step.GatewayInvokeURI),
)
}
g.logger.Info("Rail gateway resolved",
zap.String("step_id", strings.TrimSpace(step.StepID)),
zap.String("action", string(step.Action)),
zap.String("selection_mode", selectionMode),
zap.String("gateway_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("rail", string(entry.Rail)),

View File

@@ -0,0 +1,145 @@
package orchestrator
import (
"context"
"testing"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/storage/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.uber.org/zap"
)
type optionsGatewayRegistryStub struct {
items []*model.GatewayInstanceDescriptor
}
func (s optionsGatewayRegistryStub) List(context.Context) ([]*model.GatewayInstanceDescriptor, error) {
return s.items, nil
}
type optionsInvokeResolverStub struct {
uris []string
}
func (s *optionsInvokeResolverStub) Resolve(_ context.Context, invokeURI string) (chainclient.Client, error) {
s.uris = append(s.uris, invokeURI)
return &chainclient.Fake{}, nil
}
func TestResolveDynamicGateway_FallsBackToInvokeURI(t *testing.T) {
resolver := &optionsInvokeResolverStub{}
deps := railGatewayDependency{
registry: optionsGatewayRegistryStub{items: []*model.GatewayInstanceDescriptor{
{
ID: "aaa",
InstanceID: "inst-a",
Rail: model.RailCrypto,
Network: "TRON",
InvokeURI: "grpc://gw-a:50051",
Currencies: []string{"USDT"},
Capabilities: model.RailCapabilities{
CanPayOut: true,
},
IsEnabled: true,
},
{
ID: "bbb",
InstanceID: "inst-b",
Rail: model.RailCrypto,
Network: "TRON",
InvokeURI: "grpc://gw-b:50051",
Currencies: []string{"USDT"},
Capabilities: model.RailCapabilities{
CanPayOut: true,
},
IsEnabled: true,
},
}},
chainResolver: resolver,
logger: zap.NewNop(),
}
step := &model.PaymentStep{
StepID: "crypto.send",
Rail: model.RailCrypto,
Action: model.RailOperationSend,
GatewayID: "legacy-id",
InstanceID: "legacy-instance",
GatewayInvokeURI: "grpc://gw-b:50051",
Amount: &paymenttypes.Money{Currency: "USDT", Amount: "1"},
}
if _, err := deps.resolveDynamic(context.Background(), step); err != nil {
t.Fatalf("resolveDynamic returned error: %v", err)
}
if got, want := step.GatewayID, "bbb"; got != want {
t.Fatalf("unexpected gateway_id: got=%q want=%q", got, want)
}
if got, want := step.InstanceID, "inst-b"; got != want {
t.Fatalf("unexpected instance_id: got=%q want=%q", got, want)
}
if got, want := step.GatewayInvokeURI, "grpc://gw-b:50051"; got != want {
t.Fatalf("unexpected gateway_invoke_uri: got=%q want=%q", got, want)
}
if len(resolver.uris) != 1 || resolver.uris[0] != "grpc://gw-b:50051" {
t.Fatalf("unexpected resolver invocations: %#v", resolver.uris)
}
}
func TestResolveDynamicGateway_FallsBackToGatewayIDWhenInstanceChanges(t *testing.T) {
resolver := &optionsInvokeResolverStub{}
deps := railGatewayDependency{
registry: optionsGatewayRegistryStub{items: []*model.GatewayInstanceDescriptor{
{
ID: "aaa",
InstanceID: "inst-a",
Rail: model.RailCrypto,
Network: "TRON",
InvokeURI: "grpc://gw-a:50051",
Currencies: []string{"USDT"},
Capabilities: model.RailCapabilities{
CanPayOut: true,
},
IsEnabled: true,
},
{
ID: "crypto_rail_gateway_tron",
InstanceID: "inst-new",
Rail: model.RailCrypto,
Network: "TRON",
InvokeURI: "grpc://gw-tron:50051",
Currencies: []string{"USDT"},
Capabilities: model.RailCapabilities{
CanPayOut: true,
},
IsEnabled: true,
},
}},
chainResolver: resolver,
logger: zap.NewNop(),
}
step := &model.PaymentStep{
StepID: "crypto.send",
Rail: model.RailCrypto,
Action: model.RailOperationSend,
GatewayID: "crypto_rail_gateway_tron",
InstanceID: "inst-old",
Amount: &paymenttypes.Money{Currency: "USDT", Amount: "1"},
}
if _, err := deps.resolveDynamic(context.Background(), step); err != nil {
t.Fatalf("resolveDynamic returned error: %v", err)
}
if got, want := step.GatewayID, "crypto_rail_gateway_tron"; got != want {
t.Fatalf("unexpected gateway_id: got=%q want=%q", got, want)
}
if got, want := step.InstanceID, "inst-new"; got != want {
t.Fatalf("unexpected instance_id: got=%q want=%q", got, want)
}
if got, want := step.GatewayInvokeURI, "grpc://gw-tron:50051"; got != want {
t.Fatalf("unexpected gateway_invoke_uri: got=%q want=%q", got, want)
}
if len(resolver.uris) != 1 || resolver.uris[0] != "grpc://gw-tron:50051" {
t.Fatalf("unexpected resolver invocations: %#v", resolver.uris)
}
}

View File

@@ -43,17 +43,19 @@ func cloneStoredPaymentPlan(src *model.PaymentPlan) *model.PaymentPlan {
continue
}
stepClone := &model.PaymentStep{
StepID: strings.TrimSpace(step.StepID),
Rail: step.Rail,
GatewayID: strings.TrimSpace(step.GatewayID),
InstanceID: strings.TrimSpace(step.InstanceID),
Action: step.Action,
DependsOn: cloneStringList(step.DependsOn),
CommitPolicy: step.CommitPolicy,
CommitAfter: cloneStringList(step.CommitAfter),
Amount: cloneMoney(step.Amount),
FromRole: cloneAccountRole(step.FromRole),
ToRole: cloneAccountRole(step.ToRole),
StepID: strings.TrimSpace(step.StepID),
Rail: step.Rail,
GatewayID: strings.TrimSpace(step.GatewayID),
InstanceID: strings.TrimSpace(step.InstanceID),
GatewayInvokeURI: strings.TrimSpace(step.GatewayInvokeURI),
Action: step.Action,
ReportVisibility: step.ReportVisibility,
DependsOn: cloneStringList(step.DependsOn),
CommitPolicy: step.CommitPolicy,
CommitAfter: cloneStringList(step.CommitAfter),
Amount: cloneMoney(step.Amount),
FromRole: cloneAccountRole(step.FromRole),
ToRole: cloneAccountRole(step.ToRole),
}
clone.Steps = append(clone.Steps, stepClone)
}

View File

@@ -107,9 +107,6 @@ func selectGateway(ctx context.Context, registry GatewayRegistry, rail model.Rai
eligible := make([]*model.GatewayInstanceDescriptor, 0)
var lastErr error
for _, gw := range all {
if instanceID != "" && !strings.EqualFold(strings.TrimSpace(gw.InstanceID), instanceID) {
continue
}
if err := isGatewayEligible(gw, rail, network, currency, action, dir, amt); err != nil {
lastErr = err
continue
@@ -125,6 +122,13 @@ func selectGateway(ctx context.Context, registry GatewayRegistry, rail model.Rai
sort.Slice(eligible, func(i, j int) bool {
return eligible[i].ID < eligible[j].ID
})
if instanceID != "" {
for _, gw := range eligible {
if strings.EqualFold(strings.TrimSpace(gw.InstanceID), instanceID) {
return gw, nil
}
}
}
return eligible[0], nil
}

View File

@@ -14,11 +14,12 @@ func buildFXConversionPlan(payment *model.Payment) (*model.PaymentPlan, error) {
return nil, merrors.InvalidArgument("plan builder: payment is required")
}
step := &model.PaymentStep{
StepID: "fx_convert",
Rail: model.RailLedger,
Action: model.RailOperationFXConvert,
CommitPolicy: model.CommitPolicyImmediate,
Amount: cloneMoney(payment.Intent.Amount),
StepID: "fx_convert",
Rail: model.RailLedger,
Action: model.RailOperationFXConvert,
ReportVisibility: model.ReportVisibilityUser,
CommitPolicy: model.CommitPolicyImmediate,
Amount: cloneMoney(payment.Intent.Amount),
}
return &model.PaymentPlan{
ID: payment.PaymentRef,

View File

@@ -131,6 +131,7 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
StepID: stepID,
Rail: tpl.Rail,
Action: action,
ReportVisibility: tpl.ReportVisibility,
DependsOn: cloneStringList(tpl.DependsOn),
CommitPolicy: policy,
CommitAfter: cloneStringList(tpl.CommitAfter),
@@ -178,6 +179,7 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
}
step.GatewayID = strings.TrimSpace(gw.ID)
step.InstanceID = strings.TrimSpace(gw.InstanceID)
step.GatewayInvokeURI = strings.TrimSpace(gw.InvokeURI)
}
logger.Debug("Plan step added",

View File

@@ -155,6 +155,12 @@ func validatePlanTemplate(logger mlogger.Logger, template *model.PaymentPlanTemp
zap.Int("step_index", idx))
return merrors.InvalidArgument("plan builder: plan template operation is required")
}
if !model.IsValidReportVisibility(step.ReportVisibility) {
logger.Warn("Plan template step has invalid report visibility",
zap.String("step_id", id),
zap.String("report_visibility", string(step.ReportVisibility)))
return merrors.InvalidArgument("plan builder: plan template report visibility is invalid")
}
action, err := actionForOperation(step.Operation)
if err != nil {
logger.Warn("Plan template step has invalid operation", zap.String("step_id", id),