133 lines
3.0 KiB
Go
133 lines
3.0 KiB
Go
package docstore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type S3Store struct {
|
|
logger mlogger.Logger
|
|
client *s3.Client
|
|
bucket string
|
|
}
|
|
|
|
func NewS3Store(logger mlogger.Logger, cfg S3Config) (*S3Store, error) {
|
|
bucket := strings.TrimSpace(cfg.Bucket)
|
|
if bucket == "" {
|
|
return nil, merrors.InvalidArgument("docstore: bucket is required")
|
|
}
|
|
|
|
accessKey := strings.TrimSpace(cfg.AccessKey)
|
|
if accessKey == "" && cfg.AccessKeyEnv != "" {
|
|
accessKey = strings.TrimSpace(os.Getenv(cfg.AccessKeyEnv))
|
|
}
|
|
|
|
secretKey := strings.TrimSpace(cfg.SecretAccessKey)
|
|
if secretKey == "" && cfg.SecretKeyEnv != "" {
|
|
secretKey = strings.TrimSpace(os.Getenv(cfg.SecretKeyEnv))
|
|
}
|
|
|
|
region := strings.TrimSpace(cfg.Region)
|
|
if region == "" {
|
|
region = "us-east-1"
|
|
}
|
|
|
|
loadOpts := []func(*config.LoadOptions) error{
|
|
config.WithRegion(region),
|
|
}
|
|
if accessKey != "" || secretKey != "" {
|
|
loadOpts = append(loadOpts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
|
accessKey,
|
|
secretKey,
|
|
"",
|
|
)))
|
|
}
|
|
|
|
endpoint := strings.TrimSpace(cfg.Endpoint)
|
|
if endpoint != "" {
|
|
if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") {
|
|
if cfg.UseSSL {
|
|
endpoint = "https://" + endpoint
|
|
} else {
|
|
endpoint = "http://" + endpoint
|
|
}
|
|
}
|
|
}
|
|
|
|
awsCfg, err := config.LoadDefaultConfig(context.Background(), loadOpts...)
|
|
if err != nil {
|
|
logger.Warn("Failed to create AWS config", zap.Error(err), zap.String("bucket", bucket))
|
|
|
|
return nil, err
|
|
}
|
|
|
|
client := s3.NewFromConfig(awsCfg, func(opts *s3.Options) {
|
|
opts.UsePathStyle = cfg.ForcePathStyle
|
|
|
|
if endpoint != "" {
|
|
opts.BaseEndpoint = aws.String(endpoint)
|
|
}
|
|
})
|
|
|
|
store := &S3Store{
|
|
logger: logger.Named("docstore").Named("s3"),
|
|
client: client,
|
|
bucket: bucket,
|
|
}
|
|
store.logger.Info("Document storage initialised", zap.String("bucket", bucket), zap.String("endpoint", endpoint))
|
|
|
|
return store, nil
|
|
}
|
|
|
|
func (s *S3Store) Save(ctx context.Context, key string, data []byte) error {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
Body: bytes.NewReader(data),
|
|
})
|
|
if err != nil {
|
|
s.logger.Warn("Failed to upload document", zap.Error(err), zap.String("key", key))
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *S3Store) Load(ctx context.Context, key string) ([]byte, error) {
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
obj, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
s.logger.Warn("Failed to fetch document", zap.Error(err), zap.String("key", key))
|
|
|
|
return nil, err
|
|
}
|
|
|
|
defer obj.Body.Close()
|
|
|
|
return io.ReadAll(obj.Body)
|
|
}
|
|
|
|
var _ Store = (*S3Store)(nil)
|