package gateway import ( "context" "strings" "time" "github.com/tech/sendico/pkg/api/routers/gsresponse" "github.com/tech/sendico/pkg/merrors" messaging "github.com/tech/sendico/pkg/messaging/envelope" "github.com/tech/sendico/pkg/model" nm "github.com/tech/sendico/pkg/model/notification" "github.com/tech/sendico/pkg/mservice" mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" "go.uber.org/zap" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/timestamppb" ) func (s *Service) SubmitPayout(ctx context.Context, req *mntxv1.SubmitPayoutRequest) (*mntxv1.SubmitPayoutResponse, error) { return executeUnary(ctx, s, "SubmitPayout", s.handleSubmitPayout, req) } func (s *Service) handleSubmitPayout(_ context.Context, req *mntxv1.SubmitPayoutRequest) gsresponse.Responder[mntxv1.SubmitPayoutResponse] { payout, err := s.buildPayout(req) if err != nil { return gsresponse.Auto[mntxv1.SubmitPayoutResponse](s.logger, mservice.MntxGateway, err) } s.store.Save(payout) s.emitEvent(payout, nm.NAPending) go s.completePayout(payout, strings.TrimSpace(req.GetSimulatedFailureReason())) return gsresponse.Success(&mntxv1.SubmitPayoutResponse{Payout: payout}) } func (s *Service) buildPayout(req *mntxv1.SubmitPayoutRequest) (*mntxv1.Payout, error) { if req == nil { return nil, newPayoutError("invalid_request", merrors.InvalidArgument("request cannot be empty")) } idempotencyKey := strings.TrimSpace(req.IdempotencyKey) if idempotencyKey == "" { return nil, newPayoutError("missing_idempotency_key", merrors.InvalidArgument("idempotency_key is required", "idempotency_key")) } orgRef := strings.TrimSpace(req.OrganizationRef) if orgRef == "" { return nil, newPayoutError("missing_organization_ref", merrors.InvalidArgument("organization_ref is required", "organization_ref")) } if err := validateAmount(req.Amount); err != nil { return nil, err } if err := validateDestination(req.Destination); err != nil { return nil, err } if reason := strings.TrimSpace(req.SimulatedFailureReason); reason != "" { return nil, newPayoutError(normalizeReason(reason), merrors.InvalidArgument("simulated payout failure requested")) } now := timestamppb.New(s.clock.Now()) payout := &mntxv1.Payout{ PayoutRef: newPayoutRef(), IdempotencyKey: idempotencyKey, OrganizationRef: orgRef, Destination: req.Destination, Amount: req.Amount, Description: strings.TrimSpace(req.Description), Metadata: req.Metadata, Status: mntxv1.PayoutStatus_PAYOUT_STATUS_PENDING, CreatedAt: now, UpdatedAt: now, } return payout, nil } func (s *Service) completePayout(original *mntxv1.Payout, simulatedFailure string) { outcome := clonePayout(original) if outcome == nil { return } // Simulate async processing delay for realism. time.Sleep(150 * time.Millisecond) outcome.UpdatedAt = timestamppb.New(s.clock.Now()) if simulatedFailure != "" { outcome.Status = mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED outcome.FailureReason = simulatedFailure observePayoutError(simulatedFailure, outcome.Amount) s.store.Save(outcome) s.emitEvent(outcome, nm.NAUpdated) return } outcome.Status = mntxv1.PayoutStatus_PAYOUT_STATUS_PROCESSED observePayoutSuccess(outcome.Amount) s.store.Save(outcome) s.emitEvent(outcome, nm.NAUpdated) } func (s *Service) emitEvent(payout *mntxv1.Payout, action nm.NotificationAction) { if payout == nil || s.producer == nil { return } payload, err := protojson.Marshal(&mntxv1.PayoutStatusChangedEvent{Payout: payout}) if err != nil { s.logger.Warn("failed to marshal payout event", zapError(err)) return } env := messaging.CreateEnvelope(string(mservice.MntxGateway), model.NewNotification(mservice.MntxGateway, action)) if _, err := env.Wrap(payload); err != nil { s.logger.Warn("failed to wrap payout event payload", zapError(err)) return } if err := s.producer.SendMessage(env); err != nil { s.logger.Warn("failed to publish payout event", zapError(err)) } } func zapError(err error) zap.Field { return zap.Error(err) }