148 lines
4.1 KiB
Go
148 lines
4.1 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
"github.com/tech/sendico/pkg/api/http/response"
|
|
"github.com/tech/sendico/pkg/domainprovider"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
"github.com/tech/sendico/pkg/mutil/fr"
|
|
"github.com/tech/sendico/server/internal/server/fileserviceimp/storage/config"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type LocalStorage struct {
|
|
logger mlogger.Logger
|
|
storageDir string
|
|
subDir string
|
|
directory string
|
|
dp domainprovider.DomainProvider
|
|
service mservice.Type
|
|
}
|
|
|
|
func (storage *LocalStorage) Delete(ctx context.Context, objID string) error {
|
|
// Check if context is cancelled
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
filePath := filepath.Join(storage.storageDir, objID)
|
|
if err := os.Remove(filePath); err != nil {
|
|
if os.IsNotExist(err) {
|
|
storage.logger.Debug("File not found", zap.String("obj_ref", objID))
|
|
return merrors.NoData("file_not_found")
|
|
}
|
|
storage.logger.Warn("Error occurred while accesing file", zap.Error(err), zap.String("storage", storage.storageDir), zap.String("obj_ref", objID))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (storage *LocalStorage) Save(ctx context.Context, file io.Reader, objID string) (string, error) {
|
|
// Check if context is cancelled
|
|
select {
|
|
case <-ctx.Done():
|
|
return "", ctx.Err()
|
|
default:
|
|
}
|
|
|
|
filePath := filepath.Join(storage.storageDir, objID)
|
|
dst, err := os.Create(filePath)
|
|
if err != nil {
|
|
storage.logger.Warn("Error occurred while creating file", zap.Error(err), zap.String("storage", storage.storageDir), zap.String("obj_ref", objID))
|
|
return "", err
|
|
}
|
|
defer fr.CloseFile(storage.logger, dst)
|
|
|
|
// Use a goroutine to copy the file and monitor context cancellation
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
_, err := io.Copy(dst, file)
|
|
errCh <- err
|
|
}()
|
|
|
|
// Wait for either completion or context cancellation
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
storage.logger.Warn("Error occurred while saving file", zap.Error(err), zap.String("obj_ref", objID))
|
|
return "", err
|
|
}
|
|
case <-ctx.Done():
|
|
// Context was cancelled, clean up the partial file
|
|
os.Remove(filePath)
|
|
return "", ctx.Err()
|
|
}
|
|
|
|
return storage.dp.GetAPILink(storage.directory, storage.subDir, objID)
|
|
}
|
|
|
|
func (storage *LocalStorage) Get(ctx context.Context, objRef string) http.HandlerFunc {
|
|
// Check if context is cancelled
|
|
select {
|
|
case <-ctx.Done():
|
|
return response.Internal(storage.logger, storage.service, ctx.Err())
|
|
default:
|
|
}
|
|
|
|
filePath := filepath.Join(storage.storageDir, objRef)
|
|
if _, err := os.Stat(filePath); err != nil {
|
|
storage.logger.Warn("Failed to access file", zap.Error(err), zap.String("storage", storage.storageDir), zap.String("obj_ref", objRef))
|
|
return response.Internal(storage.logger, storage.service, err)
|
|
}
|
|
|
|
res := func(w http.ResponseWriter, r *http.Request) {
|
|
// Check if the request context is cancelled
|
|
select {
|
|
case <-r.Context().Done():
|
|
storage.logger.Warn("Request canceleed", zap.Error(r.Context().Err()), zap.String("obj_ref", objRef))
|
|
http.Error(w, "Request cancelled", http.StatusRequestTimeout)
|
|
return
|
|
default:
|
|
}
|
|
http.ServeFile(w, r, filePath)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func ensureDir(dirName string) error {
|
|
info, err := os.Stat(dirName)
|
|
if os.IsNotExist(err) {
|
|
return os.MkdirAll(dirName, 0o755)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !info.IsDir() {
|
|
return &os.PathError{Op: "mkdir", Path: dirName, Err: os.ErrExist}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func CreateLocalFileStorage(logger mlogger.Logger, service mservice.Type, directory, subDir string, dp domainprovider.DomainProvider, cfg config.LocalFSSConfig) (*LocalStorage, error) {
|
|
dir := filepath.Join(cfg.RootPath, directory)
|
|
if err := ensureDir(dir); err != nil {
|
|
logger.Warn("Failed to check directory availability", zap.Error(err), zap.String("dir", dir))
|
|
return nil, err
|
|
}
|
|
res := &LocalStorage{
|
|
logger: logger.Named("lfs").Named(directory),
|
|
storageDir: dir,
|
|
directory: directory,
|
|
subDir: subDir,
|
|
dp: dp,
|
|
service: service,
|
|
}
|
|
res.logger.Info("Storage installed", zap.String("root_path", cfg.RootPath), zap.String("directory", directory))
|
|
return res, nil
|
|
}
|