Skip to content

Commit

Permalink
support hybrid mode
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac committed Feb 27, 2025
1 parent 7ac6fe6 commit 058acbc
Show file tree
Hide file tree
Showing 36 changed files with 927 additions and 83 deletions.
2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
GOOGLE_STORAGE_PUBLIC_HOST: $GOOGLE_STORAGE_PUBLIC_HOST
GOOGLE_PROJECT_ID: $GOOGLE_PROJECT_ID
GOOGLE_STORAGE_CREDENTIALS_FILE: /service_account.json
HASURA_LOG_LEVEL: debug
# HASURA_LOG_LEVEL: debug
OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
OTEL_METRICS_EXPORTER: prometheus

Expand Down
8 changes: 6 additions & 2 deletions connector/collection/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (coe *CollectionBucketExecutor) Execute(ctx context.Context) (*schema.RowSe
}, nil
}

request, err := EvalBucketPredicate(nil, "", coe.Request.Query.Predicate, coe.Variables)
request, err := EvalBucketPredicate(common.StorageClientCredentialArguments{}, "", coe.Request.Query.Predicate, coe.Variables)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}
Expand All @@ -45,6 +45,10 @@ func (coe *CollectionBucketExecutor) Execute(ctx context.Context) (*schema.RowSe
}, nil
}

if err := request.EvalArguments(coe.Arguments); err != nil {
return nil, err
}

request.evalQuerySelectionFields(coe.Request.Query.Fields)

predicate := request.BucketPredicate.CheckPostPredicate
Expand Down Expand Up @@ -82,7 +86,7 @@ func (coe *CollectionBucketExecutor) Execute(ctx context.Context) (*schema.RowSe
options.MaxResults = &maxResults
}

response, err := coe.Storage.ListBuckets(ctx, request.ClientID, options, predicate)
response, err := coe.Storage.ListBuckets(ctx, request.GetBucketArguments().StorageClientCredentialArguments, options, predicate)
if err != nil {
return nil, err
}
Expand Down
24 changes: 2 additions & 22 deletions connector/collection/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,8 @@ func (coe *CollectionObjectExecutor) Execute(ctx context.Context) (*schema.RowSe
NumThreads: coe.Concurrency,
}

if clientType, err := utils.GetNullableString(coe.Arguments, ArgumentClientType); err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
} else if clientType != nil {
request.ClientType = (*common.StorageProviderType)(clientType)
}

if endpoint, err := utils.GetNullableString(coe.Arguments, ArgumentEndpoint); err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
} else if endpoint != nil {
request.Endpoint = *endpoint
}

if accessKey, err := utils.GetNullableString(coe.Arguments, ArgumentAccessKeyID); err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
} else if accessKey != nil {
request.AccessKeyID = *accessKey
}

if secretAccessKey, err := utils.GetNullableString(coe.Arguments, ArgumentSecretAccessKey); err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
} else if secretAccessKey != nil {
request.SecretAccessKey = *secretAccessKey
if err := request.EvalArguments(coe.Arguments); err != nil {
return nil, err
}

if hierarchy, err := utils.GetNullableBoolean(coe.Arguments, argumentHierarchy); err != nil {
Expand Down
62 changes: 46 additions & 16 deletions connector/collection/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"strings"

"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
"github.com/hasura/ndc-storage/connector/storage/common"
)

// PredicateEvaluator the structured predicate result which is evaluated from the raw expression.
type PredicateEvaluator struct {
common.StorageBucketArguments
common.StorageClientCredentialArguments

IsValid bool
Include common.StorageObjectIncludeOptions
Expand All @@ -23,13 +24,11 @@ type PredicateEvaluator struct {
}

// EvalBucketPredicate evaluates the predicate bucket condition of the query request.
func EvalBucketPredicate(clientID *common.StorageClientID, prefix string, predicate schema.Expression, variables map[string]any) (*PredicateEvaluator, error) {
func EvalBucketPredicate(bucketArguments common.StorageClientCredentialArguments, prefix string, predicate schema.Expression, variables map[string]any) (*PredicateEvaluator, error) {
result := &PredicateEvaluator{
StorageBucketArguments: common.StorageBucketArguments{
ClientID: clientID,
},
Include: common.StorageObjectIncludeOptions{},
variables: variables,
StorageClientCredentialArguments: bucketArguments,
Include: common.StorageObjectIncludeOptions{},
variables: variables,
}

if prefix != "" {
Expand Down Expand Up @@ -58,9 +57,9 @@ func EvalBucketPredicate(clientID *common.StorageClientID, prefix string, predic
// EvalObjectPredicate evaluates the predicate object condition of the query request.
func EvalObjectPredicate(bucketInfo common.StorageBucketArguments, preOperator *StringComparisonOperator, predicate schema.Expression, variables map[string]any) (*PredicateEvaluator, error) {
result := &PredicateEvaluator{
StorageBucketArguments: bucketInfo,
Include: common.StorageObjectIncludeOptions{},
variables: variables,
StorageClientCredentialArguments: bucketInfo.StorageClientCredentialArguments,
Include: common.StorageObjectIncludeOptions{},
variables: variables,
}

if bucketInfo.Bucket != "" {
Expand Down Expand Up @@ -96,17 +95,48 @@ func EvalObjectPredicate(bucketInfo common.StorageBucketArguments, preOperator *
// GetBucketArguments get bucket arguments information
func (pe PredicateEvaluator) GetBucketArguments() common.StorageBucketArguments {
result := common.StorageBucketArguments{
ClientID: pe.ClientID,
ClientType: pe.ClientType,
Endpoint: pe.Endpoint,
AccessKeyID: pe.AccessKeyID,
SecretAccessKey: pe.SecretAccessKey,
Bucket: pe.BucketPredicate.GetPrefix(),
StorageClientCredentialArguments: common.StorageClientCredentialArguments{
ClientID: pe.ClientID,
ClientType: pe.ClientType,
Endpoint: pe.Endpoint,
AccessKeyID: pe.AccessKeyID,
SecretAccessKey: pe.SecretAccessKey,
},
Bucket: pe.BucketPredicate.GetPrefix(),
}

return result
}

// EvalArguments evaluate other request arguments
func (pe *PredicateEvaluator) EvalArguments(arguments map[string]any) error {
if clientType, err := utils.GetNullableString(arguments, ArgumentClientType); err != nil {
return schema.UnprocessableContentError(err.Error(), nil)
} else if clientType != nil {
pe.ClientType = (*common.StorageProviderType)(clientType)
}

if endpoint, err := utils.GetNullableString(arguments, ArgumentEndpoint); err != nil {
return schema.UnprocessableContentError(err.Error(), nil)
} else if endpoint != nil {
pe.Endpoint = *endpoint
}

if accessKey, err := utils.GetNullableString(arguments, ArgumentAccessKeyID); err != nil {
return schema.UnprocessableContentError(err.Error(), nil)
} else if accessKey != nil {
pe.AccessKeyID = *accessKey
}

if secretAccessKey, err := utils.GetNullableString(arguments, ArgumentSecretAccessKey); err != nil {
return schema.UnprocessableContentError(err.Error(), nil)
} else if secretAccessKey != nil {
pe.SecretAccessKey = *secretAccessKey
}

return nil
}

func (pe *PredicateEvaluator) EvalSelection(selection schema.NestedField) error {
if len(selection) == 0 {
return nil
Expand Down
10 changes: 10 additions & 0 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,14 @@ func (c *Connector) evalSchema(connectorSchema *schema.SchemaResponse) {
bytesScalar.Representation = schema.NewTypeRepresentationString().Encode()
connectorSchema.ScalarTypes["Bytes"] = *bytesScalar
}

if !c.config.Generator.DynamicCredentials {
for name, object := range connectorSchema.ObjectTypes {
for _, key := range dynamicCredentialArguments {
delete(object.Fields, key)
}

connectorSchema.ObjectTypes[name] = object
}
}
}
6 changes: 4 additions & 2 deletions connector/functions/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func FunctionStorageBucketConnections(ctx context.Context, state *types.State, a
return StorageConnection[common.StorageBucket]{}, schema.UnprocessableContentError("$first argument must be larger than 0", nil)
}

request, err := collection.EvalBucketPredicate(args.ClientID, args.Prefix, args.Where, types.QueryVariablesFromContext(ctx))
request, err := collection.EvalBucketPredicate(args.StorageClientCredentialArguments, args.Prefix, args.Where, types.QueryVariablesFromContext(ctx))
if err != nil {
return StorageConnection[common.StorageBucket]{}, err
}
Expand All @@ -45,7 +45,9 @@ func FunctionStorageBucketConnections(ctx context.Context, state *types.State, a
predicate = nil
}

buckets, err := state.Storage.ListBuckets(ctx, request.ClientID, &common.ListStorageBucketsOptions{
bucketArguments := request.GetBucketArguments()

buckets, err := state.Storage.ListBuckets(ctx, bucketArguments.StorageClientCredentialArguments, &common.ListStorageBucketsOptions{
Prefix: request.BucketPredicate.GetPrefix(),
MaxResults: args.First,
StartAfter: args.After,
Expand Down
12 changes: 12 additions & 0 deletions connector/schema.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions connector/storage/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
// MakeBucket creates a new bucket.
func (m *Manager) MakeBucket(ctx context.Context, clientID *common.StorageClientID, args *common.MakeStorageBucketOptions) error {
client, bucketName, err := m.GetClientAndBucket(ctx, common.StorageBucketArguments{
ClientID: clientID,
Bucket: args.Name,
StorageClientCredentialArguments: common.StorageClientCredentialArguments{
ClientID: clientID,
},
Bucket: args.Name,
})
if err != nil {
return err
Expand All @@ -36,9 +38,13 @@ func (m *Manager) UpdateBucket(ctx context.Context, args *common.UpdateBucketArg
}

// ListBuckets list all buckets.
func (m *Manager) ListBuckets(ctx context.Context, clientID *common.StorageClientID, options *common.ListStorageBucketsOptions, predicate func(string) bool) (*common.StorageBucketListResults, error) {
client, ok := m.GetClient(clientID)
if !ok {
func (m *Manager) ListBuckets(ctx context.Context, clientInfo common.StorageClientCredentialArguments, options *common.ListStorageBucketsOptions, predicate func(string) bool) (*common.StorageBucketListResults, error) {
client, err := m.GetOrCreateClient(ctx, clientInfo)
if err != nil {
return nil, err
}

if client == nil {
return &common.StorageBucketListResults{
Buckets: []common.StorageBucket{},
}, nil
Expand Down
42 changes: 25 additions & 17 deletions connector/storage/common/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type StorageKeyValue struct {

// ListStorageBucketArguments represent the input arguments for the ListBuckets methods.
type ListStorageBucketArguments struct {
// The storage client ID.
ClientID *StorageClientID `json:"clientId,omitempty"`
StorageClientCredentialArguments

// Returns list of bucket with the prefix.
Prefix string `json:"prefix,omitempty"`
// The maximum number of objects requested per batch.
Expand All @@ -35,15 +35,30 @@ type GetStorageBucketArguments struct {

// StorageBucketArguments represent the common input arguments for bucket-related methods.
type StorageBucketArguments struct {
// The storage client ID.
ClientID *StorageClientID `json:"clientId,omitempty"`
ClientType *StorageProviderType `json:"clientType,omitempty"`
Endpoint string `json:"endpoint,omitempty"`

// The bucket name.
Bucket string `json:"bucket,omitempty"`
AccessKeyID string `json:"accessKeyId,omitempty"`
SecretAccessKey string `json:"secretAccessKey,omitempty"`
Bucket string `json:"bucket,omitempty"`

StorageClientCredentialArguments
}

// StorageClientCredentials hold common storage client credential arguments.
type StorageClientCredentialArguments struct {
ClientID *StorageClientID `json:"clientId,omitempty"`
ClientType *StorageProviderType `json:"clientType,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
AccessKeyID string `json:"accessKeyId,omitempty"`
SecretAccessKey string `json:"secretAccessKey,omitempty"`
}

// IsEmpty checks if all properties are empty.
func (ca StorageClientCredentialArguments) IsEmpty() bool {
return ca.ClientType == nil || !ca.ClientType.IsValid() || (ca.AccessKeyID == "" && ca.SecretAccessKey == "" && ca.Endpoint == "")
}

// MakeStorageBucketArguments holds all arguments to tweak bucket creation.
type MakeStorageBucketArguments struct {
StorageClientCredentialArguments
MakeStorageBucketOptions
}

// CopyStorageObjectArguments represent input arguments of the CopyObject method.
Expand All @@ -62,13 +77,6 @@ type ComposeStorageObjectArguments struct {
Sources []StorageCopySrcOptions `json:"sources"`
}

// MakeStorageBucketArguments holds all arguments to tweak bucket creation.
type MakeStorageBucketArguments struct {
ClientID *StorageClientID `json:"clientId,omitempty"`

MakeStorageBucketOptions
}

// MakeStorageBucketOptions holds all options to tweak bucket creation.
type MakeStorageBucketOptions struct {
// Bucket name
Expand Down
Loading

0 comments on commit 058acbc

Please sign in to comment.