From 4949c4ffe0b5eed673885dfc587229f2b0d8c0ad Mon Sep 17 00:00:00 2001 From: Stephan D Date: Thu, 26 Feb 2026 22:12:32 +0100 Subject: [PATCH] Batch payment execution + got rid of intent references --- api/payments/orchestrator/client/client.go | 8 + api/payments/orchestrator/client/fake.go | 16 +- .../orchestrationv2/psvc/execute_batch.go | 262 ++++++++++++++++++ .../psvc/execute_batch_test.go | 185 +++++++++++++ .../service/orchestrationv2/psvc/module.go | 3 + .../service/orchestrationv2/psvc/service.go | 10 + .../service/orchestrationv2/qsnap/module.go | 20 ++ .../orchestrationv2/qsnap/resolve_all_test.go | 246 ++++++++++++++++ .../service/orchestrationv2/qsnap/service.go | 85 ++++++ .../orchestrator/external_runtime_test.go | 4 + .../orchestrator/service_registration_test.go | 4 + .../service/orchestrator/service_v2.go | 8 + .../service/orchestrator/service_v2_test.go | 4 + .../internal/server/paymentapiimp/paybatch.go | 59 ++-- .../server/paymentapiimp/paybatch_test.go | 54 ++-- .../internal/server/paymentapiimp/service.go | 1 + 16 files changed, 891 insertions(+), 78 deletions(-) create mode 100644 api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch.go create mode 100644 api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch_test.go create mode 100644 api/payments/orchestrator/internal/service/orchestrationv2/qsnap/resolve_all_test.go diff --git a/api/payments/orchestrator/client/client.go b/api/payments/orchestrator/client/client.go index f0536176..b1b404b4 100644 --- a/api/payments/orchestrator/client/client.go +++ b/api/payments/orchestrator/client/client.go @@ -17,6 +17,7 @@ import ( // Client exposes typed helpers around the payment orchestration and quotation gRPC APIs. type Client interface { ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) + ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) Close() error @@ -24,6 +25,7 @@ type Client interface { type grpcOrchestratorClient interface { ExecutePayment(ctx context.Context, in *orchestrationv2.ExecutePaymentRequest, opts ...grpc.CallOption) (*orchestrationv2.ExecutePaymentResponse, error) + ExecuteBatchPayment(ctx context.Context, in *orchestrationv2.ExecuteBatchPaymentRequest, opts ...grpc.CallOption) (*orchestrationv2.ExecuteBatchPaymentResponse, error) GetPayment(ctx context.Context, in *orchestrationv2.GetPaymentRequest, opts ...grpc.CallOption) (*orchestrationv2.GetPaymentResponse, error) ListPayments(ctx context.Context, in *orchestrationv2.ListPaymentsRequest, opts ...grpc.CallOption) (*orchestrationv2.ListPaymentsResponse, error) } @@ -99,6 +101,12 @@ func (c *orchestratorClient) ExecutePayment(ctx context.Context, req *orchestrat return c.client.ExecutePayment(ctx, req) } +func (c *orchestratorClient) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) { + ctx, cancel := c.callContext(ctx) + defer cancel() + return c.client.ExecuteBatchPayment(ctx, req) +} + func (c *orchestratorClient) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) { ctx, cancel := c.callContext(ctx) defer cancel() diff --git a/api/payments/orchestrator/client/fake.go b/api/payments/orchestrator/client/fake.go index 9edde998..6ade3672 100644 --- a/api/payments/orchestrator/client/fake.go +++ b/api/payments/orchestrator/client/fake.go @@ -8,10 +8,11 @@ import ( // Fake implements Client for tests. type Fake struct { - ExecutePaymentFn func(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) - GetPaymentFn func(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) - ListPaymentsFn func(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) - CloseFn func() error + ExecutePaymentFn func(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) + ExecuteBatchPaymentFn func(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) + GetPaymentFn func(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) + ListPaymentsFn func(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) + CloseFn func() error } func (f *Fake) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) { @@ -21,6 +22,13 @@ func (f *Fake) ExecutePayment(ctx context.Context, req *orchestrationv2.ExecuteP return &orchestrationv2.ExecutePaymentResponse{}, nil } +func (f *Fake) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) { + if f.ExecuteBatchPaymentFn != nil { + return f.ExecuteBatchPaymentFn(ctx, req) + } + return &orchestrationv2.ExecuteBatchPaymentResponse{}, nil +} + func (f *Fake) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) { if f.GetPaymentFn != nil { return f.GetPaymentFn(ctx, req) diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch.go new file mode 100644 index 00000000..c97919e5 --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch.go @@ -0,0 +1,262 @@ +package psvc + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "sort" + "strings" + "time" + + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/idem" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/qsnap" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/reqval" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan" + "github.com/tech/sendico/pkg/merrors" + orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2" + "go.uber.org/zap" +) + +func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (resp *orchestrationv2.ExecuteBatchPaymentResponse, err error) { + logger := s.logger + orgRef := "" + if req != nil && req.GetMeta() != nil { + orgRef = strings.TrimSpace(req.GetMeta().GetOrganizationRef()) + } + logger.Debug("Starting ExecuteBatchPayment", + zap.String("organization_ref", orgRef), + zap.String("quotation_ref", strings.TrimSpace(req.GetQuotationRef())), + zap.Bool("has_client_payment_ref", strings.TrimSpace(req.GetClientPaymentRef()) != ""), + ) + defer func(start time.Time) { + fields := []zap.Field{zap.Int64("duration_ms", time.Since(start).Milliseconds())} + if resp != nil { + fields = append(fields, zap.Int("payments_count", len(resp.GetPayments()))) + } + if err != nil { + logger.Warn("Failed to execute batch payment", append(fields, zap.Error(err))...) + return + } + logger.Debug("Completed ExecuteBatchPayment", fields...) + }(time.Now()) + + requestCtx, err := s.prepareBatchExecute(req) + if err != nil { + return nil, err + } + + resolved, err := s.quote.ResolveAll(ctx, s.quoteStore, qsnap.ResolveAllInput{ + OrganizationID: requestCtx.OrganizationID, + QuotationRef: requestCtx.QuotationRef, + }) + if err != nil { + return nil, remapResolveError(err) + } + if resolved == nil || len(resolved.Items) == 0 { + return nil, merrors.InvalidArgument("quotation has no executable items") + } + + aggItems := make([]opagg.Item, 0, len(resolved.Items)) + for _, item := range resolved.Items { + aggItems = append(aggItems, opagg.Item{ + IntentRef: item.IntentRef, + IntentSnapshot: item.IntentSnapshot, + QuoteSnapshot: item.QuoteSnapshot, + }) + } + aggOutput, err := s.aggregator.Aggregate(opagg.Input{Items: aggItems}) + if err != nil { + return nil, err + } + + payments := make([]*agg.Payment, 0, len(aggOutput.Groups)) + for _, group := range aggOutput.Groups { + payment, err := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, group) + if err != nil { + return nil, err + } + payments = append(payments, payment) + } + + protoPayments, err := s.mapPayments(payments) + if err != nil { + return nil, err + } + return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: protoPayments}, nil +} + +func (s *svc) prepareBatchExecute(req *orchestrationv2.ExecuteBatchPaymentRequest) (*reqval.Ctx, error) { + return s.validator.Validate(mapBatchExecuteReq(req)) +} + +func mapBatchExecuteReq(req *orchestrationv2.ExecuteBatchPaymentRequest) *reqval.Req { + if req == nil { + return nil + } + out := &reqval.Req{ + QuotationRef: req.GetQuotationRef(), + ClientPaymentRef: req.GetClientPaymentRef(), + } + meta := req.GetMeta() + if meta == nil { + return out + } + out.Meta = &reqval.Meta{OrganizationRef: meta.GetOrganizationRef()} + if meta.GetTrace() != nil { + out.Meta.Trace = &reqval.Trace{IdempotencyKey: meta.GetTrace().GetIdempotencyKey()} + } + return out +} + +func (s *svc) executeGroup(ctx context.Context, requestCtx *reqval.Ctx, quotationRef string, group opagg.Group) (*agg.Payment, error) { + normalizedIntentRefs := normalizeIntentRefs(group.IntentRefs) + if len(normalizedIntentRefs) == 0 { + return nil, merrors.InvalidArgument("aggregated group has no intent refs") + } + groupIdempotencyKey := deriveGroupIdempotencyKey(requestCtx.IdempotencyKey, normalizedIntentRefs) + intentRefJoined := strings.Join(normalizedIntentRefs, ",") + + fingerprint, err := s.idempotency.Fingerprint(idem.FPInput{ + OrganizationRef: requestCtx.OrganizationRef, + QuotationRef: requestCtx.QuotationRef, + IntentRef: intentRefJoined, + ClientPaymentRef: requestCtx.ClientPaymentRef, + }) + if err != nil { + return nil, err + } + + existing, err := s.repository.GetByIdempotencyKey(ctx, requestCtx.OrganizationID, groupIdempotencyKey) + if err != nil && !errors.Is(err, prepo.ErrPaymentNotFound) && !errors.Is(err, merrors.ErrNoData) { + return nil, err + } + var payment *agg.Payment + if existing != nil { + existingFP, err := s.idempotency.Fingerprint(idem.FPInput{ + OrganizationRef: requestCtx.OrganizationRef, + QuotationRef: existing.QuotationRef, + IntentRef: strings.TrimSpace(existing.IntentSnapshot.Ref), + ClientPaymentRef: existing.ClientPaymentRef, + }) + if err != nil { + return nil, err + } + if strings.TrimSpace(existingFP) != strings.TrimSpace(fingerprint) { + return nil, idem.ErrIdempotencyParamMismatch + } + payment = existing + } else { + payment, err = s.createGroupPayment(ctx, requestCtx, quotationRef, groupIdempotencyKey, intentRefJoined, fingerprint, group) + if err != nil { + return nil, remapIdempotencyError(err) + } + } + + payment, err = s.runRuntime(ctx, payment) + if err != nil { + return nil, err + } + return payment, nil +} + +func (s *svc) createGroupPayment( + ctx context.Context, + requestCtx *reqval.Ctx, + quotationRef string, + groupIdempotencyKey string, + intentRefJoined string, + expectedFingerprint string, + group opagg.Group, +) (*agg.Payment, error) { + intentSnapshot := group.IntentSnapshot + intentSnapshot.Ref = intentRefJoined + + graph, err := s.planner.Compile(xplan.Input{ + IntentSnapshot: intentSnapshot, + QuoteSnapshot: group.QuoteSnapshot, + }) + if err != nil { + return nil, err + } + + payment, err := s.aggregate.Create(agg.Input{ + OrganizationRef: requestCtx.OrganizationID, + IdempotencyKey: groupIdempotencyKey, + QuotationRef: quotationRef, + ClientPaymentRef: requestCtx.ClientPaymentRef, + IntentSnapshot: intentSnapshot, + QuoteSnapshot: group.QuoteSnapshot, + Steps: toStepShells(graph), + }) + if err != nil { + return nil, err + } + + if err := s.repository.Create(ctx, payment); err != nil { + if !errors.Is(err, prepo.ErrDuplicatePayment) { + return nil, err + } + existing, getErr := s.repository.GetByIdempotencyKey(ctx, requestCtx.OrganizationID, groupIdempotencyKey) + if getErr != nil { + return nil, getErr + } + if existing != nil { + existingFP, fpErr := s.idempotency.Fingerprint(idem.FPInput{ + OrganizationRef: requestCtx.OrganizationRef, + QuotationRef: existing.QuotationRef, + IntentRef: strings.TrimSpace(existing.IntentSnapshot.Ref), + ClientPaymentRef: existing.ClientPaymentRef, + }) + if fpErr != nil { + return nil, fpErr + } + if strings.TrimSpace(existingFP) != strings.TrimSpace(expectedFingerprint) { + return nil, idem.ErrIdempotencyParamMismatch + } + return existing, nil + } + return nil, err + } + + if err := s.recordPaymentCreated(ctx, payment, graph); err != nil { + return nil, err + } + return payment, nil +} + +func deriveGroupIdempotencyKey(baseKey string, intentRefs []string) string { + normalized := normalizeIntentRefs(intentRefs) + h := sha256.New() + h.Write([]byte(strings.TrimSpace(baseKey))) + h.Write([]byte{0x1f}) + h.Write([]byte(strings.Join(normalized, ","))) + return hex.EncodeToString(h.Sum(nil)) +} + +func normalizeIntentRefs(values []string) []string { + if len(values) == 0 { + return nil + } + seen := make(map[string]struct{}, len(values)) + out := make([]string, 0, len(values)) + for i := range values { + token := strings.TrimSpace(values[i]) + if token == "" { + continue + } + if _, exists := seen[token]; exists { + continue + } + seen[token] = struct{}{} + out = append(out, token) + } + sort.Strings(out) + if len(out) == 0 { + return nil + } + return out +} diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch_test.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch_test.go new file mode 100644 index 00000000..70abf825 --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch_test.go @@ -0,0 +1,185 @@ +package psvc + +import ( + "context" + "testing" + "time" + + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec" + "github.com/tech/sendico/payments/storage/model" + pm "github.com/tech/sendico/pkg/model" + paymenttypes "github.com/tech/sendico/pkg/payments/types" + orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2" + "go.mongodb.org/mongo-driver/v2/bson" +) + +func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) { + env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { + step := req.StepExecution + step.State = agg.StepStateCompleted + return &sexec.ExecuteOutput{StepExecution: step}, nil + }) + + quote := newExecutableBatchQuote(env.orgID, "quote-batch-merge", []string{"intent-a", "intent-b"}, buildLedgerRoute()) + env.quotes.Put(quote) + + resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{ + Meta: testMeta(env.orgID, "idem-batch-merge"), + QuotationRef: "quote-batch-merge", + ClientPaymentRef: "client-batch-merge", + }) + if err != nil { + t.Fatalf("ExecuteBatchPayment returned error: %v", err) + } + if resp == nil { + t.Fatal("expected response") + } + if got, want := len(resp.GetPayments()), 1; got != want { + t.Fatalf("expected %d payment(s) for same-destination merge, got=%d", want, got) + } + if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want { + t.Fatalf("state mismatch: got=%s want=%s", got, want) + } +} + +func TestExecuteBatchPayment_DifferentDestinationsCreatesSeparate(t *testing.T) { + env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { + step := req.StepExecution + step.State = agg.StepStateCompleted + return &sexec.ExecuteOutput{StepExecution: step}, nil + }) + + quote := newExecutableBatchQuoteDiffDest(env.orgID, "quote-batch-diff") + env.quotes.Put(quote) + + resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{ + Meta: testMeta(env.orgID, "idem-batch-diff"), + QuotationRef: "quote-batch-diff", + ClientPaymentRef: "client-batch-diff", + }) + if err != nil { + t.Fatalf("ExecuteBatchPayment returned error: %v", err) + } + if got, want := len(resp.GetPayments()), 2; got != want { + t.Fatalf("expected %d payments for different destinations, got=%d", want, got) + } + for i, p := range resp.GetPayments() { + if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want { + t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want) + } + } +} + +func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) { + env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { + step := req.StepExecution + step.State = agg.StepStateCompleted + return &sexec.ExecuteOutput{StepExecution: step}, nil + }) + + quote := newExecutableBatchQuote(env.orgID, "quote-batch-idem", []string{"intent-a", "intent-b"}, buildLedgerRoute()) + env.quotes.Put(quote) + + req := &orchestrationv2.ExecuteBatchPaymentRequest{ + Meta: testMeta(env.orgID, "idem-batch-idem"), + QuotationRef: "quote-batch-idem", + ClientPaymentRef: "client-batch-idem", + } + resp1, err := env.svc.ExecuteBatchPayment(context.Background(), req) + if err != nil { + t.Fatalf("first ExecuteBatchPayment returned error: %v", err) + } + + resp2, err := env.svc.ExecuteBatchPayment(context.Background(), req) + if err != nil { + t.Fatalf("second ExecuteBatchPayment returned error: %v", err) + } + + if got, want := len(resp2.GetPayments()), len(resp1.GetPayments()); got != want { + t.Fatalf("expected same number of payments on retry: got=%d want=%d", got, want) + } + if got, want := resp2.GetPayments()[0].GetPaymentRef(), resp1.GetPayments()[0].GetPaymentRef(); got != want { + t.Fatalf("expected same payment_ref on retry: got=%q want=%q", got, want) + } +} + +func TestExecuteBatchPayment_EmptyQuotationRefFails(t *testing.T) { + env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { + step := req.StepExecution + step.State = agg.StepStateCompleted + return &sexec.ExecuteOutput{StepExecution: step}, nil + }) + + _, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{ + Meta: testMeta(env.orgID, "idem-empty"), + QuotationRef: "", + }) + if err == nil { + t.Fatal("expected error for empty quotation_ref") + } +} + +func TestExecuteBatchPayment_QuoteNotFoundFails(t *testing.T) { + env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { + step := req.StepExecution + step.State = agg.StepStateCompleted + return &sexec.ExecuteOutput{StepExecution: step}, nil + }) + + _, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{ + Meta: testMeta(env.orgID, "idem-notfound"), + QuotationRef: "nonexistent-quote", + }) + if err == nil { + t.Fatal("expected error for non-existent quote") + } +} + +func newExecutableBatchQuoteDiffDest(orgRef bson.ObjectID, quoteRef string) *model.PaymentQuoteRecord { + now := time.Now().UTC() + route := buildLedgerRoute() + return &model.PaymentQuoteRecord{ + Base: modelBase(now), + OrganizationBoundBase: pm.OrganizationBoundBase{ + OrganizationRef: orgRef, + }, + QuoteRef: quoteRef, + RequestShape: model.QuoteRequestShapeBatch, + Items: []*model.PaymentQuoteItemV2{ + { + Intent: &model.PaymentIntent{ + Ref: "intent-a", + Kind: model.PaymentKindPayout, + Source: testLedgerEndpoint("ledger-src"), + Destination: testLedgerEndpoint("ledger-dst-1"), + Amount: &paymenttypes.Money{Amount: "10", Currency: "USD"}, + SettlementCurrency: "USD", + }, + Quote: &model.PaymentQuoteSnapshot{ + QuoteRef: quoteRef, + DebitAmount: &paymenttypes.Money{Amount: "10", Currency: "USD"}, + Route: route, + }, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + { + Intent: &model.PaymentIntent{ + Ref: "intent-b", + Kind: model.PaymentKindPayout, + Source: testLedgerEndpoint("ledger-src"), + Destination: testLedgerEndpoint("ledger-dst-2"), + Amount: &paymenttypes.Money{Amount: "15", Currency: "USD"}, + SettlementCurrency: "USD", + }, + Quote: &model.PaymentQuoteSnapshot{ + QuoteRef: quoteRef, + DebitAmount: &paymenttypes.Money{Amount: "15", Currency: "USD"}, + Route: route, + }, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + }, + ExpiresAt: now.Add(1 * time.Hour), + } +} diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/module.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/module.go index d3ce501a..f7e9f493 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/module.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/module.go @@ -8,6 +8,7 @@ import ( "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/idem" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oobs" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/ostate" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/pquery" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo" @@ -24,6 +25,7 @@ import ( // Service orchestrates execute/query/reconcile payment runtime operations. type Service interface { ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) + ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) @@ -51,6 +53,7 @@ type Dependencies struct { Validator reqval.Validator Idempotency idem.Service Quote qsnap.Resolver + Aggregator opagg.Aggregator Aggregate agg.Factory Planner xplan.Compiler State ostate.StateMachine diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service.go index c6e46403..02c4f68e 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service.go @@ -6,6 +6,7 @@ import ( "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/idem" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oobs" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/ostate" @@ -34,6 +35,7 @@ type svc struct { validator reqval.Validator idempotency idem.Service quote qsnap.Resolver + aggregator opagg.Aggregator aggregate agg.Factory planner xplan.Compiler state ostate.StateMachine @@ -93,6 +95,7 @@ func newService(deps Dependencies) (Service, error) { validator: firstValidator(deps.Validator, logger), idempotency: firstIdempotency(deps.Idempotency, logger), quote: firstQuoteResolver(deps.Quote, logger), + aggregator: firstAggregator(deps.Aggregator, logger), aggregate: firstAggregateFactory(deps.Aggregate, logger), planner: firstPlanCompiler(deps.Planner, logger), state: firstStateMachine(deps.State, logger), @@ -148,6 +151,13 @@ func firstAggregateFactory(v agg.Factory, logger mlogger.Logger) agg.Factory { return agg.New(agg.Dependencies{Logger: logger}) } +func firstAggregator(v opagg.Aggregator, logger mlogger.Logger) opagg.Aggregator { + if v != nil { + return v + } + return opagg.New(opagg.Dependencies{Logger: logger}) +} + func firstPlanCompiler(v xplan.Compiler, logger mlogger.Logger) xplan.Compiler { if v != nil { return v diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/module.go b/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/module.go index 1bf26c96..a3d65a3e 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/module.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/module.go @@ -18,6 +18,7 @@ type Store interface { // Resolver resolves a quotation reference into canonical execution snapshots. type Resolver interface { Resolve(ctx context.Context, store Store, in Input) (*Output, error) + ResolveAll(ctx context.Context, store Store, in ResolveAllInput) (*ResolveAllOutput, error) } // Input defines lookup scope for quotation resolution. @@ -35,6 +36,25 @@ type Output struct { QuoteSnapshot *model.PaymentQuoteSnapshot } +// ResolveAllInput defines lookup scope for resolving all items in a batch quotation. +type ResolveAllInput struct { + OrganizationID bson.ObjectID + QuotationRef string +} + +// ResolveAllOutput contains all resolved items from a batch quotation. +type ResolveAllOutput struct { + QuotationRef string + Items []ResolvedItem +} + +// ResolvedItem is one resolved intent-quote pair from a batch quotation. +type ResolvedItem struct { + IntentRef string + IntentSnapshot model.PaymentIntent + QuoteSnapshot *model.PaymentQuoteSnapshot +} + // Dependencies configures quote resolver integrations. type Dependencies struct { Logger mlogger.Logger diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/resolve_all_test.go b/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/resolve_all_test.go new file mode 100644 index 00000000..4341a793 --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/resolve_all_test.go @@ -0,0 +1,246 @@ +package qsnap + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/tech/sendico/payments/storage/model" + paymenttypes "github.com/tech/sendico/pkg/payments/types" + "go.mongodb.org/mongo-driver/v2/bson" + "go.uber.org/zap" +) + +func TestResolveAll_BatchReturnsAllItems(t *testing.T) { + now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC) + orgID := bson.NewObjectID() + + record := &model.PaymentQuoteRecord{ + QuoteRef: "batch-quote-ref", + RequestShape: model.QuoteRequestShapeBatch, + ExpiresAt: now.Add(time.Minute), + Items: []*model.PaymentQuoteItemV2{ + { + Intent: &model.PaymentIntent{Ref: "intent-a", Kind: model.PaymentKindPayout, Amount: &paymenttypes.Money{Amount: "100", Currency: "USDT"}}, + Quote: &model.PaymentQuoteSnapshot{QuoteRef: "batch-quote-ref", DebitAmount: &paymenttypes.Money{Amount: "100", Currency: "USDT"}}, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + { + Intent: &model.PaymentIntent{Ref: "intent-b", Kind: model.PaymentKindPayout, Amount: &paymenttypes.Money{Amount: "200", Currency: "USDT"}}, + Quote: &model.PaymentQuoteSnapshot{QuoteRef: "batch-quote-ref", DebitAmount: &paymenttypes.Money{Amount: "200", Currency: "USDT"}}, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + }, + } + + resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }}) + + out, err := resolver.ResolveAll(context.Background(), &fakeStore{ + getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) { + return record, nil + }, + }, ResolveAllInput{ + OrganizationID: orgID, + QuotationRef: "batch-quote-ref", + }) + if err != nil { + t.Fatalf("ResolveAll returned error: %v", err) + } + if out == nil { + t.Fatal("expected output") + } + if got, want := out.QuotationRef, "batch-quote-ref"; got != want { + t.Fatalf("quotation_ref mismatch: got=%q want=%q", got, want) + } + if got, want := len(out.Items), 2; got != want { + t.Fatalf("items count mismatch: got=%d want=%d", got, want) + } + if got, want := out.Items[0].IntentRef, "intent-a"; got != want { + t.Fatalf("items[0].intent_ref mismatch: got=%q want=%q", got, want) + } + if got, want := out.Items[1].IntentRef, "intent-b"; got != want { + t.Fatalf("items[1].intent_ref mismatch: got=%q want=%q", got, want) + } + if got, want := out.Items[0].QuoteSnapshot.DebitAmount.Amount, "100"; got != want { + t.Fatalf("items[0].quote debit amount mismatch: got=%q want=%q", got, want) + } + if got, want := out.Items[1].QuoteSnapshot.DebitAmount.Amount, "200"; got != want { + t.Fatalf("items[1].quote debit amount mismatch: got=%q want=%q", got, want) + } +} + +func TestResolveAll_SingleShapeReturnsOneItem(t *testing.T) { + now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC) + orgID := bson.NewObjectID() + + record := &model.PaymentQuoteRecord{ + QuoteRef: "single-quote-ref", + RequestShape: model.QuoteRequestShapeSingle, + ExpiresAt: now.Add(time.Minute), + Items: []*model.PaymentQuoteItemV2{ + { + Intent: &model.PaymentIntent{Ref: "intent-1", Kind: model.PaymentKindPayout, Amount: &paymenttypes.Money{Amount: "50", Currency: "USD"}}, + Quote: &model.PaymentQuoteSnapshot{QuoteRef: "single-quote-ref"}, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + }, + } + + resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }}) + + out, err := resolver.ResolveAll(context.Background(), &fakeStore{ + getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) { + return record, nil + }, + }, ResolveAllInput{ + OrganizationID: orgID, + QuotationRef: "single-quote-ref", + }) + if err != nil { + t.Fatalf("ResolveAll returned error: %v", err) + } + if got, want := len(out.Items), 1; got != want { + t.Fatalf("items count mismatch: got=%d want=%d", got, want) + } + if got, want := out.Items[0].IntentRef, "intent-1"; got != want { + t.Fatalf("items[0].intent_ref mismatch: got=%q want=%q", got, want) + } +} + +func TestResolveAll_NonExecutableItemFails(t *testing.T) { + now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC) + orgID := bson.NewObjectID() + + record := &model.PaymentQuoteRecord{ + QuoteRef: "batch-mixed", + RequestShape: model.QuoteRequestShapeBatch, + ExpiresAt: now.Add(time.Minute), + Items: []*model.PaymentQuoteItemV2{ + { + Intent: &model.PaymentIntent{Ref: "intent-ok", Kind: model.PaymentKindPayout}, + Quote: &model.PaymentQuoteSnapshot{}, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + { + Intent: &model.PaymentIntent{Ref: "intent-blocked", Kind: model.PaymentKindPayout}, + Quote: &model.PaymentQuoteSnapshot{}, + Status: &model.QuoteStatusV2{State: model.QuoteStateBlocked, BlockReason: model.QuoteBlockReasonInsufficientLiquidity}, + }, + }, + } + + resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }}) + + _, err := resolver.ResolveAll(context.Background(), &fakeStore{ + getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) { + return record, nil + }, + }, ResolveAllInput{ + OrganizationID: orgID, + QuotationRef: "batch-mixed", + }) + if err == nil { + t.Fatal("expected error for non-executable item") + } + if !errors.Is(err, ErrQuoteNotExecutable) { + t.Fatalf("expected ErrQuoteNotExecutable, got %v", err) + } +} + +func TestResolveAll_ExpiredQuoteFails(t *testing.T) { + now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC) + orgID := bson.NewObjectID() + + record := &model.PaymentQuoteRecord{ + QuoteRef: "expired-quote", + RequestShape: model.QuoteRequestShapeBatch, + ExpiresAt: now.Add(-time.Minute), + Items: []*model.PaymentQuoteItemV2{ + { + Intent: &model.PaymentIntent{Ref: "intent-1", Kind: model.PaymentKindPayout}, + Quote: &model.PaymentQuoteSnapshot{}, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + }, + } + + resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }}) + + _, err := resolver.ResolveAll(context.Background(), &fakeStore{ + getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) { + return record, nil + }, + }, ResolveAllInput{ + OrganizationID: orgID, + QuotationRef: "expired-quote", + }) + if err == nil { + t.Fatal("expected error for expired quote") + } + if !errors.Is(err, ErrQuoteExpired) { + t.Fatalf("expected ErrQuoteExpired, got %v", err) + } +} + +func TestResolveAll_EmptyQuotationRefFails(t *testing.T) { + resolver := New(Dependencies{Logger: zap.NewNop()}) + + _, err := resolver.ResolveAll(context.Background(), &fakeStore{}, ResolveAllInput{ + OrganizationID: bson.NewObjectID(), + QuotationRef: "", + }) + if err == nil { + t.Fatal("expected error for empty quotation_ref") + } +} + +func TestResolveAll_QuoteNotFoundFails(t *testing.T) { + resolver := New(Dependencies{Logger: zap.NewNop()}) + + _, err := resolver.ResolveAll(context.Background(), &fakeStore{}, ResolveAllInput{ + OrganizationID: bson.NewObjectID(), + QuotationRef: "nonexistent", + }) + if err == nil { + t.Fatal("expected error for not-found quote") + } + if !errors.Is(err, ErrQuoteNotFound) { + t.Fatalf("expected ErrQuoteNotFound, got %v", err) + } +} + +func TestResolveAll_SetsQuoteRefWhenEmpty(t *testing.T) { + now := time.Date(2026, time.January, 2, 3, 4, 5, 0, time.UTC) + orgID := bson.NewObjectID() + + record := &model.PaymentQuoteRecord{ + QuoteRef: "batch-ref", + RequestShape: model.QuoteRequestShapeBatch, + ExpiresAt: now.Add(time.Minute), + Items: []*model.PaymentQuoteItemV2{ + { + Intent: &model.PaymentIntent{Ref: "intent-1", Kind: model.PaymentKindPayout}, + Quote: &model.PaymentQuoteSnapshot{QuoteRef: ""}, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + }, + } + + resolver := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }}) + + out, err := resolver.ResolveAll(context.Background(), &fakeStore{ + getByRefFn: func(context.Context, bson.ObjectID, string) (*model.PaymentQuoteRecord, error) { + return record, nil + }, + }, ResolveAllInput{ + OrganizationID: orgID, + QuotationRef: "batch-ref", + }) + if err != nil { + t.Fatalf("ResolveAll returned error: %v", err) + } + if got, want := out.Items[0].QuoteSnapshot.QuoteRef, "batch-ref"; got != want { + t.Fatalf("quote_ref should be back-filled: got=%q want=%q", got, want) + } +} diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/service.go b/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/service.go index 6adbf5bc..1b9413b1 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/service.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/qsnap/service.go @@ -102,6 +102,91 @@ func (s *svc) Resolve( return out, nil } +func (s *svc) ResolveAll( + ctx context.Context, + store Store, + in ResolveAllInput, +) (out *ResolveAllOutput, err error) { + logger := s.logger + logger.Debug("Starting ResolveAll", + zap.String("organization_ref", in.OrganizationID.Hex()), + zap.String("quotation_ref", strings.TrimSpace(in.QuotationRef)), + ) + defer func(start time.Time) { + fields := []zap.Field{zap.Int64("duration_ms", time.Since(start).Milliseconds())} + if out != nil { + fields = append(fields, + zap.String("quotation_ref", strings.TrimSpace(out.QuotationRef)), + zap.Int("items_count", len(out.Items)), + ) + } + if err != nil { + logger.Warn("Failed to resolve all", append(fields, zap.Error(err))...) + return + } + logger.Debug("Completed ResolveAll", fields...) + }(time.Now()) + + if store == nil { + return nil, merrors.InvalidArgument("quotes store is required") + } + if in.OrganizationID.IsZero() { + return nil, merrors.InvalidArgument("organization_id is required") + } + quoteRef := strings.TrimSpace(in.QuotationRef) + if quoteRef == "" { + return nil, merrors.InvalidArgument("quotation_ref is required") + } + + record, err := store.GetByRef(ctx, in.OrganizationID, quoteRef) + if err != nil { + if errors.Is(err, quotestorage.ErrQuoteNotFound) || errors.Is(err, merrors.ErrNoData) { + return nil, ErrQuoteNotFound + } + return nil, err + } + if record == nil { + return nil, ErrQuoteNotFound + } + if len(record.Items) == 0 { + return nil, xerr.Wrapf(ErrQuoteShapeMismatch, "items are empty") + } + + outputRef := strings.TrimSpace(record.QuoteRef) + if outputRef == "" { + outputRef = quoteRef + } + + items := make([]ResolvedItem, 0, len(record.Items)) + for i, item := range record.Items { + if item == nil { + return nil, xerr.Wrapf(ErrQuoteShapeMismatch, "items[%d] is nil", i) + } + if err := ensureExecutable(record, item.Status, s.now().UTC()); err != nil { + return nil, err + } + resolved, err := resolveItem(item, "") + if err != nil { + return nil, xerr.Wrapf(err, "items[%d]", i) + } + intentRef := strings.TrimSpace(resolved.Intent.Ref) + if resolved.Quote != nil && strings.TrimSpace(resolved.Quote.QuoteRef) == "" { + resolved.Quote.QuoteRef = outputRef + } + items = append(items, ResolvedItem{ + IntentRef: intentRef, + IntentSnapshot: resolved.Intent, + QuoteSnapshot: resolved.Quote, + }) + } + + out = &ResolveAllOutput{ + QuotationRef: outputRef, + Items: items, + } + return out, nil +} + func ensureExecutable( record *model.PaymentQuoteRecord, status *model.QuoteStatusV2, diff --git a/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go b/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go index eca5fb22..f3902af0 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go @@ -247,6 +247,10 @@ func (f *fakeExternalRuntimeV2) ReconcileExternal(_ context.Context, in psvc.Rec }, nil } +func (f *fakeExternalRuntimeV2) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) { + return nil, errors.New("not implemented") +} + func TestMapTransferStatus(t *testing.T) { cases := []struct { status chainv1.TransferStatus diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_registration_test.go b/api/payments/orchestrator/internal/service/orchestrator/service_registration_test.go index 0d3ffdb2..336bdd52 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_registration_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_registration_test.go @@ -46,6 +46,10 @@ func (fakeOrchestrationV2Service) ReconcileExternal(context.Context, psvc.Reconc return &psvc.ReconcileExternalOutput{}, nil } +func (fakeOrchestrationV2Service) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) { + return &orchestrationv2.ExecuteBatchPaymentResponse{}, nil +} + type grpcCaptureRouterV2 struct { server *grpc.Server done chan error diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_v2.go b/api/payments/orchestrator/internal/service/orchestrator/service_v2.go index 53c1df88..b633abd9 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_v2.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_v2.go @@ -168,6 +168,14 @@ func (s *v2GRPCServer) ExecutePayment(ctx context.Context, req *orchestrationv2. return resp, nil } +func (s *v2GRPCServer) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) { + resp, err := s.svc.ExecuteBatchPayment(ctx, req) + if err != nil { + return gsresponse.Execute(ctx, gsresponse.Auto[orchestrationv2.ExecuteBatchPaymentResponse](s.logger, mservice.PaymentOrchestrator, err)) + } + return resp, nil +} + func (s *v2GRPCServer) GetPayment(ctx context.Context, req *orchestrationv2.GetPaymentRequest) (*orchestrationv2.GetPaymentResponse, error) { resp, err := s.svc.GetPayment(ctx, req) if err != nil { diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_v2_test.go b/api/payments/orchestrator/internal/service/orchestrator/service_v2_test.go index cac2220d..7002542a 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_v2_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_v2_test.go @@ -49,3 +49,7 @@ func (fakeV2Service) ListPayments(context.Context, *orchestrationv2.ListPayments func (fakeV2Service) ReconcileExternal(context.Context, psvc.ReconcileExternalInput) (*psvc.ReconcileExternalOutput, error) { return &psvc.ReconcileExternalOutput{}, nil } + +func (fakeV2Service) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) { + return &orchestrationv2.ExecuteBatchPaymentResponse{}, nil +} diff --git a/api/server/internal/server/paymentapiimp/paybatch.go b/api/server/internal/server/paymentapiimp/paybatch.go index b98d9b44..9a3cbc69 100644 --- a/api/server/internal/server/paymentapiimp/paybatch.go +++ b/api/server/internal/server/paymentapiimp/paybatch.go @@ -1,8 +1,6 @@ package paymentapiimp import ( - "crypto/sha256" - "encoding/hex" "encoding/json" "net/http" "strings" @@ -18,11 +16,6 @@ import ( "go.uber.org/zap" ) -const ( - fanoutIdempotencyHashLen = 16 - maxExecuteIdempotencyKey = 256 -) - func (a *PaymentAPI) initiatePaymentsByQuote(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc { orgRef, err := a.oph.GetRef(r) if err != nil { @@ -68,18 +61,30 @@ func (a *PaymentAPI) initiatePaymentsByQuote(r *http.Request, account *model.Acc return resp.GetPayment(), nil } + executeBatch := func(idempotencyKey string) ([]*orchestrationv2.Payment, error) { + req := &orchestrationv2.ExecuteBatchPaymentRequest{ + Meta: requestMeta(orgRef.Hex(), idempotencyKey), + QuotationRef: quotationRef, + ClientPaymentRef: clientPaymentRef, + } + resp, executeErr := a.execution.ExecuteBatchPayment(ctx, req) + if executeErr != nil { + return nil, executeErr + } + if resp == nil { + return nil, nil + } + return resp.GetPayments(), nil + } + payments := make([]*orchestrationv2.Payment, 0, max(1, len(intentSelectors))) if len(payload.IntentRefs) > 0 { - for _, intentRef := range payload.IntentRefs { - payment, executeErr := executeOne(deriveFanoutIdempotencyKey(baseIdempotencyKey, intentRef), intentRef) - if executeErr != nil { - a.logger.Warn("Failed to initiate batch payments", zap.Error(executeErr), zap.String("organization_ref", orgRef.Hex())) - return grpcErrorResponse(a.logger, a.Name(), executeErr) - } - if payment != nil { - payments = append(payments, payment) - } + executed, executeErr := executeBatch(baseIdempotencyKey) + if executeErr != nil { + a.logger.Warn("Failed to initiate batch payments", zap.Error(executeErr), zap.String("organization_ref", orgRef.Hex())) + return grpcErrorResponse(a.logger, a.Name(), executeErr) } + payments = append(payments, executed...) return sresponse.PaymentsResponse(a.logger, payments, token) } @@ -118,28 +123,6 @@ func resolveExecutionIntentSelectors(payload *srequest.InitiatePayments, allowLe return nil, merrors.InvalidArgument("metadata.intent_ref is no longer supported; use intentRef or intentRefs", "metadata.intent_ref") } -func deriveFanoutIdempotencyKey(baseIdempotencyKey, intentRef string) string { - baseIdempotencyKey = strings.TrimSpace(baseIdempotencyKey) - intentRef = strings.TrimSpace(intentRef) - if baseIdempotencyKey == "" || intentRef == "" { - return baseIdempotencyKey - } - sum := sha256.Sum256([]byte(intentRef)) - hash := hex.EncodeToString(sum[:]) - if len(hash) > fanoutIdempotencyHashLen { - hash = hash[:fanoutIdempotencyHashLen] - } - suffix := ":i:" + hash - if len(baseIdempotencyKey)+len(suffix) <= maxExecuteIdempotencyKey { - return baseIdempotencyKey + suffix - } - if len(suffix) >= maxExecuteIdempotencyKey { - return suffix[:maxExecuteIdempotencyKey] - } - prefixLen := maxExecuteIdempotencyKey - len(suffix) - return baseIdempotencyKey[:prefixLen] + suffix -} - func decodeInitiatePaymentsPayload(r *http.Request) (*srequest.InitiatePayments, error) { defer r.Body.Close() diff --git a/api/server/internal/server/paymentapiimp/paybatch_test.go b/api/server/internal/server/paymentapiimp/paybatch_test.go index bfdadfa2..d21ba552 100644 --- a/api/server/internal/server/paymentapiimp/paybatch_test.go +++ b/api/server/internal/server/paymentapiimp/paybatch_test.go @@ -6,7 +6,6 @@ import ( "errors" "net/http" "net/http/httptest" - "strings" "testing" "time" @@ -24,7 +23,7 @@ import ( "go.uber.org/zap" ) -func TestInitiatePaymentsByQuote_FansOutByIntentRefs(t *testing.T) { +func TestInitiatePaymentsByQuote_PassesIntentRefsInSingleExecuteCall(t *testing.T) { orgRef := bson.NewObjectID() exec := &fakeExecutionClientForBatch{} api := newBatchAPI(exec) @@ -35,20 +34,17 @@ func TestInitiatePaymentsByQuote_FansOutByIntentRefs(t *testing.T) { t.Fatalf("status mismatch: got=%d want=%d body=%s", got, want, rr.Body.String()) } - if got, want := len(exec.executeReqs), 2; got != want { - t.Fatalf("execute calls mismatch: got=%d want=%d", got, want) + if got, want := len(exec.executeBatchReqs), 1; got != want { + t.Fatalf("execute batch calls mismatch: got=%d want=%d", got, want) } - if got, want := exec.executeReqs[0].GetIntentRef(), "intent-a"; got != want { - t.Fatalf("intent_ref[0] mismatch: got=%q want=%q", got, want) + if got := len(exec.executeReqs); got != 0 { + t.Fatalf("expected no execute calls, got=%d", got) } - if got, want := exec.executeReqs[1].GetIntentRef(), "intent-b"; got != want { - t.Fatalf("intent_ref[1] mismatch: got=%q want=%q", got, want) + if got, want := exec.executeBatchReqs[0].GetQuotationRef(), "quote-1"; got != want { + t.Fatalf("quotation_ref mismatch: got=%q want=%q", got, want) } - if got, want := exec.executeReqs[0].GetMeta().GetTrace().GetIdempotencyKey(), deriveFanoutIdempotencyKey("idem-batch", "intent-a"); got != want { - t.Fatalf("idempotency[0] mismatch: got=%q want=%q", got, want) - } - if got, want := exec.executeReqs[1].GetMeta().GetTrace().GetIdempotencyKey(), deriveFanoutIdempotencyKey("idem-batch", "intent-b"); got != want { - t.Fatalf("idempotency[1] mismatch: got=%q want=%q", got, want) + if got, want := exec.executeBatchReqs[0].GetMeta().GetTrace().GetIdempotencyKey(), "idem-batch"; got != want { + t.Fatalf("idempotency mismatch: got=%q want=%q", got, want) } } @@ -125,28 +121,6 @@ func TestInitiatePaymentsByQuote_RejectsLegacyMetadataIntentRefWhenDateGateExpir } } -func TestDeriveFanoutIdempotencyKey_IsDeterministicAndBounded(t *testing.T) { - a := deriveFanoutIdempotencyKey("idem-1", "intent-a") - b := deriveFanoutIdempotencyKey("idem-1", "intent-a") - if got, want := a, b; got != want { - t.Fatalf("determinism mismatch: got=%q want=%q", got, want) - } - if a == "idem-1" { - t.Fatalf("expected derived key to differ from base") - } - - c := deriveFanoutIdempotencyKey("idem-1", "intent-b") - if c == a { - t.Fatalf("expected different derived keys for different intents") - } - - longBase := strings.Repeat("a", 400) - long := deriveFanoutIdempotencyKey(longBase, "intent-a") - if got, want := len(long), maxExecuteIdempotencyKey; got != want { - t.Fatalf("length mismatch: got=%d want=%d", got, want) - } -} - func TestResolveExecutionIntentSelectors_PrefersExplicitSelectors(t *testing.T) { payload := &srequest.InitiatePayments{ IntentRefs: []string{"intent-a", "intent-b"}, @@ -229,7 +203,8 @@ func invokeInitiatePaymentsByQuote(t *testing.T, api *PaymentAPI, orgRef bson.Ob } type fakeExecutionClientForBatch struct { - executeReqs []*orchestrationv2.ExecutePaymentRequest + executeReqs []*orchestrationv2.ExecutePaymentRequest + executeBatchReqs []*orchestrationv2.ExecuteBatchPaymentRequest } func (f *fakeExecutionClientForBatch) ExecutePayment(_ context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) { @@ -239,6 +214,13 @@ func (f *fakeExecutionClientForBatch) ExecutePayment(_ context.Context, req *orc }, nil } +func (f *fakeExecutionClientForBatch) ExecuteBatchPayment(_ context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) { + f.executeBatchReqs = append(f.executeBatchReqs, req) + return &orchestrationv2.ExecuteBatchPaymentResponse{ + Payments: []*orchestrationv2.Payment{{PaymentRef: bson.NewObjectID().Hex()}}, + }, nil +} + func (*fakeExecutionClientForBatch) ListPayments(context.Context, *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) { return &orchestrationv2.ListPaymentsResponse{}, nil } diff --git a/api/server/internal/server/paymentapiimp/service.go b/api/server/internal/server/paymentapiimp/service.go index beb9ea42..ea2d2415 100644 --- a/api/server/internal/server/paymentapiimp/service.go +++ b/api/server/internal/server/paymentapiimp/service.go @@ -37,6 +37,7 @@ const ( type executionClient interface { ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) + ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) Close() error }