All checks were successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/chain_gateway Pipeline was successful
ci/woodpecker/push/fx/1 Pipeline was successful
ci/woodpecker/push/fx/2 Pipeline was successful
ci/woodpecker/push/nats Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful
137 lines
4.3 KiB
Go
137 lines
4.3 KiB
Go
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/tech/sendico/pkg/api/http/response"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
storageconfig "github.com/tech/sendico/server/internal/server/fileserviceimp/storage/config"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type AWSS3Storage struct {
|
|
logger mlogger.Logger
|
|
s3Client *s3.Client
|
|
bucketName string
|
|
directory string
|
|
service mservice.Type
|
|
}
|
|
|
|
func (storage *AWSS3Storage) Delete(ctx context.Context, objID string) error {
|
|
fullPath := filepath.Join(storage.directory, objID)
|
|
_, err := storage.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(storage.bucketName),
|
|
Key: aws.String(fullPath),
|
|
})
|
|
if err != nil {
|
|
storage.logger.Warn("Failed to delete file from AWS S3", zap.Error(err), zap.String("obj_ref", objID))
|
|
return err
|
|
}
|
|
|
|
// Wait for object to be deleted
|
|
waiter := s3.NewObjectNotExistsWaiter(storage.s3Client)
|
|
err = waiter.Wait(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(storage.bucketName),
|
|
Key: aws.String(fullPath),
|
|
}, 30) // 30 second timeout
|
|
if err != nil {
|
|
storage.logger.Warn("Error occurred while waiting for S3 file deletion", zap.Error(err), zap.String("obj_ref", objID))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (storage *AWSS3Storage) s3URL(fullPath string) string {
|
|
return fmt.Sprintf("https://%s.s3.amazonaws.com/%s", storage.bucketName, fullPath)
|
|
}
|
|
|
|
func (storage *AWSS3Storage) Save(ctx context.Context, file io.Reader, objID string) (string, error) {
|
|
fullPath := filepath.Join(storage.directory, objID)
|
|
buf := new(bytes.Buffer)
|
|
_, err := io.Copy(buf, file)
|
|
if err != nil {
|
|
storage.logger.Warn("Failed to read file content", zap.Error(err), zap.String("obj_ref", objID))
|
|
return "", err
|
|
}
|
|
|
|
_, err = storage.s3Client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(storage.bucketName),
|
|
Key: aws.String(fullPath),
|
|
Body: bytes.NewReader(buf.Bytes()),
|
|
})
|
|
if err != nil {
|
|
storage.logger.Warn("Failed to upload file to S3", zap.Error(err), zap.String("obj_ref", objID))
|
|
return "", err
|
|
}
|
|
|
|
s3URL := storage.s3URL(fullPath)
|
|
storage.logger.Info("File upload complete", zap.String("obj_ref", objID), zap.String("s3_url", s3URL))
|
|
return s3URL, nil
|
|
}
|
|
|
|
func (storage *AWSS3Storage) Get(ctx context.Context, objID string) http.HandlerFunc {
|
|
storage.logger.Warn("Indirect access to the object should be avoided", zap.String("obj_ref", objID))
|
|
fullPath := filepath.Join(storage.directory, objID)
|
|
_, err := storage.s3Client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(storage.bucketName),
|
|
Key: aws.String(fullPath),
|
|
})
|
|
if err != nil {
|
|
storage.logger.Warn("Failed to get file from S3", zap.Error(err), zap.String("obj_ref", objID))
|
|
return response.NotFound(storage.logger, storage.service, err.Error())
|
|
}
|
|
|
|
res := func(w http.ResponseWriter, r *http.Request) {
|
|
http.Redirect(w, r, storage.s3URL(fullPath), http.StatusFound)
|
|
}
|
|
return res
|
|
}
|
|
|
|
func CreateAWSS3Storage(logger mlogger.Logger, service mservice.Type, directory string, cfg storageconfig.AWSS3SConfig) (*AWSS3Storage, error) {
|
|
region := os.Getenv(cfg.RegionEnv)
|
|
accessKeyID := os.Getenv(cfg.AccessKeyIDEnv)
|
|
secretAccessKey := os.Getenv(cfg.SecretAccessKeyEnv)
|
|
bucketName := os.Getenv(cfg.BucketNameEnv)
|
|
|
|
// Create AWS config with static credentials
|
|
awsConfig, err := config.LoadDefaultConfig(context.Background(),
|
|
config.WithRegion(region),
|
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
|
accessKeyID,
|
|
secretAccessKey,
|
|
"",
|
|
)),
|
|
)
|
|
if err != nil {
|
|
logger.Warn("Failed to create AWS config", zap.Error(err), zap.String("bucket", bucketName),
|
|
zap.String("access_key_id", accessKeyID), zap.String("region", region))
|
|
return nil, err
|
|
}
|
|
|
|
// Create S3 client
|
|
s3Client := s3.NewFromConfig(awsConfig)
|
|
|
|
res := &AWSS3Storage{
|
|
logger: logger.Named("aws_s3").Named(directory),
|
|
s3Client: s3Client,
|
|
bucketName: bucketName,
|
|
directory: directory,
|
|
service: service,
|
|
}
|
|
res.logger.Info("Storage installed", zap.String("bucket", bucketName), zap.String("region", region),
|
|
zap.String("access_key_id", accessKeyID))
|
|
return res, nil
|
|
}
|