-
-
Notifications
You must be signed in to change notification settings - Fork 34
05 KisFlow Flow
Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/5-flow
A Flow represents a sequence of streaming computations, consisting of multiple Functions. The Flow schedules and executes these Functions in the order they are listed.
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
params:
myKey1: flowValue1-1
myKey2: flowValue1-2
- fname: funcName2
params:
myKey1: flowValue2-1
myKey2: flowValue2-2
- fname: funcName3
params:
myKey1: flowValue3-1
myKey2: flowValue3-2
- kistype: Indicates that the current configuration file is a Flow configuration file. status: Specifies whether the current YAML configuration file is effective. If set to 0, the Flow will not be started.
- flow_name: The name of the Flow, used as a unique identifier for KisFlow to retrieve the current Flow.
- flows: Contains all the Functions included in the current Flow, executed in the order from top to bottom.
- fname: The name of the Function.
- params: The default configuration parameters for the corresponding Function in the current Flow. This parameter is identical to default_params in the Function configuration file. If configured in the Flow, it will override the value in the Function for the same key. If the key does not exist, it will be appended.
import "github.com/aceld/kis-flow/config"
Interface
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig
Usage
myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)
Related Parameter Types
// FParam represents the fixed configuration parameters customized for Functions within the current Flow.
type FParam map[string]string
// KisFlowFunctionParam contains the Id and fixed configuration parameters carried by a Function in a Flow configuration.
type KisFlowFunctionParam struct {
FuncName string `yaml:"fname"` // Required
Params FParam `yaml:"params"` // Optional, represents the fixed configuration parameters customized for Functions within the current Flow
}
// KisFlowConfig is the object that spans the entire context environment of the streaming computation.
type KisFlowConfig struct {
KisType string `yaml:"kistype"`
Status int `yaml:"status"`
FlowName string `yaml:"flow_name"`
Flows []KisFlowFunctionParam `yaml:"flows"`
}
type Flow interface {
// Run schedules the Flow, sequentially scheduling and executing Functions within the Flow
Run(ctx context.Context) error
// Link connects the Functions in the Flow according to the configuration in the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits Flow data to the upcoming Function layer
CommitRow(row interface{}) error
// CommitRowBatch submits Flow data to the upcoming Function layer (batch submission)
// row: Must be a slice
CommitRowBatch(row interface{}) error
// Input gets the input source data of the currently executing Function in the flow
Input() common.KisRowArr
// GetName gets the name of the Flow
GetName() string
// GetThisFunction gets the currently executing Function
GetThisFunction() Function
// GetThisFuncConf gets the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
// GetConnector gets the Connector of the currently executing Function
GetConnector() (Connector, error)
// GetConnConf gets the configuration of the Connector of the currently executing Function
GetConnConf() (*config.KisConnConfig, error)
// GetConfig gets the configuration of the current Flow
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName gets the configuration of the current Flow by Function name
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next moves to the next layer of Function that the current Flow has executed, carrying the Action actions
Next(acts ...ActionFunc) error
// GetCacheData gets the cached data of the current Flow
GetCacheData(key string) interface{}
// SetCacheData sets the cached data of the current Flow
SetCacheData(key string, value interface{}, Exp time.Duration)
// GetMetaData gets the temporary data of the current Flow
GetMetaData(key string) interface{}
// SetMetaData sets the temporary data of the current Flow
SetMetaData(key string, value interface{})
// GetFuncParam gets the default parameters of the currently executing Function in the Flow, retrieves a key-value pair
GetFuncParam(key string) string
// GetFuncParamAll gets the default parameters of the currently executing Function in the Flow, retrieves all Key-Value pairs
GetFuncParamAll() config.FParam
// GetFuncParamsAllFuncs gets all FuncParams of all Functions in the Flow, retrieves all Key-Value pairs
GetFuncParamsAllFuncs() map[string]config.FParam
// Fork gets a copy of the Flow (deep copy)
Fork(ctx context.Context) Flow
}
The Flow provides developers with the ability to manually link a Function. Below is an example of a Flow with three Functions: data validation, data computation, and data printing.
The Flow can link each Function by describing the Function's configuration through Link().
// Link connects the Functions in the Flow according to the configuration in the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
fConf
: Function configuration file
fParams
: Default parameters carried by the linked Function in the Flow
main.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/config"
"github.com/aceld/kis-flow/flow"
"github.com/aceld/kis-flow/kis"
)
func main() {
ctx := context.Background()
// Create a new flow configuration
myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)
// Create new function configuration
verifyStuConfig := config.NewFuncConfig("VerifyStu", common.V, nil, nil)
avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)
// Create a new flow
flow1 := flow.NewKisFlow(myFlowConfig1)
// Link functions to the flow
_ = flow1.Link(verifyStuConfig, config.FParam{"school": "TsingHua", "country": "China"})
_ = flow1.Link(avgStuScoreConfig, config.FParam{"school": "TsingHua", "country": "China"})
_ = flow1.Link(printStuScoreConfig, config.FParam{"school": "TsingHua", "country": "China"})
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
return
}
func init() {
// Register functions
kis.Pool().FaaS("VerifyStu", VerifyStu)
kis.Pool().FaaS("AvgStuScore", AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
The linking method of the above Flow is equivalent to the one below, which links through the configuration file and assigns some default parameters to each Function.
conf/flow-CalStuAvgScore.yml
kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
- fname: VerifyStu
params:
school: TsingHua
country: China
- fname: AvgStuScore
params:
school: TsingHua
country: China
- fname: PrintStuAvgScore
params:
school: TsingHua
country: China
main.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
)
func main() {
ctx := context.Background()
// Load Configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore")
if flow1 == nil {
panic("flow1 is nil")
}
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
return
}
proto.go
package main
type StuScore struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type StuScoreAvg struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
faas_stu_verify.go
package main
import (
"context"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type VerifyStuIn struct {
serialize.DefaultSerialize
StuScore
}
func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
for _, stu := range rows {
// Filter out invalid data
if stu.StuId < 0 || stu.StuId > 999 {
continue
}
_ = flow.CommitRow(stu)
}
return nil
}
faas_stu_score_avg.go
package main
import (
"context"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
StuScore
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
StuScoreAvg
}
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
out := AvgStuScoreOut{
StuScoreAvg: StuScoreAvg{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
},
}
// Submit the result data
_ = flow.CommitRow(out)
}
return nil
}
faas_stu_score_avg_print.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
StuScoreAvg
}
type PrintStuAvgScoreOut struct {
serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return nil
}
A single Run()
call will execute the Flow once, sequentially invoking each Function. If a Function fails during execution, an error will be returned.
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
Each time Run()
is executed, it consumes the data already submitted through CommitRow()
. If you want to execute Run()
multiple times, you need to re-submit the data using CommitRow()
each time.
// Submit a string
_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
times := 3
for times > 0 {
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
times--
}
In this approach, the first Run()
has data, but the second and third Run()
will execute the Functions without any data, essentially submitting empty data.
// Run the flow
times := 3
for times > 0 {
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
times--
}
If you want to clone a Flow with identical configurations, you can use the flow.Fork()
function.
main.go
func main() {
ctx := context.Background()
// Create a new flow configuration
myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)
// Create new function configuration
verifyStuConfig := config.NewFuncConfig("VerifyStu", common.V, nil, nil)
avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)
// Create a new flow
flow1 := flow.NewKisFlow(myFlowConfig1)
// Link functions to the flow
_ = flow1.Link(verifyStuConfig, config.FParam{"school": "TsingHua", "country": "China"})
_ = flow1.Link(avgStuScoreConfig, config.FParam{"school": "TsingHua", "country": "China"})
_ = flow1.Link(printStuScoreConfig, config.FParam{"school": "TsingHua", "country": "China"})
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
// Fork the flow
flowClone1 := flow1.Fork(ctx)
// Run the flow
_ = flowClone1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
if err := flowClone1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
return
}
In this way, flowClone1
has the same configuration as flow1
, and they are two independent instances in memory.
// GetFuncParam retrieves the default configuration parameters of the currently executing Function in the Flow, retrieving a key-value pair
GetFuncParam(key string) string
// GetFuncParamAll retrieves the default configuration parameters of the currently executing Function in the Flow, retrieving all Key-Value pairs
GetFuncParamAll() config.FParam
// GetFuncParamsAllFuncs retrieves all FuncParams of all Functions in the Flow, retrieving all Key-Value pairs
GetFuncParamsAllFuncs() map[string]config.FParam
// GetCacheData retrieves the cached data of the current Flow
GetCacheData(key string) interface{}
// SetCacheData sets the cached data of the current Flow
SetCacheData(key string, value interface{}, Exp time.Duration)
// GetMetaData retrieves the temporary data of the current Flow
GetMetaData(key string) interface{}
// SetMetaData sets the temporary data of the current Flow
SetMetaData(key string, value interface{})