Skip to content

Commit

Permalink
Init running remote plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
emcfarlane committed Dec 9, 2024
1 parent 1dd3a77 commit e470345
Show file tree
Hide file tree
Showing 20 changed files with 310 additions and 123 deletions.
109 changes: 109 additions & 0 deletions private/buf/bufctl/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"io/fs"
"log/slog"
"net/http"
"os"
"sort"

"buf.build/go/protoyaml"
"github.com/bufbuild/buf/private/buf/buffetch"
"github.com/bufbuild/buf/private/buf/bufwkt/bufwktstore"
"github.com/bufbuild/buf/private/buf/bufworkspace"
"github.com/bufbuild/buf/private/bufpkg/bufanalysis"
"github.com/bufbuild/buf/private/bufpkg/bufcheck"
"github.com/bufbuild/buf/private/bufpkg/bufconfig"
"github.com/bufbuild/buf/private/bufpkg/bufimage"
"github.com/bufbuild/buf/private/bufpkg/bufimage/bufimageutil"
Expand All @@ -44,12 +46,15 @@ import (
"github.com/bufbuild/buf/private/pkg/httpauth"
"github.com/bufbuild/buf/private/pkg/ioext"
"github.com/bufbuild/buf/private/pkg/normalpath"
"github.com/bufbuild/buf/private/pkg/pluginrpcutil"
"github.com/bufbuild/buf/private/pkg/protoencoding"
"github.com/bufbuild/buf/private/pkg/slicesext"
"github.com/bufbuild/buf/private/pkg/storage/storageos"
"github.com/bufbuild/buf/private/pkg/syserror"
"github.com/bufbuild/buf/private/pkg/wasm"
"github.com/bufbuild/protovalidate-go"
"google.golang.org/protobuf/proto"
"pluginrpc.com/pluginrpc"
)

// ImageWithConfig pairs an Image with lint and breaking configuration.
Expand Down Expand Up @@ -128,6 +133,11 @@ type Controller interface {
defaultMessageEncoding buffetch.MessageEncoding,
options ...FunctionOption,
) error
GetCheckRunnerProvider(
ctx context.Context,
wasmRuntime wasm.Runtime,
options ...FunctionOption,
) (bufcheck.RunnerProvider, error)
}

func NewController(
Expand Down Expand Up @@ -706,6 +716,39 @@ func (c *controller) PutMessage(
return errors.Join(err, writeCloser.Close())
}

func (c *controller) GetCheckRunnerProvider(
ctx context.Context,
wasmRuntime wasm.Runtime,
options ...FunctionOption,
) (bufcheck.RunnerProvider, error) {
if len(options) > 0 {
return nil, syserror.Newf("options are not supported for GetCheckRunnerProvider")
}
return bufcheck.RunnerProviderFunc(func(pluginConfig bufconfig.PluginConfig) (pluginrpc.Runner, error) {
plugin, err := c.getPluginForPluginConfig(ctx, pluginConfig)
if err != nil {
return nil, err
}
switch {
case plugin.IsWasm():
getData := plugin.Data
return pluginrpcutil.NewWasmRunner(
wasmRuntime,
getData,
pluginConfig.Name(),
pluginConfig.Args()...,
), nil
case plugin.IsLocal():
return pluginrpcutil.NewLocalRunner(
pluginConfig.Name(),
pluginConfig.Args()...,
), nil
default:
return nil, syserror.Newf("unknown plugin type: %s", plugin.OpaqueID())
}
}), nil
}

func (c *controller) getImage(
ctx context.Context,
input string,
Expand Down Expand Up @@ -1159,6 +1202,72 @@ Declare %q in the deps key in your buf.yaml.`,
return nil
}

// getPluginForPluginConfig resolves the plugin for the given PluginConfig.
//
// Remote plugins are resolved by fetching the plugin key for the given Ref, and then fetching
// the plugin data for the plugin key.
func (c *controller) getPluginForPluginConfig(
ctx context.Context,
pluginConfig bufconfig.PluginConfig,
options ...FunctionOption,
) (bufplugin.Plugin, error) {
if len(options) > 0 {
return nil, syserror.Newf("options are not supported")
}
switch pluginConfigType := pluginConfig.Type(); pluginConfigType {
case bufconfig.PluginConfigTypeLocal:
return bufplugin.NewLocalPlugin(
pluginConfig.Name(),
pluginConfig.Args(),
)
case bufconfig.PluginConfigTypeLocalWasm:
pluginName := pluginConfig.Name()
getData := func() ([]byte, error) {
moduleWasm, err := os.ReadFile(pluginName)
if err != nil {
return nil, fmt.Errorf("could not read plugin %q: %v", pluginName, err)
}
return moduleWasm, nil
}
return bufplugin.NewLocalWasmPlugin(
nil, // We don't have a FullName for a local Wasm plugin.
pluginName,
pluginConfig.Args(),
getData,
)
case bufconfig.PluginConfigTypeRemoteWasm:
pluginRef := pluginConfig.Ref()
if pluginRef == nil {
return nil, syserror.Newf("Ref is required for remote plugins")
}
pluginKeys, err := c.pluginKeyProvider.GetPluginKeysForPluginRefs(ctx, []bufparse.Ref{pluginRef}, bufplugin.DigestTypeP1)
if err != nil {
return nil, err
}
if len(pluginKeys) != 1 {
return nil, syserror.Newf("expected 1 plugin key, got %d", len(pluginKeys))
}
pluginKey := pluginKeys[0]
return bufplugin.NewRemoteWasmPlugin(
pluginKey.FullName(),
pluginConfig.Args(),
pluginKey.CommitID(),
func() ([]byte, error) {
pluginDatas, err := c.pluginDataProvider.GetPluginDatasForPluginKeys(ctx, []bufplugin.PluginKey{pluginKey})
if err != nil {
return nil, err
}
if len(pluginDatas) != 1 {
return nil, syserror.Newf("expected 1 plugin data, got %d", len(pluginDatas))
}
return pluginDatas[0].Data()
},
)
default:
return nil, syserror.Newf("unknown plugin config type: %v", pluginConfigType)
}
}

// handleFileAnnotationSetError will attempt to handle the error as a FileAnnotationSet, and if so, print
// the FileAnnotationSet to the writer with the given error format while returning ErrFileAnnotation.
//
Expand Down
2 changes: 1 addition & 1 deletion private/buf/bufmigrate/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func equivalentCheckConfigInV2(
) (bufconfig.CheckConfig, error) {
// No need for custom lint/breaking plugins since there's no plugins to migrate from <=v1.
// TODO: If we ever need v3, then we will have to deal with this.
client, err := bufcheck.NewClient(logger, bufcheck.NewRunnerProvider(wasm.UnimplementedRuntime))
client, err := bufcheck.NewClient(logger, bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion private/buf/cmd/buf/buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ func TestCheckLsBreakingRulesFromConfigExceptDeprecated(t *testing.T) {
// Do not need any custom lint/breaking plugins here.
client, err := bufcheck.NewClient(
slogtestext.NewLogger(t),
bufcheck.NewRunnerProvider(wasm.UnimplementedRuntime),
bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime),
)
require.NoError(t, err)
allRules, err := client.AllRules(context.Background(), check.RuleTypeBreaking, version)
Expand Down
6 changes: 5 additions & 1 deletion private/buf/cmd/buf/command/beta/lsp/lsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ func run(
defer func() {
retErr = errors.Join(retErr, wasmRuntime.Close(ctx))
}()
checkRunnerProvider, err := controller.GetCheckRunnerProvider(ctx, wasmRuntime)
if err != nil {
return err
}
checkClient, err := bufcheck.NewClient(
container.Logger(),
bufcheck.NewRunnerProvider(wasmRuntime),
checkRunnerProvider,
bufcheck.ClientWithStderr(container.Stderr()),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion private/buf/cmd/buf/command/breaking/breaking.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,15 @@ func run(
defer func() {
retErr = errors.Join(retErr, wasmRuntime.Close(ctx))
}()
checkRunnerProvider, err := controller.GetCheckRunnerProvider(ctx, wasmRuntime)
if err != nil {
return err
}
var allFileAnnotations []bufanalysis.FileAnnotation
for i, imageWithConfig := range imageWithConfigs {
client, err := bufcheck.NewClient(
container.Logger(),
bufcheck.NewRunnerProvider(wasmRuntime),
checkRunnerProvider,
bufcheck.ClientWithStderr(container.Stderr()),
)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion private/buf/cmd/buf/command/config/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func lsRun(
return err
}
}
controller, err := bufcli.NewController(container)
if err != nil {
return err
}
wasmRuntimeCacheDir, err := bufcli.CreateWasmRuntimeCacheDir(container)
if err != nil {
return err
Expand All @@ -194,9 +198,13 @@ func lsRun(
defer func() {
retErr = errors.Join(retErr, wasmRuntime.Close(ctx))
}()
checkRunnerProvider, err := controller.GetCheckRunnerProvider(ctx, wasmRuntime)
if err != nil {
return err
}
client, err := bufcheck.NewClient(
container.Logger(),
bufcheck.NewRunnerProvider(wasmRuntime),
checkRunnerProvider,
bufcheck.ClientWithStderr(container.Stderr()),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion private/buf/cmd/buf/command/lint/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,15 @@ func run(
defer func() {
retErr = errors.Join(retErr, wasmRuntime.Close(ctx))
}()
checkRunnerProvider, err := controller.GetCheckRunnerProvider(ctx, wasmRuntime)
if err != nil {
return err
}
var allFileAnnotations []bufanalysis.FileAnnotation
for _, imageWithConfig := range imageWithConfigs {
client, err := bufcheck.NewClient(
container.Logger(),
bufcheck.NewRunnerProvider(wasmRuntime),
checkRunnerProvider,
bufcheck.ClientWithStderr(container.Stderr()),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion private/buf/cmd/buf/command/mod/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func lsRun(
// BufYAMLFiles <=v1 never had plugins.
client, err := bufcheck.NewClient(
container.Logger(),
bufcheck.NewRunnerProvider(wasm.UnimplementedRuntime),
bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime),
bufcheck.ClientWithStderr(container.Stderr()),
)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions private/buf/cmd/buf/command/plugin/pluginpush/pluginpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,13 @@ func upload(
var plugin bufplugin.Plugin
switch {
case flags.Binary != "":
// We create a local plugin reference to the Wasm binary.
pluginName := flags.Binary
var err error
plugin, err = bufplugin.NewLocalWasmPlugin(
pluginFullName,
pluginName,
nil, // args
func() ([]byte, error) {
wasmBinary, err := os.ReadFile(flags.Binary)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion private/buf/cmd/protoc-gen-buf-breaking/breaking.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func handle(
// The protoc plugins do not support custom lint/breaking change plugins for now.
client, err := bufcheck.NewClient(
container.Logger(),
bufcheck.NewRunnerProvider(wasm.UnimplementedRuntime),
bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime),
bufcheck.ClientWithStderr(pluginEnv.Stderr),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion private/buf/cmd/protoc-gen-buf-lint/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func handle(
// The protoc plugins do not support custom lint/breaking change plugins for now.
client, err := bufcheck.NewClient(
container.Logger(),
bufcheck.NewRunnerProvider(wasm.UnimplementedRuntime),
bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime),
bufcheck.ClientWithStderr(pluginEnv.Stderr),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion private/bufpkg/bufcheck/breaking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ func testBreaking(
require.NoError(t, err)
breakingConfig := workspace.GetBreakingConfigForOpaqueID(opaqueID)
require.NotNil(t, breakingConfig)
client, err := bufcheck.NewClient(logger, bufcheck.NewRunnerProvider(wasm.UnimplementedRuntime))
client, err := bufcheck.NewClient(logger, bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime))
require.NoError(t, err)
err = client.Breaking(
ctx,
Expand Down
6 changes: 2 additions & 4 deletions private/bufpkg/bufcheck/bufcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (r RunnerProviderFunc) NewRunner(pluginConfig bufconfig.PluginConfig) (plug
return r(pluginConfig)
}

// NewRunnerProvider returns a new RunnerProvider for the wasm.Runtime.
// NewLocalRunnerProvider returns a new RunnerProvider for the wasm.Runtime.
//
// This implementation should only be used for local applications. It is safe to
// use concurrently.
Expand All @@ -180,9 +180,7 @@ func (r RunnerProviderFunc) NewRunner(pluginConfig bufconfig.PluginConfig) (plug
// - bufconfig.PluginConfigTypeLocalWasm
//
// If the PluginConfigType is not supported, an error is returned.
func NewRunnerProvider(
wasmRuntime wasm.Runtime,
) RunnerProvider {
func NewLocalRunnerProvider(wasmRuntime wasm.Runtime) RunnerProvider {
return newRunnerProvider(
wasmRuntime,
)
Expand Down
2 changes: 1 addition & 1 deletion private/bufpkg/bufcheck/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,7 @@ func testLintWithOptions(
})
client, err := bufcheck.NewClient(
logger,
bufcheck.NewRunnerProvider(wasmRuntime),
bufcheck.NewLocalRunnerProvider(wasmRuntime),
)
require.NoError(t, err)
err = client.Lint(
Expand Down
8 changes: 4 additions & 4 deletions private/bufpkg/bufcheck/multi_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ func TestMultiClientCannotHaveOverlappingRulesWithBuiltIn(t *testing.T) {

client, err := newClient(
slogtestext.NewLogger(t),
NewRunnerProvider(wasm.UnimplementedRuntime),
NewLocalRunnerProvider(wasm.UnimplementedRuntime),
)
require.NoError(t, err)
duplicateBuiltInRulePluginConfig, err := bufconfig.NewLocalPluginConfig(
"buf-plugin-duplicate-rule",
nil,
[]string{"buf-plugin-duplicate-rule"},
nil,
)
require.NoError(t, err)
emptyOptions, err := option.NewOptions(nil)
Expand Down Expand Up @@ -275,13 +275,13 @@ func TestMultiClientCannotHaveOverlappingCategoriesWithBuiltIn(t *testing.T) {

client, err := newClient(
slogtestext.NewLogger(t),
NewRunnerProvider(wasm.UnimplementedRuntime),
NewLocalRunnerProvider(wasm.UnimplementedRuntime),
)
require.NoError(t, err)
duplicateBuiltInRulePluginConfig, err := bufconfig.NewLocalPluginConfig(
"buf-plugin-duplicate-category",
nil,
[]string{"buf-plugin-duplicate-category"},
nil,
)
require.NoError(t, err)
emptyOptions, err := option.NewOptions(nil)
Expand Down
18 changes: 7 additions & 11 deletions private/bufpkg/bufcheck/runner_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,17 @@ func newRunnerProvider(
func (r *runnerProvider) NewRunner(pluginConfig bufconfig.PluginConfig) (pluginrpc.Runner, error) {
switch pluginConfig.Type() {
case bufconfig.PluginConfigTypeLocal:
path := pluginConfig.Path()
return pluginrpcutil.NewRunner(
// We know that Path is of at least length 1.
path[0],
path[1:]...,
return pluginrpcutil.NewLocalRunner(
pluginConfig.Name(),
pluginConfig.Args()...,
), nil
case bufconfig.PluginConfigTypeLocalWasm:
path := pluginConfig.Path()
return pluginrpcutil.NewWasmRunner(
return pluginrpcutil.NewLocalWasmRunner(
r.wasmRuntime,
// We know that Path is of at least length 1.
path[0],
path[1:]...,
pluginConfig.Name(),
pluginConfig.Args()...,
), nil
case bufconfig.PluginConfigTypeRemote:
case bufconfig.PluginConfigTypeRemoteWasm:
return nil, fmt.Errorf("remote plugins are not supported")
default:
return nil, syserror.Newf("unknown PluginConfigType: %v", pluginConfig.Type())
Expand Down
Loading

0 comments on commit e470345

Please sign in to comment.