70 lines
2.0 KiB
Go
70 lines
2.0 KiB
Go
package ws
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/server/interface/api/ws"
|
|
ac "github.com/tech/sendico/server/internal/api/config"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/net/websocket"
|
|
)
|
|
|
|
type DispatcherImpl struct {
|
|
logger mlogger.Logger
|
|
handlers map[string]ws.HandlerFunc
|
|
timeout int
|
|
}
|
|
|
|
func (d *DispatcherImpl) InstallHandler(messageType string, handler ws.HandlerFunc) {
|
|
d.handlers[messageType] = handler
|
|
d.logger.Info("Handler installed", zap.String("message_type", messageType))
|
|
}
|
|
|
|
func (d *DispatcherImpl) dispatchMessage(ctx context.Context, conn *websocket.Conn) {
|
|
var msg ws.Message
|
|
err := websocket.JSON.Receive(conn, &msg)
|
|
if err != nil {
|
|
d.logger.Warn("Failed to read websocket message", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if handler, exists := d.handlers[msg.MessageType]; exists {
|
|
responseHandler := handler(ctx, msg)
|
|
responseHandler(msg.MessageType, conn)
|
|
} else {
|
|
d.logger.Warn("Unknown websocket message type", zap.String("message_type", msg.MessageType), zap.Any("message", &msg))
|
|
}
|
|
}
|
|
|
|
func (d *DispatcherImpl) handle(w http.ResponseWriter, r *http.Request) {
|
|
//nolint:contextcheck // websocket.Handler callback signature does not carry request context.
|
|
websocket.Handler(func(conn *websocket.Conn) {
|
|
ctx, cancel := context.WithTimeout(r.Context(), time.Duration(d.timeout)*time.Second)
|
|
defer cancel()
|
|
d.dispatchMessage(ctx, conn)
|
|
}).ServeHTTP(w, r)
|
|
}
|
|
|
|
func NewDispatcher(logger mlogger.Logger, router chi.Router, config *ac.WebSocketConfig, apiEndpoint string) *DispatcherImpl {
|
|
d := &DispatcherImpl{
|
|
logger: logger.Named("websocket"),
|
|
handlers: make(map[string]ws.HandlerFunc),
|
|
timeout: config.Timeout,
|
|
}
|
|
|
|
d.logger.Debug("Installing websocket middleware...")
|
|
router.Group(func(r chi.Router) {
|
|
ep := fmt.Sprintf("%s%s", apiEndpoint, os.Getenv(config.EndpointEnv))
|
|
d.logger.Info("Installing websockets handler", zap.String("endpoint", ep))
|
|
r.Get(ep, d.handle)
|
|
})
|
|
|
|
return d
|
|
}
|