-
-
Notifications
You must be signed in to change notification settings - Fork 34
04 KisFlow Function
Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/4-function
Function is a description of an independent logical computing unit. A Function includes some basic attributes:
type BaseFunction struct {
// Id, the instance ID of KisFunction, used to distinguish different instance objects within KisFlow
Id string
Config *config.KisFuncConfig
// flow
flow kis.Flow // Context environment KisFlow
// connector
connector kis.Connector
// Custom temporary data for the Function
metaData map[string]interface{}
// Lock for managing metaData read and write
mLock sync.RWMutex
// link
N kis.Function // Next streaming Function
P kis.Function // Previous streaming Function
}
- Id: The distributed instance ID of a Function, used to distinguish different instance objects within KisFlow.
- Config: The configuration information associated with a Function.
- flow: Represents the Flow object associated with the Function.
- connector: Represents the Connector resource on which the Function depends.
- metaData: Default parameters configured for the Function.
- N: The next Function in the stream.
- P: The previous Function in the stream.
Function Mode | Meaning |
---|---|
"Verify" | For KisFunction that verifies features, mainly performs data filtering, validation, field arrangement, idempotent, and other preliminary data processing. |
"Save" | For KisFunction that stores features, S will store data through KisConnector. S Function will store data via KisConnector. Functions with the same Connector can logically merge. |
"Load" | For KisFunction that loads data, L will load data through KisConnector. L Function will read data via KisConnector. Functions with the same Connector can logically merge with the corresponding S Function. |
"Calculate" | For KisFunction that calculates features, it can generate new fields, compute new values, aggregate data, analyze, etc. |
"Expand" | For KisFunction that expands features, as a custom feature Function in streaming computation, it is also the last Function in the current KisFlow stream, similar to Sink. |
A Function will be assigned a Mode, which currently does not have a practical role in the execution process of KisFlow. At present, the mode only serves to describe the responsibilities of the various Functions in the Flow.
kistype: func
fname: funcName3
fmode: Calculate
source:
name: User Order Error Rate
must:
- order_id
- user_id
option:
default_params:
default1: funcName3_param1
default2: funcName3_param2
- kistype: func indicates that the current configuration file is a Function configuration file.
- fname: The name of the Function, also the identifier for registering the FaaS callback computation method.
- source: Describes the data source of the current Function's data flow, with no actual logical significance, serving as a description.
- option: Optional parameters.
- default_params: Default parameters carried by the current Function, which developers can retrieve during the Function scheduling process.
import "github.com/aceld/kis-flow/config"
Interface
// NewFuncConfig creates a Function strategy configuration object to describe a KisFunction information
func NewFuncConfig(funcName string, mode common.KisMode,
source *KisSource, option *KisFuncOption) *KisFuncConfig
Invocation
// Create new function configuration
avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
Related parameter types
// FParam is the type of fixed configuration parameters for the Function in the current Flow
type FParam map[string]string
// KisSource represents the business source of the current Function
type KisSource struct {
Name string `yaml:"name"` // Description of the data source for this layer of Function
Must []string `yaml:"must"` // Required fields for the source
}
// KisFuncOption optional configuration
type KisFuncOption struct {
CName string `yaml:"cname"` // Connector name
RetryTimes int `yaml:"retry_times"` // Optional, maximum number of retries for Function scheduling (excluding normal scheduling)
RetryDuration int `yaml:"return_duration"` // Optional, maximum time interval for each retry of Function scheduling (unit: ms)
Params FParam `yaml:"default_params"` // Optional, fixed configuration parameters for the Function in the current Flow
}
// Function is the basic computing module for streaming computation. KisFunction is a basic logical computing unit for streaming computation,
// any number of KisFunctions can be combined into a KisFlow
type Function interface {
// Call executes the streaming computation logic
Call(ctx context.Context, flow Flow) error
// SetConfig configures the strategy for the current Function instance
SetConfig(s *config.KisFuncConfig) error
// GetConfig retrieves the configuration strategy for the current Function instance
GetConfig() *config.KisFuncConfig
// SetFlow sets the Flow instance that the current Function instance depends on
SetFlow(f Flow) error
// GetFlow retrieves the Flow that the current Function instance depends on
GetFlow() Flow
// AddConnector adds a Connector to the current Function instance
AddConnector(conn Connector) error
// GetConnector retrieves the Connector associated with the current Function instance
GetConnector() Connector
// CreateId generates a random instance KisID for the current Function instance
CreateId()
// GetId retrieves the FID of the current Function
GetId() string
// GetPrevId retrieves the FID of the previous Function node
GetPrevId() string
// GetNextId retrieves the FID of the next Function node
GetNextId() string
// Next returns the next layer of computational streaming Function, returns nil if the current layer is the last layer
Next() Function
// Prev returns the previous layer of computational streaming Function, returns nil if the current layer is the last layer
Prev() Function
// SetN sets the next Function instance
SetN(f Function)
// SetP sets the previous Function instance
SetP(f Function)
// GetMetaData gets the temporary data of the current Function
GetMetaData(key string) interface{}
// SetMetaData sets the temporary data of the current Function
SetMetaData(key string, value interface{})
}
FaaS is a computational callback method function bound to a Function. The relationship between FaaS and Function is:
type FaaS func(context.Context, Flow, ...interface{}) error
- context.Context: Native Golang context.
- Flow: Information about the Flow to which the current Function is attached, including all the data, parameters, hierarchical relationships, etc. on the Flow.
- ...interface{}: Developer-customized parameter type, must be a Slice type.
An example of a FaaS definition:
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
"reflect"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
// Get the original data
for _, data := range flow.Input() {
fmt.Printf("data value= %+v, data type = %+v\n", data, reflect.TypeOf(data))
}
// Get the deserialized data
for _, row := range rows {
out := AvgStuScoreOut{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// Submit the result data
_ = flow.CommitRow(out)
}
return nil
}
Registering FaaS can only be executed once in a process.
kis.Pool().FaaS("AvgStuScore", AvgStuScore)
Parameter 1: FunctionName, same as fname defined in the configuration file. Parameter 2: FaaS function name.
Using the following Function configuration as an example:
conf/func-AvgStuScore.yml
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: Student Credits
must:
- stu_id
option:
default_params:
school: TsingHua
country: China
faas_stu_score_avg.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
// Retrieve Function configuration information
funcConfig := flow.GetThisFunction().GetConfig()
fmt.Printf("function config: %+v\n", funcConfig)
fmt.Printf("function Params: school = %+v\n", flow.GetFuncParam("school"))
fmt.Printf("function Params: country = %+v\n", flow.GetFuncParam("country"))
return nil
}
Get Function Instance
faas_stu_score_avg.go
// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
// Get Function instance
function := flow.GetThisFunction()
return nil
}
Read and Write Temporary Data of Function Instance
Function instance provides the SetMetaData()
and GetMetaData()
interfaces, which can be used to hang some temporary variables in memory with the Function instance, whose lifecycle is consistent with the Function instance.
faas_stu_score_avg.go
// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
// Set temporary data
myTempNum := 1
// Get Function instance
function := flow.GetThisFunction()
if function.GetMetaData("num") == nil {
function.SetMetaData("num", myTempNum)
} else {
myTempNum = function.GetMetaData("num").(int)
myTempNum++
function.SetMetaData("num", myTempNum)
}
fmt.Printf("myTempNum = %+v\n", myTempNum)
return nil
}
Modify the Flow to execute 3 times.
main.go
func main() {
ctx := context.Background()
// Load Configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore")
if flow1 == nil {
panic("flow1 is nil")
}
// Run the flow
times := 3
for times > 0 {
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
times--
}
return
}
Output
./main
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
myTempNum = 1
myTempNum = 2
myTempNum = 3