-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from isd-sgcu/feature/s3client
Feature/s3client
- Loading branch information
Showing
13 changed files
with
395 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package bucket | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager" | ||
"github.com/aws/aws-sdk-go-v2/service/s3" | ||
"github.com/aws/aws-sdk-go-v2/service/s3/types" | ||
"github.com/isd-sgcu/johnjud-file/cfgldr" | ||
"github.com/pkg/errors" | ||
"github.com/rs/zerolog/log" | ||
) | ||
|
||
type Client struct { | ||
conf cfgldr.S3 | ||
s3 *s3.Client | ||
} | ||
|
||
func NewClient(conf cfgldr.S3, awsClient *s3.Client) *Client { | ||
return &Client{conf: conf, s3: awsClient} | ||
} | ||
|
||
func (c *Client) Upload(file []byte, objectKey string) (string, string, error) { | ||
ctx := context.Background() | ||
ctx, cancel := context.WithTimeout(ctx, 50*time.Second) | ||
defer cancel() | ||
|
||
buffer := bytes.NewReader(file) | ||
var partMiBs int64 = 10 | ||
uploader := manager.NewUploader(c.s3, func(u *manager.Uploader) { | ||
u.PartSize = partMiBs * 1024 * 1024 | ||
}) | ||
|
||
uploadOutput, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ | ||
Bucket: aws.String(c.conf.BucketName), | ||
Key: aws.String(objectKey), | ||
Body: buffer, | ||
}) | ||
|
||
if err != nil { | ||
log.Error(). | ||
Err(err). | ||
Str("service", "file"). | ||
Str("module", "bucket client"). | ||
Msgf("Couldn't upload object to %v:%v.", c.conf.BucketName, objectKey) | ||
|
||
return "", "", errors.Wrap(err, "Error while uploading the object") | ||
} | ||
|
||
return fmt.Sprintf("https://%v.s3.%v.amazonaws.com/%v", c.conf.Region, c.conf.BucketName, uploadOutput.Key), *uploadOutput.Key, nil | ||
} | ||
|
||
func (c *Client) Delete(objectKey string) error { | ||
ctx := context.Background() | ||
ctx, cancel := context.WithTimeout(ctx, 50*time.Second) | ||
defer cancel() | ||
|
||
var objectIds []types.ObjectIdentifier | ||
objectIds = append(objectIds, types.ObjectIdentifier{Key: aws.String(objectKey)}) | ||
|
||
_, err := c.s3.DeleteObjects(context.TODO(), &s3.DeleteObjectsInput{ | ||
Bucket: aws.String(c.conf.BucketName), | ||
Delete: &types.Delete{Objects: objectIds}, | ||
}) | ||
|
||
if err != nil { | ||
log.Error(). | ||
Err(err). | ||
Str("service", "file"). | ||
Str("module", "bucket client"). | ||
Msgf("Couldn't delete object from bucket %v:%v.", c.conf.BucketName, objectKey) | ||
|
||
return errors.Wrap(err, "Error while deleting the object") | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,168 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"os" | ||
"os/signal" | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/config" | ||
"github.com/aws/aws-sdk-go-v2/service/s3" | ||
"github.com/isd-sgcu/johnjud-file/cfgldr" | ||
"github.com/isd-sgcu/johnjud-file/database" | ||
"github.com/isd-sgcu/johnjud-file/pkg/client/bucket" | ||
imageRepo "github.com/isd-sgcu/johnjud-file/pkg/repository/image" | ||
imageSvc "github.com/isd-sgcu/johnjud-file/pkg/service/image" | ||
imagePb "github.com/isd-sgcu/johnjud-go-proto/johnjud/file/image/v1" | ||
"github.com/rs/zerolog/log" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/health" | ||
"google.golang.org/grpc/health/grpc_health_v1" | ||
"google.golang.org/grpc/reflection" | ||
) | ||
|
||
type operation func(ctx context.Context) error | ||
|
||
func gracefulShutdown(ctx context.Context, timeout time.Duration, ops map[string]operation) <-chan struct{} { | ||
wait := make(chan struct{}) | ||
go func() { | ||
s := make(chan os.Signal, 1) | ||
|
||
signal.Notify(s, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) | ||
sig := <-s | ||
|
||
log.Info(). | ||
Str("service", "graceful shutdown"). | ||
Msgf("got signal \"%v\" shutting down service", sig) | ||
|
||
timeoutFunc := time.AfterFunc(timeout, func() { | ||
log.Error(). | ||
Str("service", "graceful shutdown"). | ||
Msgf("timeout %v ms has been elapsed, force exit", timeout.Milliseconds()) | ||
os.Exit(0) | ||
}) | ||
|
||
defer timeoutFunc.Stop() | ||
|
||
var wg sync.WaitGroup | ||
|
||
for key, op := range ops { | ||
wg.Add(1) | ||
innerOp := op | ||
innerKey := key | ||
go func() { | ||
defer wg.Done() | ||
|
||
log.Info(). | ||
Str("service", "graceful shutdown"). | ||
Msgf("cleaning up: %v", innerKey) | ||
if err := innerOp(ctx); err != nil { | ||
log.Error(). | ||
Str("service", "graceful shutdown"). | ||
Err(err). | ||
Msgf("%v: clean up failed: %v", innerKey, err.Error()) | ||
return | ||
} | ||
|
||
log.Info(). | ||
Str("service", "graceful shutdown"). | ||
Msgf("%v was shutdown gracefully", innerKey) | ||
}() | ||
} | ||
|
||
wg.Wait() | ||
close(wait) | ||
}() | ||
|
||
return wait | ||
} | ||
|
||
func main() { | ||
// Code | ||
conf, err := cfgldr.LoadConfig() | ||
if err != nil { | ||
log.Fatal(). | ||
Err(err). | ||
Str("service", "file"). | ||
Msg("Failed to load config") | ||
} | ||
|
||
db, err := database.InitPostgresDatabase(&conf.Database, conf.App.Debug) | ||
if err != nil { | ||
log.Fatal(). | ||
Err(err). | ||
Str("service", "file"). | ||
Msg("Failed to init postgres connection") | ||
} | ||
|
||
sdkConfig, err := config.LoadDefaultConfig(context.TODO()) | ||
if err != nil { | ||
log.Fatal(). | ||
Err(err). | ||
Str("service", "file"). | ||
Msg("Failed to load AWS SDK config") | ||
return | ||
} | ||
|
||
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", conf.App.Port)) | ||
if err != nil { | ||
log.Fatal(). | ||
Err(err). | ||
Str("service", "file"). | ||
Msg("Failed to start service") | ||
} | ||
|
||
grpcServer := grpc.NewServer() | ||
|
||
awsClient := s3.NewFromConfig(sdkConfig) | ||
bucketClient := bucket.NewClient(conf.S3, awsClient) | ||
|
||
imageRepository := imageRepo.NewRepository(db) | ||
|
||
imageService := imageSvc.NewService(bucketClient, imageRepository) | ||
|
||
grpc_health_v1.RegisterHealthServer(grpcServer, health.NewServer()) | ||
imagePb.RegisterImageServiceServer(grpcServer, imageService) | ||
|
||
reflection.Register(grpcServer) | ||
go func() { | ||
log.Info(). | ||
Str("service", "file"). | ||
Msgf("JohnJud file starting at port %v", conf.App.Port) | ||
|
||
if err := grpcServer.Serve(lis); err != nil { | ||
log.Fatal(). | ||
Err(err). | ||
Str("service", "file"). | ||
Msg("Failed to start service") | ||
} | ||
}() | ||
|
||
wait := gracefulShutdown(context.Background(), 2*time.Second, map[string]operation{ | ||
"server": func(ctx context.Context) error { | ||
grpcServer.GracefulStop() | ||
return nil | ||
}, | ||
"database": func(ctx context.Context) error { | ||
sqlDB, err := db.DB() | ||
if err != nil { | ||
return nil | ||
} | ||
return sqlDB.Close() | ||
}, | ||
}) | ||
|
||
<-wait | ||
|
||
grpcServer.GracefulStop() | ||
log.Info(). | ||
Str("service", "file"). | ||
Msg("Closing the listener") | ||
lis.Close() | ||
log.Info(). | ||
Str("service", "file"). | ||
Msg("End the program") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,4 +10,5 @@ database: | |
password: root | ||
|
||
s3: | ||
bucket_name: <bucket name> | ||
bucket_name: <bucket name> | ||
region: <region> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.