Files
sendico/api/billing/fees/internal/service/fees/service.go
2025-12-11 22:25:04 +01:00

466 lines
14 KiB
Go

package fees
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"strings"
"time"
internalcalculator "github.com/tech/sendico/billing/fees/internal/service/fees/internal/calculator"
"github.com/tech/sendico/billing/fees/internal/service/fees/internal/resolver"
"github.com/tech/sendico/billing/fees/storage"
"github.com/tech/sendico/billing/fees/storage/model"
oracleclient "github.com/tech/sendico/fx/oracle/client"
"github.com/tech/sendico/pkg/api/routers"
clockpkg "github.com/tech/sendico/pkg/clock"
"github.com/tech/sendico/pkg/merrors"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
tracev1 "github.com/tech/sendico/pkg/proto/common/trace/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
type Service struct {
logger mlogger.Logger
storage storage.Repository
producer msg.Producer
clock clockpkg.Clock
calculator Calculator
oracle oracleclient.Client
resolver FeeResolver
feesv1.UnimplementedFeeEngineServer
}
func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Producer, opts ...Option) *Service {
svc := &Service{
logger: logger.Named("fees"),
storage: repo,
producer: producer,
clock: clockpkg.NewSystem(),
}
initMetrics()
for _, opt := range opts {
opt(svc)
}
if svc.clock == nil {
svc.clock = clockpkg.NewSystem()
}
if svc.calculator == nil {
svc.calculator = internalcalculator.New(svc.logger, svc.oracle)
}
if svc.resolver == nil {
svc.resolver = resolver.New(repo.Plans(), svc.logger)
}
return svc
}
func (s *Service) Register(router routers.GRPC) error {
return router.Register(func(reg grpc.ServiceRegistrar) {
feesv1.RegisterFeeEngineServer(reg, s)
})
}
func (s *Service) QuoteFees(ctx context.Context, req *feesv1.QuoteFeesRequest) (resp *feesv1.QuoteFeesResponse, err error) {
var (
meta *feesv1.RequestMeta
intent *feesv1.Intent
)
if req != nil {
meta = req.GetMeta()
intent = req.GetIntent()
}
logger := s.logger.With(requestLogFields(meta, intent)...)
start := s.clock.Now()
trigger := feesv1.Trigger_TRIGGER_UNSPECIFIED
if intent != nil {
trigger = intent.GetTrigger()
}
var fxUsed bool
defer func() {
statusLabel := statusFromError(err)
linesCount := 0
appliedCount := 0
if err == nil && resp != nil {
fxUsed = resp.GetFxUsed() != nil
linesCount = len(resp.GetLines())
appliedCount = len(resp.GetApplied())
}
observeMetrics("quote", trigger, statusLabel, fxUsed, time.Since(start))
logFields := []zap.Field{
zap.String("status", statusLabel),
zap.Duration("duration", time.Since(start)),
zap.Bool("fx_used", fxUsed),
zap.String("trigger", trigger.String()),
zap.Int("lines", linesCount),
zap.Int("applied_rules", appliedCount),
}
if err != nil {
logger.Warn("QuoteFees finished", append(logFields, zap.Error(err))...)
return
}
logger.Info("QuoteFees finished", logFields...)
}()
logger.Debug("QuoteFees request received")
if err = s.validateQuoteRequest(req); err != nil {
return nil, err
}
orgRef, parseErr := primitive.ObjectIDFromHex(req.GetMeta().GetOrganizationRef())
if parseErr != nil {
logger.Warn("QuoteFees invalid organization_ref", zap.Error(parseErr))
err = status.Error(codes.InvalidArgument, "invalid organization_ref")
return nil, err
}
lines, applied, fx, computeErr := s.computeQuote(ctx, orgRef, req.GetIntent(), req.GetPolicy(), req.GetMeta().GetTrace())
if computeErr != nil {
err = computeErr
return nil, err
}
resp = &feesv1.QuoteFeesResponse{
Meta: &feesv1.ResponseMeta{Trace: req.GetMeta().GetTrace()},
Lines: lines,
Applied: applied,
FxUsed: fx,
}
return resp, nil
}
func (s *Service) PrecomputeFees(ctx context.Context, req *feesv1.PrecomputeFeesRequest) (resp *feesv1.PrecomputeFeesResponse, err error) {
var (
meta *feesv1.RequestMeta
intent *feesv1.Intent
)
if req != nil {
meta = req.GetMeta()
intent = req.GetIntent()
}
logger := s.logger.With(requestLogFields(meta, intent)...)
start := s.clock.Now()
trigger := feesv1.Trigger_TRIGGER_UNSPECIFIED
if intent != nil {
trigger = intent.GetTrigger()
}
var (
fxUsed bool
expiresAt time.Time
)
defer func() {
statusLabel := statusFromError(err)
linesCount := 0
appliedCount := 0
if err == nil && resp != nil {
fxUsed = resp.GetFxUsed() != nil
linesCount = len(resp.GetLines())
appliedCount = len(resp.GetApplied())
if ts := resp.GetExpiresAt(); ts != nil {
expiresAt = ts.AsTime()
}
}
observeMetrics("precompute", trigger, statusLabel, fxUsed, time.Since(start))
logFields := []zap.Field{
zap.String("status", statusLabel),
zap.Duration("duration", time.Since(start)),
zap.Bool("fx_used", fxUsed),
zap.String("trigger", trigger.String()),
zap.Int("lines", linesCount),
zap.Int("applied_rules", appliedCount),
}
if !expiresAt.IsZero() {
logFields = append(logFields, zap.Time("expires_at", expiresAt))
}
if err != nil {
logger.Warn("PrecomputeFees finished", append(logFields, zap.Error(err))...)
return
}
logger.Info("PrecomputeFees finished", logFields...)
}()
logger.Debug("PrecomputeFees request received")
if err = s.validatePrecomputeRequest(req); err != nil {
return nil, err
}
now := s.clock.Now()
orgRef, parseErr := primitive.ObjectIDFromHex(req.GetMeta().GetOrganizationRef())
if parseErr != nil {
logger.Warn("PrecomputeFees invalid organization_ref", zap.Error(parseErr))
err = status.Error(codes.InvalidArgument, "invalid organization_ref")
return nil, err
}
lines, applied, fx, computeErr := s.computeQuoteWithTime(ctx, orgRef, req.GetIntent(), nil, req.GetMeta().GetTrace(), now)
if computeErr != nil {
err = computeErr
return nil, err
}
ttl := req.GetTtlMs()
if ttl <= 0 {
ttl = 60000
}
expiresAt = now.Add(time.Duration(ttl) * time.Millisecond)
payload := feeQuoteTokenPayload{
OrganizationRef: req.GetMeta().GetOrganizationRef(),
Intent: req.GetIntent(),
ExpiresAtUnixMs: expiresAt.UnixMilli(),
Trace: req.GetMeta().GetTrace(),
}
var token string
if token, err = encodeTokenPayload(payload); err != nil {
logger.Warn("failed to encode fee quote token", zap.Error(err))
err = status.Error(codes.Internal, "failed to encode fee quote token")
return nil, err
}
resp = &feesv1.PrecomputeFeesResponse{
Meta: &feesv1.ResponseMeta{Trace: req.GetMeta().GetTrace()},
FeeQuoteToken: token,
ExpiresAt: timestamppb.New(expiresAt),
Lines: lines,
Applied: applied,
FxUsed: fx,
}
return resp, nil
}
func (s *Service) ValidateFeeToken(ctx context.Context, req *feesv1.ValidateFeeTokenRequest) (resp *feesv1.ValidateFeeTokenResponse, err error) {
tokenLen := 0
if req != nil {
tokenLen = len(strings.TrimSpace(req.GetFeeQuoteToken()))
}
logger := s.logger.With(zap.Int("token_length", tokenLen))
start := s.clock.Now()
trigger := feesv1.Trigger_TRIGGER_UNSPECIFIED
var (
fxUsed bool
resultReason string
)
defer func() {
statusLabel := statusFromError(err)
if err == nil && resp != nil {
if !resp.GetValid() {
statusLabel = "invalid"
}
fxUsed = resp.GetFxUsed() != nil
if resp.GetIntent() != nil {
trigger = resp.GetIntent().GetTrigger()
}
}
observeMetrics("validate", trigger, statusLabel, fxUsed, time.Since(start))
logFields := []zap.Field{
zap.String("status", statusLabel),
zap.Duration("duration", time.Since(start)),
zap.Bool("fx_used", fxUsed),
zap.String("trigger", trigger.String()),
zap.Bool("valid", resp != nil && resp.GetValid()),
}
if resultReason != "" {
logFields = append(logFields, zap.String("reason", resultReason))
}
if err != nil {
logger.Warn("ValidateFeeToken finished", append(logFields, zap.Error(err))...)
return
}
logger.Info("ValidateFeeToken finished", logFields...)
}()
logger.Debug("ValidateFeeToken request received")
if req == nil || strings.TrimSpace(req.GetFeeQuoteToken()) == "" {
resultReason = "missing_token"
err = status.Error(codes.InvalidArgument, "fee_quote_token is required")
return nil, err
}
now := s.clock.Now()
payload, decodeErr := decodeTokenPayload(req.GetFeeQuoteToken())
if decodeErr != nil {
resultReason = "invalid_token"
logger.Warn("failed to decode fee quote token", zap.Error(decodeErr))
resp = &feesv1.ValidateFeeTokenResponse{Meta: &feesv1.ResponseMeta{}, Valid: false, Reason: "invalid_token"}
return resp, nil
}
logger = logger.With(logFieldsFromTokenPayload(&payload)...)
if payload.Intent != nil {
trigger = payload.Intent.GetTrigger()
}
if now.UnixMilli() > payload.ExpiresAtUnixMs {
resultReason = "expired"
logger.Info("fee quote token expired")
resp = &feesv1.ValidateFeeTokenResponse{Meta: &feesv1.ResponseMeta{}, Valid: false, Reason: "expired"}
return resp, nil
}
orgRef, parseErr := primitive.ObjectIDFromHex(payload.OrganizationRef)
if parseErr != nil {
resultReason = "invalid_token"
logger.Warn("token contained invalid organization reference", zap.Error(parseErr))
resp = &feesv1.ValidateFeeTokenResponse{Meta: &feesv1.ResponseMeta{}, Valid: false, Reason: "invalid_token"}
return resp, nil
}
lines, applied, fx, computeErr := s.computeQuoteWithTime(ctx, orgRef, payload.Intent, nil, payload.Trace, now)
if computeErr != nil {
err = computeErr
return nil, err
}
resp = &feesv1.ValidateFeeTokenResponse{
Meta: &feesv1.ResponseMeta{Trace: payload.Trace},
Valid: true,
Intent: payload.Intent,
Lines: lines,
Applied: applied,
FxUsed: fx,
}
return resp, nil
}
func (s *Service) validateQuoteRequest(req *feesv1.QuoteFeesRequest) error {
if req == nil {
return status.Error(codes.InvalidArgument, "request is required")
}
if req.GetMeta() == nil || strings.TrimSpace(req.GetMeta().GetOrganizationRef()) == "" {
return status.Error(codes.InvalidArgument, "meta.organization_ref is required")
}
if req.GetIntent() == nil {
return status.Error(codes.InvalidArgument, "intent is required")
}
if req.GetIntent().GetTrigger() == feesv1.Trigger_TRIGGER_UNSPECIFIED {
return status.Error(codes.InvalidArgument, "intent.trigger is required")
}
if req.GetIntent().GetBaseAmount() == nil {
return status.Error(codes.InvalidArgument, "intent.base_amount is required")
}
if strings.TrimSpace(req.GetIntent().GetBaseAmount().GetAmount()) == "" {
return status.Error(codes.InvalidArgument, "intent.base_amount.amount is required")
}
if strings.TrimSpace(req.GetIntent().GetBaseAmount().GetCurrency()) == "" {
return status.Error(codes.InvalidArgument, "intent.base_amount.currency is required")
}
return nil
}
func (s *Service) validatePrecomputeRequest(req *feesv1.PrecomputeFeesRequest) error {
if req == nil {
return status.Error(codes.InvalidArgument, "request is required")
}
return s.validateQuoteRequest(&feesv1.QuoteFeesRequest{Meta: req.GetMeta(), Intent: req.GetIntent()})
}
func (s *Service) computeQuote(ctx context.Context, orgRef primitive.ObjectID, intent *feesv1.Intent, overrides *feesv1.PolicyOverrides, trace *tracev1.TraceContext) ([]*feesv1.DerivedPostingLine, []*feesv1.AppliedRule, *feesv1.FXUsed, error) {
return s.computeQuoteWithTime(ctx, orgRef, intent, overrides, trace, s.clock.Now())
}
func (s *Service) computeQuoteWithTime(ctx context.Context, orgRef primitive.ObjectID, intent *feesv1.Intent, overrides *feesv1.PolicyOverrides, trace *tracev1.TraceContext, now time.Time) ([]*feesv1.DerivedPostingLine, []*feesv1.AppliedRule, *feesv1.FXUsed, error) {
bookedAt := now
if intent.GetBookedAt() != nil && intent.GetBookedAt().IsValid() {
bookedAt = intent.GetBookedAt().AsTime()
}
logFields := []zap.Field{
zap.Time("booked_at_used", bookedAt),
}
if !orgRef.IsZero() {
logFields = append(logFields, zap.String("organization_ref", orgRef.Hex()))
}
logFields = append(logFields, logFieldsFromIntent(intent)...)
logFields = append(logFields, logFieldsFromTrace(trace)...)
logger := s.logger.With(logFields...)
var orgPtr *primitive.ObjectID
if !orgRef.IsZero() {
orgPtr = &orgRef
}
plan, rule, err := s.resolver.ResolveFeeRule(ctx, orgPtr, convertTrigger(intent.GetTrigger()), bookedAt, intent.GetAttributes())
if err != nil {
switch {
case errors.Is(err, merrors.ErrNoData):
return nil, nil, nil, status.Error(codes.NotFound, "fee rule not found")
case errors.Is(err, merrors.ErrDataConflict):
return nil, nil, nil, status.Error(codes.FailedPrecondition, "conflicting fee rules")
case errors.Is(err, storage.ErrConflictingFeePlans):
return nil, nil, nil, status.Error(codes.FailedPrecondition, "conflicting fee plans")
case errors.Is(err, storage.ErrFeePlanNotFound):
return nil, nil, nil, status.Error(codes.NotFound, "fee plan not found")
default:
logger.Warn("failed to resolve fee rule", zap.Error(err))
return nil, nil, nil, status.Error(codes.Internal, "failed to resolve fee rule")
}
}
originalRules := plan.Rules
plan.Rules = []model.FeeRule{*rule}
defer func() {
plan.Rules = originalRules
}()
result, calcErr := s.calculator.Compute(ctx, plan, intent, bookedAt, trace)
if calcErr != nil {
if errors.Is(calcErr, merrors.ErrInvalidArg) {
return nil, nil, nil, status.Error(codes.InvalidArgument, calcErr.Error())
}
logger.Warn("failed to compute fee quote", zap.Error(calcErr))
return nil, nil, nil, status.Error(codes.Internal, "failed to compute fee quote")
}
return result.Lines, result.Applied, result.FxUsed, nil
}
type feeQuoteTokenPayload struct {
OrganizationRef string `json:"organization_ref"`
Intent *feesv1.Intent `json:"intent"`
ExpiresAtUnixMs int64 `json:"expires_at_unix_ms"`
Trace *tracev1.TraceContext `json:"trace,omitempty"`
}
func encodeTokenPayload(payload feeQuoteTokenPayload) (string, error) {
data, err := json.Marshal(payload)
if err != nil {
return "", merrors.Internal("fees: failed to serialize token payload")
}
return base64.StdEncoding.EncodeToString(data), nil
}
func decodeTokenPayload(token string) (feeQuoteTokenPayload, error) {
var payload feeQuoteTokenPayload
data, err := base64.StdEncoding.DecodeString(token)
if err != nil {
return payload, merrors.InvalidArgument("fees: invalid token encoding")
}
if err := json.Unmarshal(data, &payload); err != nil {
return payload, merrors.InvalidArgument("fees: invalid token payload")
}
return payload, nil
}