Skip to content

Commit

Permalink
feat(audit): enable audit configuration on k8s (#14)
Browse files Browse the repository at this point in the history
* feat(audit): enable audit configuration on k8s

-also refactor command iostream

* fix test
  • Loading branch information
hsinhoyeh authored Jul 11, 2022
1 parent 417bc24 commit 87cea8e
Show file tree
Hide file tree
Showing 19 changed files with 293 additions and 53 deletions.
2 changes: 1 addition & 1 deletion assets/bootstrap/provision-cluster.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash

/usr/local/bin/kind create cluster --config /tmp/kind-config.yaml
/usr/local/bin/kind create cluster --config kind-config.yaml

sudo cp -r /root/.kube /home/vagrant/.kube
sudo chown -R vagrant:vagrant /home/vagrant/.kube
3 changes: 3 additions & 0 deletions cmd/multikf/cmd_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func NewAddCommand(logger log.Logger, ioStreams genericclioptions.IOStreams) *co
withKubeflow bool // install with kubeflow components
withKubeflowDefaultPassword string // with kubeflow defaultpassword
withIP string // with specific IP
withAudit bool // with audit enabled
exportPorts string // export ports on hostmachine
forceOverwrite bool // force overwrite existing config
)
Expand Down Expand Up @@ -51,6 +52,7 @@ func NewAddCommand(logger log.Logger, ioStreams genericclioptions.IOStreams) *co
kubeAPIIP: withIP,
exportPorts: exportPorts,
forceOverwrite: forceOverwrite,
auditEnabled: withAudit,
})
if err != nil {
return err
Expand Down Expand Up @@ -80,6 +82,7 @@ func NewAddCommand(logger log.Logger, ioStreams genericclioptions.IOStreams) *co
cmd.Flags().IntVar(&memoryInG, "memoryg", 1, "number of memory in gigabytes allocated to the guest machine")
cmd.Flags().BoolVar(&forceOverwrite, "f", false, "force to overwrite existing config. (default: false)")
cmd.Flags().BoolVar(&withKubeflow, "with_kubeflow", true, "install kubeflow modules (default: true)")
cmd.Flags().BoolVar(&withAudit, "with_audit", true, "enable k8s auditing (default: true)")
cmd.Flags().StringVar(&withKubeflowDefaultPassword, "with_password", "12341234", "with a specific password for default user (default: 12341234)")
cmd.Flags().IntVar(&useGPUs, "use_gpus", 0, "use gpu resources (default: 0), possible value (0 or 1)")
cmd.Flags().StringVar(&withIP, "with_ip", "0.0.0.0", "with a specific ip address for kubeapi (default: 0.0.0.0)")
Expand Down
5 changes: 5 additions & 0 deletions cmd/multikf/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type machineConfig struct {
exportPorts string
defaultPassword string
forceOverwrite bool
auditEnabled bool
}

func (m machineConfig) GetCPUs() int {
Expand All @@ -86,6 +87,10 @@ func (m machineConfig) GetKubeAPIIP() string {
return m.kubeAPIIP
}

func (m machineConfig) AuditEnabled() bool {
return m.auditEnabled
}

func (m machineConfig) GetExportPorts() []machine.ExportPortPair {
if len(m.exportPorts) == 0 {
m.logger.V(1).Infof("getexportport: export nothing\n")
Expand Down
24 changes: 17 additions & 7 deletions pkg/machine/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,29 @@ type Cmd struct {
logger log.Logger
}

func (c *Cmd) Run(cmdAndArgs ...string) (ioutil.StreamReader, <-chan cmd.Status, error) {
func (c *Cmd) Run(cmdAndArgs ...string) (*ioutil.CmdOutputStream, <-chan cmd.Status, error) {
c.logger.V(1).Infof("cmd->%s\n", cmdAndArgs)
cmdOptions := cmd.Options{
Buffered: false,
Streaming: true,
}
runcmd := cmd.NewCmdOptions(cmdOptions, cmdAndArgs[0], cmdAndArgs[1:]...)
status := runcmd.Start()
// run and output stderr
for stderrline := range runcmd.Stderr {
c.logger.V(1).Infof("%s\n", stderrline)
statusChan1 := make(chan cmd.Status, 1)
statusChan2 := make(chan cmd.Status, 1)
go newChanForwarder(runcmd.Start(), statusChan1, statusChan2)

cinfo := &ioutil.CommandInfo{
CommandStatus: statusChan1,
Command: runcmd,
}
//stat := <-runStatus

return ioutil.NewOutputStream(c.logger, runcmd.Stdout), status, nil
return ioutil.NewCmdOutputStream(c.logger, cinfo), statusChan2, nil
}

func newChanForwarder(src <-chan cmd.Status, dests ...chan<- cmd.Status) {
status := <-src
for _, dest := range dests {
dest <- status
}
return
}
2 changes: 1 addition & 1 deletion pkg/machine/docker/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type HostFolder struct {

func (h *HostFolder) GenerateFiles(tmplConfig *hosttemplates.DockerHostmachineTemplateConfig) error {
memoryFileFs := templatefs.NewMemoryFilesFs()
if err := memoryFileFs.Generate(tmplConfig, pkgtemplate.NewKindTemplate()); err != nil {
if err := memoryFileFs.Generate(tmplConfig, pkgtemplate.NewKindTemplate(), pkgtemplate.NewAuditPolicyTemplate()); err != nil {
return err
}
/*
Expand Down
2 changes: 2 additions & 0 deletions pkg/machine/docker/hostmachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func (h *HostMachine) prepareFiles() error {
h.options.GetKubeAPIIP(),
h.options.GetGPUs(),
h.options.GetExportPorts(),
h.options.AuditEnabled(),
filepath.Join(h.hostMachineDir, "audit-policy.yaml"),
)

vfolder := NewHostFolder(h.hostMachineDir)
Expand Down
4 changes: 3 additions & 1 deletion pkg/machine/docker/template/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type DockerHostmachineTemplateConfig struct {
*pkgtemplateconfig.DefaultTemplateConfig
}

func NewDockerHostmachineTemplateConfig(name string, cpus int, memory int, sshport int, kubeApiPort int, kubeApiIP string, gpus int, exportPorts []machine.ExportPortPair) *DockerHostmachineTemplateConfig {
func NewDockerHostmachineTemplateConfig(name string, cpus int, memory int, sshport int, kubeApiPort int, kubeApiIP string, gpus int, exportPorts []machine.ExportPortPair, auditEnabled bool, auditFileAbsolutePath string) *DockerHostmachineTemplateConfig {
return &DockerHostmachineTemplateConfig{
DefaultTemplateConfig: pkgtemplateconfig.NewDefaultTemplateConfig(
name,
Expand All @@ -20,6 +20,8 @@ func NewDockerHostmachineTemplateConfig(name string, cpus int, memory int, sshpo
kubeApiIP,
gpus,
exportPorts,
auditEnabled,
auditFileAbsolutePath,
),
}
}
Expand Down
36 changes: 26 additions & 10 deletions pkg/machine/ioutil/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,31 @@ import (
"fmt"
"io"

"github.com/go-cmd/cmd"
"sigs.k8s.io/kind/pkg/log"
)

type StreamReader interface {
Stdout() error // stream outputs to stdout
Read(b []byte) (int, error) // read stream to buffer with Reader
}

// outputStream implements io.Reader by wrapping the line channel
type outputStream struct {
// CmdOutputStream implements io.Reader by wrapping the line channel
type CmdOutputStream struct {
logger log.Logger
ch chan string
cInfo *CommandInfo
}

func NewOutputStream(logger log.Logger, ch chan string) *outputStream {
return &outputStream{ch: ch, logger: logger}
type CommandInfo struct {
CommandStatus <-chan cmd.Status
Command *cmd.Cmd
}

func (o *outputStream) Read(b []byte) (int, error) {
out, more := <-o.ch
func NewCmdOutputStream(logger log.Logger, cInfo *CommandInfo) *CmdOutputStream {
return &CmdOutputStream{cInfo: cInfo, logger: logger}
}

func (o *CmdOutputStream) Read(b []byte) (int, error) {
out, more := <-o.cInfo.Command.Stdout
if !more {
return 0, io.EOF
}
Expand All @@ -36,8 +41,19 @@ func (o *outputStream) Read(b []byte) (int, error) {
return n + 1, nil
}

func (o *outputStream) Stdout() error {
for lineLog := range o.ch {
func StderrOnError(o *CmdOutputStream) error {

go func() {
status := <-o.cInfo.CommandStatus
if status.Exit != 0 {
// process exit abnormally, display stderr
for lineLog := range o.cInfo.Command.Stderr {
o.logger.V(0).Infof("%s\n", lineLog)
}
}
}()

for lineLog := range o.cInfo.Command.Stdout {
o.logger.V(1).Infof("%s\n", lineLog)
}
return nil
Expand Down
14 changes: 7 additions & 7 deletions pkg/machine/kubectl/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (cli *CLI) ProvisonCluster(kindConfigfile string) error {
if err != nil {
return err
}
return sr.Stdout()
return ioutil.StderrOnError(sr)
}

//func (cli *CLI) InstallRequiredPkgs(containername ContainerName) error {
Expand Down Expand Up @@ -169,7 +169,7 @@ func (cli *CLI) InstallKubeflow(kubeConfigFile string, kfmanifestFile string) er
if err != nil {
return err
}
sr.Stdout()
ioutil.StderrOnError(sr)

ps := <-status
cli.logger.V(1).Infof("kf installation, ps code:%+v\n", ps.Exit)
Expand Down Expand Up @@ -211,7 +211,7 @@ func (cli *CLI) PatchKubeflow(kubeConfigFile string) error {
if err != nil {
return err
}
sr.Stdout()
ioutil.StderrOnError(sr)
time.Sleep(3 * time.Second)

}
Expand All @@ -234,7 +234,7 @@ func (cli *CLI) Portforward(kubeConfigFile, svc, namespace string, fromPort, toP
if err != nil {
return err
}
return sr.Stdout()
return ioutil.StderrOnError(sr)
}

func (cli *CLI) GetPods(kindConfigfile string, namespace string) error {
Expand All @@ -254,7 +254,7 @@ func (cli *CLI) GetPods(kindConfigfile string, namespace string) error {
if err != nil {
return err
}
return sr.Stdout()
return ioutil.StderrOnError(sr)
}

func (cli *CLI) RemoveCluster(clustername string) error {
Expand All @@ -269,7 +269,7 @@ func (cli *CLI) RemoveCluster(clustername string) error {
if err != nil {
return err
}
return sr.Stdout()
return ioutil.StderrOnError(sr)

}

Expand All @@ -292,6 +292,6 @@ func (cli *CLI) GetKubeConfig(clustername string, exportLocalFilePath string) er
return pkgioutil.WriteFile(exportLocalFilePath, contentBlob, 0600)
}

func (cli *CLI) runCmd(cmdAndArgs []string) (ioutil.StreamReader, <-chan gocmd.Status, error) {
func (cli *CLI) runCmd(cmdAndArgs []string) (*ioutil.CmdOutputStream, <-chan gocmd.Status, error) {
return cmd.NewCmd(cli.logger).Run(cmdAndArgs...)
}
1 change: 1 addition & 0 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type MachineConfiger interface {
GetKubeAPIIP() string
GetExportPorts() []ExportPortPair
GetForceOverwriteConfig() bool
AuditEnabled() bool
}

type ExportPortPair struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/machine/vagrant/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (v *VagrantFolder) GenerateVagrantFiles(tmplConfig *vagranttemplates.Vagran
if err := memoryFileFs.Generate(tmplConfig, vagranttemplates.NewDefaultVagrantTemplate()); err != nil {
return err
}
if err := memoryFileFs.Generate(tmplConfig, pkgtemplate.NewKindTemplate()); err != nil {
if err := memoryFileFs.Generate(tmplConfig, pkgtemplate.NewKindTemplate(), pkgtemplate.NewAuditPolicyTemplate()); err != nil {
return err
}
if err := v.folder.DumpFiles(true, memoryFileFs.FS(), assets.BootstrapFs); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/machine/vagrant/template/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type VagrantTemplateConfig struct {
*pkgtemplateconfig.DefaultTemplateConfig
}

func NewVagrantTemplateConfig(name string, cpus int, memory int, sshport int, kubeApiPort int, kubeApiIP string, gpus int, exportPorts []machine.ExportPortPair) *VagrantTemplateConfig {
func NewVagrantTemplateConfig(name string, cpus int, memory int, sshport int, kubeApiPort int, kubeApiIP string, gpus int, exportPorts []machine.ExportPortPair, auditEnabled bool, auditFileAbsolutePath string) *VagrantTemplateConfig {
return &VagrantTemplateConfig{
DefaultTemplateConfig: pkgtemplateconfig.NewDefaultTemplateConfig(
name,
Expand All @@ -20,6 +20,8 @@ func NewVagrantTemplateConfig(name string, cpus int, memory int, sshport int, ku
kubeApiIP,
gpus,
exportPorts,
auditEnabled,
auditFileAbsolutePath,
),
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/machine/vagrant/template/vagrantfile_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type DefaultVagrantFileTemplate struct {
var vagrantFileDefaultTemplate string = `
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/focal64"
config.vm.provision "file", source: "kind-config.yaml", destination: "/tmp/kind-config.yaml"
config.vm.provision "file", source: "kind-config.yaml", destination: "kind-config.yaml"
config.vm.provision "file", source: "audit-policy.yaml", destination: "/tmp/audit-policy.yaml"
config.vm.provision :shell, path: "bootstrap/bootstrap.sh"
config.vm.provision :shell, path: "bootstrap/provision-cluster.sh"
config.vm.network :forwarded_port, guest: {{.KubeAPIPort}}, guest_ip: "0.0.0.0", host: {{.KubeAPIPort}}
Expand Down
2 changes: 2 additions & 0 deletions pkg/machine/vagrant/vagrant.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func (v *VagrantMachine) prepareFiles() error {
v.options.GetKubeAPIIP(),
v.options.GetGPUs(),
v.options.GetExportPorts(),
v.options.AuditEnabled(),
"/tmp/audit-policy.yaml", /*for vagrant, we will copy the file under /tmp and run local installation*/
)

vfolder := NewVagrantFolder(v.vagrantMachineDir)
Expand Down
42 changes: 42 additions & 0 deletions pkg/template/audit_template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package template

import (
"html/template"
"io"
)

func NewAuditPolicyTemplate() *AuditPolicyFileTemplate {
return &AuditPolicyFileTemplate{
auditPolicyFileTemplate: auditPolicyDefaultFileTemplate,
}
}

func (k *AuditPolicyFileTemplate) Filename() string {
return "audit-policy.yaml"
}

func (k *AuditPolicyFileTemplate) Execute(w io.Writer) error {
tmpl, err := template.New("auditpolicy").Parse(k.auditPolicyFileTemplate)
if err != nil {
return err
}
if err := tmpl.Execute(w, k); err != nil {
return err
}
return nil
}

func (k *AuditPolicyFileTemplate) Populate(v interface{}) error {
return nil
}

type AuditPolicyFileTemplate struct {
auditPolicyFileTemplate string
}

var auditPolicyDefaultFileTemplate string = `
apiVersion: audit.k8s.io/v1
kind: Policy
rules:
- level: Metadata
`
Loading

0 comments on commit 87cea8e

Please sign in to comment.