Orchestration / payments v2 #554

Merged
tech merged 23 commits from pqpov2-547 into main 2026-02-26 22:45:55 +00:00
16 changed files with 891 additions and 78 deletions
Showing only changes of commit 4949c4ffe0 - Show all commits

View File

@@ -17,6 +17,7 @@ import (
// Client exposes typed helpers around the payment orchestration and quotation gRPC APIs. // Client exposes typed helpers around the payment orchestration and quotation gRPC APIs.
type Client interface { type Client interface {
ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error)
ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error)
GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error)
ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error)
Close() error Close() error
@@ -24,6 +25,7 @@ type Client interface {
type grpcOrchestratorClient interface { type grpcOrchestratorClient interface {
ExecutePayment(ctx context.Context, in *orchestrationv2.ExecutePaymentRequest, opts ...grpc.CallOption) (*orchestrationv2.ExecutePaymentResponse, error) ExecutePayment(ctx context.Context, in *orchestrationv2.ExecutePaymentRequest, opts ...grpc.CallOption) (*orchestrationv2.ExecutePaymentResponse, error)
ExecuteBatchPayment(ctx context.Context, in *orchestrationv2.ExecuteBatchPaymentRequest, opts ...grpc.CallOption) (*orchestrationv2.ExecuteBatchPaymentResponse, error)
GetPayment(ctx context.Context, in *orchestrationv2.GetPaymentRequest, opts ...grpc.CallOption) (*orchestrationv2.GetPaymentResponse, error) GetPayment(ctx context.Context, in *orchestrationv2.GetPaymentRequest, opts ...grpc.CallOption) (*orchestrationv2.GetPaymentResponse, error)
ListPayments(ctx context.Context, in *orchestrationv2.ListPaymentsRequest, opts ...grpc.CallOption) (*orchestrationv2.ListPaymentsResponse, error) ListPayments(ctx context.Context, in *orchestrationv2.ListPaymentsRequest, opts ...grpc.CallOption) (*orchestrationv2.ListPaymentsResponse, error)
} }
@@ -99,6 +101,12 @@ func (c *orchestratorClient) ExecutePayment(ctx context.Context, req *orchestrat
return c.client.ExecutePayment(ctx, req) return c.client.ExecutePayment(ctx, req)
} }
func (c *orchestratorClient) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) {
ctx, cancel := c.callContext(ctx)
defer cancel()
return c.client.ExecuteBatchPayment(ctx, req)
}
func (c *orchestratorClient) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) { func (c *orchestratorClient) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) {
ctx, cancel := c.callContext(ctx) ctx, cancel := c.callContext(ctx)
defer cancel() defer cancel()

View File

@@ -8,10 +8,11 @@ import (
// Fake implements Client for tests. // Fake implements Client for tests.
type Fake struct { type Fake struct {
ExecutePaymentFn func(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) ExecutePaymentFn func(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error)
GetPaymentFn func(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) ExecuteBatchPaymentFn func(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error)
ListPaymentsFn func(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) GetPaymentFn func(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error)
CloseFn func() error ListPaymentsFn func(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error)
CloseFn func() error
} }
func (f *Fake) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) { func (f *Fake) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) {
@@ -21,6 +22,13 @@ func (f *Fake) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecuteP
return &orchestrationv2.ExecutePaymentResponse{}, nil return &orchestrationv2.ExecutePaymentResponse{}, nil
} }
func (f *Fake) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) {
if f.ExecuteBatchPaymentFn != nil {
return f.ExecuteBatchPaymentFn(ctx, req)
}
return &orchestrationv2.ExecuteBatchPaymentResponse{}, nil
}
func (f *Fake) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) { func (f *Fake) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) {
if f.GetPaymentFn != nil { if f.GetPaymentFn != nil {
return f.GetPaymentFn(ctx, req) return f.GetPaymentFn(ctx, req)

View File

@@ -0,0 +1,262 @@
package psvc
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"sort"
"strings"
"time"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/idem"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/qsnap"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/reqval"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/pkg/merrors"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
"go.uber.org/zap"
)
func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (resp *orchestrationv2.ExecuteBatchPaymentResponse, err error) {
logger := s.logger
orgRef := ""
if req != nil && req.GetMeta() != nil {
orgRef = strings.TrimSpace(req.GetMeta().GetOrganizationRef())
}
logger.Debug("Starting ExecuteBatchPayment",
zap.String("organization_ref", orgRef),
zap.String("quotation_ref", strings.TrimSpace(req.GetQuotationRef())),
zap.Bool("has_client_payment_ref", strings.TrimSpace(req.GetClientPaymentRef()) != ""),
)
defer func(start time.Time) {
fields := []zap.Field{zap.Int64("duration_ms", time.Since(start).Milliseconds())}
if resp != nil {
fields = append(fields, zap.Int("payments_count", len(resp.GetPayments())))
}
if err != nil {
logger.Warn("Failed to execute batch payment", append(fields, zap.Error(err))...)
return
}
logger.Debug("Completed ExecuteBatchPayment", fields...)
}(time.Now())
requestCtx, err := s.prepareBatchExecute(req)
if err != nil {
return nil, err
}
resolved, err := s.quote.ResolveAll(ctx, s.quoteStore, qsnap.ResolveAllInput{
OrganizationID: requestCtx.OrganizationID,
QuotationRef: requestCtx.QuotationRef,
})
if err != nil {
return nil, remapResolveError(err)
}
if resolved == nil || len(resolved.Items) == 0 {
return nil, merrors.InvalidArgument("quotation has no executable items")
}
aggItems := make([]opagg.Item, 0, len(resolved.Items))
for _, item := range resolved.Items {
aggItems = append(aggItems, opagg.Item{
IntentRef: item.IntentRef,
IntentSnapshot: item.IntentSnapshot,
QuoteSnapshot: item.QuoteSnapshot,
})
}
aggOutput, err := s.aggregator.Aggregate(opagg.Input{Items: aggItems})
if err != nil {
return nil, err
}
payments := make([]*agg.Payment, 0, len(aggOutput.Groups))
for _, group := range aggOutput.Groups {
payment, err := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, group)
if err != nil {
return nil, err
}
payments = append(payments, payment)
}
protoPayments, err := s.mapPayments(payments)
if err != nil {
return nil, err
}
return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: protoPayments}, nil
}
func (s *svc) prepareBatchExecute(req *orchestrationv2.ExecuteBatchPaymentRequest) (*reqval.Ctx, error) {
return s.validator.Validate(mapBatchExecuteReq(req))
}
func mapBatchExecuteReq(req *orchestrationv2.ExecuteBatchPaymentRequest) *reqval.Req {
if req == nil {
return nil
}
out := &reqval.Req{
QuotationRef: req.GetQuotationRef(),
ClientPaymentRef: req.GetClientPaymentRef(),
}
meta := req.GetMeta()
if meta == nil {
return out
}
out.Meta = &reqval.Meta{OrganizationRef: meta.GetOrganizationRef()}
if meta.GetTrace() != nil {
out.Meta.Trace = &reqval.Trace{IdempotencyKey: meta.GetTrace().GetIdempotencyKey()}
}
return out
}
func (s *svc) executeGroup(ctx context.Context, requestCtx *reqval.Ctx, quotationRef string, group opagg.Group) (*agg.Payment, error) {
normalizedIntentRefs := normalizeIntentRefs(group.IntentRefs)
if len(normalizedIntentRefs) == 0 {
return nil, merrors.InvalidArgument("aggregated group has no intent refs")
}
groupIdempotencyKey := deriveGroupIdempotencyKey(requestCtx.IdempotencyKey, normalizedIntentRefs)
intentRefJoined := strings.Join(normalizedIntentRefs, ",")
fingerprint, err := s.idempotency.Fingerprint(idem.FPInput{
OrganizationRef: requestCtx.OrganizationRef,
QuotationRef: requestCtx.QuotationRef,
IntentRef: intentRefJoined,
ClientPaymentRef: requestCtx.ClientPaymentRef,
})
if err != nil {
return nil, err
}
existing, err := s.repository.GetByIdempotencyKey(ctx, requestCtx.OrganizationID, groupIdempotencyKey)
if err != nil && !errors.Is(err, prepo.ErrPaymentNotFound) && !errors.Is(err, merrors.ErrNoData) {
return nil, err
}
var payment *agg.Payment
if existing != nil {
existingFP, err := s.idempotency.Fingerprint(idem.FPInput{
OrganizationRef: requestCtx.OrganizationRef,
QuotationRef: existing.QuotationRef,
IntentRef: strings.TrimSpace(existing.IntentSnapshot.Ref),
ClientPaymentRef: existing.ClientPaymentRef,
})
if err != nil {
return nil, err
}
if strings.TrimSpace(existingFP) != strings.TrimSpace(fingerprint) {
return nil, idem.ErrIdempotencyParamMismatch
}
payment = existing
} else {
payment, err = s.createGroupPayment(ctx, requestCtx, quotationRef, groupIdempotencyKey, intentRefJoined, fingerprint, group)
if err != nil {
return nil, remapIdempotencyError(err)
}
}
payment, err = s.runRuntime(ctx, payment)
if err != nil {
return nil, err
}
return payment, nil
}
func (s *svc) createGroupPayment(
ctx context.Context,
requestCtx *reqval.Ctx,
quotationRef string,
groupIdempotencyKey string,
intentRefJoined string,
expectedFingerprint string,
group opagg.Group,
) (*agg.Payment, error) {
intentSnapshot := group.IntentSnapshot
intentSnapshot.Ref = intentRefJoined
graph, err := s.planner.Compile(xplan.Input{
IntentSnapshot: intentSnapshot,
QuoteSnapshot: group.QuoteSnapshot,
})
if err != nil {
return nil, err
}
payment, err := s.aggregate.Create(agg.Input{
OrganizationRef: requestCtx.OrganizationID,
IdempotencyKey: groupIdempotencyKey,
QuotationRef: quotationRef,
ClientPaymentRef: requestCtx.ClientPaymentRef,
IntentSnapshot: intentSnapshot,
QuoteSnapshot: group.QuoteSnapshot,
Steps: toStepShells(graph),
})
if err != nil {
return nil, err
}
if err := s.repository.Create(ctx, payment); err != nil {
if !errors.Is(err, prepo.ErrDuplicatePayment) {
return nil, err
}
existing, getErr := s.repository.GetByIdempotencyKey(ctx, requestCtx.OrganizationID, groupIdempotencyKey)
if getErr != nil {
return nil, getErr
}
if existing != nil {
existingFP, fpErr := s.idempotency.Fingerprint(idem.FPInput{
OrganizationRef: requestCtx.OrganizationRef,
QuotationRef: existing.QuotationRef,
IntentRef: strings.TrimSpace(existing.IntentSnapshot.Ref),
ClientPaymentRef: existing.ClientPaymentRef,
})
if fpErr != nil {
return nil, fpErr
}
if strings.TrimSpace(existingFP) != strings.TrimSpace(expectedFingerprint) {
return nil, idem.ErrIdempotencyParamMismatch
}
return existing, nil
}
return nil, err
}
if err := s.recordPaymentCreated(ctx, payment, graph); err != nil {
return nil, err
}
return payment, nil
}
func deriveGroupIdempotencyKey(baseKey string, intentRefs []string) string {
normalized := normalizeIntentRefs(intentRefs)
h := sha256.New()
h.Write([]byte(strings.TrimSpace(baseKey)))
h.Write([]byte{0x1f})
h.Write([]byte(strings.Join(normalized, ",")))
return hex.EncodeToString(h.Sum(nil))
}
func normalizeIntentRefs(values []string) []string {
if len(values) == 0 {
return nil
}
seen := make(map[string]struct{}, len(values))
out := make([]string, 0, len(values))
for i := range values {
token := strings.TrimSpace(values[i])
if token == "" {
continue
}
if _, exists := seen[token]; exists {
continue
}
seen[token] = struct{}{}
out = append(out, token)
}
sort.Strings(out)
if len(out) == 0 {
return nil
}
return out
}

View File

@@ -0,0 +1,185 @@
package psvc
import (
"context"
"testing"
"time"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/storage/model"
pm "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
"go.mongodb.org/mongo-driver/v2/bson"
)
func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
return &sexec.ExecuteOutput{StepExecution: step}, nil
})
quote := newExecutableBatchQuote(env.orgID, "quote-batch-merge", []string{"intent-a", "intent-b"}, buildLedgerRoute())
env.quotes.Put(quote)
resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: testMeta(env.orgID, "idem-batch-merge"),
QuotationRef: "quote-batch-merge",
ClientPaymentRef: "client-batch-merge",
})
if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
}
if resp == nil {
t.Fatal("expected response")
}
if got, want := len(resp.GetPayments()), 1; got != want {
t.Fatalf("expected %d payment(s) for same-destination merge, got=%d", want, got)
}
if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("state mismatch: got=%s want=%s", got, want)
}
}
func TestExecuteBatchPayment_DifferentDestinationsCreatesSeparate(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
return &sexec.ExecuteOutput{StepExecution: step}, nil
})
quote := newExecutableBatchQuoteDiffDest(env.orgID, "quote-batch-diff")
env.quotes.Put(quote)
resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: testMeta(env.orgID, "idem-batch-diff"),
QuotationRef: "quote-batch-diff",
ClientPaymentRef: "client-batch-diff",
})
if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
}
if got, want := len(resp.GetPayments()), 2; got != want {
t.Fatalf("expected %d payments for different destinations, got=%d", want, got)
}
for i, p := range resp.GetPayments() {
if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want)
}
}
}
func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
return &sexec.ExecuteOutput{StepExecution: step}, nil
})
quote := newExecutableBatchQuote(env.orgID, "quote-batch-idem", []string{"intent-a", "intent-b"}, buildLedgerRoute())
env.quotes.Put(quote)
req := &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: testMeta(env.orgID, "idem-batch-idem"),
QuotationRef: "quote-batch-idem",
ClientPaymentRef: "client-batch-idem",
}
resp1, err := env.svc.ExecuteBatchPayment(context.Background(), req)
if err != nil {
t.Fatalf("first ExecuteBatchPayment returned error: %v", err)
}
resp2, err := env.svc.ExecuteBatchPayment(context.Background(), req)
if err != nil {
t.Fatalf("second ExecuteBatchPayment returned error: %v", err)
}
if got, want := len(resp2.GetPayments()), len(resp1.GetPayments()); got != want {
t.Fatalf("expected same number of payments on retry: got=%d want=%d", got, want)
}
if got, want := resp2.GetPayments()[0].GetPaymentRef(), resp1.GetPayments()[0].GetPaymentRef(); got != want {
t.Fatalf("expected same payment_ref on retry: got=%q want=%q", got, want)
}
}
func TestExecuteBatchPayment_EmptyQuotationRefFails(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
return &sexec.ExecuteOutput{StepExecution: step}, nil
})
_, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: testMeta(env.orgID, "idem-empty"),
QuotationRef: "",
})
if err == nil {
t.Fatal("expected error for empty quotation_ref")
}
}
func TestExecuteBatchPayment_QuoteNotFoundFails(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
return &sexec.ExecuteOutput{StepExecution: step}, nil
})
_, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: testMeta(env.orgID, "idem-notfound"),
QuotationRef: "nonexistent-quote",
})
if err == nil {
t.Fatal("expected error for non-existent quote")
}
}
func newExecutableBatchQuoteDiffDest(orgRef bson.ObjectID, quoteRef string) *model.PaymentQuoteRecord {
now := time.Now().UTC()
route := buildLedgerRoute()
return &model.PaymentQuoteRecord{
Base: modelBase(now),
OrganizationBoundBase: pm.OrganizationBoundBase{
OrganizationRef: orgRef,
},
QuoteRef: quoteRef,
RequestShape: model.QuoteRequestShapeBatch,
Items: []*model.PaymentQuoteItemV2{
{
Intent: &model.PaymentIntent{
Ref: "intent-a",
Kind: model.PaymentKindPayout,
Source: testLedgerEndpoint("ledger-src"),
Destination: testLedgerEndpoint("ledger-dst-1"),
Amount: &paymenttypes.Money{Amount: "10", Currency: "USD"},
SettlementCurrency: "USD",
},
Quote: &model.PaymentQuoteSnapshot{
QuoteRef: quoteRef,
DebitAmount: &paymenttypes.Money{Amount: "10", Currency: "USD"},
Route: route,
},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
{
Intent: &model.PaymentIntent{
Ref: "intent-b",
Kind: model.PaymentKindPayout,
Source: testLedgerEndpoint("ledger-src"),
Destination: testLedgerEndpoint("ledger-dst-2"),
Amount: &paymenttypes.Money{Amount: "15", Currency: "USD"},
SettlementCurrency: "USD",
},
Quote: &model.PaymentQuoteSnapshot{
QuoteRef: quoteRef,
DebitAmount: &paymenttypes.Money{Amount: "15", Currency: "USD"},
Route: route,
},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
},
ExpiresAt: now.Add(1 * time.Hour),
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/idem" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/idem"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oobs" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oobs"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/ostate" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/ostate"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/pquery" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/pquery"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
@@ -24,6 +25,7 @@ import (
// Service orchestrates execute/query/reconcile payment runtime operations. // Service orchestrates execute/query/reconcile payment runtime operations.
type Service interface { type Service interface {
ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error)
ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error)
GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error)
ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error)
@@ -51,6 +53,7 @@ type Dependencies struct {
Validator reqval.Validator Validator reqval.Validator
Idempotency idem.Service Idempotency idem.Service
Quote qsnap.Resolver Quote qsnap.Resolver
Aggregator opagg.Aggregator
Aggregate agg.Factory Aggregate agg.Factory
Planner xplan.Compiler Planner xplan.Compiler
State ostate.StateMachine State ostate.StateMachine

View File

@@ -6,6 +6,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/idem" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/idem"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oobs" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oobs"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/ostate" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/ostate"
@@ -34,6 +35,7 @@ type svc struct {
validator reqval.Validator validator reqval.Validator
idempotency idem.Service idempotency idem.Service
quote qsnap.Resolver quote qsnap.Resolver
aggregator opagg.Aggregator
aggregate agg.Factory aggregate agg.Factory
planner xplan.Compiler planner xplan.Compiler
state ostate.StateMachine state ostate.StateMachine
@@ -93,6 +95,7 @@ func newService(deps Dependencies) (Service, error) {
validator: firstValidator(deps.Validator, logger), validator: firstValidator(deps.Validator, logger),
idempotency: firstIdempotency(deps.Idempotency, logger), idempotency: firstIdempotency(deps.Idempotency, logger),
quote: firstQuoteResolver(deps.Quote, logger), quote: firstQuoteResolver(deps.Quote, logger),
aggregator: firstAggregator(deps.Aggregator, logger),
aggregate: firstAggregateFactory(deps.Aggregate, logger), aggregate: firstAggregateFactory(deps.Aggregate, logger),
planner: firstPlanCompiler(deps.Planner, logger), planner: firstPlanCompiler(deps.Planner, logger),
state: firstStateMachine(deps.State, logger), state: firstStateMachine(deps.State, logger),
@@ -148,6 +151,13 @@ func firstAggregateFactory(v agg.Factory, logger mlogger.Logger) agg.Factory {
return agg.New(agg.Dependencies{Logger: logger}) return agg.New(agg.Dependencies{Logger: logger})
} }
func firstAggregator(v opagg.Aggregator, logger mlogger.Logger) opagg.Aggregator {
if v != nil {
return v
}
return opagg.New(opagg.Dependencies{Logger: logger})
}
func firstPlanCompiler(v xplan.Compiler, logger mlogger.Logger) xplan.Compiler { func firstPlanCompiler(v xplan.Compiler, logger mlogger.Logger) xplan.Compiler {
if v != nil { if v != nil {
return v return v

View File

@@ -18,6 +18,7 @@ type Store interface {
// Resolver resolves a quotation reference into canonical execution snapshots. // Resolver resolves a quotation reference into canonical execution snapshots.
type Resolver interface { type Resolver interface {
Resolve(ctx context.Context, store Store, in Input) (*Output, error) Resolve(ctx context.Context, store Store, in Input) (*Output, error)
ResolveAll(ctx context.Context, store Store, in ResolveAllInput) (*ResolveAllOutput, error)
} }
// Input defines lookup scope for quotation resolution. // Input defines lookup scope for quotation resolution.
@@ -35,6 +36,25 @@ type Output struct {
QuoteSnapshot *model.PaymentQuoteSnapshot QuoteSnapshot *model.PaymentQuoteSnapshot
} }
// ResolveAllInput defines lookup scope for resolving all items in a batch quotation.
type ResolveAllInput struct {
OrganizationID bson.ObjectID
QuotationRef string
}
// ResolveAllOutput contains all resolved items from a batch quotation.
type ResolveAllOutput struct {
QuotationRef string
Items []ResolvedItem
}
// ResolvedItem is one resolved intent-quote pair from a batch quotation.
type ResolvedItem struct {
IntentRef string
IntentSnapshot model.PaymentIntent
QuoteSnapshot *model.PaymentQuoteSnapshot
}
// Dependencies configures quote resolver integrations. // Dependencies configures quote resolver integrations.
type Dependencies struct { type Dependencies struct {
Logger mlogger.Logger Logger mlogger.Logger

View File

@@ -0,0 +1,246 @@
package qsnap
import (
"context"
"errors"
"testing"
"time"
"github.com/tech/sendico/payments/storage/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
)
func TestResolveAll_BatchReturnsAllItems(t *testing.T) {
now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC)
orgID := bson.NewObjectID()
record := &model.PaymentQuoteRecord{
QuoteRef: "batch-quote-ref",
RequestShape: model.QuoteRequestShapeBatch,
ExpiresAt: now.Add(time.Minute),
Items: []*model.PaymentQuoteItemV2{
{
Intent: &model.PaymentIntent{Ref: "intent-a", Kind: model.PaymentKindPayout, Amount: &paymenttypes.Money{Amount: "100", Currency: "USDT"}},
Quote: &model.PaymentQuoteSnapshot{QuoteRef: "batch-quote-ref", DebitAmount: &paymenttypes.Money{Amount: "100", Currency: "USDT"}},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
{
Intent: &model.PaymentIntent{Ref: "intent-b", Kind: model.PaymentKindPayout, Amount: &paymenttypes.Money{Amount: "200", Currency: "USDT"}},
Quote: &model.PaymentQuoteSnapshot{QuoteRef: "batch-quote-ref", DebitAmount: &paymenttypes.Money{Amount: "200", Currency: "USDT"}},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
},
}
resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }})
out, err := resolver.ResolveAll(context.Background(), &fakeStore{
getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) {
return record, nil
},
}, ResolveAllInput{
OrganizationID: orgID,
QuotationRef: "batch-quote-ref",
})
if err != nil {
t.Fatalf("ResolveAll returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if got, want := out.QuotationRef, "batch-quote-ref"; got != want {
t.Fatalf("quotation_ref mismatch: got=%q want=%q", got, want)
}
if got, want := len(out.Items), 2; got != want {
t.Fatalf("items count mismatch: got=%d want=%d", got, want)
}
if got, want := out.Items[0].IntentRef, "intent-a"; got != want {
t.Fatalf("items[0].intent_ref mismatch: got=%q want=%q", got, want)
}
if got, want := out.Items[1].IntentRef, "intent-b"; got != want {
t.Fatalf("items[1].intent_ref mismatch: got=%q want=%q", got, want)
}
if got, want := out.Items[0].QuoteSnapshot.DebitAmount.Amount, "100"; got != want {
t.Fatalf("items[0].quote debit amount mismatch: got=%q want=%q", got, want)
}
if got, want := out.Items[1].QuoteSnapshot.DebitAmount.Amount, "200"; got != want {
t.Fatalf("items[1].quote debit amount mismatch: got=%q want=%q", got, want)
}
}
func TestResolveAll_SingleShapeReturnsOneItem(t *testing.T) {
now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC)
orgID := bson.NewObjectID()
record := &model.PaymentQuoteRecord{
QuoteRef: "single-quote-ref",
RequestShape: model.QuoteRequestShapeSingle,
ExpiresAt: now.Add(time.Minute),
Items: []*model.PaymentQuoteItemV2{
{
Intent: &model.PaymentIntent{Ref: "intent-1", Kind: model.PaymentKindPayout, Amount: &paymenttypes.Money{Amount: "50", Currency: "USD"}},
Quote: &model.PaymentQuoteSnapshot{QuoteRef: "single-quote-ref"},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
},
}
resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }})
out, err := resolver.ResolveAll(context.Background(), &fakeStore{
getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) {
return record, nil
},
}, ResolveAllInput{
OrganizationID: orgID,
QuotationRef: "single-quote-ref",
})
if err != nil {
t.Fatalf("ResolveAll returned error: %v", err)
}
if got, want := len(out.Items), 1; got != want {
t.Fatalf("items count mismatch: got=%d want=%d", got, want)
}
if got, want := out.Items[0].IntentRef, "intent-1"; got != want {
t.Fatalf("items[0].intent_ref mismatch: got=%q want=%q", got, want)
}
}
func TestResolveAll_NonExecutableItemFails(t *testing.T) {
now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC)
orgID := bson.NewObjectID()
record := &model.PaymentQuoteRecord{
QuoteRef: "batch-mixed",
RequestShape: model.QuoteRequestShapeBatch,
ExpiresAt: now.Add(time.Minute),
Items: []*model.PaymentQuoteItemV2{
{
Intent: &model.PaymentIntent{Ref: "intent-ok", Kind: model.PaymentKindPayout},
Quote: &model.PaymentQuoteSnapshot{},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
{
Intent: &model.PaymentIntent{Ref: "intent-blocked", Kind: model.PaymentKindPayout},
Quote: &model.PaymentQuoteSnapshot{},
Status: &model.QuoteStatusV2{State: model.QuoteStateBlocked, BlockReason: model.QuoteBlockReasonInsufficientLiquidity},
},
},
}
resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }})
_, err := resolver.ResolveAll(context.Background(), &fakeStore{
getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) {
return record, nil
},
}, ResolveAllInput{
OrganizationID: orgID,
QuotationRef: "batch-mixed",
})
if err == nil {
t.Fatal("expected error for non-executable item")
}
if !errors.Is(err, ErrQuoteNotExecutable) {
t.Fatalf("expected ErrQuoteNotExecutable, got %v", err)
}
}
func TestResolveAll_ExpiredQuoteFails(t *testing.T) {
now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC)
orgID := bson.NewObjectID()
record := &model.PaymentQuoteRecord{
QuoteRef: "expired-quote",
RequestShape: model.QuoteRequestShapeBatch,
ExpiresAt: now.Add(-time.Minute),
Items: []*model.PaymentQuoteItemV2{
{
Intent: &model.PaymentIntent{Ref: "intent-1", Kind: model.PaymentKindPayout},
Quote: &model.PaymentQuoteSnapshot{},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
},
}
resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }})
_, err := resolver.ResolveAll(context.Background(), &fakeStore{
getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) {
return record, nil
},
}, ResolveAllInput{
OrganizationID: orgID,
QuotationRef: "expired-quote",
})
if err == nil {
t.Fatal("expected error for expired quote")
}
if !errors.Is(err, ErrQuoteExpired) {
t.Fatalf("expected ErrQuoteExpired, got %v", err)
}
}
func TestResolveAll_EmptyQuotationRefFails(t *testing.T) {
resolver := New(Dependencies{Logger: zap.NewNop()})
_, err := resolver.ResolveAll(context.Background(), &fakeStore{}, ResolveAllInput{
OrganizationID: bson.NewObjectID(),
QuotationRef: "",
})
if err == nil {
t.Fatal("expected error for empty quotation_ref")
}
}
func TestResolveAll_QuoteNotFoundFails(t *testing.T) {
resolver := New(Dependencies{Logger: zap.NewNop()})
_, err := resolver.ResolveAll(context.Background(), &fakeStore{}, ResolveAllInput{
OrganizationID: bson.NewObjectID(),
QuotationRef: "nonexistent",
})
if err == nil {
t.Fatal("expected error for not-found quote")
}
if !errors.Is(err, ErrQuoteNotFound) {
t.Fatalf("expected ErrQuoteNotFound, got %v", err)
}
}
func TestResolveAll_SetsQuoteRefWhenEmpty(t *testing.T) {
now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC)
orgID := bson.NewObjectID()
record := &model.PaymentQuoteRecord{
QuoteRef: "batch-ref",
RequestShape: model.QuoteRequestShapeBatch,
ExpiresAt: now.Add(time.Minute),
Items: []*model.PaymentQuoteItemV2{
{
Intent: &model.PaymentIntent{Ref: "intent-1", Kind: model.PaymentKindPayout},
Quote: &model.PaymentQuoteSnapshot{QuoteRef: ""},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
},
}
resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }})
out, err := resolver.ResolveAll(context.Background(), &fakeStore{
getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) {
return record, nil
},
}, ResolveAllInput{
OrganizationID: orgID,
QuotationRef: "batch-ref",
})
if err != nil {
t.Fatalf("ResolveAll returned error: %v", err)
}
if got, want := out.Items[0].QuoteSnapshot.QuoteRef, "batch-ref"; got != want {
t.Fatalf("quote_ref should be back-filled: got=%q want=%q", got, want)
}
}

View File

@@ -102,6 +102,91 @@ func (s *svc) Resolve(
return out, nil return out, nil
} }
func (s *svc) ResolveAll(
ctx context.Context,
store Store,
in ResolveAllInput,
) (out *ResolveAllOutput, err error) {
logger := s.logger
logger.Debug("Starting ResolveAll",
zap.String("organization_ref", in.OrganizationID.Hex()),
zap.String("quotation_ref", strings.TrimSpace(in.QuotationRef)),
)
defer func(start time.Time) {
fields := []zap.Field{zap.Int64("duration_ms", time.Since(start).Milliseconds())}
if out != nil {
fields = append(fields,
zap.String("quotation_ref", strings.TrimSpace(out.QuotationRef)),
zap.Int("items_count", len(out.Items)),
)
}
if err != nil {
logger.Warn("Failed to resolve all", append(fields, zap.Error(err))...)
return
}
logger.Debug("Completed ResolveAll", fields...)
}(time.Now())
if store == nil {
return nil, merrors.InvalidArgument("quotes store is required")
}
if in.OrganizationID.IsZero() {
return nil, merrors.InvalidArgument("organization_id is required")
}
quoteRef := strings.TrimSpace(in.QuotationRef)
if quoteRef == "" {
return nil, merrors.InvalidArgument("quotation_ref is required")
}
record, err := store.GetByRef(ctx, in.OrganizationID, quoteRef)
if err != nil {
if errors.Is(err, quotestorage.ErrQuoteNotFound) || errors.Is(err, merrors.ErrNoData) {
return nil, ErrQuoteNotFound
}
return nil, err
}
if record == nil {
return nil, ErrQuoteNotFound
}
if len(record.Items) == 0 {
return nil, xerr.Wrapf(ErrQuoteShapeMismatch, "items are empty")
}
outputRef := strings.TrimSpace(record.QuoteRef)
if outputRef == "" {
outputRef = quoteRef
}
items := make([]ResolvedItem, 0, len(record.Items))
for i, item := range record.Items {
if item == nil {
return nil, xerr.Wrapf(ErrQuoteShapeMismatch, "items[%d] is nil", i)
}
if err := ensureExecutable(record, item.Status, s.now().UTC()); err != nil {
return nil, err
}
resolved, err := resolveItem(item, "")
if err != nil {
return nil, xerr.Wrapf(err, "items[%d]", i)
}
intentRef := strings.TrimSpace(resolved.Intent.Ref)
if resolved.Quote != nil && strings.TrimSpace(resolved.Quote.QuoteRef) == "" {
resolved.Quote.QuoteRef = outputRef
}
items = append(items, ResolvedItem{
IntentRef: intentRef,
IntentSnapshot: resolved.Intent,
QuoteSnapshot: resolved.Quote,
})
}
out = &ResolveAllOutput{
QuotationRef: outputRef,
Items: items,
}
return out, nil
}
func ensureExecutable( func ensureExecutable(
record *model.PaymentQuoteRecord, record *model.PaymentQuoteRecord,
status *model.QuoteStatusV2, status *model.QuoteStatusV2,

View File

@@ -247,6 +247,10 @@ func (f *fakeExternalRuntimeV2) ReconcileExternal(_ context.Context, in psvc.Rec
}, nil }, nil
} }
func (f *fakeExternalRuntimeV2) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) {
return nil, errors.New("not implemented")
}
func TestMapTransferStatus(t *testing.T) { func TestMapTransferStatus(t *testing.T) {
cases := []struct { cases := []struct {
status chainv1.TransferStatus status chainv1.TransferStatus

View File

@@ -46,6 +46,10 @@ func (fakeOrchestrationV2Service) ReconcileExternal(context.Context, psvc.Reconc
return &psvc.ReconcileExternalOutput{}, nil return &psvc.ReconcileExternalOutput{}, nil
} }
func (fakeOrchestrationV2Service) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) {
return &orchestrationv2.ExecuteBatchPaymentResponse{}, nil
}
type grpcCaptureRouterV2 struct { type grpcCaptureRouterV2 struct {
server *grpc.Server server *grpc.Server
done chan error done chan error

View File

@@ -168,6 +168,14 @@ func (s *v2GRPCServer) ExecutePayment(ctx context.Context, req *orchestrationv2.
return resp, nil return resp, nil
} }
func (s *v2GRPCServer) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) {
resp, err := s.svc.ExecuteBatchPayment(ctx, req)
if err != nil {
return gsresponse.Execute(ctx, gsresponse.Auto[orchestrationv2.ExecuteBatchPaymentResponse](s.logger, mservice.PaymentOrchestrator, err))
}
return resp, nil
}
func (s *v2GRPCServer) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) { func (s *v2GRPCServer) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) {
resp, err := s.svc.GetPayment(ctx, req) resp, err := s.svc.GetPayment(ctx, req)
if err != nil { if err != nil {

View File

@@ -49,3 +49,7 @@ func (fakeV2Service) ListPayments(context.Context, *orchestrationv2.ListPayments
func (fakeV2Service) ReconcileExternal(context.Context, psvc.ReconcileExternalInput) (*psvc.ReconcileExternalOutput, error) { func (fakeV2Service) ReconcileExternal(context.Context, psvc.ReconcileExternalInput) (*psvc.ReconcileExternalOutput, error) {
return &psvc.ReconcileExternalOutput{}, nil return &psvc.ReconcileExternalOutput{}, nil
} }
func (fakeV2Service) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) {
return &orchestrationv2.ExecuteBatchPaymentResponse{}, nil
}

View File

@@ -1,8 +1,6 @@
package paymentapiimp package paymentapiimp
import ( import (
"crypto/sha256"
"encoding/hex"
"encoding/json" "encoding/json"
"net/http" "net/http"
"strings" "strings"
@@ -18,11 +16,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const (
fanoutIdempotencyHashLen = 16
maxExecuteIdempotencyKey = 256
)
func (a *PaymentAPI) initiatePaymentsByQuote(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc { func (a *PaymentAPI) initiatePaymentsByQuote(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc {
orgRef, err := a.oph.GetRef(r) orgRef, err := a.oph.GetRef(r)
if err != nil { if err != nil {
@@ -68,18 +61,30 @@ func (a *PaymentAPI) initiatePaymentsByQuote(r *http.Request, account *model.Acc
return resp.GetPayment(), nil return resp.GetPayment(), nil
} }
executeBatch := func(idempotencyKey string) ([]*orchestrationv2.Payment, error) {
req := &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: requestMeta(orgRef.Hex(), idempotencyKey),
QuotationRef: quotationRef,
ClientPaymentRef: clientPaymentRef,
}
resp, executeErr := a.execution.ExecuteBatchPayment(ctx, req)
if executeErr != nil {
return nil, executeErr
}
if resp == nil {
return nil, nil
}
return resp.GetPayments(), nil
}
payments := make([]*orchestrationv2.Payment, 0, max(1, len(intentSelectors))) payments := make([]*orchestrationv2.Payment, 0, max(1, len(intentSelectors)))
if len(payload.IntentRefs) > 0 { if len(payload.IntentRefs) > 0 {
for _, intentRef := range payload.IntentRefs { executed, executeErr := executeBatch(baseIdempotencyKey)
payment, executeErr := executeOne(deriveFanoutIdempotencyKey(baseIdempotencyKey, intentRef), intentRef) if executeErr != nil {
if executeErr != nil { a.logger.Warn("Failed to initiate batch payments", zap.Error(executeErr), zap.String("organization_ref", orgRef.Hex()))
a.logger.Warn("Failed to initiate batch payments", zap.Error(executeErr), zap.String("organization_ref", orgRef.Hex())) return grpcErrorResponse(a.logger, a.Name(), executeErr)
return grpcErrorResponse(a.logger, a.Name(), executeErr)
}
if payment != nil {
payments = append(payments, payment)
}
} }
payments = append(payments, executed...)
return sresponse.PaymentsResponse(a.logger, payments, token) return sresponse.PaymentsResponse(a.logger, payments, token)
} }
@@ -118,28 +123,6 @@ func resolveExecutionIntentSelectors(payload *srequest.InitiatePayments, allowLe
return nil, merrors.InvalidArgument("metadata.intent_ref is no longer supported; use intentRef or intentRefs", "metadata.intent_ref") return nil, merrors.InvalidArgument("metadata.intent_ref is no longer supported; use intentRef or intentRefs", "metadata.intent_ref")
} }
func deriveFanoutIdempotencyKey(baseIdempotencyKey, intentRef string) string {
baseIdempotencyKey = strings.TrimSpace(baseIdempotencyKey)
intentRef = strings.TrimSpace(intentRef)
if baseIdempotencyKey == "" || intentRef == "" {
return baseIdempotencyKey
}
sum := sha256.Sum256([]byte(intentRef))
hash := hex.EncodeToString(sum[:])
if len(hash) > fanoutIdempotencyHashLen {
hash = hash[:fanoutIdempotencyHashLen]
}
suffix := ":i:" + hash
if len(baseIdempotencyKey)+len(suffix) <= maxExecuteIdempotencyKey {
return baseIdempotencyKey + suffix
}
if len(suffix) >= maxExecuteIdempotencyKey {
return suffix[:maxExecuteIdempotencyKey]
}
prefixLen := maxExecuteIdempotencyKey - len(suffix)
return baseIdempotencyKey[:prefixLen] + suffix
}
func decodeInitiatePaymentsPayload(r *http.Request) (*srequest.InitiatePayments, error) { func decodeInitiatePaymentsPayload(r *http.Request) (*srequest.InitiatePayments, error) {
defer r.Body.Close() defer r.Body.Close()

View File

@@ -6,7 +6,6 @@ import (
"errors" "errors"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings"
"testing" "testing"
"time" "time"
@@ -24,7 +23,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func TestInitiatePaymentsByQuote_FansOutByIntentRefs(t *testing.T) { func TestInitiatePaymentsByQuote_PassesIntentRefsInSingleExecuteCall(t *testing.T) {
orgRef := bson.NewObjectID() orgRef := bson.NewObjectID()
exec := &fakeExecutionClientForBatch{} exec := &fakeExecutionClientForBatch{}
api := newBatchAPI(exec) api := newBatchAPI(exec)
@@ -35,20 +34,17 @@ func TestInitiatePaymentsByQuote_FansOutByIntentRefs(t *testing.T) {
t.Fatalf("status mismatch: got=%d want=%d body=%s", got, want, rr.Body.String()) t.Fatalf("status mismatch: got=%d want=%d body=%s", got, want, rr.Body.String())
} }
if got, want := len(exec.executeReqs), 2; got != want { if got, want := len(exec.executeBatchReqs), 1; got != want {
t.Fatalf("execute calls mismatch: got=%d want=%d", got, want) t.Fatalf("execute batch calls mismatch: got=%d want=%d", got, want)
} }
if got, want := exec.executeReqs[0].GetIntentRef(), "intent-a"; got != want { if got := len(exec.executeReqs); got != 0 {
t.Fatalf("intent_ref[0] mismatch: got=%q want=%q", got, want) t.Fatalf("expected no execute calls, got=%d", got)
} }
if got, want := exec.executeReqs[1].GetIntentRef(), "intent-b"; got != want { if got, want := exec.executeBatchReqs[0].GetQuotationRef(), "quote-1"; got != want {
t.Fatalf("intent_ref[1] mismatch: got=%q want=%q", got, want) t.Fatalf("quotation_ref mismatch: got=%q want=%q", got, want)
} }
if got, want := exec.executeReqs[0].GetMeta().GetTrace().GetIdempotencyKey(), deriveFanoutIdempotencyKey("idem-batch", "intent-a"); got != want { if got, want := exec.executeBatchReqs[0].GetMeta().GetTrace().GetIdempotencyKey(), "idem-batch"; got != want {
t.Fatalf("idempotency[0] mismatch: got=%q want=%q", got, want) t.Fatalf("idempotency mismatch: got=%q want=%q", got, want)
}
if got, want := exec.executeReqs[1].GetMeta().GetTrace().GetIdempotencyKey(), deriveFanoutIdempotencyKey("idem-batch", "intent-b"); got != want {
t.Fatalf("idempotency[1] mismatch: got=%q want=%q", got, want)
} }
} }
@@ -125,28 +121,6 @@ func TestInitiatePaymentsByQuote_RejectsLegacyMetadataIntentRefWhenDateGateExpir
} }
} }
func TestDeriveFanoutIdempotencyKey_IsDeterministicAndBounded(t *testing.T) {
a := deriveFanoutIdempotencyKey("idem-1", "intent-a")
b := deriveFanoutIdempotencyKey("idem-1", "intent-a")
if got, want := a, b; got != want {
t.Fatalf("determinism mismatch: got=%q want=%q", got, want)
}
if a == "idem-1" {
t.Fatalf("expected derived key to differ from base")
}
c := deriveFanoutIdempotencyKey("idem-1", "intent-b")
if c == a {
t.Fatalf("expected different derived keys for different intents")
}
longBase := strings.Repeat("a", 400)
long := deriveFanoutIdempotencyKey(longBase, "intent-a")
if got, want := len(long), maxExecuteIdempotencyKey; got != want {
t.Fatalf("length mismatch: got=%d want=%d", got, want)
}
}
func TestResolveExecutionIntentSelectors_PrefersExplicitSelectors(t *testing.T) { func TestResolveExecutionIntentSelectors_PrefersExplicitSelectors(t *testing.T) {
payload := &srequest.InitiatePayments{ payload := &srequest.InitiatePayments{
IntentRefs: []string{"intent-a", "intent-b"}, IntentRefs: []string{"intent-a", "intent-b"},
@@ -229,7 +203,8 @@ func invokeInitiatePaymentsByQuote(t *testing.T, api *PaymentAPI, orgRef bson.Ob
} }
type fakeExecutionClientForBatch struct { type fakeExecutionClientForBatch struct {
executeReqs []*orchestrationv2.ExecutePaymentRequest executeReqs []*orchestrationv2.ExecutePaymentRequest
executeBatchReqs []*orchestrationv2.ExecuteBatchPaymentRequest
} }
func (f *fakeExecutionClientForBatch) ExecutePayment(_ context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) { func (f *fakeExecutionClientForBatch) ExecutePayment(_ context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) {
@@ -239,6 +214,13 @@ func (f *fakeExecutionClientForBatch) ExecutePayment(_ context.Context, req *orc
}, nil }, nil
} }
func (f *fakeExecutionClientForBatch) ExecuteBatchPayment(_ context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) {
f.executeBatchReqs = append(f.executeBatchReqs, req)
return &orchestrationv2.ExecuteBatchPaymentResponse{
Payments: []*orchestrationv2.Payment{{PaymentRef: bson.NewObjectID().Hex()}},
}, nil
}
func (*fakeExecutionClientForBatch) ListPayments(context.Context, *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) { func (*fakeExecutionClientForBatch) ListPayments(context.Context, *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) {
return &orchestrationv2.ListPaymentsResponse{}, nil return &orchestrationv2.ListPaymentsResponse{}, nil
} }

View File

@@ -37,6 +37,7 @@ const (
type executionClient interface { type executionClient interface {
ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error)
ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error)
ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error)
Close() error Close() error
} }