Skip to content

04 KisFlow Function

刘丹冰 edited this page Apr 17, 2024 · 4 revisions

Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/4-function

Function is a description of an independent logical computing unit. A Function includes some basic attributes:

type BaseFunction struct {
	// Id, the instance ID of KisFunction, used to distinguish different instance objects within KisFlow
	Id     string
	Config *config.KisFuncConfig

	// flow
	flow kis.Flow // Context environment KisFlow

	// connector
	connector kis.Connector

	// Custom temporary data for the Function
	metaData map[string]interface{}
	// Lock for managing metaData read and write
	mLock sync.RWMutex

	// link
	N kis.Function // Next streaming Function
	P kis.Function // Previous streaming Function
}
  • Id: The distributed instance ID of a Function, used to distinguish different instance objects within KisFlow.
  • Config: The configuration information associated with a Function.
  • flow: Represents the Flow object associated with the Function.
  • connector: Represents the Connector resource on which the Function depends.
  • metaData: Default parameters configured for the Function.
  • N: The next Function in the stream.
  • P: The previous Function in the stream.

1. Function Mode

Function Mode Meaning
"Verify" For KisFunction that verifies features, mainly performs data filtering, validation, field arrangement, idempotent, and other preliminary data processing.
"Save" For KisFunction that stores features, S will store data through KisConnector. S Function will store data via KisConnector. Functions with the same Connector can logically merge.
"Load" For KisFunction that loads data, L will load data through KisConnector. L Function will read data via KisConnector. Functions with the same Connector can logically merge with the corresponding S Function.
"Calculate" For KisFunction that calculates features, it can generate new fields, compute new values, aggregate data, analyze, etc.
"Expand" For KisFunction that expands features, as a custom feature Function in streaming computation, it is also the last Function in the current KisFlow stream, similar to Sink.

A Function will be assigned a Mode, which currently does not have a practical role in the execution process of KisFlow. At present, the mode only serves to describe the responsibilities of the various Functions in the Flow.

2. Function Config

2.1 YAML File Config

kistype: func
fname: funcName3
fmode: Calculate
source:
  name: User Order Error Rate
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName3_param1
    default2: funcName3_param2
  • kistype: func indicates that the current configuration file is a Function configuration file.
  • fname: The name of the Function, also the identifier for registering the FaaS callback computation method.
  • source: Describes the data source of the current Function's data flow, with no actual logical significance, serving as a description.
  • option: Optional parameters.
  • default_params: Default parameters carried by the current Function, which developers can retrieve during the Function scheduling process.

3. New Function Config

import "github.com/aceld/kis-flow/config"

Interface

// NewFuncConfig creates a Function strategy configuration object to describe a KisFunction information
func NewFuncConfig(funcName string, mode common.KisMode,
	source *KisSource, option *KisFuncOption) *KisFuncConfig

Invocation

	// Create new function configuration
	avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)

Related parameter types

// FParam is the type of fixed configuration parameters for the Function in the current Flow
type FParam map[string]string

// KisSource represents the business source of the current Function
type KisSource struct {
	Name string   `yaml:"name"` // Description of the data source for this layer of Function
	Must []string `yaml:"must"` // Required fields for the source
}

// KisFuncOption optional configuration
type KisFuncOption struct {
	CName         string `yaml:"cname"`           // Connector name
	RetryTimes    int    `yaml:"retry_times"`     // Optional, maximum number of retries for Function scheduling (excluding normal scheduling)
	RetryDuration int    `yaml:"return_duration"` // Optional, maximum time interval for each retry of Function scheduling (unit: ms)
	Params        FParam `yaml:"default_params"`  // Optional, fixed configuration parameters for the Function in the current Flow
}

4. Function Interface

// Function is the basic computing module for streaming computation. KisFunction is a basic logical computing unit for streaming computation,
//             any number of KisFunctions can be combined into a KisFlow
type Function interface {
	// Call executes the streaming computation logic
	Call(ctx context.Context, flow Flow) error

	// SetConfig configures the strategy for the current Function instance
	SetConfig(s *config.KisFuncConfig) error
	// GetConfig retrieves the configuration strategy for the current Function instance
	GetConfig() *config.KisFuncConfig

	// SetFlow sets the Flow instance that the current Function instance depends on
	SetFlow(f Flow) error
	// GetFlow retrieves the Flow that the current Function instance depends on
	GetFlow() Flow

	// AddConnector adds a Connector to the current Function instance
	AddConnector(conn Connector) error
	// GetConnector retrieves the Connector associated with the current Function instance
	GetConnector() Connector

	// CreateId generates a random instance KisID for the current Function instance
	CreateId()
	// GetId retrieves the FID of the current Function
	GetId() string
	// GetPrevId retrieves the FID of the previous Function node
	GetPrevId() string
	// GetNextId retrieves the FID of the next Function node
	GetNextId() string

	// Next returns the next layer of computational streaming Function, returns nil if the current layer is the last layer
	Next() Function
	// Prev returns the previous layer of computational streaming Function, returns nil if the current layer is the last layer
	Prev() Function
	// SetN sets the next Function instance
	SetN(f Function)
	// SetP sets the previous Function instance
	SetP(f Function)
	// GetMetaData gets the temporary data of the current Function
	GetMetaData(key string) interface{}
	// SetMetaData sets the temporary data of the current Function
	SetMetaData(key string, value interface{})
}

5. Function & FaaS

FaaS is a computational callback method function bound to a Function. The relationship between FaaS and Function is:

5.1 FaaS Function Prototype

type FaaS func(context.Context, Flow, ...interface{}) error
  • context.Context: Native Golang context.
  • Flow: Information about the Flow to which the current Function is attached, including all the data, parameters, hierarchical relationships, etc. on the Flow.
  • ...interface{}: Developer-customized parameter type, must be a Slice type.

An example of a FaaS definition:

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
	"reflect"
)

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {

	// Get the original data
	for _, data := range flow.Input() {
		fmt.Printf("data value= %+v, data type = %+v\n", data, reflect.TypeOf(data))
	}

	// Get the deserialized data
	for _, row := range rows {

		out := AvgStuScoreOut{
			StuId:    row.StuId,
			AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
		}

		// Submit the result data
		_ = flow.CommitRow(out)
	}

	return nil
}

5.2 Register FaaS

Registering FaaS can only be executed once in a process.

	kis.Pool().FaaS("AvgStuScore", AvgStuScore)

Parameter 1: FunctionName, same as fname defined in the configuration file. Parameter 2: FaaS function name.

6. FaaS

Using the following Function configuration as an example:

conf/func-AvgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
    name: Student Credits
    must:
        - stu_id
option:
    default_params:
        school: TsingHua
        country: China

6.1 Retrieve Function Configuration Information in FaaS

faas_stu_score_avg.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {

	// Retrieve Function configuration information
	funcConfig := flow.GetThisFunction().GetConfig()
	fmt.Printf("function config: %+v\n", funcConfig)

	fmt.Printf("function Params: school = %+v\n", flow.GetFuncParam("school"))
	fmt.Printf("function Params: country = %+v\n", flow.GetFuncParam("country"))

	return nil
}

6.2 Retrieve Function Instance in FaaS

Get Function Instance

faas_stu_score_avg.go

// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {

    // Get Function instance
	function := flow.GetThisFunction()

	return nil
}

Read and Write Temporary Data of Function Instance Function instance provides the SetMetaData() and GetMetaData() interfaces, which can be used to hang some temporary variables in memory with the Function instance, whose lifecycle is consistent with the Function instance.

faas_stu_score_avg.go

// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {

	// Set temporary data
	myTempNum := 1

    // Get Function instance
	function := flow.GetThisFunction()
    
	if function.GetMetaData("num") == nil {
		function.SetMetaData("num", myTempNum)
	} else {
		myTempNum = function.GetMetaData("num").(int)
		myTempNum++
		function.SetMetaData("num", myTempNum)
	}

	fmt.Printf("myTempNum = %+v\n", myTempNum)

	return nil
}

Modify the Flow to execute 3 times.

main.go

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	// Run the flow
	times := 3
	for times > 0 {
		if err := flow1.Run(ctx); err != nil {
			fmt.Println("err: ", err)
		}
		times--
	}

	return
}

Output

./main
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
myTempNum = 1
myTempNum = 2
myTempNum = 3