Skip to content

Commit

Permalink
Multi cluster support (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
bivas authored Aug 29, 2018
1 parent e4f7b6e commit b018ca6
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 32 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
RPM_PATH = "iguazio_yum"
DEB_PATH = "iguazio_deb"
BINARY_NAME = "igz-fuse"
RELEASE_VERSION = "0.4.0"

.PHONY: build
build:
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ spec:
- name: nginx
image: nginx
volumeMounts:
- name: test
mountPath: /data
- name: v3io
mountPath: /v3io
ports:
- containerPort: 80
volumes:
- name: test
- name: v3io
flexVolume:
driver: "v3io/fuse"
secretRef:
name: v3io-fuse-user
options:
container: bigdata # data container name
cluster: default # which cluster to connect to (optional, default to "default")
---
apiVersion: v1
kind: Secret
Expand Down
21 changes: 13 additions & 8 deletions pkg/flex/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ type Mounter struct {
}

func (m *Mounter) doMount(targetPath string) *Response {
session, err := m.Config.DataSession(m.Spec.GetFullUsername(), m.Spec.GetPassword())
session, err := m.Config.DataSession(m.Spec)
if err != nil {
return Fail("Could not create session", err)
}

dataUrls, err := m.Config.DataURLs(m.Spec.GetClusterName())
if err != nil {
return Fail("could not get cluster data urls", err)
}

args := []string{"-o", "allow_root",
"--connection_strings", m.Config.DataURLs(),
"--connection_strings", dataUrls,
"--mountpoint", targetPath,
"--session_key", session}
if m.Spec.Container != "" {
Expand Down Expand Up @@ -62,10 +67,10 @@ func (m *Mounter) osMount() *Response {
}

func (m *Mounter) Mount() *Response {
if m.Config.Type == "os" {
return m.osMount()
if m.Config.Type == "link" {
return m.mountAsLink()
}
return m.mountAsLink()
return m.osMount()
}

func (m *Mounter) mountAsLink() *Response {
Expand Down Expand Up @@ -115,10 +120,10 @@ func (m *Mounter) osUmount() *Response {
}

func (m *Mounter) Unmount() *Response {
if m.Config.Type == "os" {
return m.osUmount()
if m.Config.Type == "link" {
return m.unmountAsLink()
}
return m.unmountAsLink()
return m.osUmount()
}

func NewMounter(target, options string) (*Mounter, error) {
Expand Down
69 changes: 48 additions & 21 deletions pkg/flex/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ func ReadConfig() (*Config, error) {
return &config, nil
}

type ClusterConfig struct {
Name string `json:"name"`
DataUrls []string `json:"data_urls"`
ApiUrl string `json:"api_url"`
}

type Config struct {
RootPath string `json:"root_path"`
FusePath string `json:"fuse_path"`
Debug bool `json:"debug"`
Type string `json:"type"`
Clusters []struct {
Name string `json:"name"`
DataUrl string `json:"data_url"`
ApiUrl string `json:"api_url"`
} `json:"clusters"`
RootPath string `json:"root_path"`
FusePath string `json:"fuse_path"`
Debug bool `json:"debug"`
Type string `json:"type"`
Clusters []ClusterConfig `json:"clusters"`
}

type sessionResponse struct {
Expand All @@ -55,27 +57,44 @@ type sessionResponse struct {
} `json:"data"`
}

func (c *Config) DataURLs() string {
result := make([]string, len(c.Clusters), len(c.Clusters))
for index, item := range c.Clusters {
result[index] = item.DataUrl
func (c *Config) findCluster(cluster string) (*ClusterConfig, error) {
for _, clusterConfig := range c.Clusters {
if clusterConfig.Name == cluster {
return &clusterConfig, nil
}
}
return nil, fmt.Errorf("no such cluster %s", cluster)
}

func (c *Config) DataURLs(cluster string) (string, error) {
clusterConfig, err := c.findCluster(cluster)
if err != nil {
return "", err
}
return strings.Join(result, ",")
result := make([]string, len(clusterConfig.DataUrls), len(clusterConfig.DataUrls))
for index, item := range clusterConfig.DataUrls {
result[index] = item
}
return strings.Join(result, ","), nil
}

func (c *Config) ControlSession(username, password string) (string, error) {
return c.Session(username, password, "control")
func (c *Config) ControlSession(spec *VolumeSpec) (string, error) {
return c.Session(spec.GetClusterName(), spec.GetFullUsername(), spec.GetPassword(), "control")
}

func (c *Config) DataSession(username, password string) (string, error) {
return c.Session(username, password, "data")
func (c *Config) DataSession(spec *VolumeSpec) (string, error) {
return c.Session(spec.GetClusterName(), spec.GetFullUsername(), spec.GetPassword(), "data")
}

func (c *Config) Session(username, password, plane string) (string, error) {
func (c *Config) Session(cluster, username, password, plane string) (string, error) {
clusterConfig, err := c.findCluster(cluster)
if err != nil {
return "", err
}
payload := strings.NewReader(fmt.Sprintf(v3ioSessionPayloadTemplate, plane, username, password))
journal.Debug("creating session", "plane", plane, "url", fmt.Sprintf("%s/api/sessions", c.Clusters[0].ApiUrl))
journal.Debug("creating session", "plane", plane, "url", fmt.Sprintf("%s/api/sessions", clusterConfig.ApiUrl))
response, err := http.Post(
fmt.Sprintf("%s/api/sessions", c.Clusters[0].ApiUrl),
fmt.Sprintf("%s/api/sessions", clusterConfig.ApiUrl),
"application/json",
payload)
if err != nil {
Expand Down Expand Up @@ -123,6 +142,7 @@ func (r *Response) ToJson() {
type VolumeSpec struct {
SubPath string `json:"subPath"`
Container string `json:"container"`
Cluster string `json:"cluster"`
Username string `json:"kubernetes.io/secret/username"`
Password string `json:"kubernetes.io/secret/password"`
Tenant string `json:"kubernetes.io/secret/tenant"`
Expand Down Expand Up @@ -157,3 +177,10 @@ func (vs *VolumeSpec) GetFullUsername() string {
}
return vs.GetUsername()
}

func (vs *VolumeSpec) GetClusterName() string {
if vs.Cluster == "" {
return "default"
}
return vs.Cluster
}

0 comments on commit b018ca6

Please sign in to comment.