Merge pull request 'improved logging in callbacks' (#605) from callbacks-604 into main
Some checks failed
ci/woodpecker/push/callbacks Pipeline failed
Some checks failed
ci/woodpecker/push/callbacks Pipeline failed
Reviewed-on: #605
This commit was merged in pull request #605.
This commit is contained in:
@@ -169,7 +169,19 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T
|
|||||||
|
|
||||||
if err := s.security.ValidateURL(ctx, task.EndpointURL); err != nil {
|
if err := s.security.ValidateURL(ctx, task.EndpointURL); err != nil {
|
||||||
result = "blocked"
|
result = "blocked"
|
||||||
_ = s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC())
|
s.logger.Warn("Blocked task delivery due to URL validation failure",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.String("event_id", task.EventID),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
if markErr := s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()); markErr != nil {
|
||||||
|
s.logger.Warn("Failed to mark blocked task as failed",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.Error(markErr),
|
||||||
|
)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -181,7 +193,20 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T
|
|||||||
signed, err := s.signer.Sign(ctx, task.SigningMode, task.SecretRef, task.Payload, time.Now().UTC())
|
signed, err := s.signer.Sign(ctx, task.SigningMode, task.SecretRef, task.Payload, time.Now().UTC())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result = "sign_error"
|
result = "sign_error"
|
||||||
_ = s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC())
|
s.logger.Warn("Failed to sign task payload",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.String("event_id", task.EventID),
|
||||||
|
zap.String("signing_mode", task.SigningMode),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
if markErr := s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()); markErr != nil {
|
||||||
|
s.logger.Warn("Failed to mark signing-error task as failed",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.Error(markErr),
|
||||||
|
)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,7 +216,20 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T
|
|||||||
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, task.EndpointURL, bytes.NewReader(signed.Body))
|
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, task.EndpointURL, bytes.NewReader(signed.Body))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result = "request_error"
|
result = "request_error"
|
||||||
_ = s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC())
|
s.logger.Warn("Failed to build callback request",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.String("event_id", task.EventID),
|
||||||
|
zap.String("endpoint_url", task.EndpointURL),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
if markErr := s.tasks.MarkFailed(ctx, task.ID, attempt, err.Error(), statusCode, time.Now().UTC()); markErr != nil {
|
||||||
|
s.logger.Warn("Failed to mark request-error task as failed",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.Error(markErr),
|
||||||
|
)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
@@ -225,6 +263,15 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T
|
|||||||
if reqErr == nil && statusCode > 0 {
|
if reqErr == nil && statusCode > 0 {
|
||||||
lastErr = "upstream returned retryable status"
|
lastErr = "upstream returned retryable status"
|
||||||
}
|
}
|
||||||
|
s.logger.Warn("Task delivery retry scheduled",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.String("event_id", task.EventID),
|
||||||
|
zap.Int("attempt", attempt),
|
||||||
|
zap.Int("status_code", statusCode),
|
||||||
|
zap.String("reason", lastErr),
|
||||||
|
zap.Time("next_attempt_at", next),
|
||||||
|
)
|
||||||
if err := s.tasks.MarkRetry(ctx, task.ID, attempt, next, lastErr, statusCode, now); err != nil {
|
if err := s.tasks.MarkRetry(ctx, task.ID, attempt, next, lastErr, statusCode, now); err != nil {
|
||||||
s.logger.Warn("Failed to mark task retry", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err))
|
s.logger.Warn("Failed to mark task retry", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err))
|
||||||
}
|
}
|
||||||
@@ -234,6 +281,15 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T
|
|||||||
if reqErr == nil && statusCode > 0 {
|
if reqErr == nil && statusCode > 0 {
|
||||||
lastErr = "upstream returned retryable status but max attempts reached"
|
lastErr = "upstream returned retryable status but max attempts reached"
|
||||||
}
|
}
|
||||||
|
s.logger.Warn("Task delivery failed after reaching max attempts",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.String("event_id", task.EventID),
|
||||||
|
zap.Int("attempt", attempt),
|
||||||
|
zap.Int("max_attempts", task.MaxAttempts),
|
||||||
|
zap.Int("status_code", statusCode),
|
||||||
|
zap.String("reason", lastErr),
|
||||||
|
)
|
||||||
if err := s.tasks.MarkFailed(ctx, task.ID, attempt, lastErr, statusCode, now); err != nil {
|
if err := s.tasks.MarkFailed(ctx, task.ID, attempt, lastErr, statusCode, now); err != nil {
|
||||||
s.logger.Warn("Failed to mark task failed", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err))
|
s.logger.Warn("Failed to mark task failed", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err))
|
||||||
}
|
}
|
||||||
@@ -244,6 +300,14 @@ func (s *service) handleTask(ctx context.Context, workerID string, task *model.T
|
|||||||
if reqErr == nil && statusCode > 0 {
|
if reqErr == nil && statusCode > 0 {
|
||||||
lastErr = "upstream returned non-retryable status"
|
lastErr = "upstream returned non-retryable status"
|
||||||
}
|
}
|
||||||
|
s.logger.Warn("Task delivery failed",
|
||||||
|
zap.String("worker_id", workerID),
|
||||||
|
zap.String("task_id", task.ID.Hex()),
|
||||||
|
zap.String("event_id", task.EventID),
|
||||||
|
zap.Int("attempt", attempt),
|
||||||
|
zap.Int("status_code", statusCode),
|
||||||
|
zap.String("reason", lastErr),
|
||||||
|
)
|
||||||
if err := s.tasks.MarkFailed(ctx, task.ID, attempt, lastErr, statusCode, now); err != nil {
|
if err := s.tasks.MarkFailed(ctx, task.ID, attempt, lastErr, statusCode, now); err != nil {
|
||||||
s.logger.Warn("Failed to mark task failed", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err))
|
s.logger.Warn("Failed to mark task failed", zap.String("worker_id", workerID), zap.String("task_id", task.ID.Hex()), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,9 +10,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tech/sendico/pkg/merrors"
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
|
logger mlogger.Logger
|
||||||
requireHTTPS bool
|
requireHTTPS bool
|
||||||
allowedHosts map[string]struct{}
|
allowedHosts map[string]struct{}
|
||||||
allowedPorts map[int]struct{}
|
allowedPorts map[int]struct{}
|
||||||
@@ -21,7 +24,11 @@ type service struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New creates URL validator.
|
// New creates URL validator.
|
||||||
func New(cfg Config) Validator {
|
func New(logger mlogger.Logger, cfg Config) Validator {
|
||||||
|
if logger == nil {
|
||||||
|
logger = zap.NewNop()
|
||||||
|
}
|
||||||
|
|
||||||
hosts := make(map[string]struct{}, len(cfg.AllowedHosts))
|
hosts := make(map[string]struct{}, len(cfg.AllowedHosts))
|
||||||
for _, host := range cfg.AllowedHosts {
|
for _, host := range cfg.AllowedHosts {
|
||||||
h := strings.ToLower(strings.TrimSpace(host))
|
h := strings.ToLower(strings.TrimSpace(host))
|
||||||
@@ -43,6 +50,7 @@ func New(cfg Config) Validator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &service{
|
return &service{
|
||||||
|
logger: logger.Named("security"),
|
||||||
requireHTTPS: cfg.RequireHTTPS,
|
requireHTTPS: cfg.RequireHTTPS,
|
||||||
allowedHosts: hosts,
|
allowedHosts: hosts,
|
||||||
allowedPorts: ports,
|
allowedPorts: ports,
|
||||||
@@ -54,24 +62,33 @@ func New(cfg Config) Validator {
|
|||||||
func (s *service) ValidateURL(ctx context.Context, target string) error {
|
func (s *service) ValidateURL(ctx context.Context, target string) error {
|
||||||
parsed, err := url.Parse(strings.TrimSpace(target))
|
parsed, err := url.Parse(strings.TrimSpace(target))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Warn("Failed to parse callback URL", zap.Error(err))
|
||||||
return merrors.InvalidArgumentWrap(err, "invalid callback URL", "url")
|
return merrors.InvalidArgumentWrap(err, "invalid callback URL", "url")
|
||||||
}
|
}
|
||||||
if parsed == nil || parsed.Host == "" {
|
if parsed == nil || parsed.Host == "" {
|
||||||
|
s.logger.Warn("Callback URL host is required")
|
||||||
return merrors.InvalidArgument("callback URL host is required", "url")
|
return merrors.InvalidArgument("callback URL host is required", "url")
|
||||||
}
|
}
|
||||||
if parsed.User != nil {
|
if parsed.User != nil {
|
||||||
|
s.logger.Warn("Rejected callback URL with credentials", zap.String("host", parsed.Hostname()))
|
||||||
return merrors.InvalidArgument("callback URL credentials are not allowed", "url")
|
return merrors.InvalidArgument("callback URL credentials are not allowed", "url")
|
||||||
}
|
}
|
||||||
if s.requireHTTPS && !strings.EqualFold(parsed.Scheme, "https") {
|
if s.requireHTTPS && !strings.EqualFold(parsed.Scheme, "https") {
|
||||||
|
s.logger.Warn("Rejected callback URL due to non-https scheme",
|
||||||
|
zap.String("scheme", parsed.Scheme),
|
||||||
|
zap.String("host", parsed.Hostname()),
|
||||||
|
)
|
||||||
return merrors.InvalidArgument("callback URL must use HTTPS", "url")
|
return merrors.InvalidArgument("callback URL must use HTTPS", "url")
|
||||||
}
|
}
|
||||||
|
|
||||||
host := strings.ToLower(strings.TrimSpace(parsed.Hostname()))
|
host := strings.ToLower(strings.TrimSpace(parsed.Hostname()))
|
||||||
if host == "" {
|
if host == "" {
|
||||||
|
s.logger.Warn("Callback URL host is empty")
|
||||||
return merrors.InvalidArgument("callback URL host is empty", "url")
|
return merrors.InvalidArgument("callback URL host is empty", "url")
|
||||||
}
|
}
|
||||||
if len(s.allowedHosts) > 0 {
|
if len(s.allowedHosts) > 0 {
|
||||||
if _, ok := s.allowedHosts[host]; !ok {
|
if _, ok := s.allowedHosts[host]; !ok {
|
||||||
|
s.logger.Warn("Rejected callback host not present in allowlist", zap.String("host", host))
|
||||||
return merrors.InvalidArgument("callback host is not in allowlist", "url.host")
|
return merrors.InvalidArgument("callback host is not in allowlist", "url.host")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -82,12 +99,20 @@ func (s *service) ValidateURL(ctx context.Context, target string) error {
|
|||||||
}
|
}
|
||||||
if len(s.allowedPorts) > 0 {
|
if len(s.allowedPorts) > 0 {
|
||||||
if _, ok := s.allowedPorts[port]; !ok {
|
if _, ok := s.allowedPorts[port]; !ok {
|
||||||
|
s.logger.Warn("Rejected callback URL port not present in allowlist",
|
||||||
|
zap.String("host", host),
|
||||||
|
zap.Int("port", port),
|
||||||
|
)
|
||||||
return merrors.InvalidArgument("callback URL port is not allowed", "url.port")
|
return merrors.InvalidArgument("callback URL port is not allowed", "url.port")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if addr, addrErr := netip.ParseAddr(host); addrErr == nil {
|
if addr, addrErr := netip.ParseAddr(host); addrErr == nil {
|
||||||
if isBlocked(addr) {
|
if isBlocked(addr) {
|
||||||
|
s.logger.Warn("Rejected callback URL with blocked IP address",
|
||||||
|
zap.String("host", host),
|
||||||
|
zap.String("ip", addr.String()),
|
||||||
|
)
|
||||||
return merrors.InvalidArgument("callback URL resolves to blocked IP range", "url")
|
return merrors.InvalidArgument("callback URL resolves to blocked IP range", "url")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -102,9 +127,11 @@ func (s *service) ValidateURL(ctx context.Context, target string) error {
|
|||||||
|
|
||||||
ips, err := s.resolver.LookupIPAddr(lookupCtx, host)
|
ips, err := s.resolver.LookupIPAddr(lookupCtx, host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Warn("Failed to resolve callback host", zap.String("host", host), zap.Error(err))
|
||||||
return merrors.InternalWrap(err, "failed to resolve callback host")
|
return merrors.InternalWrap(err, "failed to resolve callback host")
|
||||||
}
|
}
|
||||||
if len(ips) == 0 {
|
if len(ips) == 0 {
|
||||||
|
s.logger.Warn("Callback host did not resolve", zap.String("host", host))
|
||||||
return merrors.InvalidArgument("callback host did not resolve", "url.host")
|
return merrors.InvalidArgument("callback host did not resolve", "url.host")
|
||||||
}
|
}
|
||||||
for _, ip := range ips {
|
for _, ip := range ips {
|
||||||
@@ -113,6 +140,10 @@ func (s *service) ValidateURL(ctx context.Context, target string) error {
|
|||||||
}
|
}
|
||||||
addr, ok := netip.AddrFromSlice(ip.IP)
|
addr, ok := netip.AddrFromSlice(ip.IP)
|
||||||
if ok && isBlocked(addr) {
|
if ok && isBlocked(addr) {
|
||||||
|
s.logger.Warn("Rejected callback URL resolving to blocked IP address",
|
||||||
|
zap.String("host", host),
|
||||||
|
zap.String("ip", addr.String()),
|
||||||
|
)
|
||||||
return merrors.InvalidArgument("callback URL resolves to blocked IP range", "url.host")
|
return merrors.InvalidArgument("callback URL resolves to blocked IP range", "url.host")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -65,13 +65,16 @@ func (i *Imp) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
resolver, err := subscriptions.New(subscriptions.Dependencies{EndpointRepo: repo.Endpoints()})
|
resolver, err := subscriptions.New(subscriptions.Dependencies{
|
||||||
|
EndpointRepo: repo.Endpoints(),
|
||||||
|
Logger: i.logger,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
i.shutdownRuntime(context.Background())
|
i.shutdownRuntime(context.Background())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
securityValidator := security.New(security.Config{
|
securityValidator := security.New(i.logger, security.Config{
|
||||||
RequireHTTPS: cfg.Security.RequireHTTPS,
|
RequireHTTPS: cfg.Security.RequireHTTPS,
|
||||||
AllowedHosts: cfg.Security.AllowedHosts,
|
AllowedHosts: cfg.Security.AllowedHosts,
|
||||||
AllowedPorts: cfg.Security.AllowedPorts,
|
AllowedPorts: cfg.Security.AllowedPorts,
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
|
|
||||||
"github.com/tech/sendico/edge/callbacks/internal/model"
|
"github.com/tech/sendico/edge/callbacks/internal/model"
|
||||||
"github.com/tech/sendico/edge/callbacks/internal/storage"
|
"github.com/tech/sendico/edge/callbacks/internal/storage"
|
||||||
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -16,4 +17,5 @@ type Resolver interface {
|
|||||||
// Dependencies defines subscriptions resolver dependencies.
|
// Dependencies defines subscriptions resolver dependencies.
|
||||||
type Dependencies struct {
|
type Dependencies struct {
|
||||||
EndpointRepo storage.EndpointRepo
|
EndpointRepo storage.EndpointRepo
|
||||||
|
Logger mlogger.Logger
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,10 +7,13 @@ import (
|
|||||||
"github.com/tech/sendico/edge/callbacks/internal/model"
|
"github.com/tech/sendico/edge/callbacks/internal/model"
|
||||||
"github.com/tech/sendico/edge/callbacks/internal/storage"
|
"github.com/tech/sendico/edge/callbacks/internal/storage"
|
||||||
"github.com/tech/sendico/pkg/merrors"
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
|
logger mlogger.Logger
|
||||||
repo storage.EndpointRepo
|
repo storage.EndpointRepo
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -19,8 +22,15 @@ func New(deps Dependencies) (Resolver, error) {
|
|||||||
if deps.EndpointRepo == nil {
|
if deps.EndpointRepo == nil {
|
||||||
return nil, merrors.InvalidArgument("subscriptions: endpoint repo is required", "endpointRepo")
|
return nil, merrors.InvalidArgument("subscriptions: endpoint repo is required", "endpointRepo")
|
||||||
}
|
}
|
||||||
|
logger := deps.Logger
|
||||||
|
if logger == nil {
|
||||||
|
logger = zap.NewNop()
|
||||||
|
}
|
||||||
|
|
||||||
return &service{repo: deps.EndpointRepo}, nil
|
return &service{
|
||||||
|
logger: logger.Named("subscriptions"),
|
||||||
|
repo: deps.EndpointRepo,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Resolve(ctx context.Context, eventType string, organizationRef bson.ObjectID) ([]model.Endpoint, error) {
|
func (s *service) Resolve(ctx context.Context, eventType string, organizationRef bson.ObjectID) ([]model.Endpoint, error) {
|
||||||
@@ -33,8 +43,19 @@ func (s *service) Resolve(ctx context.Context, eventType string, organizationRef
|
|||||||
|
|
||||||
endpoints, err := s.repo.FindActive(ctx, eventType, organizationRef)
|
endpoints, err := s.repo.FindActive(ctx, eventType, organizationRef)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Warn("Failed to resolve active endpoints",
|
||||||
|
zap.String("event_type", eventType),
|
||||||
|
zap.String("organization_ref", organizationRef.Hex()),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Debug("Resolved active endpoints",
|
||||||
|
zap.String("event_type", eventType),
|
||||||
|
zap.String("organization_ref", organizationRef.Hex()),
|
||||||
|
zap.Int("endpoints", len(endpoints)),
|
||||||
|
)
|
||||||
|
|
||||||
return endpoints, nil
|
return endpoints, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user