diff --git a/private/buf/bufctl/controller.go b/private/buf/bufctl/controller.go index 9e5dc73950..d4543c502e 100644 --- a/private/buf/bufctl/controller.go +++ b/private/buf/bufctl/controller.go @@ -22,7 +22,6 @@ import ( "io/fs" "log/slog" "net/http" - "os" "sort" "buf.build/go/protoyaml" @@ -46,7 +45,6 @@ import ( "github.com/bufbuild/buf/private/pkg/httpauth" "github.com/bufbuild/buf/private/pkg/ioext" "github.com/bufbuild/buf/private/pkg/normalpath" - "github.com/bufbuild/buf/private/pkg/pluginrpcutil" "github.com/bufbuild/buf/private/pkg/protoencoding" "github.com/bufbuild/buf/private/pkg/slicesext" "github.com/bufbuild/buf/private/pkg/storage/storageos" @@ -54,7 +52,6 @@ import ( "github.com/bufbuild/buf/private/pkg/wasm" "github.com/bufbuild/protovalidate-go" "google.golang.org/protobuf/proto" - "pluginrpc.com/pluginrpc" ) // ImageWithConfig pairs an Image with lint and breaking configuration. @@ -133,6 +130,11 @@ type Controller interface { defaultMessageEncoding buffetch.MessageEncoding, options ...FunctionOption, ) error + // GetCheckRunnerProvider gets a CheckRunnerProvider for the given input. + // + // The returned RunnerProvider will be able to run lint and breaking checks + // using the PluginConfigs from the input. The input provided will resolve + // the PluginKeys from the related buf.lock file. GetCheckRunnerProvider( ctx context.Context, input string, @@ -723,7 +725,8 @@ func (c *controller) GetCheckRunnerProvider( input string, wasmRuntime wasm.Runtime, options ...FunctionOption, -) (bufcheck.RunnerProvider, error) { +) (_ bufcheck.RunnerProvider, retErr error) { + defer c.handleFileAnnotationSetRetError(&retErr) functionOptions := newFunctionOptions(c) for _, option := range options { option(functionOptions) @@ -736,29 +739,11 @@ func (c *controller) GetCheckRunnerProvider( if err != nil { return nil, err } - return bufcheck.RunnerProviderFunc(func(pluginConfig bufconfig.PluginConfig) (pluginrpc.Runner, error) { - plugin, err := c.getPluginForPluginConfig(ctx, pluginKeyProvider, pluginConfig) - if err != nil { - return nil, err - } - switch { - case plugin.IsWasm(): - getData := plugin.Data - return pluginrpcutil.NewWasmRunner( - wasmRuntime, - getData, - pluginConfig.Name(), - pluginConfig.Args()..., - ), nil - case plugin.IsLocal(): - return pluginrpcutil.NewLocalRunner( - pluginConfig.Name(), - pluginConfig.Args()..., - ), nil - default: - return nil, syserror.Newf("unknown plugin type: %s", plugin.OpaqueID()) - } - }), nil + return bufcheck.NewLocalRunnerProvider( + wasmRuntime, + pluginKeyProvider, + c.pluginDataProvider, + ), nil } func (c *controller) getImage( @@ -1216,7 +1201,7 @@ Declare %q in the deps key in your buf.yaml.`, // getPluginKeyProviderForRef create a new PluginKeyProvider for the Ref. // -// Remote plugins refs are resolved to keys from the workspace buf.lock file. +// Remote plugins Refs are resolved to PluginKeys from the workspace buf.lock file. // If the Ref is a MessageRef, we use the current directory buf.lock file. func (c *controller) getPluginKeyProviderForRef( ctx context.Context, @@ -1289,71 +1274,6 @@ func (c *controller) getPluginKeyProviderForRef( } } -// getPluginForPluginConfig resolves the plugin for the given PluginConfig. -// -// Remote plugins are resolved by fetching the plugin key for the given Ref, and then fetching -// the plugin data for the plugin key. -func (c *controller) getPluginForPluginConfig( - ctx context.Context, - pluginKeyProvider bufplugin.PluginKeyProvider, - pluginConfig bufconfig.PluginConfig, -) (bufplugin.Plugin, error) { - switch pluginConfigType := pluginConfig.Type(); pluginConfigType { - case bufconfig.PluginConfigTypeLocal: - return bufplugin.NewLocalPlugin( - pluginConfig.Name(), - pluginConfig.Args(), - ) - case bufconfig.PluginConfigTypeLocalWasm: - pluginName := pluginConfig.Name() - return bufplugin.NewLocalWasmPlugin( - nil, // We don't have a FullName for a local Wasm plugin. - pluginName, - pluginConfig.Args(), - func() ([]byte, error) { - moduleWasm, err := os.ReadFile(pluginName) - if err != nil { - return nil, fmt.Errorf("could not read plugin %q: %v", pluginName, err) - } - return moduleWasm, nil - }, - ) - case bufconfig.PluginConfigTypeRemoteWasm: - pluginRef := pluginConfig.Ref() - if pluginRef == nil { - return nil, syserror.Newf("Ref is required for remote plugins") - } - pluginKeys, err := pluginKeyProvider.GetPluginKeysForPluginRefs(ctx, []bufparse.Ref{pluginRef}, bufplugin.DigestTypeP1) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - return nil, fmt.Errorf("plugin %q not found in workspace. ", pluginRef.FullName().String()) - } - return nil, err - } - if len(pluginKeys) != 1 { - return nil, syserror.Newf("expected 1 plugin key, got %d", len(pluginKeys)) - } - pluginKey := pluginKeys[0] - return bufplugin.NewRemoteWasmPlugin( - pluginKey.FullName(), - pluginConfig.Args(), - pluginKey.CommitID(), - func() ([]byte, error) { - pluginDatas, err := c.pluginDataProvider.GetPluginDatasForPluginKeys(ctx, []bufplugin.PluginKey{pluginKey}) - if err != nil { - return nil, err - } - if len(pluginDatas) != 1 { - return nil, syserror.Newf("expected 1 plugin data, got %d", len(pluginDatas)) - } - return pluginDatas[0].Data() - }, - ) - default: - return nil, syserror.Newf("unknown plugin config type: %v", pluginConfigType) - } -} - // handleFileAnnotationSetError will attempt to handle the error as a FileAnnotationSet, and if so, print // the FileAnnotationSet to the writer with the given error format while returning ErrFileAnnotation. // diff --git a/private/buf/bufmigrate/migrator.go b/private/buf/bufmigrate/migrator.go index bcecb54923..cf69bb2960 100644 --- a/private/buf/bufmigrate/migrator.go +++ b/private/buf/bufmigrate/migrator.go @@ -29,6 +29,7 @@ import ( "github.com/bufbuild/buf/private/bufpkg/bufconfig" "github.com/bufbuild/buf/private/bufpkg/bufmodule" "github.com/bufbuild/buf/private/bufpkg/bufparse" + "github.com/bufbuild/buf/private/bufpkg/bufplugin" "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/slicesext" "github.com/bufbuild/buf/private/pkg/storage" @@ -695,7 +696,11 @@ func equivalentCheckConfigInV2( ) (bufconfig.CheckConfig, error) { // No need for custom lint/breaking plugins since there's no plugins to migrate from <=v1. // TODO: If we ever need v3, then we will have to deal with this. - client, err := bufcheck.NewClient(logger, bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime)) + client, err := bufcheck.NewClient(logger, bufcheck.NewLocalRunnerProvider( + wasm.UnimplementedRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + )) if err != nil { return nil, err } diff --git a/private/buf/bufworkspace/workspace_provider.go b/private/buf/bufworkspace/workspace_provider.go index 388e28c276..58152f94b3 100644 --- a/private/buf/bufworkspace/workspace_provider.go +++ b/private/buf/bufworkspace/workspace_provider.go @@ -418,6 +418,7 @@ func (w *workspaceProvider) getWorkspaceForBucketBufYAMLV2( v2Targeting *v2Targeting, ) (*workspace, error) { moduleSetBuilder := bufmodule.NewModuleSetBuilder(ctx, w.logger, w.moduleDataProvider, w.commitProvider) + var remotePluginKeys []bufplugin.PluginKey bufLockFile, err := bufconfig.GetBufLockFileForPrefix( ctx, bucket, @@ -446,6 +447,7 @@ func (w *workspaceProvider) getWorkspaceForBucketBufYAMLV2( false, ) } + remotePluginKeys = bufLockFile.RemotePluginKeys() } // Only check for duplicate module description in v2, which would be an user error, i.e. // This is not a system error: @@ -503,7 +505,7 @@ func (w *workspaceProvider) getWorkspaceForBucketBufYAMLV2( moduleSet, v2Targeting.bucketIDToModuleConfig, v2Targeting.bufYAMLFile.PluginConfigs(), - bufLockFile.RemotePluginKeys(), + remotePluginKeys, v2Targeting.bufYAMLFile.ConfiguredDepModuleRefs(), true, ) diff --git a/private/buf/cmd/buf/buf_test.go b/private/buf/cmd/buf/buf_test.go index 4206cc3efb..f5ec4472ce 100644 --- a/private/buf/cmd/buf/buf_test.go +++ b/private/buf/cmd/buf/buf_test.go @@ -35,6 +35,7 @@ import ( "github.com/bufbuild/buf/private/bufpkg/bufcheck" "github.com/bufbuild/buf/private/bufpkg/bufconfig" "github.com/bufbuild/buf/private/bufpkg/bufimage" + "github.com/bufbuild/buf/private/bufpkg/bufplugin" imagev1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/image/v1" "github.com/bufbuild/buf/private/pkg/app/appcmd" "github.com/bufbuild/buf/private/pkg/app/appcmd/appcmdtesting" @@ -1350,7 +1351,11 @@ func TestCheckLsBreakingRulesFromConfigExceptDeprecated(t *testing.T) { // Do not need any custom lint/breaking plugins here. client, err := bufcheck.NewClient( slogtestext.NewLogger(t), - bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime), + bufcheck.NewLocalRunnerProvider( + wasm.UnimplementedRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + ), ) require.NoError(t, err) allRules, err := client.AllRules(context.Background(), check.RuleTypeBreaking, version) diff --git a/private/buf/cmd/buf/command/breaking/breaking.go b/private/buf/cmd/buf/command/breaking/breaking.go index 1109c2d951..869cb3a84e 100644 --- a/private/buf/cmd/buf/command/breaking/breaking.go +++ b/private/buf/cmd/buf/command/breaking/breaking.go @@ -163,11 +163,14 @@ func run( } // Do not exclude imports here. bufcheck's Client requires all imports. // Use bufcheck's BreakingWithExcludeImports. + inputControllerOptions := []bufctl.FunctionOption{ + bufctl.WithTargetPaths(flags.Paths, flags.ExcludePaths), + bufctl.WithConfigOverride(flags.Config), + } imageWithConfigs, err := controller.GetTargetImageWithConfigs( ctx, input, - bufctl.WithTargetPaths(flags.Paths, flags.ExcludePaths), - bufctl.WithConfigOverride(flags.Config), + inputControllerOptions..., ) if err != nil { return err @@ -183,14 +186,11 @@ func run( } // Do not exclude imports here. bufcheck's Client requires all imports. // Use bufcheck's BreakingWithExcludeImports. - controllerOptions := []bufctl.FunctionOption{ - bufctl.WithTargetPaths(externalPaths, flags.ExcludePaths), - bufctl.WithConfigOverride(flags.AgainstConfig), - } againstImageWithConfigs, err := controller.GetTargetImageWithConfigs( ctx, flags.Against, - controllerOptions..., + bufctl.WithTargetPaths(externalPaths, flags.ExcludePaths), + bufctl.WithConfigOverride(flags.AgainstConfig), ) if err != nil { return err @@ -223,7 +223,7 @@ func run( ctx, input, wasmRuntime, - controllerOptions..., + inputControllerOptions..., ) if err != nil { return err diff --git a/private/buf/cmd/buf/command/mod/internal/internal.go b/private/buf/cmd/buf/command/mod/internal/internal.go index 2feb7a340b..5aa14892c4 100644 --- a/private/buf/cmd/buf/command/mod/internal/internal.go +++ b/private/buf/cmd/buf/command/mod/internal/internal.go @@ -24,6 +24,7 @@ import ( "github.com/bufbuild/buf/private/buf/bufcli" "github.com/bufbuild/buf/private/bufpkg/bufcheck" "github.com/bufbuild/buf/private/bufpkg/bufconfig" + "github.com/bufbuild/buf/private/bufpkg/bufplugin" "github.com/bufbuild/buf/private/pkg/app/appcmd" "github.com/bufbuild/buf/private/pkg/app/appext" "github.com/bufbuild/buf/private/pkg/slicesext" @@ -175,7 +176,11 @@ func lsRun( // BufYAMLFiles <=v1 never had plugins. client, err := bufcheck.NewClient( container.Logger(), - bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime), + bufcheck.NewLocalRunnerProvider( + wasm.UnimplementedRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + ), bufcheck.ClientWithStderr(container.Stderr()), ) if err != nil { diff --git a/private/buf/cmd/protoc-gen-buf-breaking/breaking.go b/private/buf/cmd/protoc-gen-buf-breaking/breaking.go index 85db406ac2..82d8b856c2 100644 --- a/private/buf/cmd/protoc-gen-buf-breaking/breaking.go +++ b/private/buf/cmd/protoc-gen-buf-breaking/breaking.go @@ -28,6 +28,7 @@ import ( "github.com/bufbuild/buf/private/bufpkg/bufanalysis" "github.com/bufbuild/buf/private/bufpkg/bufcheck" "github.com/bufbuild/buf/private/bufpkg/bufimage" + "github.com/bufbuild/buf/private/bufpkg/bufplugin" "github.com/bufbuild/buf/private/pkg/encoding" "github.com/bufbuild/buf/private/pkg/protodescriptor" "github.com/bufbuild/buf/private/pkg/protoencoding" @@ -125,7 +126,11 @@ func handle( // The protoc plugins do not support custom lint/breaking change plugins for now. client, err := bufcheck.NewClient( container.Logger(), - bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime), + bufcheck.NewLocalRunnerProvider( + wasm.UnimplementedRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + ), bufcheck.ClientWithStderr(pluginEnv.Stderr), ) if err != nil { diff --git a/private/buf/cmd/protoc-gen-buf-lint/lint.go b/private/buf/cmd/protoc-gen-buf-lint/lint.go index 580f6bdcf8..f29fef7659 100644 --- a/private/buf/cmd/protoc-gen-buf-lint/lint.go +++ b/private/buf/cmd/protoc-gen-buf-lint/lint.go @@ -27,6 +27,7 @@ import ( "github.com/bufbuild/buf/private/bufpkg/bufanalysis" "github.com/bufbuild/buf/private/bufpkg/bufcheck" "github.com/bufbuild/buf/private/bufpkg/bufimage" + "github.com/bufbuild/buf/private/bufpkg/bufplugin" "github.com/bufbuild/buf/private/pkg/encoding" "github.com/bufbuild/buf/private/pkg/protodescriptor" "github.com/bufbuild/buf/private/pkg/protoencoding" @@ -100,7 +101,11 @@ func handle( // The protoc plugins do not support custom lint/breaking change plugins for now. client, err := bufcheck.NewClient( container.Logger(), - bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime), + bufcheck.NewLocalRunnerProvider( + wasm.UnimplementedRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + ), bufcheck.ClientWithStderr(pluginEnv.Stderr), ) if err != nil { diff --git a/private/bufpkg/bufcheck/breaking_test.go b/private/bufpkg/bufcheck/breaking_test.go index 4d7eb601f3..32b4716baa 100644 --- a/private/bufpkg/bufcheck/breaking_test.go +++ b/private/bufpkg/bufcheck/breaking_test.go @@ -1346,7 +1346,11 @@ func testBreaking( require.NoError(t, err) breakingConfig := workspace.GetBreakingConfigForOpaqueID(opaqueID) require.NotNil(t, breakingConfig) - client, err := bufcheck.NewClient(logger, bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime)) + client, err := bufcheck.NewClient(logger, bufcheck.NewLocalRunnerProvider( + wasm.UnimplementedRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + )) require.NoError(t, err) err = client.Breaking( ctx, diff --git a/private/bufpkg/bufcheck/bufcheck.go b/private/bufpkg/bufcheck/bufcheck.go index 357f1b7189..d5cee8cfc9 100644 --- a/private/bufpkg/bufcheck/bufcheck.go +++ b/private/bufpkg/bufcheck/bufcheck.go @@ -22,6 +22,7 @@ import ( "buf.build/go/bufplugin/check" "github.com/bufbuild/buf/private/bufpkg/bufconfig" "github.com/bufbuild/buf/private/bufpkg/bufimage" + "github.com/bufbuild/buf/private/bufpkg/bufplugin" "github.com/bufbuild/buf/private/pkg/slicesext" "github.com/bufbuild/buf/private/pkg/syserror" "github.com/bufbuild/buf/private/pkg/wasm" @@ -169,7 +170,8 @@ func (r RunnerProviderFunc) NewRunner(pluginConfig bufconfig.PluginConfig) (plug return r(pluginConfig) } -// NewLocalRunnerProvider returns a new RunnerProvider for the wasm.Runtime. +// NewLocalRunnerProvider returns a new RunnerProvider for the wasm.Runtime and +// the given plugin providers. // // This implementation should only be used for local applications. It is safe to // use concurrently. @@ -178,11 +180,22 @@ func (r RunnerProviderFunc) NewRunner(pluginConfig bufconfig.PluginConfig) (plug // The supported types are: // - bufconfig.PluginConfigTypeLocal // - bufconfig.PluginConfigTypeLocalWasm +// - bufconfig.PluginConfigTypeRemoteWasm // // If the PluginConfigType is not supported, an error is returned. -func NewLocalRunnerProvider(wasmRuntime wasm.Runtime) RunnerProvider { +// To disable support for Wasm plugins, set wasmRuntime to wasm.UnimplementedRuntime. +// To disable support for bufconfig.PluginConfigTypeRemoteWasm Plugins, set +// pluginKeyProvider and pluginDataProvider to bufplugin.NopPluginKeyProvider +// and bufplugin.NopPluginDataProvider. +func NewLocalRunnerProvider( + wasmRuntime wasm.Runtime, + pluginKeyProvider bufplugin.PluginKeyProvider, + pluginDataProvider bufplugin.PluginDataProvider, +) RunnerProvider { return newRunnerProvider( wasmRuntime, + pluginKeyProvider, + pluginDataProvider, ) } diff --git a/private/bufpkg/bufcheck/lint_test.go b/private/bufpkg/bufcheck/lint_test.go index 2682a2004c..cbc8ce64cd 100644 --- a/private/bufpkg/bufcheck/lint_test.go +++ b/private/bufpkg/bufcheck/lint_test.go @@ -1357,7 +1357,11 @@ func testLintWithOptions( }) client, err := bufcheck.NewClient( logger, - bufcheck.NewLocalRunnerProvider(wasmRuntime), + bufcheck.NewLocalRunnerProvider( + wasmRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + ), ) require.NoError(t, err) err = client.Lint( diff --git a/private/bufpkg/bufcheck/multi_client_test.go b/private/bufpkg/bufcheck/multi_client_test.go index a723c94ff2..895ffe85dd 100644 --- a/private/bufpkg/bufcheck/multi_client_test.go +++ b/private/bufpkg/bufcheck/multi_client_test.go @@ -24,6 +24,7 @@ import ( "buf.build/go/bufplugin/check/checkutil" "buf.build/go/bufplugin/option" "github.com/bufbuild/buf/private/bufpkg/bufconfig" + "github.com/bufbuild/buf/private/bufpkg/bufplugin" "github.com/bufbuild/buf/private/pkg/slicesext" "github.com/bufbuild/buf/private/pkg/slogtestext" "github.com/bufbuild/buf/private/pkg/stringutil" @@ -182,7 +183,11 @@ func TestMultiClientCannotHaveOverlappingRulesWithBuiltIn(t *testing.T) { client, err := newClient( slogtestext.NewLogger(t), - NewLocalRunnerProvider(wasm.UnimplementedRuntime), + NewLocalRunnerProvider( + wasm.UnimplementedRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + ), ) require.NoError(t, err) duplicateBuiltInRulePluginConfig, err := bufconfig.NewLocalPluginConfig( @@ -275,7 +280,11 @@ func TestMultiClientCannotHaveOverlappingCategoriesWithBuiltIn(t *testing.T) { client, err := newClient( slogtestext.NewLogger(t), - NewLocalRunnerProvider(wasm.UnimplementedRuntime), + NewLocalRunnerProvider( + wasm.UnimplementedRuntime, + bufplugin.NopPluginKeyProvider, + bufplugin.NopPluginDataProvider, + ), ) require.NoError(t, err) duplicateBuiltInRulePluginConfig, err := bufconfig.NewLocalPluginConfig( diff --git a/private/bufpkg/bufcheck/runner_provider.go b/private/bufpkg/bufcheck/runner_provider.go index 15b7a030d0..3e7d73c3dd 100644 --- a/private/bufpkg/bufcheck/runner_provider.go +++ b/private/bufpkg/bufcheck/runner_provider.go @@ -15,9 +15,12 @@ package bufcheck import ( - "fmt" + "context" + "sync" "github.com/bufbuild/buf/private/bufpkg/bufconfig" + "github.com/bufbuild/buf/private/bufpkg/bufparse" + "github.com/bufbuild/buf/private/bufpkg/bufplugin" "github.com/bufbuild/buf/private/pkg/pluginrpcutil" "github.com/bufbuild/buf/private/pkg/syserror" "github.com/bufbuild/buf/private/pkg/wasm" @@ -25,14 +28,20 @@ import ( ) type runnerProvider struct { - wasmRuntime wasm.Runtime + wasmRuntime wasm.Runtime + pluginKeyProvider bufplugin.PluginKeyProvider + pluginDataProvider bufplugin.PluginDataProvider } func newRunnerProvider( wasmRuntime wasm.Runtime, + pluginKeyProvider bufplugin.PluginKeyProvider, + pluginDataProvider bufplugin.PluginDataProvider, ) *runnerProvider { return &runnerProvider{ - wasmRuntime: wasmRuntime, + wasmRuntime: wasmRuntime, + pluginKeyProvider: pluginKeyProvider, + pluginDataProvider: pluginDataProvider, } } @@ -50,8 +59,104 @@ func (r *runnerProvider) NewRunner(pluginConfig bufconfig.PluginConfig) (pluginr pluginConfig.Args()..., ), nil case bufconfig.PluginConfigTypeRemoteWasm: - return nil, fmt.Errorf("remote plugins are not supported") + return newRemoteWasmPluginRunner( + r.wasmRuntime, + r.pluginKeyProvider, + r.pluginDataProvider, + pluginConfig, + ) default: return nil, syserror.Newf("unknown PluginConfigType: %v", pluginConfig.Type()) } } + +// *** PRIVATE *** + +// remoteWasmPluginRunner is a Runner that runs a remote Wasm plugin. +// +// This is a wrapper around a pluginrpc.Runner that first resolves the Ref to +// a PluginKey using the PluginKeyProvider. It then loads the PluginData for +// the PluginKey using the PluginDataProvider. The PluginData is then used to +// create the pluginrpc.Runner. The Runner is only loaded once and is cached +// for future calls. However, if the Runner fails to load it will try to +// reload on the next call. +type remoteWasmPluginRunner struct { + wasmRuntime wasm.Runtime + pluginKeyProvider bufplugin.PluginKeyProvider + pluginDataProvider bufplugin.PluginDataProvider + pluginRef bufparse.Ref + pluginName string + pluginArgs []string + // lock protects runner. + lock sync.RWMutex + runner pluginrpc.Runner +} + +func newRemoteWasmPluginRunner( + wasmRuntime wasm.Runtime, + pluginKeyProvider bufplugin.PluginKeyProvider, + pluginDataProvider bufplugin.PluginDataProvider, + pluginConfig bufconfig.PluginConfig, +) (*remoteWasmPluginRunner, error) { + pluginRef := pluginConfig.Ref() + if pluginRef == nil { + return nil, syserror.Newf("Ref nil on PluginConfig of type %v", bufconfig.PluginConfigTypeRemoteWasm) + } + return &remoteWasmPluginRunner{ + wasmRuntime: wasmRuntime, + pluginKeyProvider: pluginKeyProvider, + pluginDataProvider: pluginDataProvider, + pluginRef: pluginRef, + pluginName: pluginConfig.Name(), + pluginArgs: pluginConfig.Args(), + }, nil +} + +func (r *remoteWasmPluginRunner) Run(ctx context.Context, env pluginrpc.Env) (retErr error) { + delegate, err := r.loadRunnerOnce(ctx) + if err != nil { + return err + } + return delegate.Run(ctx, env) +} + +func (r *remoteWasmPluginRunner) loadRunnerOnce(ctx context.Context) (pluginrpc.Runner, error) { + r.lock.RLock() + if r.runner != nil { + r.lock.RUnlock() + return r.runner, nil + } + r.lock.RUnlock() + r.lock.Lock() + defer r.lock.Unlock() + if r.runner == nil { + runner, err := r.loadRunner(ctx) + if err != nil { + // The error isn't stored to avoid ctx cancellation issues. On the next call, + // the runner will be reloaded instead of returning the erorr. + return nil, err + } + r.runner = runner + } + return r.runner, nil +} + +func (r *remoteWasmPluginRunner) loadRunner(ctx context.Context) (pluginrpc.Runner, error) { + pluginKeys, err := r.pluginKeyProvider.GetPluginKeysForPluginRefs(ctx, []bufparse.Ref{r.pluginRef}, bufplugin.DigestTypeP1) + if err != nil { + return nil, err + } + if len(pluginKeys) != 1 { + return nil, syserror.Newf("expected 1 PluginKey, got %d", len(pluginKeys)) + } + // Load the data for the plugin now to ensure the context is valid for the entire operation. + pluginDatas, err := r.pluginDataProvider.GetPluginDatasForPluginKeys(ctx, pluginKeys) + if err != nil { + return nil, err + } + if len(pluginDatas) != 1 { + return nil, syserror.Newf("expected 1 PluginData, got %d", len(pluginDatas)) + } + data := pluginDatas[0] + return pluginrpcutil.NewWasmRunner(r.wasmRuntime, data.Data, r.pluginName, r.pluginArgs...), nil +} diff --git a/private/bufpkg/bufplugin/plugin_key_provider.go b/private/bufpkg/bufplugin/plugin_key_provider.go index 8bbbfd8c7c..cef430f5db 100644 --- a/private/bufpkg/bufplugin/plugin_key_provider.go +++ b/private/bufpkg/bufplugin/plugin_key_provider.go @@ -43,10 +43,12 @@ type PluginKeyProvider interface { GetPluginKeysForPluginRefs(context.Context, []bufparse.Ref, DigestType) ([]PluginKey, error) } -// NewStaticPluginKeyProvider returns a new PluginKeyProvider for the given PluginKeys. +// NewStaticPluginKeyProvider returns a new PluginKeyProvider for a set of PluginKeys. // -// If the Ref is not found in the list of provided keys, fs.ErrNotExist will be -// returned. +// The set of PluginKeys must be unique by FullName. If there are duplicates, +// an error will be returned. The Ref will be matched to the PluginKey by FullName. +// If the Ref is not found in the set of provided keys, an error with +// fs.ErrNotExist will be returned. func NewStaticPluginKeyProvider(pluginKeys []PluginKey) (PluginKeyProvider, error) { if len(pluginKeys) == 0 { return NopPluginKeyProvider, nil @@ -57,8 +59,13 @@ func NewStaticPluginKeyProvider(pluginKeys []PluginKey) (PluginKeyProvider, erro if err != nil { return nil, err } + digetType, err := UniqueDigestTypeForPluginKeys(pluginKeys) + if err != nil { + return nil, err + } return staticPluginKeyProvider{ pluginKeysByFullName: pluginKeysByFullName, + digestType: digetType, }, nil } @@ -76,23 +83,27 @@ func (nopPluginKeyProvider) GetPluginKeysForPluginRefs( type staticPluginKeyProvider struct { pluginKeysByFullName map[string]PluginKey + digestType DigestType } func (s staticPluginKeyProvider) GetPluginKeysForPluginRefs( - ctx context.Context, + _ context.Context, refs []bufparse.Ref, digestType DigestType, ) ([]PluginKey, error) { - fmt.Println("staticPluginKeyProvider.GetPluginKeysForPluginRefs") + if digestType != s.digestType { + return nil, fmt.Errorf("expected DigestType %v, got %v", s.digestType, digestType) + } pluginKeys := make([]PluginKey, len(refs)) for i, ref := range refs { + // Only the FullName is used to match the PluginKey. The Ref is not + // validated to match the PluginKey as there is not enough information + // to do so. pluginKey, ok := s.pluginKeysByFullName[ref.FullName().String()] if !ok { return nil, fs.ErrNotExist } - // TODO: validate the Ref matches the plugin key. pluginKeys[i] = pluginKey } - fmt.Println("got plugin keys", pluginKeys) return pluginKeys, nil } diff --git a/private/pkg/pluginrpcutil/pluginrpcutil.go b/private/pkg/pluginrpcutil/pluginrpcutil.go index a3f121237c..326bc87ce8 100644 --- a/private/pkg/pluginrpcutil/pluginrpcutil.go +++ b/private/pkg/pluginrpcutil/pluginrpcutil.go @@ -22,19 +22,27 @@ import ( "pluginrpc.com/pluginrpc" ) -// NewLocalRunner returns a new pluginrpc.Runner for the program name. +// NewLocalRunner returns a new pluginrpc.Runner for the local program. +// +// The programName is the path or name of the program. Any program args are passed to +// the program when it is run. The programArgs may be nil. func NewLocalRunner(programName string, programArgs ...string) pluginrpc.Runner { return newRunner(programName, programArgs...) } // NewWasmRunner returns a new pluginrpc.Runner for the wasm.Runtime. +// +// The getData function should return the Wasm module bytes for the program. +// The program name is the name of the program. Any program args are passed to +// the program when it is run. The programArgs may be nil. func NewWasmRunner(delegate wasm.Runtime, getData func() ([]byte, error), programName string, programArgs ...string) pluginrpc.Runner { return newWasmRunner(delegate, getData, programName, programArgs...) } -// NewLocalWasmRunner returns a new pluginrpc.Runner for the wasm.Runtime and program name. +// NewLocalWasmRunner returns a new pluginrpc.Runner for the wasm.Runtime. // -// This runner is used for local Wasm plugins. The program name is the path to the Wasm file. +// The program name is the path to the Wasm file. Any program args are passed to +// the program when it is run. The programArgs may be nil. func NewLocalWasmRunner(delegate wasm.Runtime, programName string, programArgs ...string) pluginrpc.Runner { getData := func() ([]byte, error) { // Find the plugin filePath. We use the same logic as exec.LookPath, but we do