billing-637 #638

Merged
tech merged 2 commits from billing-637 into main 2026-03-04 14:42:40 +00:00
22 changed files with 1077 additions and 255 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"path/filepath"
"strings"
@@ -148,18 +147,17 @@ func (s *Service) Shutdown() {
func (s *Service) BatchResolveDocuments(ctx context.Context, req *documentsv1.BatchResolveDocumentsRequest) (resp *documentsv1.BatchResolveDocumentsResponse, err error) {
start := time.Now()
var paymentRefs []string
paymentRefs := 0
if req != nil {
paymentRefs = req.GetPaymentRefs()
paymentRefs = len(req.GetPaymentRefs())
}
logger := s.logger.With(zap.Int("payment_refs", len(paymentRefs)))
logger := s.logger.With(zap.Int("payment_refs", paymentRefs))
defer func() {
statusLabel := statusFromError(err)
observeRequest("batch_resolve", documentsv1.DocumentType_DOCUMENT_TYPE_UNSPECIFIED, statusLabel, time.Since(start))
observeBatchSize(len(paymentRefs))
observeBatchSize(paymentRefs)
itemsCount := 0
if resp != nil {
@@ -181,80 +179,16 @@ func (s *Service) BatchResolveDocuments(ctx context.Context, req *documentsv1.Ba
logger.Info("BatchResolveDocuments finished", fields...)
}()
if len(paymentRefs) == 0 {
resp = &documentsv1.BatchResolveDocumentsResponse{}
return resp, nil
}
if s.storage == nil {
err = status.Error(codes.Unavailable, errStorageUnavailable.Error())
_ = ctx
err = status.Error(codes.Unimplemented, "payment-level document flow removed; use GetOperationDocument")
return nil, err
}
refs := make([]string, 0, len(paymentRefs))
for _, ref := range paymentRefs {
clean := strings.TrimSpace(ref)
if clean == "" {
continue
}
refs = append(refs, clean)
}
if len(refs) == 0 {
resp = &documentsv1.BatchResolveDocumentsResponse{}
return resp, nil
}
records, err := s.storage.Documents().ListByPaymentRefs(ctx, refs)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
recordByRef := map[string]*model.DocumentRecord{}
for _, record := range records {
if record == nil {
continue
}
recordByRef[record.PaymentRef] = record
}
items := make([]*documentsv1.DocumentMeta, 0, len(refs))
for _, ref := range refs {
meta := &documentsv1.DocumentMeta{PaymentRef: ref}
if record := recordByRef[ref]; record != nil {
record.Normalize()
available := []model.DocumentType{model.DocumentTypeAct}
ready := make([]model.DocumentType, 0, 1)
if path, ok := record.StoragePaths[model.DocumentTypeAct]; ok && path != "" {
ready = append(ready, model.DocumentTypeAct)
}
meta.AvailableTypes = toProtoTypes(available)
meta.ReadyTypes = toProtoTypes(ready)
}
items = append(items, meta)
}
resp = &documentsv1.BatchResolveDocumentsResponse{Items: items}
return resp, nil
}
func (s *Service) GetDocument(ctx context.Context, req *documentsv1.GetDocumentRequest) (resp *documentsv1.GetDocumentResponse, err error) {
start := time.Now()
docType := documentsv1.DocumentType_DOCUMENT_TYPE_UNSPECIFIED
paymentRef := ""
if req != nil {
docType = req.GetType()
paymentRef = strings.TrimSpace(req.GetPaymentRef())
@@ -293,92 +227,94 @@ func (s *Service) GetDocument(ctx context.Context, req *documentsv1.GetDocumentR
logger.Info("GetDocument finished", fields...)
}()
if paymentRef == "" {
err = status.Error(codes.InvalidArgument, "payment_ref is required")
_ = ctx
err = status.Error(codes.Unimplemented, "payment-level document flow removed; use GetOperationDocument")
return nil, err
}
if docType == documentsv1.DocumentType_DOCUMENT_TYPE_UNSPECIFIED {
err = status.Error(codes.InvalidArgument, "document type is required")
func (s *Service) GetOperationDocument(_ context.Context, req *documentsv1.GetOperationDocumentRequest) (resp *documentsv1.GetDocumentResponse, err error) {
start := time.Now()
organizationRef := ""
gatewayService := ""
operationRef := ""
return nil, err
if req != nil {
organizationRef = strings.TrimSpace(req.GetOrganizationRef())
gatewayService = strings.TrimSpace(req.GetGatewayService())
operationRef = strings.TrimSpace(req.GetOperationRef())
}
if s.storage == nil {
err = status.Error(codes.Unavailable, errStorageUnavailable.Error())
logger := s.logger.With(
zap.String("organization_ref", organizationRef),
zap.String("gateway_service", gatewayService),
zap.String("operation_ref", operationRef),
)
return nil, err
defer func() {
statusLabel := statusFromError(err)
docType := documentsv1.DocumentType_DOCUMENT_TYPE_UNSPECIFIED
observeRequest("get_operation_document", docType, statusLabel, time.Since(start))
if resp != nil {
observeDocumentBytes(docType, len(resp.GetContent()))
}
if s.docStore == nil {
err = status.Error(codes.Unavailable, errDocStoreUnavailable.Error())
return nil, err
contentBytes := 0
if resp != nil {
contentBytes = len(resp.GetContent())
}
if s.template == nil {
err = status.Error(codes.FailedPrecondition, errTemplateUnavailable.Error())
return nil, err
fields := []zap.Field{
zap.String("status", statusLabel),
zap.Duration("duration", time.Since(start)),
zap.Int("content_bytes", contentBytes),
}
record, err := s.storage.Documents().GetByPaymentRef(ctx, paymentRef)
if err != nil {
if errors.Is(err, storage.ErrDocumentNotFound) {
return nil, status.Error(codes.NotFound, "document record not found")
logger.Warn("GetOperationDocument failed", append(fields, zap.Error(err))...)
return
}
return nil, status.Error(codes.Internal, err.Error())
logger.Info("GetOperationDocument finished", fields...)
}()
if req == nil {
err = status.Error(codes.InvalidArgument, "request is required")
return nil, err
}
record.Normalize()
if organizationRef == "" {
err = status.Error(codes.InvalidArgument, "organization_ref is required")
targetType := model.DocumentTypeFromProto(docType)
if docType != documentsv1.DocumentType_DOCUMENT_TYPE_ACT {
return nil, status.Error(codes.Unimplemented, "document type not implemented")
return nil, err
}
if path, ok := record.StoragePaths[targetType]; ok && path != "" {
content, loadErr := s.docStore.Load(ctx, path)
if loadErr != nil {
return nil, status.Error(codes.Internal, loadErr.Error())
if gatewayService == "" {
err = status.Error(codes.InvalidArgument, "gateway_service is required")
return nil, err
}
return &documentsv1.GetDocumentResponse{
Content: content,
Filename: documentFilename(docType, paymentRef),
MimeType: "application/pdf",
}, nil
if operationRef == "" {
err = status.Error(codes.InvalidArgument, "operation_ref is required")
return nil, err
}
content, hash, genErr := s.generateActPDF(record.Snapshot)
snapshot := operationSnapshotFromRequest(req)
content, _, genErr := s.generateOperationPDF(snapshot)
if genErr != nil {
logger.Warn("Failed to generate document", zap.Error(genErr))
err = status.Error(codes.Internal, genErr.Error())
return nil, status.Error(codes.Internal, genErr.Error())
}
path := documentStoragePath(paymentRef, docType)
if saveErr := s.docStore.Save(ctx, path, content); saveErr != nil {
logger.Warn("Failed to store document", zap.Error(saveErr))
return nil, status.Error(codes.Internal, saveErr.Error())
}
record.StoragePaths[targetType] = path
record.Hashes[targetType] = hash
if updateErr := s.storage.Documents().Update(ctx, record); updateErr != nil {
logger.Warn("Failed to update document record", zap.Error(updateErr))
return nil, status.Error(codes.Internal, updateErr.Error())
return nil, err
}
resp = &documentsv1.GetDocumentResponse{
Content: content,
Filename: documentFilename(docType, paymentRef),
Filename: operationDocumentFilename(operationRef),
MimeType: "application/pdf",
}
@@ -392,7 +328,7 @@ func (s *Service) startDiscoveryAnnouncer() {
announce := discovery.Announcement{
Service: mservice.BillingDocuments,
Operations: []string{discovery.OperationDocumentsBatchResolve, discovery.OperationDocumentsGet},
Operations: []string{discovery.OperationDocumentsGet},
InvokeURI: s.invokeURI,
Version: appversion.Create().Short(),
}
@@ -418,10 +354,19 @@ func (s *Service) generateActPDF(snapshot model.ActSnapshot) ([]byte, string, er
return nil, "", err
}
return s.renderPDFWithIntegrity(blocks)
}
func (s *Service) generateOperationPDF(snapshot operationSnapshot) ([]byte, string, error) {
return s.renderPDFWithIntegrity(buildOperationBlocks(snapshot))
}
func (s *Service) renderPDFWithIntegrity(blocks []renderer.Block) ([]byte, string, error) {
generated := renderer.Renderer{
Issuer: s.config.Issuer,
OwnerPassword: s.config.Protection.OwnerPassword,
}
placeholder := strings.Repeat("0", 64)
firstPass, err := generated.Render(blocks, placeholder)
@@ -440,6 +385,157 @@ func (s *Service) generateActPDF(snapshot model.ActSnapshot) ([]byte, string, er
return finalBytes, footerHex, nil
}
type operationSnapshot struct {
OrganizationRef string
GatewayService string
OperationRef string
PaymentRef string
OperationCode string
OperationLabel string
OperationState string
FailureCode string
FailureReason string
Amount string
Currency string
StartedAt time.Time
CompletedAt time.Time
}
func operationSnapshotFromRequest(req *documentsv1.GetOperationDocumentRequest) operationSnapshot {
snapshot := operationSnapshot{
OrganizationRef: strings.TrimSpace(req.GetOrganizationRef()),
GatewayService: strings.TrimSpace(req.GetGatewayService()),
OperationRef: strings.TrimSpace(req.GetOperationRef()),
PaymentRef: strings.TrimSpace(req.GetPaymentRef()),
OperationCode: strings.TrimSpace(req.GetOperationCode()),
OperationLabel: strings.TrimSpace(req.GetOperationLabel()),
OperationState: strings.TrimSpace(req.GetOperationState()),
FailureCode: strings.TrimSpace(req.GetFailureCode()),
FailureReason: strings.TrimSpace(req.GetFailureReason()),
Amount: strings.TrimSpace(req.GetAmount()),
Currency: strings.TrimSpace(req.GetCurrency()),
}
if ts := req.GetStartedAtUnixMs(); ts > 0 {
snapshot.StartedAt = time.UnixMilli(ts).UTC()
}
if ts := req.GetCompletedAtUnixMs(); ts > 0 {
snapshot.CompletedAt = time.UnixMilli(ts).UTC()
}
return snapshot
}
func buildOperationBlocks(snapshot operationSnapshot) []renderer.Block {
rows := [][]string{
{"Organization", snapshot.OrganizationRef},
{"Gateway Service", snapshot.GatewayService},
{"Operation Ref", snapshot.OperationRef},
{"Payment Ref", safeValue(snapshot.PaymentRef)},
{"Code", safeValue(snapshot.OperationCode)},
{"State", safeValue(snapshot.OperationState)},
{"Label", safeValue(snapshot.OperationLabel)},
{"Started At (UTC)", formatSnapshotTime(snapshot.StartedAt)},
{"Completed At (UTC)", formatSnapshotTime(snapshot.CompletedAt)},
}
if snapshot.Amount != "" || snapshot.Currency != "" {
rows = append(rows, []string{"Amount", strings.TrimSpace(strings.TrimSpace(snapshot.Amount) + " " + strings.TrimSpace(snapshot.Currency))})
}
blocks := []renderer.Block{
{
Tag: renderer.TagTitle,
Lines: []string{"OPERATION BILLING DOCUMENT"},
},
{
Tag: renderer.TagSubtitle,
Lines: []string{"Gateway operation statement"},
},
{
Tag: renderer.TagMeta,
Lines: []string{
"Document Type: Operation",
},
},
{
Tag: renderer.TagSection,
Lines: []string{"OPERATION DETAILS"},
},
{
Tag: renderer.TagKV,
Rows: rows,
},
}
if snapshot.FailureCode != "" || snapshot.FailureReason != "" {
blocks = append(blocks,
renderer.Block{Tag: renderer.TagSection, Lines: []string{"FAILURE DETAILS"}},
renderer.Block{
Tag: renderer.TagKV,
Rows: [][]string{
{"Failure Code", safeValue(snapshot.FailureCode)},
{"Failure Reason", safeValue(snapshot.FailureReason)},
},
},
)
}
return blocks
}
func formatSnapshotTime(value time.Time) string {
if value.IsZero() {
return "n/a"
}
return value.UTC().Format(time.RFC3339)
}
func safeValue(value string) string {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return "n/a"
}
return trimmed
}
func operationDocumentFilename(operationRef string) string {
clean := sanitizeFilenameComponent(operationRef)
if clean == "" {
clean = "operation"
}
return fmt.Sprintf("operation_%s.pdf", clean)
}
func sanitizeFilenameComponent(value string) string {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return ""
}
var b strings.Builder
b.Grow(len(trimmed))
for _, r := range trimmed {
switch {
case r >= 'a' && r <= 'z':
b.WriteRune(r)
case r >= 'A' && r <= 'Z':
b.WriteRune(r)
case r >= '0' && r <= '9':
b.WriteRune(r)
case r == '-', r == '_':
b.WriteRune(r)
default:
b.WriteRune('_')
}
}
return strings.Trim(b.String(), "_")
}
func toProtoTypes(types []model.DocumentType) []documentsv1.DocumentType {
if len(types) == 0 {
return nil

View File

@@ -12,6 +12,8 @@ import (
"github.com/tech/sendico/billing/documents/storage/model"
documentsv1 "github.com/tech/sendico/pkg/proto/billing/documents/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type stubRepo struct {
@@ -94,9 +96,7 @@ func (s *stubTemplate) Render(_ model.ActSnapshot) ([]renderer.Block, error) {
return s.blocks, nil
}
func TestGetDocument_IdempotentAndHashed(t *testing.T) {
ctx := context.Background()
func TestGenerateActPDF_IdempotentAndHashed(t *testing.T) {
snapshot := model.ActSnapshot{
PaymentID: "PAY-123",
Date: time.Date(2026, 1, 30, 0, 0, 0, 0, time.UTC),
@@ -105,14 +105,6 @@ func TestGetDocument_IdempotentAndHashed(t *testing.T) {
Currency: "USD",
}
record := &model.DocumentRecord{
PaymentRef: "PAY-123",
Snapshot: snapshot,
}
documentsStore := &stubDocumentsStore{record: record}
repo := &stubRepo{store: documentsStore}
store := newMemDocStore()
tmpl := &stubTemplate{
blocks: []renderer.Block{
{Tag: renderer.TagTitle, Lines: []string{"ACT"}},
@@ -127,62 +119,47 @@ func TestGetDocument_IdempotentAndHashed(t *testing.T) {
},
}
svc := NewService(zap.NewNop(), repo, nil,
svc := NewService(zap.NewNop(), nil, nil,
WithConfig(cfg),
WithDocumentStore(store),
WithTemplateRenderer(tmpl),
)
resp1, err := svc.GetDocument(ctx, &documentsv1.GetDocumentRequest{
PaymentRef: "PAY-123",
Type: documentsv1.DocumentType_DOCUMENT_TYPE_ACT,
})
pdf1, hash1, err := svc.generateActPDF(snapshot)
if err != nil {
t.Fatalf("GetDocument first call: %v", err)
t.Fatalf("generateActPDF first call: %v", err)
}
if len(resp1.GetContent()) == 0 {
if len(pdf1) == 0 {
t.Fatalf("expected content on first call")
}
stored := record.Hashes[model.DocumentTypeAct]
if stored == "" {
t.Fatalf("expected stored hash")
if hash1 == "" {
t.Fatalf("expected non-empty hash on first call")
}
footerHash := extractFooterHash(resp1.GetContent())
footerHash := extractFooterHash(pdf1)
if footerHash == "" {
t.Fatalf("expected footer hash in PDF")
}
if stored != footerHash {
t.Fatalf("stored hash mismatch: got %s", stored)
if hash1 != footerHash {
t.Fatalf("stored hash mismatch: got %s", hash1)
}
resp2, err := svc.GetDocument(ctx, &documentsv1.GetDocumentRequest{
PaymentRef: "PAY-123",
Type: documentsv1.DocumentType_DOCUMENT_TYPE_ACT,
})
pdf2, hash2, err := svc.generateActPDF(snapshot)
if err != nil {
t.Fatalf("GetDocument second call: %v", err)
t.Fatalf("generateActPDF second call: %v", err)
}
if !bytes.Equal(resp1.GetContent(), resp2.GetContent()) {
t.Fatalf("expected identical PDF bytes on second call")
if hash2 == "" {
t.Fatalf("expected non-empty hash on second call")
}
if tmpl.calls != 1 {
t.Fatalf("expected template to be rendered once, got %d", tmpl.calls)
footerHash2 := extractFooterHash(pdf2)
if footerHash2 == "" {
t.Fatalf("expected footer hash in second PDF")
}
if store.saveCount != 1 {
t.Fatalf("expected document save once, got %d", store.saveCount)
}
if store.loadCount == 0 {
t.Fatalf("expected document load on second call")
if footerHash2 != hash2 {
t.Fatalf("second hash mismatch: got=%s want=%s", footerHash2, hash2)
}
}
@@ -212,3 +189,48 @@ func extractFooterHash(pdf []byte) string {
func isHexDigit(b byte) bool {
return (b >= '0' && b <= '9') || (b >= 'a' && b <= 'f') || (b >= 'A' && b <= 'F')
}
func TestGetOperationDocument_GeneratesPDF(t *testing.T) {
svc := NewService(zap.NewNop(), nil, nil, WithConfig(Config{
Issuer: renderer.Issuer{
LegalName: "Sendico Ltd",
},
}))
resp, err := svc.GetOperationDocument(context.Background(), &documentsv1.GetOperationDocumentRequest{
OrganizationRef: "org-1",
GatewayService: "chain_gateway",
OperationRef: "pay-1:step-1",
PaymentRef: "pay-1",
OperationCode: "crypto.transfer",
OperationLabel: "Outbound transfer",
OperationState: "completed",
Amount: "100.50",
Currency: "USDT",
StartedAtUnixMs: time.Date(2026, 3, 4, 10, 0, 0, 0, time.UTC).UnixMilli(),
})
if err != nil {
t.Fatalf("GetOperationDocument failed: %v", err)
}
if len(resp.GetContent()) == 0 {
t.Fatalf("expected non-empty PDF content")
}
if got, want := resp.GetMimeType(), "application/pdf"; got != want {
t.Fatalf("mime_type mismatch: got=%q want=%q", got, want)
}
if got, want := resp.GetFilename(), "operation_pay-1_step-1.pdf"; got != want {
t.Fatalf("filename mismatch: got=%q want=%q", got, want)
}
}
func TestGetOperationDocument_RequiresOperationRef(t *testing.T) {
svc := NewService(zap.NewNop(), nil, nil)
_, err := svc.GetOperationDocument(context.Background(), &documentsv1.GetOperationDocumentRequest{
OrganizationRef: "org-1",
GatewayService: "chain_gateway",
})
if status.Code(err) != codes.InvalidArgument {
t.Fatalf("expected InvalidArgument, got=%v err=%v", status.Code(err), err)
}
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/tech/sendico/pkg/mservice"
"github.com/tech/sendico/pkg/mutil/mzap"
documentsv1 "github.com/tech/sendico/pkg/proto/billing/documents/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"github.com/tech/sendico/server/interface/api/sresponse"
mutil "github.com/tech/sendico/server/internal/mutil/param"
"go.mongodb.org/mongo-driver/v2/bson"
@@ -23,43 +24,90 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
)
const (
documentsServiceName = "BILLING_DOCUMENTS"
documentsOperationGet = discovery.OperationDocumentsGet
documentsDialTimeout = 5 * time.Second
documentsCallTimeout = 10 * time.Second
gatewayCallTimeout = 10 * time.Second
)
func (a *PaymentAPI) getActDocument(r *http.Request, account *model.Account, _ *sresponse.TokenData) http.HandlerFunc {
var allowedOperationGatewayServices = map[mservice.Type]struct{}{
mservice.ChainGateway: {},
mservice.TronGateway: {},
mservice.MntxGateway: {},
mservice.PaymentGateway: {},
mservice.TgSettle: {},
}
func (a *PaymentAPI) getOperationDocument(r *http.Request, account *model.Account, _ *sresponse.TokenData) http.HandlerFunc {
orgRef, denied := a.authorizeDocumentDownload(r, account)
if denied != nil {
return denied
}
query := r.URL.Query()
gatewayService := normalizeGatewayService(query.Get("gateway_service"))
if gatewayService == "" {
return response.BadRequest(a.logger, a.Name(), "missing_parameter", "gateway_service is required")
}
if _, ok := allowedOperationGatewayServices[gatewayService]; !ok {
return response.BadRequest(a.logger, a.Name(), "invalid_parameter", "unsupported gateway_service")
}
operationRef := strings.TrimSpace(query.Get("operation_ref"))
if operationRef == "" {
return response.BadRequest(a.logger, a.Name(), "missing_parameter", "operation_ref is required")
}
service, gateway, h := a.resolveOperationDocumentDeps(r.Context(), gatewayService)
if h != nil {
return h
}
op, err := a.fetchGatewayOperation(r.Context(), gateway.InvokeURI, operationRef)
if err != nil {
a.logger.Warn("Failed to fetch gateway operation for document generation", zap.Error(err), mzap.ObjRef("organization_ref", orgRef), zap.String("gateway_service", string(gatewayService)), zap.String("operation_ref", operationRef))
return documentErrorResponse(a.logger, a.Name(), err)
}
req := operationDocumentRequest(orgRef.Hex(), gatewayService, operationRef, op)
docResp, err := a.fetchOperationDocument(r.Context(), service.InvokeURI, req)
if err != nil {
a.logger.Warn("Failed to fetch operation document", zap.Error(err), mzap.ObjRef("organization_ref", orgRef), zap.String("gateway_service", string(gatewayService)), zap.String("operation_ref", operationRef))
return documentErrorResponse(a.logger, a.Name(), err)
}
return operationDocumentResponse(a.logger, a.Name(), docResp, fmt.Sprintf("operation_%s.pdf", sanitizeFilenameComponent(operationRef)))
}
func (a *PaymentAPI) authorizeDocumentDownload(r *http.Request, account *model.Account) (bson.ObjectID, http.HandlerFunc) {
orgRef, err := a.oph.GetRef(r)
if err != nil {
a.logger.Warn("Failed to parse organization reference for document request", zap.Error(err), mutil.PLog(a.oph, r))
return response.BadReference(a.logger, a.Name(), a.oph.Name(), a.oph.GetID(r), err)
return bson.NilObjectID, response.BadReference(a.logger, a.Name(), a.oph.Name(), a.oph.GetID(r), err)
}
ctx := r.Context()
allowed, err := a.enf.Enforce(ctx, a.permissionRef, account.ID, orgRef, bson.NilObjectID, model.ActionRead)
if err != nil {
a.logger.Warn("Failed to check payments access permissions", zap.Error(err), mutil.PLog(a.oph, r))
return response.Auto(a.logger, a.Name(), err)
return bson.NilObjectID, response.Auto(a.logger, a.Name(), err)
}
if !allowed {
a.logger.Debug("Access denied when downloading act", mutil.PLog(a.oph, r))
return response.AccessDenied(a.logger, a.Name(), "payments read permission denied")
a.logger.Debug("Access denied when downloading document", mutil.PLog(a.oph, r))
return bson.NilObjectID, response.AccessDenied(a.logger, a.Name(), "payments read permission denied")
}
paymentRef := strings.TrimSpace(r.URL.Query().Get("payment_ref"))
if paymentRef == "" {
paymentRef = strings.TrimSpace(r.URL.Query().Get("paymentRef"))
}
if paymentRef == "" {
return response.BadRequest(a.logger, a.Name(), "missing_parameter", "payment_ref is required")
return orgRef, nil
}
func (a *PaymentAPI) resolveOperationDocumentDeps(ctx context.Context, gatewayService mservice.Type) (*discovery.ServiceSummary, *discovery.GatewaySummary, http.HandlerFunc) {
if a.discovery == nil {
return response.Error(a.logger, a.Name(), http.StatusServiceUnavailable, "service_unavailable", "discovery client is not configured")
return nil, nil, response.Error(a.logger, a.Name(), http.StatusServiceUnavailable, "service_unavailable", "discovery client is not configured")
}
lookupCtx, cancel := context.WithTimeout(ctx, discoveryLookupTimeout)
@@ -68,27 +116,35 @@ func (a *PaymentAPI) getActDocument(r *http.Request, account *model.Account, _ *
lookupResp, err := a.discovery.Lookup(lookupCtx)
if err != nil {
a.logger.Warn("Failed to lookup discovery registry", zap.Error(err))
return response.Auto(a.logger, a.Name(), err)
return nil, nil, response.Auto(a.logger, a.Name(), err)
}
service := findDocumentsService(lookupResp.Services)
if service == nil {
return response.Error(a.logger, a.Name(), http.StatusServiceUnavailable, "service_unavailable", "billing documents service unavailable")
return nil, nil, response.Error(a.logger, a.Name(), http.StatusServiceUnavailable, "service_unavailable", "billing documents service unavailable")
}
docResp, err := a.fetchActDocument(ctx, service.InvokeURI, paymentRef)
if err != nil {
a.logger.Warn("Failed to fetch act document", zap.Error(err), mzap.ObjRef("organization_ref", orgRef))
return documentErrorResponse(a.logger, a.Name(), err)
gateway := findGatewayForService(lookupResp.Gateways, gatewayService)
if gateway == nil {
return nil, nil, response.Error(a.logger, a.Name(), http.StatusServiceUnavailable, "service_unavailable", "gateway service unavailable")
}
if len(docResp.GetContent()) == 0 {
return response.Error(a.logger, a.Name(), http.StatusInternalServerError, "empty_document", "document service returned empty payload")
return service, gateway, nil
}
func operationDocumentResponse(logger mlogger.Logger, source mservice.Type, docResp *documentsv1.GetDocumentResponse, fallbackFilename string) http.HandlerFunc {
if docResp == nil || len(docResp.GetContent()) == 0 {
return response.Error(logger, source, http.StatusInternalServerError, "empty_document", "document service returned empty payload")
}
filename := strings.TrimSpace(docResp.GetFilename())
if filename == "" {
filename = fmt.Sprintf("act_%s.pdf", paymentRef)
filename = strings.TrimSpace(fallbackFilename)
}
if filename == "" {
filename = "document.pdf"
}
mimeType := strings.TrimSpace(docResp.GetMimeType())
if mimeType == "" {
mimeType = "application/pdf"
@@ -98,13 +154,67 @@ func (a *PaymentAPI) getActDocument(r *http.Request, account *model.Account, _ *
w.Header().Set("Content-Type", mimeType)
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filename))
w.WriteHeader(http.StatusOK)
if _, writeErr := w.Write(docResp.GetContent()); writeErr != nil {
a.logger.Warn("Failed to write document response", zap.Error(writeErr))
if _, err := w.Write(docResp.GetContent()); err != nil {
logger.Warn("Failed to write document response", zap.Error(err))
}
}
}
func (a *PaymentAPI) fetchActDocument(ctx context.Context, invokeURI, paymentRef string) (*documentsv1.GetDocumentResponse, error) {
func normalizeGatewayService(raw string) mservice.Type {
value := strings.ToLower(strings.TrimSpace(raw))
if value == "" {
return ""
}
switch value {
case string(mservice.ChainGateway):
return mservice.ChainGateway
case string(mservice.TronGateway):
return mservice.TronGateway
case string(mservice.MntxGateway):
return mservice.MntxGateway
case string(mservice.PaymentGateway):
return mservice.PaymentGateway
case string(mservice.TgSettle):
return mservice.TgSettle
default:
return ""
}
}
func sanitizeFilenameComponent(value string) string {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return ""
}
var b strings.Builder
b.Grow(len(trimmed))
for _, r := range trimmed {
switch {
case r >= 'a' && r <= 'z':
b.WriteRune(r)
case r >= 'A' && r <= 'Z':
b.WriteRune(r)
case r >= '0' && r <= '9':
b.WriteRune(r)
case r == '-', r == '_':
b.WriteRune(r)
default:
b.WriteRune('_')
}
}
clean := strings.Trim(b.String(), "_")
if clean == "" {
return "operation"
}
return clean
}
func (a *PaymentAPI) fetchOperationDocument(ctx context.Context, invokeURI string, req *documentsv1.GetOperationDocumentRequest) (*documentsv1.GetDocumentResponse, error) {
conn, err := grpc.NewClient(invokeURI, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, merrors.InternalWrap(err, "dial billing documents")
@@ -116,10 +226,160 @@ func (a *PaymentAPI) fetchActDocument(ctx context.Context, invokeURI, paymentRef
callCtx, callCancel := context.WithTimeout(ctx, documentsCallTimeout)
defer callCancel()
return client.GetDocument(callCtx, &documentsv1.GetDocumentRequest{
PaymentRef: paymentRef,
Type: documentsv1.DocumentType_DOCUMENT_TYPE_ACT,
})
return client.GetOperationDocument(callCtx, req)
}
func (a *PaymentAPI) fetchGatewayOperation(ctx context.Context, invokeURI, operationRef string) (*connectorv1.Operation, error) {
conn, err := grpc.NewClient(invokeURI, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, merrors.InternalWrap(err, "dial gateway connector")
}
defer conn.Close()
client := connectorv1.NewConnectorServiceClient(conn)
callCtx, callCancel := context.WithTimeout(ctx, gatewayCallTimeout)
defer callCancel()
resp, err := client.GetOperation(callCtx, &connectorv1.GetOperationRequest{OperationId: strings.TrimSpace(operationRef)})
if err != nil {
return nil, err
}
op := resp.GetOperation()
if op == nil {
return nil, merrors.NoData("gateway returned empty operation payload")
}
return op, nil
}
func findGatewayForService(gateways []discovery.GatewaySummary, gatewayService mservice.Type) *discovery.GatewaySummary {
candidates := make([]discovery.GatewaySummary, 0, len(gateways))
for _, gw := range gateways {
if !gw.Healthy || strings.TrimSpace(gw.InvokeURI) == "" {
continue
}
rail := discovery.NormalizeRail(gw.Rail)
network := strings.ToLower(strings.TrimSpace(gw.Network))
switch gatewayService {
case mservice.MntxGateway:
if rail == discovery.NormalizeRail(discovery.RailCardPayout) {
candidates = append(candidates, gw)
}
case mservice.PaymentGateway, mservice.TgSettle:
if rail == discovery.NormalizeRail(discovery.RailProviderSettlement) {
candidates = append(candidates, gw)
}
case mservice.TronGateway:
if rail == discovery.NormalizeRail(discovery.RailCrypto) && strings.Contains(network, "tron") {
candidates = append(candidates, gw)
}
case mservice.ChainGateway:
if rail == discovery.NormalizeRail(discovery.RailCrypto) && !strings.Contains(network, "tron") {
candidates = append(candidates, gw)
}
}
}
if len(candidates) == 0 && gatewayService == mservice.ChainGateway {
for _, gw := range gateways {
if gw.Healthy && strings.TrimSpace(gw.InvokeURI) != "" && discovery.NormalizeRail(gw.Rail) == discovery.NormalizeRail(discovery.RailCrypto) {
candidates = append(candidates, gw)
}
}
}
if len(candidates) == 0 {
return nil
}
best := candidates[0]
for _, candidate := range candidates[1:] {
if candidate.RoutingPriority > best.RoutingPriority {
best = candidate
}
}
return &best
}
func operationDocumentRequest(organizationRef string, gatewayService mservice.Type, requestedOperationRef string, op *connectorv1.Operation) *documentsv1.GetOperationDocumentRequest {
req := &documentsv1.GetOperationDocumentRequest{
OrganizationRef: strings.TrimSpace(organizationRef),
GatewayService: string(gatewayService),
OperationRef: firstNonEmpty(strings.TrimSpace(op.GetOperationRef()), strings.TrimSpace(requestedOperationRef)),
OperationCode: strings.TrimSpace(op.GetType().String()),
OperationLabel: operationLabel(op.GetType()),
OperationState: strings.TrimSpace(op.GetStatus().String()),
Amount: strings.TrimSpace(op.GetMoney().GetAmount()),
Currency: strings.TrimSpace(op.GetMoney().GetCurrency()),
}
if ts := op.GetCreatedAt(); ts != nil {
req.StartedAtUnixMs = ts.AsTime().UnixMilli()
}
if ts := op.GetUpdatedAt(); ts != nil {
req.CompletedAtUnixMs = ts.AsTime().UnixMilli()
}
req.PaymentRef = operationParamValue(op.GetParams(), "payment_ref", "parent_payment_ref", "paymentRef", "parentPaymentRef")
req.FailureCode = firstNonEmpty(
operationParamValue(op.GetParams(), "failure_code", "provider_code", "error_code"),
failureCodeFromStatus(op.GetStatus()),
)
req.FailureReason = operationParamValue(op.GetParams(), "failure_reason", "provider_message", "error", "message")
return req
}
func operationLabel(opType connectorv1.OperationType) string {
switch opType {
case connectorv1.OperationType_CREDIT:
return "Credit"
case connectorv1.OperationType_DEBIT:
return "Debit"
case connectorv1.OperationType_TRANSFER:
return "Transfer"
case connectorv1.OperationType_PAYOUT:
return "Payout"
case connectorv1.OperationType_FEE_ESTIMATE:
return "Fee Estimate"
case connectorv1.OperationType_FX:
return "FX"
case connectorv1.OperationType_GAS_TOPUP:
return "Gas Top Up"
default:
return strings.TrimSpace(opType.String())
}
}
func failureCodeFromStatus(status connectorv1.OperationStatus) string {
switch status {
case connectorv1.OperationStatus_OPERATION_FAILED, connectorv1.OperationStatus_OPERATION_CANCELLED:
return strings.TrimSpace(status.String())
default:
return ""
}
}
func operationParamValue(params *structpb.Struct, keys ...string) string {
if params == nil {
return ""
}
values := params.AsMap()
for _, key := range keys {
raw, ok := values[key]
if !ok {
continue
}
if text := strings.TrimSpace(fmt.Sprint(raw)); text != "" && text != "<nil>" {
return text
}
}
return ""
}
func findDocumentsService(services []discovery.ServiceSummary) *discovery.ServiceSummary {

View File

@@ -106,7 +106,7 @@ func CreateAPI(apiCtx eapi.API) (*PaymentAPI, error) {
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/by-quote"), api.Post, p.initiateByQuote)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/by-multiquote"), api.Post, p.initiatePaymentsByQuote)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/"), api.Get, p.listPayments)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/documents/act"), api.Get, p.getActDocument)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/documents/operation"), api.Get, p.getOperationDocument)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/registry"), api.Get, p.listDiscoveryRegistry)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/registry/refresh"), api.Get, p.getDiscoveryRefresh)

View File

@@ -8,6 +8,7 @@ import (
"github.com/tech/sendico/gateway/chain/internal/appversion"
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
chainstoragemodel "github.com/tech/sendico/gateway/chain/storage/model"
chainasset "github.com/tech/sendico/pkg/chain"
"github.com/tech/sendico/pkg/connector/params"
"github.com/tech/sendico/pkg/merrors"
@@ -17,6 +18,7 @@ import (
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
const chainConnectorID = "chain"
@@ -293,11 +295,21 @@ func (s *Service) GetOperation(ctx context.Context, req *connectorv1.GetOperatio
if req == nil || strings.TrimSpace(req.GetOperationId()) == "" {
return nil, merrors.InvalidArgument("get_operation: operation_id is required")
}
resp, err := s.GetTransfer(ctx, &chainv1.GetTransferRequest{TransferRef: strings.TrimSpace(req.GetOperationId())})
operationRef := strings.TrimSpace(req.GetOperationId())
if s.storage == nil || s.storage.Transfers() == nil {
return nil, merrors.Internal("get_operation: storage is not configured")
}
transfer, err := s.storage.Transfers().FindByOperationRef(ctx, "", operationRef)
if err != nil {
return nil, err
}
return &connectorv1.GetOperationResponse{Operation: chainTransferToOperation(resp.GetTransfer())}, nil
if transfer == nil {
return nil, merrors.NoData("transfer not found")
}
return &connectorv1.GetOperationResponse{Operation: chainTransferToOperation(storageTransferToProto(transfer))}, nil
}
func (s *Service) ListOperations(ctx context.Context, req *connectorv1.ListOperationsRequest) (*connectorv1.ListOperationsResponse, error) {
@@ -493,6 +505,61 @@ func feeEstimateResult(resp *chainv1.EstimateTransferFeeResponse) *structpb.Stru
return result
}
func storageTransferToProto(transfer *chainstoragemodel.Transfer) *chainv1.Transfer {
if transfer == nil {
return nil
}
destination := &chainv1.TransferDestination{Memo: strings.TrimSpace(transfer.Destination.Memo)}
if managedWalletRef := strings.TrimSpace(transfer.Destination.ManagedWalletRef); managedWalletRef != "" {
destination.Destination = &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: managedWalletRef}
} else if externalAddress := strings.TrimSpace(transfer.Destination.ExternalAddress); externalAddress != "" {
destination.Destination = &chainv1.TransferDestination_ExternalAddress{ExternalAddress: externalAddress}
}
fees := make([]*chainv1.ServiceFeeBreakdown, 0, len(transfer.Fees))
for _, fee := range transfer.Fees {
fees = append(fees, &chainv1.ServiceFeeBreakdown{
FeeCode: strings.TrimSpace(fee.FeeCode),
Amount: fee.Amount,
Description: strings.TrimSpace(fee.Description),
})
}
asset := &chainv1.Asset{
Chain: shared.ChainEnumFromName(transfer.Network),
TokenSymbol: strings.TrimSpace(transfer.TokenSymbol),
ContractAddress: strings.TrimSpace(transfer.ContractAddress),
}
protoTransfer := &chainv1.Transfer{
TransferRef: strings.TrimSpace(transfer.TransferRef),
IdempotencyKey: strings.TrimSpace(transfer.IdempotencyKey),
IntentRef: strings.TrimSpace(transfer.IntentRef),
OperationRef: strings.TrimSpace(transfer.OperationRef),
OrganizationRef: strings.TrimSpace(transfer.OrganizationRef),
SourceWalletRef: strings.TrimSpace(transfer.SourceWalletRef),
Destination: destination,
Asset: asset,
RequestedAmount: shared.MonenyToProto(transfer.RequestedAmount),
NetAmount: shared.MonenyToProto(transfer.NetAmount),
Fees: fees,
Status: shared.TransferStatusToProto(transfer.Status),
TransactionHash: strings.TrimSpace(transfer.TxHash),
FailureReason: strings.TrimSpace(transfer.FailureReason),
PaymentRef: strings.TrimSpace(transfer.PaymentRef),
}
if !transfer.CreatedAt.IsZero() {
protoTransfer.CreatedAt = timestamppb.New(transfer.CreatedAt.UTC())
}
if !transfer.UpdatedAt.IsZero() {
protoTransfer.UpdatedAt = timestamppb.New(transfer.UpdatedAt.UTC())
}
return protoTransfer
}
func gasTopUpResult(amount *moneyv1.Money, capHit bool, transferRef string) *structpb.Struct {
payload := map[string]interface{}{
"cap_hit": capHit,
@@ -523,6 +590,8 @@ func chainTransferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation
Status: chainTransferStatusToOperation(transfer.GetStatus()),
Money: transfer.GetRequestedAmount(),
ProviderRef: strings.TrimSpace(transfer.GetTransactionHash()),
IntentRef: strings.TrimSpace(transfer.GetIntentRef()),
OperationRef: strings.TrimSpace(transfer.GetOperationRef()),
CreatedAt: transfer.GetCreatedAt(),
UpdatedAt: transfer.GetUpdatedAt(),
From: &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{
@@ -530,6 +599,19 @@ func chainTransferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation
AccountId: strings.TrimSpace(transfer.GetSourceWalletRef()),
}}},
}
params := map[string]interface{}{}
if paymentRef := strings.TrimSpace(transfer.GetPaymentRef()); paymentRef != "" {
params["payment_ref"] = paymentRef
}
if organizationRef := strings.TrimSpace(transfer.GetOrganizationRef()); organizationRef != "" {
params["organization_ref"] = organizationRef
}
if failureReason := strings.TrimSpace(transfer.GetFailureReason()); failureReason != "" {
params["failure_reason"] = failureReason
}
if len(params) > 0 {
op.Params = structFromMap(params)
}
if dest := transfer.GetDestination(); dest != nil {
switch d := dest.GetDestination().(type) {
case *chainv1.TransferDestination_ManagedWalletRef:
@@ -629,6 +711,17 @@ func operationAccountID(party *connectorv1.OperationParty) string {
return ""
}
func structFromMap(values map[string]interface{}) *structpb.Struct {
if len(values) == 0 {
return nil
}
result, err := structpb.NewStruct(values)
if err != nil {
return nil
}
return result
}
func connectorError(code connectorv1.ErrorCode, message string, op *connectorv1.Operation, accountID string) *connectorv1.ConnectorError {
err := &connectorv1.ConnectorError{
Code: code,

View File

@@ -500,6 +500,32 @@ func (t *inMemoryTransfers) Get(ctx context.Context, transferRef string) (*model
return transfer, nil
}
func (t *inMemoryTransfers) FindByOperationRef(ctx context.Context, organizationRef, operationRef string) (*model.Transfer, error) {
t.mu.Lock()
defer t.mu.Unlock()
org := strings.TrimSpace(organizationRef)
opRef := strings.TrimSpace(operationRef)
if opRef == "" {
return nil, merrors.InvalidArgument("transfersStore: empty operationRef")
}
for _, transfer := range t.items {
if transfer == nil {
continue
}
if !strings.EqualFold(strings.TrimSpace(transfer.OperationRef), opRef) {
continue
}
if org != "" && !strings.EqualFold(strings.TrimSpace(transfer.OrganizationRef), org) {
continue
}
return transfer, nil
}
return nil, merrors.NoData("transfer not found")
}
func (t *inMemoryTransfers) List(ctx context.Context, filter model.TransferFilter) (*model.TransferList, error) {
t.mu.Lock()
defer t.mu.Unlock()

View File

@@ -40,6 +40,9 @@ func NewTransfers(logger mlogger.Logger, db *mongo.Database) (*Transfers, error)
Keys: []ri.Key{{Field: "transferRef", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{{Field: "organizationRef", Sort: ri.Asc}, {Field: "operationRef", Sort: ri.Asc}},
},
{
Keys: []ri.Key{{Field: "idempotencyKey", Sort: ri.Asc}},
Unique: true,
@@ -110,6 +113,25 @@ func (t *Transfers) Get(ctx context.Context, transferRef string) (*model.Transfe
return transfer, nil
}
func (t *Transfers) FindByOperationRef(ctx context.Context, organizationRef, operationRef string) (*model.Transfer, error) {
operationRef = strings.TrimSpace(operationRef)
if operationRef == "" {
return nil, merrors.InvalidArgument("transfersStore: empty operationRef")
}
query := repository.Query().Filter(repository.Field("operationRef"), operationRef)
if org := strings.TrimSpace(organizationRef); org != "" {
query = query.Filter(repository.Field("organizationRef"), org)
}
transfer := &model.Transfer{}
if err := t.repo.FindOneByFilter(ctx, query, transfer); err != nil {
return nil, err
}
return transfer, nil
}
func (t *Transfers) List(ctx context.Context, filter model.TransferFilter) (*model.TransferList, error) {
query := repository.Query()
if src := strings.TrimSpace(filter.SourceWalletRef); src != "" {

View File

@@ -42,6 +42,7 @@ type WalletsStore interface {
type TransfersStore interface {
Create(ctx context.Context, transfer *model.Transfer) (*model.Transfer, error)
Get(ctx context.Context, transferRef string) (*model.Transfer, error)
FindByOperationRef(ctx context.Context, organizationRef, operationRef string) (*model.Transfer, error)
List(ctx context.Context, filter model.TransferFilter) (*model.TransferList, error)
UpdateStatus(ctx context.Context, transferRef string, status model.TransferStatus, failureReason string, txHash string) (*model.Transfer, error)
}

View File

@@ -12,6 +12,7 @@ import (
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
"google.golang.org/protobuf/types/known/structpb"
)
const mntxConnectorID = "mntx"
@@ -92,11 +93,21 @@ func (s *Service) GetOperation(ctx context.Context, req *connectorv1.GetOperatio
if req == nil || strings.TrimSpace(req.GetOperationId()) == "" {
return nil, merrors.InvalidArgument("get_operation: operation_id is required")
}
resp, err := s.GetCardPayoutStatus(ctx, &mntxv1.GetCardPayoutStatusRequest{PayoutId: strings.TrimSpace(req.GetOperationId())})
operationRef := strings.TrimSpace(req.GetOperationId())
if s.storage == nil || s.storage.Payouts() == nil {
return nil, merrors.Internal("get_operation: storage is not configured")
}
payout, err := s.storage.Payouts().FindByOperationRef(ctx, operationRef)
if err != nil {
return nil, err
}
return &connectorv1.GetOperationResponse{Operation: payoutToOperation(resp.GetPayout())}, nil
if payout == nil {
return nil, merrors.NoData("payout not found")
}
return &connectorv1.GetOperationResponse{Operation: payoutToOperation(StateToProto(payout))}, nil
}
func (s *Service) ListOperations(_ context.Context, _ *connectorv1.ListOperationsRequest) (*connectorv1.ListOperationsResponse, error) {
@@ -274,7 +285,7 @@ func payoutToOperation(state *mntxv1.CardPayoutState) *connectorv1.Operation {
if state == nil {
return nil
}
return &connectorv1.Operation{
op := &connectorv1.Operation{
OperationId: firstNonEmpty(strings.TrimSpace(state.GetOperationRef()), strings.TrimSpace(state.GetPayoutId())),
Type: connectorv1.OperationType_PAYOUT,
Status: payoutStatusToOperation(state.GetStatus()),
@@ -283,9 +294,29 @@ func payoutToOperation(state *mntxv1.CardPayoutState) *connectorv1.Operation {
Currency: strings.ToUpper(strings.TrimSpace(state.GetCurrency())),
},
ProviderRef: strings.TrimSpace(state.GetProviderPaymentId()),
IntentRef: strings.TrimSpace(state.GetIntentRef()),
OperationRef: strings.TrimSpace(state.GetOperationRef()),
CreatedAt: state.GetCreatedAt(),
UpdatedAt: state.GetUpdatedAt(),
}
params := map[string]interface{}{}
if paymentRef := strings.TrimSpace(state.GetParentPaymentRef()); paymentRef != "" {
params["payment_ref"] = paymentRef
params["parent_payment_ref"] = paymentRef
}
if providerCode := strings.TrimSpace(state.GetProviderCode()); providerCode != "" {
params["provider_code"] = providerCode
}
if providerMessage := strings.TrimSpace(state.GetProviderMessage()); providerMessage != "" {
params["provider_message"] = providerMessage
params["failure_reason"] = providerMessage
}
if len(params) > 0 {
op.Params = structFromMap(params)
}
return op
}
func minorToDecimal(amount int64) string {
@@ -316,6 +347,17 @@ func payoutStatusToOperation(status mntxv1.PayoutStatus) connectorv1.OperationSt
}
}
func structFromMap(values map[string]interface{}) *structpb.Struct {
if len(values) == 0 {
return nil
}
result, err := structpb.NewStruct(values)
if err != nil {
return nil
}
return result
}
func connectorError(code connectorv1.ErrorCode, message string, op *connectorv1.Operation, accountID string) *connectorv1.ConnectorError {
err := &connectorv1.ConnectorError{
Code: code,

View File

@@ -11,6 +11,9 @@ import (
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
)
const tgsettleConnectorID = "tgsettle"
@@ -152,12 +155,22 @@ func (s *Service) GetOperation(ctx context.Context, req *connectorv1.GetOperatio
return nil, merrors.InvalidArgument("get_operation: operation_id is required")
}
operationID := strings.TrimSpace(req.GetOperationId())
resp, err := s.GetTransfer(ctx, &chainv1.GetTransferRequest{TransferRef: operationID})
if s.repo == nil || s.repo.Payments() == nil {
s.logger.Warn("Get operation storage unavailable", zap.String("operation_id", operationID))
return nil, merrors.Internal("get_operation: storage is not configured")
}
record, err := s.repo.Payments().FindByOperationRef(ctx, operationID)
if err != nil {
s.logger.Warn("Get operation failed", zap.String("operation_id", operationID), zap.Error(err))
s.logger.Warn("Get operation lookup by operation_ref failed", zap.String("operation_id", operationID), zap.Error(err))
return nil, err
}
return &connectorv1.GetOperationResponse{Operation: transferToOperation(resp.GetTransfer())}, nil
if record == nil {
return nil, status.Error(codes.NotFound, "operation not found")
}
return &connectorv1.GetOperationResponse{Operation: transferToOperation(transferFromPayment(record, nil))}, nil
}
func (s *Service) ListOperations(_ context.Context, _ *connectorv1.ListOperationsRequest) (*connectorv1.ListOperationsResponse, error) {
@@ -221,6 +234,19 @@ func transferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation {
CreatedAt: transfer.GetCreatedAt(),
UpdatedAt: transfer.GetUpdatedAt(),
}
params := map[string]interface{}{}
if paymentRef := strings.TrimSpace(transfer.GetPaymentRef()); paymentRef != "" {
params["payment_ref"] = paymentRef
}
if organizationRef := strings.TrimSpace(transfer.GetOrganizationRef()); organizationRef != "" {
params["organization_ref"] = organizationRef
}
if failureReason := strings.TrimSpace(transfer.GetFailureReason()); failureReason != "" {
params["failure_reason"] = failureReason
}
if len(params) > 0 {
op.Params = structFromMap(params)
}
if source := strings.TrimSpace(transfer.GetSourceWalletRef()); source != "" {
op.From = &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{
ConnectorId: tgsettleConnectorID,
@@ -281,6 +307,17 @@ func operationAccountID(party *connectorv1.OperationParty) string {
return ""
}
func structFromMap(values map[string]interface{}) *structpb.Struct {
if len(values) == 0 {
return nil
}
result, err := structpb.NewStruct(values)
if err != nil {
return nil
}
return result
}
func operationLogFields(op *connectorv1.Operation) []zap.Field {
if op == nil {
return nil

View File

@@ -675,6 +675,9 @@ func transferFromRequest(req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
SourceWalletRef: strings.TrimSpace(req.GetSourceWalletRef()),
Destination: req.GetDestination(),
RequestedAmount: req.GetAmount(),
IntentRef: strings.TrimSpace(req.GetIntentRef()),
OperationRef: strings.TrimSpace(req.GetOperationRef()),
PaymentRef: strings.TrimSpace(req.GetPaymentRef()),
Status: chainv1.TransferStatus_TRANSFER_CREATED,
}
}
@@ -714,6 +717,10 @@ func transferFromPayment(record *storagemodel.PaymentRecord, req *chainv1.Submit
IdempotencyKey: strings.TrimSpace(record.IdempotencyKey),
RequestedAmount: requested,
NetAmount: net,
IntentRef: strings.TrimSpace(record.IntentRef),
OperationRef: strings.TrimSpace(record.OperationRef),
PaymentRef: strings.TrimSpace(record.PaymentRef),
FailureReason: strings.TrimSpace(record.FailureReason),
Status: status,
}

View File

@@ -37,6 +37,20 @@ func (f *fakePaymentsStore) FindByIdempotencyKey(_ context.Context, key string)
return f.records[key], nil
}
func (f *fakePaymentsStore) FindByOperationRef(_ context.Context, key string) (*storagemodel.PaymentRecord, error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.records == nil {
return nil, nil
}
for _, record := range f.records {
if record != nil && record.OperationRef == key {
return record, nil
}
}
return nil, nil
}
func (f *fakePaymentsStore) Upsert(_ context.Context, record *storagemodel.PaymentRecord) error {
f.mu.Lock()
defer f.mu.Unlock()

View File

@@ -20,6 +20,7 @@ import (
const (
paymentsCollection = "payments"
fieldIdempotencyKey = "idempotencyKey"
fieldOperationRef = "operationRef"
)
type Payments struct {
@@ -44,6 +45,14 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
logger.Error("Failed to create payments idempotency index", zap.Error(err), zap.String("index_field", fieldIdempotencyKey))
return nil, err
}
if err := repo.CreateIndex(&ri.Definition{
Keys: []ri.Key{{Field: fieldOperationRef, Sort: ri.Asc}},
Unique: true,
Sparse: true,
}); err != nil {
logger.Error("Failed to create payments operation index", zap.Error(err), zap.String("index_field", fieldOperationRef))
return nil, err
}
p := &Payments{
logger: logger,
@@ -72,6 +81,25 @@ func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model
return &result, nil
}
func (p *Payments) FindByOperationRef(ctx context.Context, key string) (*model.PaymentRecord, error) {
key = strings.TrimSpace(key)
if key == "" {
return nil, merrors.InvalidArgument("operation reference is required", "operation_ref")
}
var result model.PaymentRecord
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldOperationRef, key), &result)
if errors.Is(err, merrors.ErrNoData) {
return nil, nil
}
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Payment record lookup by operation ref failed", zap.String("operation_ref", key), zap.Error(err))
}
return nil, err
}
return &result, nil
}
func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) error {
if record == nil {
return merrors.InvalidArgument("payment record is nil", "record")
@@ -82,6 +110,7 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg)
record.TargetChatID = strings.TrimSpace(record.TargetChatID)
record.IntentRef = strings.TrimSpace(record.IntentRef)
record.OperationRef = strings.TrimSpace(record.OperationRef)
if record.PaymentIntentID == "" {
return merrors.InvalidArgument("intention reference is required", "payment_intent_ref")
}

View File

@@ -18,6 +18,7 @@ type Repository interface {
type PaymentsStore interface {
FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error)
FindByOperationRef(ctx context.Context, key string) (*model.PaymentRecord, error)
Upsert(ctx context.Context, record *model.PaymentRecord) error
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/tech/sendico/gateway/tron/internal/appversion"
"github.com/tech/sendico/gateway/tron/shared"
tronstoragemodel "github.com/tech/sendico/gateway/tron/storage/model"
chainasset "github.com/tech/sendico/pkg/chain"
"github.com/tech/sendico/pkg/connector/params"
"github.com/tech/sendico/pkg/merrors"
@@ -17,6 +18,7 @@ import (
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
const chainConnectorID = "chain"
@@ -293,11 +295,21 @@ func (s *Service) GetOperation(ctx context.Context, req *connectorv1.GetOperatio
if req == nil || strings.TrimSpace(req.GetOperationId()) == "" {
return nil, merrors.InvalidArgument("get_operation: operation_id is required")
}
resp, err := s.GetTransfer(ctx, &chainv1.GetTransferRequest{TransferRef: strings.TrimSpace(req.GetOperationId())})
operationRef := strings.TrimSpace(req.GetOperationId())
if s.storage == nil || s.storage.Transfers() == nil {
return nil, merrors.Internal("get_operation: storage is not configured")
}
transfer, err := s.storage.Transfers().FindByOperationRef(ctx, "", operationRef)
if err != nil {
return nil, err
}
return &connectorv1.GetOperationResponse{Operation: chainTransferToOperation(resp.GetTransfer())}, nil
if transfer == nil {
return nil, merrors.NoData("transfer not found")
}
return &connectorv1.GetOperationResponse{Operation: chainTransferToOperation(storageTransferToProto(transfer))}, nil
}
func (s *Service) ListOperations(ctx context.Context, req *connectorv1.ListOperationsRequest) (*connectorv1.ListOperationsResponse, error) {
@@ -493,6 +505,61 @@ func feeEstimateResult(resp *chainv1.EstimateTransferFeeResponse) *structpb.Stru
return result
}
func storageTransferToProto(transfer *tronstoragemodel.Transfer) *chainv1.Transfer {
if transfer == nil {
return nil
}
destination := &chainv1.TransferDestination{Memo: strings.TrimSpace(transfer.Destination.Memo)}
if managedWalletRef := strings.TrimSpace(transfer.Destination.ManagedWalletRef); managedWalletRef != "" {
destination.Destination = &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: managedWalletRef}
} else if externalAddress := strings.TrimSpace(transfer.Destination.ExternalAddress); externalAddress != "" {
destination.Destination = &chainv1.TransferDestination_ExternalAddress{ExternalAddress: externalAddress}
}
fees := make([]*chainv1.ServiceFeeBreakdown, 0, len(transfer.Fees))
for _, fee := range transfer.Fees {
fees = append(fees, &chainv1.ServiceFeeBreakdown{
FeeCode: strings.TrimSpace(fee.FeeCode),
Amount: fee.Amount,
Description: strings.TrimSpace(fee.Description),
})
}
asset := &chainv1.Asset{
Chain: shared.ChainEnumFromName(transfer.Network),
TokenSymbol: strings.TrimSpace(transfer.TokenSymbol),
ContractAddress: strings.TrimSpace(transfer.ContractAddress),
}
protoTransfer := &chainv1.Transfer{
TransferRef: strings.TrimSpace(transfer.TransferRef),
IdempotencyKey: strings.TrimSpace(transfer.IdempotencyKey),
IntentRef: strings.TrimSpace(transfer.IntentRef),
OperationRef: strings.TrimSpace(transfer.OperationRef),
OrganizationRef: strings.TrimSpace(transfer.OrganizationRef),
SourceWalletRef: strings.TrimSpace(transfer.SourceWalletRef),
Destination: destination,
Asset: asset,
RequestedAmount: shared.MonenyToProto(transfer.RequestedAmount),
NetAmount: shared.MonenyToProto(transfer.NetAmount),
Fees: fees,
Status: shared.TransferStatusToProto(transfer.Status),
TransactionHash: strings.TrimSpace(transfer.TxHash),
FailureReason: strings.TrimSpace(transfer.FailureReason),
PaymentRef: strings.TrimSpace(transfer.PaymentRef),
}
if !transfer.CreatedAt.IsZero() {
protoTransfer.CreatedAt = timestamppb.New(transfer.CreatedAt.UTC())
}
if !transfer.UpdatedAt.IsZero() {
protoTransfer.UpdatedAt = timestamppb.New(transfer.UpdatedAt.UTC())
}
return protoTransfer
}
func gasTopUpResult(amount *moneyv1.Money, capHit bool, transferRef string) *structpb.Struct {
payload := map[string]interface{}{
"cap_hit": capHit,
@@ -523,6 +590,8 @@ func chainTransferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation
Status: chainTransferStatusToOperation(transfer.GetStatus()),
Money: transfer.GetRequestedAmount(),
ProviderRef: strings.TrimSpace(transfer.GetTransactionHash()),
IntentRef: strings.TrimSpace(transfer.GetIntentRef()),
OperationRef: strings.TrimSpace(transfer.GetOperationRef()),
CreatedAt: transfer.GetCreatedAt(),
UpdatedAt: transfer.GetUpdatedAt(),
From: &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{
@@ -530,6 +599,19 @@ func chainTransferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation
AccountId: strings.TrimSpace(transfer.GetSourceWalletRef()),
}}},
}
params := map[string]interface{}{}
if paymentRef := strings.TrimSpace(transfer.GetPaymentRef()); paymentRef != "" {
params["payment_ref"] = paymentRef
}
if organizationRef := strings.TrimSpace(transfer.GetOrganizationRef()); organizationRef != "" {
params["organization_ref"] = organizationRef
}
if failureReason := strings.TrimSpace(transfer.GetFailureReason()); failureReason != "" {
params["failure_reason"] = failureReason
}
if len(params) > 0 {
op.Params = structFromMap(params)
}
if dest := transfer.GetDestination(); dest != nil {
switch d := dest.GetDestination().(type) {
case *chainv1.TransferDestination_ManagedWalletRef:
@@ -629,6 +711,17 @@ func operationAccountID(party *connectorv1.OperationParty) string {
return ""
}
func structFromMap(values map[string]interface{}) *structpb.Struct {
if len(values) == 0 {
return nil
}
result, err := structpb.NewStruct(values)
if err != nil {
return nil
}
return result
}
func connectorError(code connectorv1.ErrorCode, message string, op *connectorv1.Operation, accountID string) *connectorv1.ConnectorError {
err := &connectorv1.ConnectorError{
Code: code,

View File

@@ -554,6 +554,32 @@ func (t *inMemoryTransfers) Get(ctx context.Context, transferRef string) (*model
return transfer, nil
}
func (t *inMemoryTransfers) FindByOperationRef(ctx context.Context, organizationRef, operationRef string) (*model.Transfer, error) {
t.mu.Lock()
defer t.mu.Unlock()
org := strings.TrimSpace(organizationRef)
opRef := strings.TrimSpace(operationRef)
if opRef == "" {
return nil, merrors.InvalidArgument("transfersStore: empty operationRef")
}
for _, transfer := range t.items {
if transfer == nil {
continue
}
if !strings.EqualFold(strings.TrimSpace(transfer.OperationRef), opRef) {
continue
}
if org != "" && !strings.EqualFold(strings.TrimSpace(transfer.OrganizationRef), org) {
continue
}
return transfer, nil
}
return nil, merrors.NoData("transfer not found")
}
func (t *inMemoryTransfers) List(ctx context.Context, filter model.TransferFilter) (*model.TransferList, error) {
t.mu.Lock()
defer t.mu.Unlock()

View File

@@ -40,6 +40,9 @@ func NewTransfers(logger mlogger.Logger, db *mongo.Database) (*Transfers, error)
Keys: []ri.Key{{Field: "transferRef", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{{Field: "organizationRef", Sort: ri.Asc}, {Field: "operationRef", Sort: ri.Asc}},
},
{
Keys: []ri.Key{{Field: "idempotencyKey", Sort: ri.Asc}},
Unique: true,
@@ -110,6 +113,25 @@ func (t *Transfers) Get(ctx context.Context, transferRef string) (*model.Transfe
return transfer, nil
}
func (t *Transfers) FindByOperationRef(ctx context.Context, organizationRef, operationRef string) (*model.Transfer, error) {
operationRef = strings.TrimSpace(operationRef)
if operationRef == "" {
return nil, merrors.InvalidArgument("transfersStore: empty operationRef")
}
query := repository.Query().Filter(repository.Field("operationRef"), operationRef)
if org := strings.TrimSpace(organizationRef); org != "" {
query = query.Filter(repository.Field("organizationRef"), org)
}
transfer := &model.Transfer{}
if err := t.repo.FindOneByFilter(ctx, query, transfer); err != nil {
return nil, err
}
return transfer, nil
}
func (t *Transfers) List(ctx context.Context, filter model.TransferFilter) (*model.TransferList, error) {
query := repository.Query()
if src := strings.TrimSpace(filter.SourceWalletRef); src != "" {

View File

@@ -42,6 +42,7 @@ type WalletsStore interface {
type TransfersStore interface {
Create(ctx context.Context, transfer *model.Transfer) (*model.Transfer, error)
Get(ctx context.Context, transferRef string) (*model.Transfer, error)
FindByOperationRef(ctx context.Context, organizationRef, operationRef string) (*model.Transfer, error)
List(ctx context.Context, filter model.TransferFilter) (*model.TransferList, error)
UpdateStatus(ctx context.Context, transferRef string, status model.TransferStatus, failureReason string, txHash string) (*model.Transfer, error)
}

View File

@@ -43,6 +43,11 @@ service DocumentService {
// generates it lazily, stores it, and returns it.
rpc GetDocument(GetDocumentRequest)
returns (GetDocumentResponse);
// GetOperationDocument returns a generated PDF file for
// a gateway operation snapshot provided by the caller.
rpc GetOperationDocument(GetOperationDocumentRequest)
returns (GetDocumentResponse);
}
@@ -99,3 +104,24 @@ message GetDocumentResponse {
// MIME type, typically "application/pdf"
string mime_type = 3;
}
// GetOperationDocumentRequest requests a document for a
// single gateway operation.
message GetOperationDocumentRequest {
string organization_ref = 1;
string gateway_service = 2;
string operation_ref = 3;
string payment_ref = 4;
string operation_code = 5;
string operation_label = 6;
string operation_state = 7;
string failure_code = 8;
string failure_reason = 9;
string amount = 10;
string currency = 11;
int64 started_at_unix_ms = 12;
int64 completed_at_unix_ms = 13;
}

View File

@@ -95,6 +95,8 @@ paths:
$ref: ./api/payments/by_multiquote.yaml
/payments/{organizations_ref}:
$ref: ./api/payments/list.yaml
/payments/documents/operation/{organizations_ref}:
$ref: ./api/payments/documents_operation.yaml
components:
securitySchemes:

View File

@@ -1,27 +1,29 @@
get:
tags: [Payments]
summary: Download act document by payment reference
description: Returns the billing act document as binary content.
operationId: paymentsGetActDocument
summary: Download billing document by operation reference
description: |
Returns operation-level billing document as binary content.
The request is resolved by gateway service and operation reference.
operationId: paymentsGetOperationDocument
security:
- bearerAuth: []
parameters:
- $ref: ../parameters/organizations_ref.yaml#/components/parameters/OrganizationsRef
- name: payment_ref
- name: gateway_service
in: query
required: false
description: Payment reference for which to fetch the act document.
required: true
description: Gateway service identifier (`chain_gateway`, `tron_gateway`, `mntx_gateway`, `payment_gateway`, `tgsettle_gateway`).
schema:
type: string
- name: paymentRef
- name: operation_ref
in: query
required: false
description: Alias of `payment_ref`.
required: true
description: Operation reference for which to fetch billing document.
schema:
type: string
responses:
'200':
description: Act document file
description: Operation billing document file
content:
application/pdf:
schema:

View File