Skip to content

09 KisFlow Connector

刘丹冰 edited this page Apr 17, 2024 · 2 revisions

Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/8-connector

Connector represents a link to a resource, indicating that data must flow through a third-party storage engine. The Connector will hold the basic information of the current third-party data storage engine.

1. Connector interface

type Connector interface {
	// Init initializes the connection to the associated storage engine for the Connector
	Init() error
	// Call invokes the read/write operations of the external storage logic through the Connector
	Call(ctx context.Context, flow Flow, args interface{}) (interface{}, error)
	// GetId retrieves the ID of the Connector
	GetId() string
	// GetName retrieves the name of the Connector
	GetName() string
	// GetConfig retrieves the configuration information of the Connector
	GetConfig() *config.KisConnConfig
	// GetMetaData retrieves temporary data of the current Connector
	GetMetaData(key string) interface{}
	// SetMetaData sets temporary data for the current Connector
	SetMetaData(key string, value interface{})
}

2.Connector Config

kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
  • kistype: conn indicates that the current configuration file is a Connector configuration file.
  • cname: The name of the Connector, a unique identifier for the Connector, which is also required by the Function associated with the Connector.
  • addrs: Addresses of the third-party medium managed by the Connector.
  • type: Type of the third-party medium (redis, mysql, kafka, tidb, es).
  • key: Identifier for the data storage of the third-party medium, such as the Redis Key, or the MySQL table, or the Kafka topic, etc.
  • params: Default parameters carried by the current Connector, which can be retrieved during the Connector's scheduling process.

3. Connector Init (Connector Initialization)

For a Connector's initialization action, KisFlow will call the Connector's initialization method only once, and its role is to initialize network connections, etc. Function prototype:

type ConnInit func(conn Connector) error

3.1 Register Connector Init

You can register a Connector's Init method as follows.

kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
func InitConnDemo1(connector kis.Connector) error {
	fmt.Println("===> Call Connector InitDemo1")
	//config info
	connConf := connector.GetConfig()

	fmt.Println(connConf)

	// init connector

	return nil
}

Note: InitConnDemo1() will only be executed once in the process.

4. Connector Call (Encapsulation of Connector Execution Logic)

Developers can also encapsulate some storage/read-write operations. If the Connector provides a scheduling process encapsulation (this is not a mandatory requirement), the encapsulation prototype is as follows:

type CaaS func(context.Context, Connector, Function, Flow, interface{}) (interface{}, error)

4.1 Register Connector Call

You can register a Connector Call method as follows.

kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) (interface{}, error) {
	fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
		flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

	fmt.Printf("Params = %+v\n", conn.GetConfig().Params)

	fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

	return nil, nil
}

5. Case Demo Application Example (Merge Flow with Connector)

KisFlow can also use Connector to merge two Flows. Here is an example that combines two Flows to introduce the Connector interface and usage.

Data Flow Diagram The data flow diagram for this case is as follows:

yuque_mind (5)

5.1 Case Introduction

Consider a student with four attributes:

Student ID: stu_id
Credit 1: score_1
Credit 2: score_2
Credit 3: score_3

Define Flow1: CalStuAvgScore-1-2, which calculates the average score (avg_score_1_2) of a student's credit 1 (score_1) and credit 2 (score_2). Define Flow2: CalStuAvgScore-3, which calculates the average score of a student's credit 3 (score_3) and the average of credit 1 and credit 2 (avg_score_1_2). The average of credit 1 and credit 2 is provided by Flow1.

  • Flow1 Structure Flow1 consists of 4 Functions. The V (Function: VerifyStu) function validates the legality of the StuId. The C (Function: AvgStuScore12) function calculates the average score of credit 1 and credit 2. The S (Function: SaveScoreAvg12) function saves avg_score_1_2 into Redis. The E (Function: PrintStuAvgScore) function prints the average score of credit 1 and credit 2.

  • Flow2 Structure Flow2 consists of 4 Functions. The V (Function: VerifyStu) function validates the legality of the StuId. The L (Function: LoadScoreAvg12) function reads the average score of credit 1 and credit 2 calculated by Flow1. The C (Function: AvgStuScore3) function calculates the average score of the three subjects using credit 3 and the average of credit 1 and credit 2. The E (Function: PrintStuAvgScore) function prints the average scores of credit 1, credit 2, and credit 3.

5.2 Case Project Directory

https://github.com/aceld/kis-flow-usage/tree/main/8-connector

│  conn_init.go
│  faas_load_score_avg_1_2.go
│  faas_save_score_avg_1_2.go
│  faas_stu_score_avg_1_2.go
│  faas_stu_score_avg_3.go
│  faas_stu_score_avg_print.go
│  faas_stu_verify.go
│  main.go
│  Makefile
│  stu_proto.go
│
└─conf/
    ├─conn/
    │      conn-Score12Cache.yml
    │
    ├─flow/
    │      flow-CalStuAvgScore-1-2.yml
    │      flow-CalStuAvgScore-3.yml
    │
    └─func/
            func-AvgStuScore-1-2.yml
            func-AvgStuScore-3.yml
            func-LoadScoreAvg-1-2.yml
            func-PrintStuAvgScore.yml
            func-SaveScoreAvg-1-2.yml
            func-VerifyStu.yml

5.3 Configuration

5.3.1 Connector Configuration

conf/conn/conn-Score12Cache.yml

kistype: conn
cname: Score12Cache
addrs: '127.0.0.1:6379'
type: redis
key: stu_score12_avg_

5.3.2 Function Configuration in Flow1

conf/func/func-VerifyStu.yml

kistype: func
fname: VerifyStu
fmode: Verify
source:
  name: SourceStuScore
  must:
    - stu_id

conf/func/func-AvgStuScore-1-2.yml

kistype: func
fname: AvgStuScore12
fmode: Calculate
source:
    name: SourceStuScore
    must:
        - stu_id

conf/func/func-SaveScoreAvg-1-2.yml

kistype: func
fname: SaveScoreAvg12
fmode: Save
source:
    name: SourceStuScore
    must:
        - stu_id
option:
    cname: Score12Cache

conf/func/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
    name: SourceStuScore
    must:
        - stu_id

5.3.3 Function Configuration in Flow2

In Flow2, Function: VerifyStu and Function: PrintStuAvgScore are shared with Flow1.

conf/func/func-AvgStuScore-3.yml

kistype: func
fname: AvgStuScore3
fmode: Calculate
source:
    name: SourceStuScore
    must:
        - stu_id

conf/func/func-LoadScoreAvg-1-2.yml

kistype: func
fname: LoadScoreAvg12
fmode: Load
source:
    name: SourceStuScore
    must:
        - stu_id
option:
    cname: Score12Cache

5.4 Basic Data Protocol

stu_proto.go

package main

type StuScore1_2 struct {
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
}

type StuScoreAvg struct {
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

type StuScore3 struct {
	StuId      int     `json:"stu_id"`
	AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg
	Score3     int     `json:"score_3"`
}

5.4.1 Connector Init

The Connector defined in this project: Score12Cache, is a link resource associated with Redis. This Connector needs to provide an initialization method for starting KisFlow to initialize the connection.

conn_init.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/log"
	"github.com/go-redis/redis/v8"
)

// type ConnInit func(conn Connector) error

func InitScore12Cache(connector kis.Connector) error {
	fmt.Println("===> Call Connector InitScore12Cache")

	// init Redis Conn Client
	rdb := redis.NewClient(&redis.Options{
		Addr:     connector.GetConfig().AddrString, // Redis-Server address
		Password: "",                               // password
		DB:       0,                                // select db
	})

	// Ping test
	pong, err := rdb.Ping(context.Background()).Result()
	if err != nil {
		log.Logger().ErrorF("Failed to connect to Redis: %v", err)
		return err
	}
	fmt.Println("Connected to Redis:", pong)

	// set rdb to connector
	connector.SetMetaData("rdb", rdb)

	return nil
}

Here, the successfully connected Redis instance is stored in the connector's cache variable "rdb".

	// set rdb to connector
	connector.SetMetaData("rdb", rdb)

5.5 FaaS Implementation

5.5.1 Function(V): VerifyStu

faas_stu_verify.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
	serialize.DefaultSerialize
	StuId int `json:"stu_id"`
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
	fmt.Printf("->Call Func VerifyStu\n")

	for _, stu := range rows {
		// Filter out invalid data
		if stu.StuId < 0 || stu.StuId > 999 {
			// Terminate the current Flow, and will not continue to execute the subsequent Functions of this Flow
			return flow.Next(kis.ActionAbort)
		}
	}

	return flow.Next(kis.ActionDataReuse)
}

VerifyStu() is used to validate the data. If it does not meet the requirements, the data stream for this entry will be terminated. Finally, the data is reused and passed to the next step through flow.Next(kis.ActionDataReuse).

5.5.2 Function(C): AvgStuScore12

faas_save_score_avg_1_2.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn_1_2 struct {
	serialize.DefaultSerialize
	StuScore1_2
}

type AvgStuScoreOut_1_2 struct {
	serialize.DefaultSerialize
	StuScoreAvg
}

func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
	fmt.Printf("->Call Func AvgStuScore12\n")

	for _, row := range rows {

		out := AvgStuScoreOut_1_2{
			StuScoreAvg: StuScoreAvg{
				StuId:    row.StuId,
				AvgScore: float64(row.Score1+row.Score2) / 2,
			},
		}

		// Commit the result data
		_ = flow.CommitRow(out)
	}

	return flow.Next()
}

AvgStuScore12() calculates the average of score_1 and score_2 to get the calculated result avg_score.

5.5.3 Function(S): SaveScoreAvg12

faas_save_score_avg_1_2.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
	"github.com/go-redis/redis/v8"
	"strconv"
)

type SaveStuScoreIn struct {
	serialize.DefaultSerialize
	StuScoreAvg
}

func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {

	var rdb *redis.Client

	// Get Redis Client
	rdb = conn.GetMetaData("rdb").(*redis.Client)

	// Set data to redis
	pipe := rdb.Pipeline()

	for _, score := range rows {
		// make key
		key := conn.GetConfig().Key + strconv.Itoa(score.StuId)

		pipe.HMSet(context.Background(), key, map[string]interface{}{
			"avg_score": score.AvgScore,
		})
	}

	_, err := pipe.Exec(ctx)
	if err != nil {
		return err
	}

	return nil
}

func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
	fmt.Printf("->Call Func SaveScoreScore12\n")

	conn, err := flow.GetConnector()
	if err != nil {
		fmt.Printf("SaveScoreScore12(): GetConnector err = %s\n", err.Error())
		return err
	}

	if BatchSetStuScores(ctx, conn, rows) != nil {
		fmt.Printf("SaveScoreScore12(): BatchSetStuScores err = %s\n", err.Error())
		return err
	}

	return flow.Next(kis.ActionDataReuse)
}

SaveScoreAvg12() stores the data in Redis through the bound Connector with the key configured in the Connector. Finally, the source data is passed to the next Function.

5.5.4 Function(E): PrintStuAvgScore

faas_stu_score_avg_print.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
	fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName())

	for _, row := range rows {
		fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
	}

	return flow.Next()
}

PrintStuAvgScore() prints the average score of the current student.

5.5.5 Function(L): LoadScoreAvg12

faas_load_score_avg_1_2.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
	"github.com/go-redis/redis/v8"
	"strconv"
)

type LoadStuScoreIn struct {
	serialize.DefaultSerialize
	StuScore3
}

type LoadStuScoreOut struct {
	serialize.DefaultSerialize
	StuScore3
}

func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {

	var rdb *redis.Client

	// Get Redis Client
	rdb = conn.GetMetaData("rdb").(*redis.Client)

	// make key
	key := conn.GetConfig().Key + strconv.Itoa(stuId)

	// get data from redis
	result, err := rdb.HGetAll(ctx, key).Result()
	if err != nil {
		return 0, err
	}

	// get value
	avgScoreStr, ok := result["avg_score"]
	if !ok {
		return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId)
	}

	// parse to float64
	avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
	if err != nil {
		return 0, err
	}

	return avgScore, nil
}

func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
	fmt.Printf("->Call Func LoadScoreAvg12\n")

	conn, err := flow.GetConnector()
	if err != nil {
		fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error())
		return err
	}

	for _, row := range rows {
		stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
		if err != nil {
			fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error())
			return err
		}

		out := LoadStuScoreOut{
			StuScore3: StuScore3{
				StuId:      row.StuId,
				Score3:     row.Score3,
				AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)
			},
		}

		// commit result
		_ = flow.CommitRow(out)
	}

	return flow.Next()
}

LoadScoreAvg12() connects to the Redis resource linked by the bound Connector, reads the average scores of score_1 and score_2 using the key in the configuration, and then sends the source data from the upstream plus the newly read average scores of score_1 and score_2 to the downstream.

5.5.6 Function(C): AvgStuScore3

faas_stu_score_avg_3.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type AvgStuScore3In struct {
	serialize.DefaultSerialize
	StuScore3
}

type AvgStuScore3Out struct {
	serialize.DefaultSerialize
	StuScoreAvg
}

func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
	fmt.Printf("->Call Func AvgStuScore3\n")

	for _, row := range rows {

		out := AvgStuScore3Out{
			StuScoreAvg: StuScoreAvg{
				StuId:    row.StuId,
				AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,
			},
		}

		// Commit the result data
		_ = flow.CommitRow(out)
	}

	return flow.Next()
}

AvgStuScore3() calculates the average score of score_3 and the average scores of score_1 and score_2 read, and then recalculates the average score of the three subjects to get the final average score avg_score.

5.6 Register FaaS & CaaSInit/CaaS

main.go

func init() {
	// Register functions
	kis.Pool().FaaS("VerifyStu", VerifyStu)
	kis.Pool().FaaS("AvgStuScore12", AvgStuScore12)
	kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
	kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12)
	kis.Pool().FaaS("AvgStuScore3", AvgStuScore3)

	// Register connectors
	kis.Pool().CaaSInit("Score12Cache", InitScore12Cache)
}

5.7 Main Process

main.go

package main

import (
	"context"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
	"sync"
)

func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {

	// Submit data
	_ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`)
	_ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`)

	// Run the flow
	if err := flow.Run(ctx); err != nil {
		return err
	}

	return nil
}

func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {

	// Submit data
	_ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`)
	_ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`)

	// Run the flow
	if err := flow.Run(ctx); err != nil {
		return err
	}

	return nil
}

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		// Run flow1
		defer wg.Done()
		// Get the flow
		flow1 := kis.Pool().GetFlow("CalStuAvgScore12")
		if flow1 == nil {
			panic("flow1 is nil")
		}

		if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
			panic(err)
		}
	}()

	go func() {
		defer wg.Done()
		// Get the flow
		flow2 := kis.Pool().GetFlow("CalStuAvgScore3")
		if flow2 == nil {
			panic("flow2 is nil")
		}

		// Run flow2
		if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
			panic(err)
		}
	}()

	wg.Wait()

	return
}

Two Goroutines are started to execute Flow1 and Flow2 respectively to calculate the final average scores of student 101 and student 102.

5.8 Running Results

===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore12
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore3
->Call Func VerifyStu
->Call Func VerifyStu
->Call Func AvgStuScore12
->Call Func LoadScoreAvg12
->Call Func SaveScoreScore12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
stuid: [101], avg score: [95]
stuid: [102], avg score: [90]
->Call Func AvgStuScore12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
stuid: [101], avg score: [90]
stuid: [102], avg score: [83.33333333333333]

We can see that the average scores of score_1, score_2, and score_3 are calculated in Flow[CalStuAvgScore3].

5.9 Dynamically Configuring Connector through Interface

For the creation and configuration of the Connector, it can also be constructed through the addition of methods, as shown below:

func main() {
	ctx := context.Background()

	// source
	source := config.KisSource{
		Name: "SourceStuScore",
		Must: []string{"stu_id", "user_id"},
	}

	// Connector: Score12Cache (like: conf/conn/conn-Score12Cache.yml)
	connScore12CacheConf := config.NewConnConfig("Score12Cache", "127.0.0.1:6379", common.REDIS, "stu_score12_avg_", nil)
	if connScore12CacheConf == nil {
		panic("connScore12CacheConf is nil")
	}

	// Function: VerifyStu(like: conf/func/func-VerifyStu.yml)
	funcVerifyStuConf := config.NewFuncConfig("VerifyStu", common.V, &source, nil)
	if funcVerifyStuConf == nil {
		panic("funcVerifyStuConf is nil")
	}

	// Function: AvgStuScore12 (like: conf/func/func-AvgStuScore-1-2.yml)
	funcAvgStuScore12Conf := config.NewFuncConfig("AvgStuScore12", common.C, &source, nil)
	if funcAvgStuScore12Conf == nil {
		panic("funcAvgStuScore12Conf is nil")
	}

	// Function: SaveScoreAvg12 (like: conf/func/func-SaveScoreAvg-1-2.yml)
	funcSaveScoreAvg12Conf := config.NewFuncConfig("SaveScoreAvg12", common.S, &source, nil)
	if funcSaveScoreAvg12Conf == nil {
		panic("funcSaveScoreAvg12Conf is nil")
	}

	// ---> Add connector to function
	if err := funcSaveScoreAvg12Conf.AddConnConfig(connScore12CacheConf); err != nil {
		panic(err)
	}

	// Function: PrintStuAvgScore (like: conf/func/func-PrintStuAvgScore.yml)
	funcPrintStuAvgScoreConf := config.NewFuncConfig("PrintStuAvgScore", common.E, &source, nil)
	if funcPrintStuAvgScoreConf == nil {
		panic("funcPrintStuAvgScoreConf is nil")
	}

	// Function: LoadScoreAvg12 (like: conf/func/func-LoadScoreAvg-1-2.yml)
	funcLoadScoreAvg12Conf := config.NewFuncConfig("LoadScoreAvg12", common.L, &source, nil)
	if funcLoadScoreAvg12Conf == nil {
		panic("funcLoadScoreAvg12Conf is nil")
	}

	// ---> Add connector to function
	if err := funcLoadScoreAvg12Conf.AddConnConfig(connScore12CacheConf); err != nil {
		panic(err)
	}

	// Function: AvgStuScore3 (like: conf/func/func-AvgStuScore-3.yml)
	funcAvgStuScore3Conf := config.NewFuncConfig("AvgStuScore3", common.C, &source, nil)
	if funcAvgStuScore3Conf == nil {
		panic("funcAvgStuScore3Conf is nil")
	}

	// Flow: CalStuAvgScore12 (like: conf/flow/flow-CalStuAvgScore-1-2.yml)
	flowCalStuAvgScore12Conf := config.NewFlowConfig("CalStuAvgScore12", common.FlowEnable)
	if flowCalStuAvgScore12Conf == nil {
		panic("flowCalStuAvgScore12Conf is nil")
	}

	// Flow: CalStuAvgScore3 (like: conf/flow/flow-CalStuAvgScore-3.yml)
	flowCalStuAvgScore3Conf := config.NewFlowConfig("CalStuAvgScore3", common.FlowEnable)
	if flowCalStuAvgScore3Conf == nil {
		panic("flowCalStuAvgScore3Conf is nil")
	}

	// Create a new flow1
	flow1 := flow.NewKisFlow(flowCalStuAvgScore12Conf)

	// Link Functions to flow1
	if err := flow1.Link(funcVerifyStuConf, nil); err != nil {
		panic(err)
	}
	if err := flow1.Link(funcAvgStuScore12Conf, nil); err != nil {
		panic(err)
	}
	if err := flow1.Link(funcSaveScoreAvg12Conf, nil); err != nil {
		panic(err)
	}
	if err := flow1.Link(funcPrintStuAvgScoreConf, nil); err != nil {
		panic(err)
	}

	// Create a new flow2
	flow2 := flow.NewKisFlow(flowCalStuAvgScore12Conf)

	// Link Functions to flow1
	if err := flow2.Link(funcVerifyStuConf, nil); err != nil {
		panic(err)
	}
	if err := flow2.Link(funcLoadScoreAvg12Conf, nil); err != nil {
		panic(err)
	}
	if err := flow2.Link(funcAvgStuScore3Conf, nil); err != nil {
		panic(err)
	}
	if err := flow2.Link(funcPrintStuAvgScoreConf, nil); err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		// run flow1
		if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
			panic(err)
		}
	}()

	go func() {
		defer wg.Done()
		// run flow2
		if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
			panic(err)
		}
	}()

	wg.Wait()

	return
}