Skip to content

05 KisFlow Flow

刘丹冰 edited this page Apr 17, 2024 · 1 revision

Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/5-flow

A Flow represents a sequence of streaming computations, consisting of multiple Functions. The Flow schedules and executes these Functions in the order they are listed.

1. Flow Config

1.1 YAML File Config

kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
    params:
      myKey1: flowValue1-1
      myKey2: flowValue1-2
  - fname: funcName2
    params:
      myKey1: flowValue2-1
      myKey2: flowValue2-2
  - fname: funcName3
    params:
      myKey1: flowValue3-1
      myKey2: flowValue3-2
  • kistype: Indicates that the current configuration file is a Flow configuration file. status: Specifies whether the current YAML configuration file is effective. If set to 0, the Flow will not be started.
  • flow_name: The name of the Flow, used as a unique identifier for KisFlow to retrieve the current Flow.
  • flows: Contains all the Functions included in the current Flow, executed in the order from top to bottom.
  • fname: The name of the Function.
  • params: The default configuration parameters for the corresponding Function in the current Flow. This parameter is identical to default_params in the Function configuration file. If configured in the Flow, it will override the value in the Function for the same key. If the key does not exist, it will be appended.

1.2 New Flow Config

import "github.com/aceld/kis-flow/config"

Interface

func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig 

Usage

	myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)

Related Parameter Types

// FParam represents the fixed configuration parameters customized for Functions within the current Flow.
type FParam map[string]string

// KisFlowFunctionParam contains the Id and fixed configuration parameters carried by a Function in a Flow configuration.
type KisFlowFunctionParam struct {
	FuncName string `yaml:"fname"`  // Required
	Params   FParam `yaml:"params"` // Optional, represents the fixed configuration parameters customized for Functions within the current Flow
}

// KisFlowConfig is the object that spans the entire context environment of the streaming computation.
type KisFlowConfig struct {
	KisType  string                 `yaml:"kistype"`
	Status   int                    `yaml:"status"`
	FlowName string                 `yaml:"flow_name"`
	Flows    []KisFlowFunctionParam `yaml:"flows"`
}

2. Flow Interface

type Flow interface {
	// Run schedules the Flow, sequentially scheduling and executing Functions within the Flow
	Run(ctx context.Context) error
	// Link connects the Functions in the Flow according to the configuration in the configuration file
	Link(fConf *config.KisFuncConfig, fParams config.FParam) error
	// CommitRow submits Flow data to the upcoming Function layer
	CommitRow(row interface{}) error
	// CommitRowBatch submits Flow data to the upcoming Function layer (batch submission)
	// row: Must be a slice
	CommitRowBatch(row interface{}) error
	// Input gets the input source data of the currently executing Function in the flow
	Input() common.KisRowArr
	// GetName gets the name of the Flow
	GetName() string
	// GetThisFunction gets the currently executing Function
	GetThisFunction() Function
	// GetThisFuncConf gets the configuration of the currently executing Function
	GetThisFuncConf() *config.KisFuncConfig
	// GetConnector gets the Connector of the currently executing Function
	GetConnector() (Connector, error)
	// GetConnConf gets the configuration of the Connector of the currently executing Function
	GetConnConf() (*config.KisConnConfig, error)
	// GetConfig gets the configuration of the current Flow
	GetConfig() *config.KisFlowConfig
	// GetFuncConfigByName gets the configuration of the current Flow by Function name
	GetFuncConfigByName(funcName string) *config.KisFuncConfig
	// Next moves to the next layer of Function that the current Flow has executed, carrying the Action actions
	Next(acts ...ActionFunc) error
	// GetCacheData gets the cached data of the current Flow
	GetCacheData(key string) interface{}
	// SetCacheData sets the cached data of the current Flow
	SetCacheData(key string, value interface{}, Exp time.Duration)
	// GetMetaData gets the temporary data of the current Flow
	GetMetaData(key string) interface{}
	// SetMetaData sets the temporary data of the current Flow
	SetMetaData(key string, value interface{})
	// GetFuncParam gets the default parameters of the currently executing Function in the Flow, retrieves a key-value pair
	GetFuncParam(key string) string
	// GetFuncParamAll gets the default parameters of the currently executing Function in the Flow, retrieves all Key-Value pairs
	GetFuncParamAll() config.FParam
	// GetFuncParamsAllFuncs gets all FuncParams of all Functions in the Flow, retrieves all Key-Value pairs
	GetFuncParamsAllFuncs() map[string]config.FParam
	// Fork gets a copy of the Flow (deep copy)
	Fork(ctx context.Context) Flow
}

3. Flow Link

The Flow provides developers with the ability to manually link a Function. Below is an example of a Flow with three Functions: data validation, data computation, and data printing.

3.1 Link

The Flow can link each Function by describing the Function's configuration through Link().

	// Link connects the Functions in the Flow according to the configuration in the configuration file
	Link(fConf *config.KisFuncConfig, fParams config.FParam) error

fConf: Function configuration file fParams: Default parameters carried by the linked Function in the Flow

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/common"
	"github.com/aceld/kis-flow/config"
	"github.com/aceld/kis-flow/flow"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Create a new flow configuration
	myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)

	// Create new function configuration
	verifyStuConfig := config.NewFuncConfig("VerifyStu", common.V, nil, nil)
	avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
	printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)

	// Create a new flow
	flow1 := flow.NewKisFlow(myFlowConfig1)

	// Link functions to the flow
	_ = flow1.Link(verifyStuConfig, config.FParam{"school": "TsingHua", "country": "China"})
	_ = flow1.Link(avgStuScoreConfig, config.FParam{"school": "TsingHua", "country": "China"})
	_ = flow1.Link(printStuScoreConfig, config.FParam{"school": "TsingHua", "country": "China"})

	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

func init() {
	// Register functions
	kis.Pool().FaaS("VerifyStu", VerifyStu)
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

3.2 Link With Config

The linking method of the above Flow is equivalent to the one below, which links through the configuration file and assigns some default parameters to each Function.

conf/flow-CalStuAvgScore.yml

kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
    - fname: VerifyStu
      params:
          school: TsingHua
          country: China
    - fname: AvgStuScore
      params:
          school: TsingHua
          country: China
    - fname: PrintStuAvgScore
      params:
          school: TsingHua
          country: China

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

3.3 Other Source

proto.go

package main

type StuScore struct {
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

type StuScoreAvg struct {
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

faas_stu_verify.go

package main

import (
	"context"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
	serialize.DefaultSerialize
	StuScore
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {

	for _, stu := range rows {
		// Filter out invalid data
		if stu.StuId < 0 || stu.StuId > 999 {
			continue
		}

		_ = flow.CommitRow(stu)
	}

	return nil
}

faas_stu_score_avg.go

package main

import (
	"context"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	StuScore
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	StuScoreAvg
}

func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {

	for _, row := range rows {

		out := AvgStuScoreOut{
			StuScoreAvg: StuScoreAvg{
				StuId:    row.StuId,
				AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
			},
		}

		// Submit the result data
		_ = flow.CommitRow(out)
	}

	return nil
}

faas_stu_score_avg_print.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
	serialize.DefaultSerialize
	StuScoreAvg
}

type PrintStuAvgScoreOut struct {
	serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

	for _, row := range rows {
		fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
	}

	return nil
}

4.Flow Run

A single Run() call will execute the Flow once, sequentially invoking each Function. If a Function fails during execution, an error will be returned.

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

Each time Run() is executed, it consumes the data already submitted through CommitRow(). If you want to execute Run() multiple times, you need to re-submit the data using CommitRow() each time.

4.1 Incorrect Way

	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)

	// Run the flow
	times := 3
	for times > 0 {
		if err := flow1.Run(ctx); err != nil {
			fmt.Println("err: ", err)
		}
		times--
	}

In this approach, the first Run() has data, but the second and third Run() will execute the Functions without any data, essentially submitting empty data.

4.2 Correct Way

	// Run the flow
	times := 3
	for times > 0 {
		// Submit a string
		_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)

		if err := flow1.Run(ctx); err != nil {
			fmt.Println("err: ", err)
		}
		times--
	}

5.Flow Fork

If you want to clone a Flow with identical configurations, you can use the flow.Fork() function.

main.go

func main() {
	ctx := context.Background()

	// Create a new flow configuration
	myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)

	// Create new function configuration
	verifyStuConfig := config.NewFuncConfig("VerifyStu", common.V, nil, nil)
	avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
	printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)

	// Create a new flow
	flow1 := flow.NewKisFlow(myFlowConfig1)

	// Link functions to the flow
	_ = flow1.Link(verifyStuConfig, config.FParam{"school": "TsingHua", "country": "China"})
	_ = flow1.Link(avgStuScoreConfig, config.FParam{"school": "TsingHua", "country": "China"})
	_ = flow1.Link(printStuScoreConfig, config.FParam{"school": "TsingHua", "country": "China"})

	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	// Fork the flow
	flowClone1 := flow1.Fork(ctx)

	// Run the flow
	_ = flowClone1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

	if err := flowClone1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

In this way, flowClone1 has the same configuration as flow1, and they are two independent instances in memory.

6.Flow & FaaS

6.1 Flow Get Function's Param

	// GetFuncParam retrieves the default configuration parameters of the currently executing Function in the Flow, retrieving a key-value pair
	GetFuncParam(key string) string
	// GetFuncParamAll retrieves the default configuration parameters of the currently executing Function in the Flow, retrieving all Key-Value pairs
	GetFuncParamAll() config.FParam
	// GetFuncParamsAllFuncs retrieves all FuncParams of all Functions in the Flow, retrieving all Key-Value pairs
	GetFuncParamsAllFuncs() map[string]config.FParam

6.2 FaaS Get/Set Flow Cache

	// GetCacheData retrieves the cached data of the current Flow
	GetCacheData(key string) interface{}
	// SetCacheData sets the cached data of the current Flow
	SetCacheData(key string, value interface{}, Exp time.Duration)
	// GetMetaData retrieves the temporary data of the current Flow
	GetMetaData(key string) interface{}
	// SetMetaData sets the temporary data of the current Flow
	SetMetaData(key string, value interface{})