From b10ec79fe02a28162ee10ddb8c9c4c62ae95e9cc Mon Sep 17 00:00:00 2001 From: Stephan D Date: Tue, 3 Mar 2026 00:26:51 +0100 Subject: [PATCH] improved logging in callbacks --- .../callbacks/internal/delivery/service.go | 70 ++++++++++++++++++- .../callbacks/internal/security/service.go | 33 ++++++++- .../internal/server/internal/serverimp.go | 7 +- .../internal/subscriptions/module.go | 2 + .../internal/subscriptions/service.go | 25 ++++++- 5 files changed, 129 insertions(+), 8 deletions(-) diff --git a/api/edge/callbacks/internal/delivery/service.go b/api/edge/callbacks/internal/delivery/service.go index e6cf2a98..6aaee3c7 100644 --- a/api/edge/callbacks/internal/delivery/service.go +++ b/api/edge/callbacks/internal/delivery/service.go @@ -169,7 +169,19 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T if err := s.security.ValidateURL(ctx, task.EndpointURL); err != nil { result = "blocked" - _ = s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()) + s.logger.Warn("Blocked task delivery due to URL validation failure", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.String("event_id", task.EventID), + zap.Error(err), + ) + if markErr := s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()); markErr != nil { + s.logger.Warn("Failed to mark blocked task as failed", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.Error(markErr), + ) + } return } @@ -181,7 +193,20 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T signed, err := s.signer.Sign(ctx, task.SigningMode, task.SecretRef, task.Payload, time.Now().UTC()) if err != nil { result = "sign_error" - _ = s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()) + s.logger.Warn("Failed to sign task payload", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.String("event_id", task.EventID), + zap.String("signing_mode", task.SigningMode), + zap.Error(err), + ) + if markErr := s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()); markErr != nil { + s.logger.Warn("Failed to mark signing-error task as failed", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.Error(markErr), + ) + } return } @@ -191,7 +216,20 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, task.EndpointURL, bytes.NewReader(signed.Body)) if err != nil { result = "request_error" - _ = s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()) + s.logger.Warn("Failed to build callback request", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.String("event_id", task.EventID), + zap.String("endpoint_url", task.EndpointURL), + zap.Error(err), + ) + if markErr := s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()); markErr != nil { + s.logger.Warn("Failed to mark request-error task as failed", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.Error(markErr), + ) + } return } req.Header.Set("Content-Type", "application/json") @@ -225,6 +263,15 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T if reqErr == nil && statusCode > 0 { lastErr = "upstream returned retryable status" } + s.logger.Warn("Task delivery retry scheduled", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.String("event_id", task.EventID), + zap.Int("attempt", attempt), + zap.Int("status_code", statusCode), + zap.String("reason", lastErr), + zap.Time("next_attempt_at", next), + ) if err := s.tasks.MarkRetry(ctx, task.ID, attempt, next, lastErr, statusCode, now); err != nil { s.logger.Warn("Failed to mark task retry", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err)) } @@ -234,6 +281,15 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T if reqErr == nil && statusCode > 0 { lastErr = "upstream returned retryable status but max attempts reached" } + s.logger.Warn("Task delivery failed after reaching max attempts", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.String("event_id", task.EventID), + zap.Int("attempt", attempt), + zap.Int("max_attempts", task.MaxAttempts), + zap.Int("status_code", statusCode), + zap.String("reason", lastErr), + ) if err := s.tasks.MarkFailed(ctx, task.ID, attempt, lastErr, statusCode, now); err != nil { s.logger.Warn("Failed to mark task failed", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err)) } @@ -244,6 +300,14 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T if reqErr == nil && statusCode > 0 { lastErr = "upstream returned non-retryable status" } + s.logger.Warn("Task delivery failed", + zap.String("worker_id", workerID), + zap.String("task_id", task.ID.Hex()), + zap.String("event_id", task.EventID), + zap.Int("attempt", attempt), + zap.Int("status_code", statusCode), + zap.String("reason", lastErr), + ) if err := s.tasks.MarkFailed(ctx, task.ID, attempt, lastErr, statusCode, now); err != nil { s.logger.Warn("Failed to mark task failed", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err)) } diff --git a/api/edge/callbacks/internal/security/service.go b/api/edge/callbacks/internal/security/service.go index fd33b95c..2aba30dd 100644 --- a/api/edge/callbacks/internal/security/service.go +++ b/api/edge/callbacks/internal/security/service.go @@ -10,9 +10,12 @@ import ( "time" "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + "go.uber.org/zap" ) type service struct { + logger mlogger.Logger requireHTTPS bool allowedHosts map[string]struct{} allowedPorts map[int]struct{} @@ -21,7 +24,11 @@ type service struct { } // New creates URL validator. -func New(cfg Config) Validator { +func New(logger mlogger.Logger, cfg Config) Validator { + if logger == nil { + logger = zap.NewNop() + } + hosts := make(map[string]struct{}, len(cfg.AllowedHosts)) for _, host := range cfg.AllowedHosts { h := strings.ToLower(strings.TrimSpace(host)) @@ -43,6 +50,7 @@ func New(cfg Config) Validator { } return &service{ + logger: logger.Named("security"), requireHTTPS: cfg.RequireHTTPS, allowedHosts: hosts, allowedPorts: ports, @@ -54,24 +62,33 @@ func New(cfg Config) Validator { func (s *service) ValidateURL(ctx context.Context, target string) error { parsed, err := url.Parse(strings.TrimSpace(target)) if err != nil { + s.logger.Warn("Failed to parse callback URL", zap.Error(err)) return merrors.InvalidArgumentWrap(err, "invalid callback URL", "url") } if parsed == nil || parsed.Host == "" { + s.logger.Warn("Callback URL host is required") return merrors.InvalidArgument("callback URL host is required", "url") } if parsed.User != nil { + s.logger.Warn("Rejected callback URL with credentials", zap.String("host", parsed.Hostname())) return merrors.InvalidArgument("callback URL credentials are not allowed", "url") } if s.requireHTTPS && !strings.EqualFold(parsed.Scheme, "https") { + s.logger.Warn("Rejected callback URL due to non-https scheme", + zap.String("scheme", parsed.Scheme), + zap.String("host", parsed.Hostname()), + ) return merrors.InvalidArgument("callback URL must use HTTPS", "url") } host := strings.ToLower(strings.TrimSpace(parsed.Hostname())) if host == "" { + s.logger.Warn("Callback URL host is empty") return merrors.InvalidArgument("callback URL host is empty", "url") } if len(s.allowedHosts) > 0 { if _, ok := s.allowedHosts[host]; !ok { + s.logger.Warn("Rejected callback host not present in allowlist", zap.String("host", host)) return merrors.InvalidArgument("callback host is not in allowlist", "url.host") } } @@ -82,12 +99,20 @@ func (s *service) ValidateURL(ctx context.Context, target string) error { } if len(s.allowedPorts) > 0 { if _, ok := s.allowedPorts[port]; !ok { + s.logger.Warn("Rejected callback URL port not present in allowlist", + zap.String("host", host), + zap.Int("port", port), + ) return merrors.InvalidArgument("callback URL port is not allowed", "url.port") } } if addr, addrErr := netip.ParseAddr(host); addrErr == nil { if isBlocked(addr) { + s.logger.Warn("Rejected callback URL with blocked IP address", + zap.String("host", host), + zap.String("ip", addr.String()), + ) return merrors.InvalidArgument("callback URL resolves to blocked IP range", "url") } return nil @@ -102,9 +127,11 @@ func (s *service) ValidateURL(ctx context.Context, target string) error { ips, err := s.resolver.LookupIPAddr(lookupCtx, host) if err != nil { + s.logger.Warn("Failed to resolve callback host", zap.String("host", host), zap.Error(err)) return merrors.InternalWrap(err, "failed to resolve callback host") } if len(ips) == 0 { + s.logger.Warn("Callback host did not resolve", zap.String("host", host)) return merrors.InvalidArgument("callback host did not resolve", "url.host") } for _, ip := range ips { @@ -113,6 +140,10 @@ func (s *service) ValidateURL(ctx context.Context, target string) error { } addr, ok := netip.AddrFromSlice(ip.IP) if ok && isBlocked(addr) { + s.logger.Warn("Rejected callback URL resolving to blocked IP address", + zap.String("host", host), + zap.String("ip", addr.String()), + ) return merrors.InvalidArgument("callback URL resolves to blocked IP range", "url.host") } } diff --git a/api/edge/callbacks/internal/server/internal/serverimp.go b/api/edge/callbacks/internal/server/internal/serverimp.go index 75f8fff7..83e00ea7 100644 --- a/api/edge/callbacks/internal/server/internal/serverimp.go +++ b/api/edge/callbacks/internal/server/internal/serverimp.go @@ -65,13 +65,16 @@ func (i *Imp) Start() error { return err } - resolver, err := subscriptions.New(subscriptions.Dependencies{EndpointRepo: repo.Endpoints()}) + resolver, err := subscriptions.New(subscriptions.Dependencies{ + EndpointRepo: repo.Endpoints(), + Logger: i.logger, + }) if err != nil { i.shutdownRuntime(context.Background()) return err } - securityValidator := security.New(security.Config{ + securityValidator := security.New(i.logger, security.Config{ RequireHTTPS: cfg.Security.RequireHTTPS, AllowedHosts: cfg.Security.AllowedHosts, AllowedPorts: cfg.Security.AllowedPorts, diff --git a/api/edge/callbacks/internal/subscriptions/module.go b/api/edge/callbacks/internal/subscriptions/module.go index f3cab9a7..6e56b8a2 100644 --- a/api/edge/callbacks/internal/subscriptions/module.go +++ b/api/edge/callbacks/internal/subscriptions/module.go @@ -5,6 +5,7 @@ import ( "github.com/tech/sendico/edge/callbacks/internal/model" "github.com/tech/sendico/edge/callbacks/internal/storage" + "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/v2/bson" ) @@ -16,4 +17,5 @@ type Resolver interface { // Dependencies defines subscriptions resolver dependencies. type Dependencies struct { EndpointRepo storage.EndpointRepo + Logger mlogger.Logger } diff --git a/api/edge/callbacks/internal/subscriptions/service.go b/api/edge/callbacks/internal/subscriptions/service.go index e5a21637..6d56ebb4 100644 --- a/api/edge/callbacks/internal/subscriptions/service.go +++ b/api/edge/callbacks/internal/subscriptions/service.go @@ -7,11 +7,14 @@ import ( "github.com/tech/sendico/edge/callbacks/internal/model" "github.com/tech/sendico/edge/callbacks/internal/storage" "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/v2/bson" + "go.uber.org/zap" ) type service struct { - repo storage.EndpointRepo + logger mlogger.Logger + repo storage.EndpointRepo } // New creates endpoint resolver service. @@ -19,8 +22,15 @@ func New(deps Dependencies) (Resolver, error) { if deps.EndpointRepo == nil { return nil, merrors.InvalidArgument("subscriptions: endpoint repo is required", "endpointRepo") } + logger := deps.Logger + if logger == nil { + logger = zap.NewNop() + } - return &service{repo: deps.EndpointRepo}, nil + return &service{ + logger: logger.Named("subscriptions"), + repo: deps.EndpointRepo, + }, nil } func (s *service) Resolve(ctx context.Context, eventType string, organizationRef bson.ObjectID) ([]model.Endpoint, error) { @@ -33,8 +43,19 @@ func (s *service) Resolve(ctx context.Context, eventType string, organizationRef endpoints, err := s.repo.FindActive(ctx, eventType, organizationRef) if err != nil { + s.logger.Warn("Failed to resolve active endpoints", + zap.String("event_type", eventType), + zap.String("organization_ref", organizationRef.Hex()), + zap.Error(err), + ) return nil, err } + s.logger.Debug("Resolved active endpoints", + zap.String("event_type", eventType), + zap.String("organization_ref", organizationRef.Hex()), + zap.Int("endpoints", len(endpoints)), + ) + return endpoints, nil }