package fees import ( "context" "encoding/base64" "encoding/json" "errors" "strings" "time" internalcalculator "github.com/tech/sendico/billing/fees/internal/service/fees/internal/calculator" "github.com/tech/sendico/billing/fees/internal/service/fees/internal/resolver" "github.com/tech/sendico/billing/fees/storage" "github.com/tech/sendico/billing/fees/storage/model" oracleclient "github.com/tech/sendico/fx/oracle/client" "github.com/tech/sendico/pkg/api/routers" clockpkg "github.com/tech/sendico/pkg/clock" "github.com/tech/sendico/pkg/merrors" msg "github.com/tech/sendico/pkg/messaging" "github.com/tech/sendico/pkg/mlogger" feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" tracev1 "github.com/tech/sendico/pkg/proto/common/trace/v1" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) type Service struct { logger mlogger.Logger storage storage.Repository producer msg.Producer clock clockpkg.Clock calculator Calculator oracle oracleclient.Client resolver FeeResolver feesv1.UnimplementedFeeEngineServer } func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Producer, opts ...Option) *Service { svc := &Service{ logger: logger.Named("fees"), storage: repo, producer: producer, clock: clockpkg.NewSystem(), } initMetrics() for _, opt := range opts { opt(svc) } if svc.clock == nil { svc.clock = clockpkg.NewSystem() } if svc.calculator == nil { svc.calculator = internalcalculator.New(svc.logger, svc.oracle) } if svc.resolver == nil { svc.resolver = resolver.New(repo.Plans(), svc.logger) } return svc } func (s *Service) Register(router routers.GRPC) error { return router.Register(func(reg grpc.ServiceRegistrar) { feesv1.RegisterFeeEngineServer(reg, s) }) } func (s *Service) QuoteFees(ctx context.Context, req *feesv1.QuoteFeesRequest) (resp *feesv1.QuoteFeesResponse, err error) { var ( meta *feesv1.RequestMeta intent *feesv1.Intent ) if req != nil { meta = req.GetMeta() intent = req.GetIntent() } logger := s.logger.With(requestLogFields(meta, intent)...) start := s.clock.Now() trigger := feesv1.Trigger_TRIGGER_UNSPECIFIED if intent != nil { trigger = intent.GetTrigger() } var fxUsed bool defer func() { statusLabel := statusFromError(err) linesCount := 0 appliedCount := 0 if err == nil && resp != nil { fxUsed = resp.GetFxUsed() != nil linesCount = len(resp.GetLines()) appliedCount = len(resp.GetApplied()) } observeMetrics("quote", trigger, statusLabel, fxUsed, time.Since(start)) logFields := []zap.Field{ zap.String("status", statusLabel), zap.Duration("duration", time.Since(start)), zap.Bool("fx_used", fxUsed), zap.String("trigger", trigger.String()), zap.Int("lines", linesCount), zap.Int("applied_rules", appliedCount), } if err != nil { logger.Warn("QuoteFees finished", append(logFields, zap.Error(err))...) return } logger.Info("QuoteFees finished", logFields...) }() logger.Debug("QuoteFees request received") if err = s.validateQuoteRequest(req); err != nil { return nil, err } orgRef, parseErr := primitive.ObjectIDFromHex(req.GetMeta().GetOrganizationRef()) if parseErr != nil { logger.Warn("QuoteFees invalid organization_ref", zap.Error(parseErr)) err = status.Error(codes.InvalidArgument, "invalid organization_ref") return nil, err } lines, applied, fx, computeErr := s.computeQuote(ctx, orgRef, req.GetIntent(), req.GetPolicy(), req.GetMeta().GetTrace()) if computeErr != nil { err = computeErr return nil, err } resp = &feesv1.QuoteFeesResponse{ Meta: &feesv1.ResponseMeta{Trace: req.GetMeta().GetTrace()}, Lines: lines, Applied: applied, FxUsed: fx, } return resp, nil } func (s *Service) PrecomputeFees(ctx context.Context, req *feesv1.PrecomputeFeesRequest) (resp *feesv1.PrecomputeFeesResponse, err error) { var ( meta *feesv1.RequestMeta intent *feesv1.Intent ) if req != nil { meta = req.GetMeta() intent = req.GetIntent() } logger := s.logger.With(requestLogFields(meta, intent)...) start := s.clock.Now() trigger := feesv1.Trigger_TRIGGER_UNSPECIFIED if intent != nil { trigger = intent.GetTrigger() } var ( fxUsed bool expiresAt time.Time ) defer func() { statusLabel := statusFromError(err) linesCount := 0 appliedCount := 0 if err == nil && resp != nil { fxUsed = resp.GetFxUsed() != nil linesCount = len(resp.GetLines()) appliedCount = len(resp.GetApplied()) if ts := resp.GetExpiresAt(); ts != nil { expiresAt = ts.AsTime() } } observeMetrics("precompute", trigger, statusLabel, fxUsed, time.Since(start)) logFields := []zap.Field{ zap.String("status", statusLabel), zap.Duration("duration", time.Since(start)), zap.Bool("fx_used", fxUsed), zap.String("trigger", trigger.String()), zap.Int("lines", linesCount), zap.Int("applied_rules", appliedCount), } if !expiresAt.IsZero() { logFields = append(logFields, zap.Time("expires_at", expiresAt)) } if err != nil { logger.Warn("PrecomputeFees finished", append(logFields, zap.Error(err))...) return } logger.Info("PrecomputeFees finished", logFields...) }() logger.Debug("PrecomputeFees request received") if err = s.validatePrecomputeRequest(req); err != nil { return nil, err } now := s.clock.Now() orgRef, parseErr := primitive.ObjectIDFromHex(req.GetMeta().GetOrganizationRef()) if parseErr != nil { logger.Warn("PrecomputeFees invalid organization_ref", zap.Error(parseErr)) err = status.Error(codes.InvalidArgument, "invalid organization_ref") return nil, err } lines, applied, fx, computeErr := s.computeQuoteWithTime(ctx, orgRef, req.GetIntent(), nil, req.GetMeta().GetTrace(), now) if computeErr != nil { err = computeErr return nil, err } ttl := req.GetTtlMs() if ttl <= 0 { ttl = 60000 } expiresAt = now.Add(time.Duration(ttl) * time.Millisecond) payload := feeQuoteTokenPayload{ OrganizationRef: req.GetMeta().GetOrganizationRef(), Intent: req.GetIntent(), ExpiresAtUnixMs: expiresAt.UnixMilli(), Trace: req.GetMeta().GetTrace(), } var token string if token, err = encodeTokenPayload(payload); err != nil { logger.Warn("failed to encode fee quote token", zap.Error(err)) err = status.Error(codes.Internal, "failed to encode fee quote token") return nil, err } resp = &feesv1.PrecomputeFeesResponse{ Meta: &feesv1.ResponseMeta{Trace: req.GetMeta().GetTrace()}, FeeQuoteToken: token, ExpiresAt: timestamppb.New(expiresAt), Lines: lines, Applied: applied, FxUsed: fx, } return resp, nil } func (s *Service) ValidateFeeToken(ctx context.Context, req *feesv1.ValidateFeeTokenRequest) (resp *feesv1.ValidateFeeTokenResponse, err error) { tokenLen := 0 if req != nil { tokenLen = len(strings.TrimSpace(req.GetFeeQuoteToken())) } logger := s.logger.With(zap.Int("token_length", tokenLen)) start := s.clock.Now() trigger := feesv1.Trigger_TRIGGER_UNSPECIFIED var ( fxUsed bool resultReason string ) defer func() { statusLabel := statusFromError(err) if err == nil && resp != nil { if !resp.GetValid() { statusLabel = "invalid" } fxUsed = resp.GetFxUsed() != nil if resp.GetIntent() != nil { trigger = resp.GetIntent().GetTrigger() } } observeMetrics("validate", trigger, statusLabel, fxUsed, time.Since(start)) logFields := []zap.Field{ zap.String("status", statusLabel), zap.Duration("duration", time.Since(start)), zap.Bool("fx_used", fxUsed), zap.String("trigger", trigger.String()), zap.Bool("valid", resp != nil && resp.GetValid()), } if resultReason != "" { logFields = append(logFields, zap.String("reason", resultReason)) } if err != nil { logger.Warn("ValidateFeeToken finished", append(logFields, zap.Error(err))...) return } logger.Info("ValidateFeeToken finished", logFields...) }() logger.Debug("ValidateFeeToken request received") if req == nil || strings.TrimSpace(req.GetFeeQuoteToken()) == "" { resultReason = "missing_token" err = status.Error(codes.InvalidArgument, "fee_quote_token is required") return nil, err } now := s.clock.Now() payload, decodeErr := decodeTokenPayload(req.GetFeeQuoteToken()) if decodeErr != nil { resultReason = "invalid_token" logger.Warn("failed to decode fee quote token", zap.Error(decodeErr)) resp = &feesv1.ValidateFeeTokenResponse{Meta: &feesv1.ResponseMeta{}, Valid: false, Reason: "invalid_token"} return resp, nil } logger = logger.With(logFieldsFromTokenPayload(&payload)...) if payload.Intent != nil { trigger = payload.Intent.GetTrigger() } if now.UnixMilli() > payload.ExpiresAtUnixMs { resultReason = "expired" logger.Info("fee quote token expired") resp = &feesv1.ValidateFeeTokenResponse{Meta: &feesv1.ResponseMeta{}, Valid: false, Reason: "expired"} return resp, nil } orgRef, parseErr := primitive.ObjectIDFromHex(payload.OrganizationRef) if parseErr != nil { resultReason = "invalid_token" logger.Warn("token contained invalid organization reference", zap.Error(parseErr)) resp = &feesv1.ValidateFeeTokenResponse{Meta: &feesv1.ResponseMeta{}, Valid: false, Reason: "invalid_token"} return resp, nil } lines, applied, fx, computeErr := s.computeQuoteWithTime(ctx, orgRef, payload.Intent, nil, payload.Trace, now) if computeErr != nil { err = computeErr return nil, err } resp = &feesv1.ValidateFeeTokenResponse{ Meta: &feesv1.ResponseMeta{Trace: payload.Trace}, Valid: true, Intent: payload.Intent, Lines: lines, Applied: applied, FxUsed: fx, } return resp, nil } func (s *Service) validateQuoteRequest(req *feesv1.QuoteFeesRequest) error { if req == nil { return status.Error(codes.InvalidArgument, "request is required") } if req.GetMeta() == nil || strings.TrimSpace(req.GetMeta().GetOrganizationRef()) == "" { return status.Error(codes.InvalidArgument, "meta.organization_ref is required") } if req.GetIntent() == nil { return status.Error(codes.InvalidArgument, "intent is required") } if req.GetIntent().GetTrigger() == feesv1.Trigger_TRIGGER_UNSPECIFIED { return status.Error(codes.InvalidArgument, "intent.trigger is required") } if req.GetIntent().GetBaseAmount() == nil { return status.Error(codes.InvalidArgument, "intent.base_amount is required") } if strings.TrimSpace(req.GetIntent().GetBaseAmount().GetAmount()) == "" { return status.Error(codes.InvalidArgument, "intent.base_amount.amount is required") } if strings.TrimSpace(req.GetIntent().GetBaseAmount().GetCurrency()) == "" { return status.Error(codes.InvalidArgument, "intent.base_amount.currency is required") } return nil } func (s *Service) validatePrecomputeRequest(req *feesv1.PrecomputeFeesRequest) error { if req == nil { return status.Error(codes.InvalidArgument, "request is required") } return s.validateQuoteRequest(&feesv1.QuoteFeesRequest{Meta: req.GetMeta(), Intent: req.GetIntent()}) } func (s *Service) computeQuote(ctx context.Context, orgRef primitive.ObjectID, intent *feesv1.Intent, overrides *feesv1.PolicyOverrides, trace *tracev1.TraceContext) ([]*feesv1.DerivedPostingLine, []*feesv1.AppliedRule, *feesv1.FXUsed, error) { return s.computeQuoteWithTime(ctx, orgRef, intent, overrides, trace, s.clock.Now()) } func (s *Service) computeQuoteWithTime(ctx context.Context, orgRef primitive.ObjectID, intent *feesv1.Intent, overrides *feesv1.PolicyOverrides, trace *tracev1.TraceContext, now time.Time) ([]*feesv1.DerivedPostingLine, []*feesv1.AppliedRule, *feesv1.FXUsed, error) { bookedAt := now if intent.GetBookedAt() != nil && intent.GetBookedAt().IsValid() { bookedAt = intent.GetBookedAt().AsTime() } logFields := []zap.Field{ zap.Time("booked_at_used", bookedAt), } if !orgRef.IsZero() { logFields = append(logFields, zap.String("organization_ref", orgRef.Hex())) } logFields = append(logFields, logFieldsFromIntent(intent)...) logFields = append(logFields, logFieldsFromTrace(trace)...) logger := s.logger.With(logFields...) var orgPtr *primitive.ObjectID if !orgRef.IsZero() { orgPtr = &orgRef } plan, rule, err := s.resolver.ResolveFeeRule(ctx, orgPtr, convertTrigger(intent.GetTrigger()), bookedAt, intent.GetAttributes()) if err != nil { switch { case errors.Is(err, merrors.ErrNoData): return nil, nil, nil, status.Error(codes.NotFound, "fee rule not found") case errors.Is(err, merrors.ErrDataConflict): return nil, nil, nil, status.Error(codes.FailedPrecondition, "conflicting fee rules") case errors.Is(err, storage.ErrConflictingFeePlans): return nil, nil, nil, status.Error(codes.FailedPrecondition, "conflicting fee plans") case errors.Is(err, storage.ErrFeePlanNotFound): return nil, nil, nil, status.Error(codes.NotFound, "fee plan not found") default: logger.Warn("failed to resolve fee rule", zap.Error(err)) return nil, nil, nil, status.Error(codes.Internal, "failed to resolve fee rule") } } originalRules := plan.Rules plan.Rules = []model.FeeRule{*rule} defer func() { plan.Rules = originalRules }() result, calcErr := s.calculator.Compute(ctx, plan, intent, bookedAt, trace) if calcErr != nil { if errors.Is(calcErr, merrors.ErrInvalidArg) { return nil, nil, nil, status.Error(codes.InvalidArgument, calcErr.Error()) } logger.Warn("failed to compute fee quote", zap.Error(calcErr)) return nil, nil, nil, status.Error(codes.Internal, "failed to compute fee quote") } return result.Lines, result.Applied, result.FxUsed, nil } type feeQuoteTokenPayload struct { OrganizationRef string `json:"organization_ref"` Intent *feesv1.Intent `json:"intent"` ExpiresAtUnixMs int64 `json:"expires_at_unix_ms"` Trace *tracev1.TraceContext `json:"trace,omitempty"` } func encodeTokenPayload(payload feeQuoteTokenPayload) (string, error) { data, err := json.Marshal(payload) if err != nil { return "", merrors.Internal("fees: failed to serialize token payload") } return base64.StdEncoding.EncodeToString(data), nil } func decodeTokenPayload(token string) (feeQuoteTokenPayload, error) { var payload feeQuoteTokenPayload data, err := base64.StdEncoding.DecodeString(token) if err != nil { return payload, merrors.InvalidArgument("fees: invalid token encoding") } if err := json.Unmarshal(data, &payload); err != nil { return payload, merrors.InvalidArgument("fees: invalid token payload") } return payload, nil }