Skip to content

Commit

Permalink
Init running remote plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
emcfarlane committed Dec 10, 2024
1 parent 1dd3a77 commit 4afd677
Show file tree
Hide file tree
Showing 27 changed files with 635 additions and 143 deletions.
195 changes: 195 additions & 0 deletions private/buf/bufctl/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"io/fs"
"log/slog"
"net/http"
"os"
"sort"

"buf.build/go/protoyaml"
"github.com/bufbuild/buf/private/buf/buffetch"
"github.com/bufbuild/buf/private/buf/bufwkt/bufwktstore"
"github.com/bufbuild/buf/private/buf/bufworkspace"
"github.com/bufbuild/buf/private/bufpkg/bufanalysis"
"github.com/bufbuild/buf/private/bufpkg/bufcheck"
"github.com/bufbuild/buf/private/bufpkg/bufconfig"
"github.com/bufbuild/buf/private/bufpkg/bufimage"
"github.com/bufbuild/buf/private/bufpkg/bufimage/bufimageutil"
Expand All @@ -44,12 +46,15 @@ import (
"github.com/bufbuild/buf/private/pkg/httpauth"
"github.com/bufbuild/buf/private/pkg/ioext"
"github.com/bufbuild/buf/private/pkg/normalpath"
"github.com/bufbuild/buf/private/pkg/pluginrpcutil"
"github.com/bufbuild/buf/private/pkg/protoencoding"
"github.com/bufbuild/buf/private/pkg/slicesext"
"github.com/bufbuild/buf/private/pkg/storage/storageos"
"github.com/bufbuild/buf/private/pkg/syserror"
"github.com/bufbuild/buf/private/pkg/wasm"
"github.com/bufbuild/protovalidate-go"
"google.golang.org/protobuf/proto"
"pluginrpc.com/pluginrpc"
)

// ImageWithConfig pairs an Image with lint and breaking configuration.
Expand Down Expand Up @@ -128,6 +133,12 @@ type Controller interface {
defaultMessageEncoding buffetch.MessageEncoding,
options ...FunctionOption,
) error
GetCheckRunnerProvider(
ctx context.Context,
input string,
wasmRuntime wasm.Runtime,
options ...FunctionOption,
) (bufcheck.RunnerProvider, error)
}

func NewController(
Expand Down Expand Up @@ -243,6 +254,7 @@ func newController(
graphProvider,
moduleDataProvider,
commitProvider,
pluginKeyProvider,
)
controller.workspaceDepManagerProvider = bufworkspace.NewWorkspaceDepManagerProvider(
logger,
Expand Down Expand Up @@ -706,6 +718,49 @@ func (c *controller) PutMessage(
return errors.Join(err, writeCloser.Close())
}

func (c *controller) GetCheckRunnerProvider(
ctx context.Context,
input string,
wasmRuntime wasm.Runtime,
options ...FunctionOption,
) (bufcheck.RunnerProvider, error) {
functionOptions := newFunctionOptions(c)
for _, option := range options {
option(functionOptions)
}
ref, err := c.buffetchRefParser.GetRef(ctx, input)
if err != nil {
return nil, err
}
pluginKeyProvider, err := c.getPluginKeyProviderForRef(ctx, ref, functionOptions)
if err != nil {
return nil, err
}
return bufcheck.RunnerProviderFunc(func(pluginConfig bufconfig.PluginConfig) (pluginrpc.Runner, error) {
plugin, err := c.getPluginForPluginConfig(ctx, pluginKeyProvider, pluginConfig)
if err != nil {
return nil, err
}
switch {
case plugin.IsWasm():
getData := plugin.Data
return pluginrpcutil.NewWasmRunner(
wasmRuntime,
getData,
pluginConfig.Name(),
pluginConfig.Args()...,
), nil
case plugin.IsLocal():
return pluginrpcutil.NewLocalRunner(
pluginConfig.Name(),
pluginConfig.Args()...,
), nil
default:
return nil, syserror.Newf("unknown plugin type: %s", plugin.OpaqueID())
}
}), nil
}

func (c *controller) getImage(
ctx context.Context,
input string,
Expand Down Expand Up @@ -1159,6 +1214,146 @@ Declare %q in the deps key in your buf.yaml.`,
return nil
}

// getPluginKeyProviderForRef create a new PluginKeyProvider for the Ref.
//
// Remote plugins refs are resolved to keys from the workspace buf.lock file.
// If the Ref is a MessageRef, we use the current directory buf.lock file.
func (c *controller) getPluginKeyProviderForRef(
ctx context.Context,
ref buffetch.Ref,
functionOptions *functionOptions,
) (_ bufplugin.PluginKeyProvider, retErr error) {
switch t := ref.(type) {
case buffetch.ProtoFileRef:
workspace, err := c.getWorkspaceForProtoFileRef(ctx, t, functionOptions)
if err != nil {
return nil, err
}
return bufplugin.NewStaticPluginKeyProvider(workspace.RemotePluginKeys())
case buffetch.SourceRef:
workspace, err := c.getWorkspaceForSourceRef(ctx, t, functionOptions)
if err != nil {
return nil, err
}
return bufplugin.NewStaticPluginKeyProvider(workspace.RemotePluginKeys())
case buffetch.ModuleRef:
workspace, err := c.getWorkspaceForModuleRef(ctx, t, functionOptions)
if err != nil {
return nil, err
}
return bufplugin.NewStaticPluginKeyProvider(workspace.RemotePluginKeys())
case buffetch.MessageRef:
bucket, err := c.storageosProvider.NewReadWriteBucket(
".",
storageos.ReadWriteBucketWithSymlinksIfSupported(),
)
if err != nil {
return nil, err
}
bufYAMLFile, err := bufconfig.GetBufYAMLFileForPrefixOrOverride(
ctx,
bucket,
".",
functionOptions.configOverride,
)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
// We did not find a buf.yaml in our current directory,
// and there was no config override.
return bufplugin.NopPluginKeyProvider, nil
}
var pluginKeys []bufplugin.PluginKey
if bufYAMLFile.FileVersion() == bufconfig.FileVersionV2 {
bufLockFile, err := bufconfig.GetBufLockFileForPrefix(
ctx,
bucket,
// buf.lock files live next to the buf.yaml
".",
)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
// We did not find a buf.lock in our current directory.
// Remote plugins are not available.
return bufplugin.NopPluginKeyProvider, nil
}
pluginKeys = bufLockFile.RemotePluginKeys()
}
return bufplugin.NewStaticPluginKeyProvider(pluginKeys)
default:
// This is a system error.
return nil, syserror.Newf("invalid Ref: %T", ref)
}
}

// getPluginForPluginConfig resolves the plugin for the given PluginConfig.
//
// Remote plugins are resolved by fetching the plugin key for the given Ref, and then fetching
// the plugin data for the plugin key.
func (c *controller) getPluginForPluginConfig(
ctx context.Context,
pluginKeyProvider bufplugin.PluginKeyProvider,
pluginConfig bufconfig.PluginConfig,
) (bufplugin.Plugin, error) {
switch pluginConfigType := pluginConfig.Type(); pluginConfigType {
case bufconfig.PluginConfigTypeLocal:
return bufplugin.NewLocalPlugin(
pluginConfig.Name(),
pluginConfig.Args(),
)
case bufconfig.PluginConfigTypeLocalWasm:
pluginName := pluginConfig.Name()
return bufplugin.NewLocalWasmPlugin(
nil, // We don't have a FullName for a local Wasm plugin.
pluginName,
pluginConfig.Args(),
func() ([]byte, error) {
moduleWasm, err := os.ReadFile(pluginName)
if err != nil {
return nil, fmt.Errorf("could not read plugin %q: %v", pluginName, err)
}
return moduleWasm, nil
},
)
case bufconfig.PluginConfigTypeRemoteWasm:
pluginRef := pluginConfig.Ref()
if pluginRef == nil {
return nil, syserror.Newf("Ref is required for remote plugins")
}
pluginKeys, err := pluginKeyProvider.GetPluginKeysForPluginRefs(ctx, []bufparse.Ref{pluginRef}, bufplugin.DigestTypeP1)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("plugin %q not found in workspace. ", pluginRef.FullName().String())
}
return nil, err
}
if len(pluginKeys) != 1 {
return nil, syserror.Newf("expected 1 plugin key, got %d", len(pluginKeys))
}
pluginKey := pluginKeys[0]
return bufplugin.NewRemoteWasmPlugin(
pluginKey.FullName(),
pluginConfig.Args(),
pluginKey.CommitID(),
func() ([]byte, error) {
pluginDatas, err := c.pluginDataProvider.GetPluginDatasForPluginKeys(ctx, []bufplugin.PluginKey{pluginKey})
if err != nil {
return nil, err
}
if len(pluginDatas) != 1 {
return nil, syserror.Newf("expected 1 plugin data, got %d", len(pluginDatas))
}
return pluginDatas[0].Data()
},
)
default:
return nil, syserror.Newf("unknown plugin config type: %v", pluginConfigType)
}
}

// handleFileAnnotationSetError will attempt to handle the error as a FileAnnotationSet, and if so, print
// the FileAnnotationSet to the writer with the given error format while returning ErrFileAnnotation.
//
Expand Down
8 changes: 4 additions & 4 deletions private/buf/buflsp/buflsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
"sync/atomic"

"github.com/bufbuild/buf/private/buf/bufctl"
"github.com/bufbuild/buf/private/bufpkg/bufcheck"
"github.com/bufbuild/buf/private/pkg/app/appext"
"github.com/bufbuild/buf/private/pkg/slogext"
"github.com/bufbuild/buf/private/pkg/storage"
"github.com/bufbuild/buf/private/pkg/storage/storageos"
"github.com/bufbuild/buf/private/pkg/wasm"
"go.lsp.dev/jsonrpc2"
"go.lsp.dev/protocol"
"go.uber.org/zap"
Expand All @@ -43,7 +43,7 @@ func Serve(
wktBucket storage.ReadBucket,
container appext.Container,
controller bufctl.Controller,
checkClient bufcheck.Client,
wasmRuntime wasm.Runtime,
stream jsonrpc2.Stream,
) (jsonrpc2.Conn, error) {
// The LSP protocol deals with absolute filesystem paths. This requires us to
Expand All @@ -68,7 +68,7 @@ func Serve(
container: container,
logger: container.Logger(),
controller: controller,
checkClient: checkClient,
wasmRuntime: wasmRuntime,
rootBucket: bucket,
wktBucket: wktBucket,
}
Expand Down Expand Up @@ -96,7 +96,7 @@ type lsp struct {

logger *slog.Logger
controller bufctl.Controller
checkClient bufcheck.Client
wasmRuntime wasm.Runtime
rootBucket storage.ReadBucket
fileManager *fileManager

Expand Down
39 changes: 34 additions & 5 deletions private/buf/buflsp/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type file struct {
version int32
hasText bool // Whether this file has ever had text read into it.

workspace bufworkspace.Workspace
module bufmodule.Module
workspace bufworkspace.Workspace
module bufmodule.Module
checkClient bufcheck.Client

againstStrategy againstStrategy
againstGitRef string
Expand Down Expand Up @@ -371,6 +372,24 @@ func (f *file) FindModule(ctx context.Context) {
return
}

// Get the check runner provider for this file. The client is scoped to
// the input Buf lock file, so we need to get the check runner provider
// for the workspace that contains this file.
checkRunnerProvider, err := f.lsp.controller.GetCheckRunnerProvider(ctx, f.uri.Filename(), f.lsp.wasmRuntime)
if err != nil {
f.lsp.logger.Warn("could not get check runner provider", slogext.ErrorAttr(err))
return
}
checkClient, err := bufcheck.NewClient(
f.lsp.logger,
checkRunnerProvider,
bufcheck.ClientWithStderr(f.lsp.container.Stderr()),
)
if err != nil {
f.lsp.logger.Warn("could not create check client", slogext.ErrorAttr(err))
return
}

// Figure out which module this file belongs to.
var module bufmodule.Module
for _, mod := range workspace.Modules() {
Expand Down Expand Up @@ -398,6 +417,7 @@ func (f *file) FindModule(ctx context.Context) {

f.workspace = workspace
f.module = module
f.checkClient = checkClient
}

// IndexImports finds URIs for all of the files imported by this file.
Expand Down Expand Up @@ -641,9 +661,14 @@ func (f *file) RunLints(ctx context.Context) bool {
f.lsp.logger.Warn(fmt.Sprintf("could not find image for %q", f.uri))
return false
}

if f.checkClient == nil {
f.lsp.logger.Warn(fmt.Sprintf("could not find check client for %q", f.uri))
return false
}
f.lsp.logger.Debug(fmt.Sprintf("running lint for %q in %v", f.uri, f.module.FullName()))
return f.appendLintErrors("buf lint", f.lsp.checkClient.Lint(

f.lsp.logger.Debug(fmt.Sprintf("running lint for %q in %v %d plugin configs", f.uri, f.module.FullName(), len(f.workspace.PluginConfigs())))
return f.appendLintErrors("buf lint", f.checkClient.Lint(
ctx,
f.workspace.GetLintConfigForOpaqueID(f.module.OpaqueID()),
f.image,
Expand All @@ -664,9 +689,13 @@ func (f *file) RunBreaking(ctx context.Context) bool {
f.lsp.logger.Warn(fmt.Sprintf("could not find --against image for %q", f.uri))
return false
}
if f.checkClient == nil {
f.lsp.logger.Warn(fmt.Sprintf("could not find check client for %q", f.uri))
return false
}

f.lsp.logger.Debug(fmt.Sprintf("running breaking for %q in %v", f.uri, f.module.FullName()))
return f.appendLintErrors("buf breaking", f.lsp.checkClient.Breaking(
return f.appendLintErrors("buf breaking", f.checkClient.Breaking(
ctx,
f.workspace.GetBreakingConfigForOpaqueID(f.module.OpaqueID()),
f.image,
Expand Down
2 changes: 1 addition & 1 deletion private/buf/bufmigrate/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func equivalentCheckConfigInV2(
) (bufconfig.CheckConfig, error) {
// No need for custom lint/breaking plugins since there's no plugins to migrate from <=v1.
// TODO: If we ever need v3, then we will have to deal with this.
client, err := bufcheck.NewClient(logger, bufcheck.NewRunnerProvider(wasm.UnimplementedRuntime))
client, err := bufcheck.NewClient(logger, bufcheck.NewLocalRunnerProvider(wasm.UnimplementedRuntime))
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4afd677

Please sign in to comment.