132 lines
4.0 KiB
Go
132 lines
4.0 KiB
Go
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
messaging "github.com/tech/sendico/pkg/messaging/envelope"
|
|
"github.com/tech/sendico/pkg/model"
|
|
nm "github.com/tech/sendico/pkg/model/notification"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/encoding/protojson"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
func (s *Service) SubmitPayout(ctx context.Context, req *mntxv1.SubmitPayoutRequest) (*mntxv1.SubmitPayoutResponse, error) {
|
|
return executeUnary(ctx, s, "SubmitPayout", s.handleSubmitPayout, req)
|
|
}
|
|
|
|
func (s *Service) handleSubmitPayout(_ context.Context, req *mntxv1.SubmitPayoutRequest) gsresponse.Responder[mntxv1.SubmitPayoutResponse] {
|
|
payout, err := s.buildPayout(req)
|
|
if err != nil {
|
|
return gsresponse.Auto[mntxv1.SubmitPayoutResponse](s.logger, mservice.MntxGateway, err)
|
|
}
|
|
|
|
s.store.Save(payout)
|
|
s.emitEvent(payout, nm.NAPending)
|
|
go s.completePayout(payout, strings.TrimSpace(req.GetSimulatedFailureReason()))
|
|
|
|
return gsresponse.Success(&mntxv1.SubmitPayoutResponse{Payout: payout})
|
|
}
|
|
|
|
func (s *Service) buildPayout(req *mntxv1.SubmitPayoutRequest) (*mntxv1.Payout, error) {
|
|
if req == nil {
|
|
return nil, newPayoutError("invalid_request", merrors.InvalidArgument("request cannot be empty"))
|
|
}
|
|
|
|
idempotencyKey := strings.TrimSpace(req.IdempotencyKey)
|
|
if idempotencyKey == "" {
|
|
return nil, newPayoutError("missing_idempotency_key", merrors.InvalidArgument("idempotency_key is required", "idempotency_key"))
|
|
}
|
|
|
|
orgRef := strings.TrimSpace(req.OrganizationRef)
|
|
if orgRef == "" {
|
|
return nil, newPayoutError("missing_organization_ref", merrors.InvalidArgument("organization_ref is required", "organization_ref"))
|
|
}
|
|
|
|
if err := validateAmount(req.Amount); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := validateDestination(req.Destination); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if reason := strings.TrimSpace(req.SimulatedFailureReason); reason != "" {
|
|
return nil, newPayoutError(normalizeReason(reason), merrors.InvalidArgument("simulated payout failure requested"))
|
|
}
|
|
|
|
now := timestamppb.New(s.clock.Now())
|
|
payout := &mntxv1.Payout{
|
|
PayoutRef: newPayoutRef(),
|
|
IdempotencyKey: idempotencyKey,
|
|
OrganizationRef: orgRef,
|
|
Destination: req.Destination,
|
|
Amount: req.Amount,
|
|
Description: strings.TrimSpace(req.Description),
|
|
Metadata: req.Metadata,
|
|
Status: mntxv1.PayoutStatus_PAYOUT_STATUS_PENDING,
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
|
|
return payout, nil
|
|
}
|
|
|
|
func (s *Service) completePayout(original *mntxv1.Payout, simulatedFailure string) {
|
|
outcome := clonePayout(original)
|
|
if outcome == nil {
|
|
return
|
|
}
|
|
|
|
// Simulate async processing delay for realism.
|
|
time.Sleep(150 * time.Millisecond)
|
|
|
|
outcome.UpdatedAt = timestamppb.New(s.clock.Now())
|
|
|
|
if simulatedFailure != "" {
|
|
outcome.Status = mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED
|
|
outcome.FailureReason = simulatedFailure
|
|
observePayoutError(simulatedFailure, outcome.Amount)
|
|
s.store.Save(outcome)
|
|
s.emitEvent(outcome, nm.NAUpdated)
|
|
return
|
|
}
|
|
|
|
outcome.Status = mntxv1.PayoutStatus_PAYOUT_STATUS_PROCESSED
|
|
observePayoutSuccess(outcome.Amount)
|
|
s.store.Save(outcome)
|
|
s.emitEvent(outcome, nm.NAUpdated)
|
|
}
|
|
|
|
func (s *Service) emitEvent(payout *mntxv1.Payout, action nm.NotificationAction) {
|
|
if payout == nil || s.producer == nil {
|
|
return
|
|
}
|
|
|
|
payload, err := protojson.Marshal(&mntxv1.PayoutStatusChangedEvent{Payout: payout})
|
|
if err != nil {
|
|
s.logger.Warn("failed to marshal payout event", zapError(err))
|
|
return
|
|
}
|
|
|
|
env := messaging.CreateEnvelope(string(mservice.MntxGateway), model.NewNotification(mservice.MntxGateway, action))
|
|
if _, err := env.Wrap(payload); err != nil {
|
|
s.logger.Warn("failed to wrap payout event payload", zapError(err))
|
|
return
|
|
}
|
|
|
|
if err := s.producer.SendMessage(env); err != nil {
|
|
s.logger.Warn("failed to publish payout event", zapError(err))
|
|
}
|
|
}
|
|
|
|
func zapError(err error) zap.Field {
|
|
return zap.Error(err)
|
|
}
|