-
-
Notifications
You must be signed in to change notification settings - Fork 34
09 KisFlow Connector
Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/8-connector
Connector
represents a link to a resource, indicating that data must flow through a third-party storage engine. The Connector
will hold the basic information of the current third-party data storage engine.
type Connector interface {
// Init initializes the connection to the associated storage engine for the Connector
Init() error
// Call invokes the read/write operations of the external storage logic through the Connector
Call(ctx context.Context, flow Flow, args interface{}) (interface{}, error)
// GetId retrieves the ID of the Connector
GetId() string
// GetName retrieves the name of the Connector
GetName() string
// GetConfig retrieves the configuration information of the Connector
GetConfig() *config.KisConnConfig
// GetMetaData retrieves temporary data of the current Connector
GetMetaData(key string) interface{}
// SetMetaData sets temporary data for the current Connector
SetMetaData(key string, value interface{})
}
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
- kistype: conn indicates that the current configuration file is a Connector configuration file.
- cname: The name of the Connector, a unique identifier for the Connector, which is also required by the Function associated with the Connector.
- addrs: Addresses of the third-party medium managed by the Connector.
- type: Type of the third-party medium (redis, mysql, kafka, tidb, es).
- key: Identifier for the data storage of the third-party medium, such as the Redis Key, or the MySQL table, or the Kafka topic, etc.
- params: Default parameters carried by the current Connector, which can be retrieved during the Connector's scheduling process.
For a Connector's initialization action, KisFlow will call the Connector's initialization method only once, and its role is to initialize network connections, etc. Function prototype:
type ConnInit func(conn Connector) error
You can register a Connector's Init method as follows.
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
func InitConnDemo1(connector kis.Connector) error {
fmt.Println("===> Call Connector InitDemo1")
//config info
connConf := connector.GetConfig()
fmt.Println(connConf)
// init connector
return nil
}
Note: InitConnDemo1()
will only be executed once in the process.
Developers can also encapsulate some storage/read-write operations. If the Connector provides a scheduling process encapsulation (this is not a mandatory requirement), the encapsulation prototype is as follows:
type CaaS func(context.Context, Connector, Function, Flow, interface{}) (interface{}, error)
You can register a Connector Call method as follows.
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) (interface{}, error) {
fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)
fmt.Printf("Params = %+v\n", conn.GetConfig().Params)
fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)
return nil, nil
}
KisFlow can also use Connector to merge two Flows. Here is an example that combines two Flows to introduce the Connector interface and usage.
Data Flow Diagram The data flow diagram for this case is as follows:
Consider a student with four attributes:
Student ID: stu_id
Credit 1: score_1
Credit 2: score_2
Credit 3: score_3
Define Flow1
: CalStuAvgScore-1-2
, which calculates the average score (avg_score_1_2)
of a student's credit 1 (score_1)
and credit 2 (score_2)
.
Define Flow2
: CalStuAvgScore-3
, which calculates the average score of a student's credit 3 (score_3)
and the average of credit 1 and credit 2 (avg_score_1_2)
. The average of credit 1 and credit 2 is provided by Flow1
.
-
Flow1 Structure Flow1 consists of 4 Functions. The V (Function: VerifyStu) function validates the legality of the StuId. The C (Function: AvgStuScore12) function calculates the average score of credit 1 and credit 2. The S (Function: SaveScoreAvg12) function saves
avg_score_1_2
into Redis. The E (Function: PrintStuAvgScore) function prints the average score of credit 1 and credit 2. -
Flow2 Structure Flow2 consists of 4 Functions. The V (Function: VerifyStu) function validates the legality of the StuId. The L (Function: LoadScoreAvg12) function reads the average score of credit 1 and credit 2 calculated by Flow1. The C (Function: AvgStuScore3) function calculates the average score of the three subjects using credit 3 and the average of credit 1 and credit 2. The E (Function: PrintStuAvgScore) function prints the average scores of credit 1, credit 2, and credit 3.
https://github.com/aceld/kis-flow-usage/tree/main/8-connector
│ conn_init.go
│ faas_load_score_avg_1_2.go
│ faas_save_score_avg_1_2.go
│ faas_stu_score_avg_1_2.go
│ faas_stu_score_avg_3.go
│ faas_stu_score_avg_print.go
│ faas_stu_verify.go
│ main.go
│ Makefile
│ stu_proto.go
│
└─conf/
├─conn/
│ conn-Score12Cache.yml
│
├─flow/
│ flow-CalStuAvgScore-1-2.yml
│ flow-CalStuAvgScore-3.yml
│
└─func/
func-AvgStuScore-1-2.yml
func-AvgStuScore-3.yml
func-LoadScoreAvg-1-2.yml
func-PrintStuAvgScore.yml
func-SaveScoreAvg-1-2.yml
func-VerifyStu.yml
conf/conn/conn-Score12Cache.yml
kistype: conn
cname: Score12Cache
addrs: '127.0.0.1:6379'
type: redis
key: stu_score12_avg_
conf/func/func-VerifyStu.yml
kistype: func
fname: VerifyStu
fmode: Verify
source:
name: SourceStuScore
must:
- stu_id
conf/func/func-AvgStuScore-1-2.yml
kistype: func
fname: AvgStuScore12
fmode: Calculate
source:
name: SourceStuScore
must:
- stu_id
conf/func/func-SaveScoreAvg-1-2.yml
kistype: func
fname: SaveScoreAvg12
fmode: Save
source:
name: SourceStuScore
must:
- stu_id
option:
cname: Score12Cache
conf/func/func-PrintStuAvgScore.yml
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: SourceStuScore
must:
- stu_id
In Flow2, Function: VerifyStu and Function: PrintStuAvgScore are shared with Flow1.
conf/func/func-AvgStuScore-3.yml
kistype: func
fname: AvgStuScore3
fmode: Calculate
source:
name: SourceStuScore
must:
- stu_id
conf/func/func-LoadScoreAvg-1-2.yml
kistype: func
fname: LoadScoreAvg12
fmode: Load
source:
name: SourceStuScore
must:
- stu_id
option:
cname: Score12Cache
stu_proto.go
package main
type StuScore1_2 struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
}
type StuScoreAvg struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
type StuScore3 struct {
StuId int `json:"stu_id"`
AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg
Score3 int `json:"score_3"`
}
The Connector defined in this project: Score12Cache
, is a link resource associated with Redis. This Connector needs to provide an initialization method for starting KisFlow to initialize the connection.
conn_init.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/log"
"github.com/go-redis/redis/v8"
)
// type ConnInit func(conn Connector) error
func InitScore12Cache(connector kis.Connector) error {
fmt.Println("===> Call Connector InitScore12Cache")
// init Redis Conn Client
rdb := redis.NewClient(&redis.Options{
Addr: connector.GetConfig().AddrString, // Redis-Server address
Password: "", // password
DB: 0, // select db
})
// Ping test
pong, err := rdb.Ping(context.Background()).Result()
if err != nil {
log.Logger().ErrorF("Failed to connect to Redis: %v", err)
return err
}
fmt.Println("Connected to Redis:", pong)
// set rdb to connector
connector.SetMetaData("rdb", rdb)
return nil
}
Here, the successfully connected Redis instance is stored in the connector's cache variable "rdb".
// set rdb to connector
connector.SetMetaData("rdb", rdb)
faas_stu_verify.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type VerifyStuIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
}
func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
fmt.Printf("->Call Func VerifyStu\n")
for _, stu := range rows {
// Filter out invalid data
if stu.StuId < 0 || stu.StuId > 999 {
// Terminate the current Flow, and will not continue to execute the subsequent Functions of this Flow
return flow.Next(kis.ActionAbort)
}
}
return flow.Next(kis.ActionDataReuse)
}
VerifyStu()
is used to validate the data. If it does not meet the requirements, the data stream for this entry will be terminated. Finally, the data is reused and passed to the next step through flow.Next(kis.ActionDataReuse)
.
faas_save_score_avg_1_2.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn_1_2 struct {
serialize.DefaultSerialize
StuScore1_2
}
type AvgStuScoreOut_1_2 struct {
serialize.DefaultSerialize
StuScoreAvg
}
func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
fmt.Printf("->Call Func AvgStuScore12\n")
for _, row := range rows {
out := AvgStuScoreOut_1_2{
StuScoreAvg: StuScoreAvg{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2) / 2,
},
}
// Commit the result data
_ = flow.CommitRow(out)
}
return flow.Next()
}
AvgStuScore12()
calculates the average of score_1
and score_2
to get the calculated result avg_score
.
faas_save_score_avg_1_2.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
"github.com/go-redis/redis/v8"
"strconv"
)
type SaveStuScoreIn struct {
serialize.DefaultSerialize
StuScoreAvg
}
func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {
var rdb *redis.Client
// Get Redis Client
rdb = conn.GetMetaData("rdb").(*redis.Client)
// Set data to redis
pipe := rdb.Pipeline()
for _, score := range rows {
// make key
key := conn.GetConfig().Key + strconv.Itoa(score.StuId)
pipe.HMSet(context.Background(), key, map[string]interface{}{
"avg_score": score.AvgScore,
})
}
_, err := pipe.Exec(ctx)
if err != nil {
return err
}
return nil
}
func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
fmt.Printf("->Call Func SaveScoreScore12\n")
conn, err := flow.GetConnector()
if err != nil {
fmt.Printf("SaveScoreScore12(): GetConnector err = %s\n", err.Error())
return err
}
if BatchSetStuScores(ctx, conn, rows) != nil {
fmt.Printf("SaveScoreScore12(): BatchSetStuScores err = %s\n", err.Error())
return err
}
return flow.Next(kis.ActionDataReuse)
}
SaveScoreAvg12()
stores the data in Redis through the bound Connector with the key configured in the Connector. Finally, the source data is passed to the next Function.
faas_stu_score_avg_print.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName())
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return flow.Next()
}
PrintStuAvgScore()
prints the average score of the current student.
faas_load_score_avg_1_2.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
"github.com/go-redis/redis/v8"
"strconv"
)
type LoadStuScoreIn struct {
serialize.DefaultSerialize
StuScore3
}
type LoadStuScoreOut struct {
serialize.DefaultSerialize
StuScore3
}
func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {
var rdb *redis.Client
// Get Redis Client
rdb = conn.GetMetaData("rdb").(*redis.Client)
// make key
key := conn.GetConfig().Key + strconv.Itoa(stuId)
// get data from redis
result, err := rdb.HGetAll(ctx, key).Result()
if err != nil {
return 0, err
}
// get value
avgScoreStr, ok := result["avg_score"]
if !ok {
return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId)
}
// parse to float64
avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
if err != nil {
return 0, err
}
return avgScore, nil
}
func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
fmt.Printf("->Call Func LoadScoreAvg12\n")
conn, err := flow.GetConnector()
if err != nil {
fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error())
return err
}
for _, row := range rows {
stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
if err != nil {
fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error())
return err
}
out := LoadStuScoreOut{
StuScore3: StuScore3{
StuId: row.StuId,
Score3: row.Score3,
AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)
},
}
// commit result
_ = flow.CommitRow(out)
}
return flow.Next()
}
LoadScoreAvg12()
connects to the Redis resource linked by the bound Connector, reads the average scores of score_1
and score_2
using the key in the configuration, and then sends the source data from the upstream plus the newly read average scores of score_1
and score_2
to the downstream.
faas_stu_score_avg_3.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScore3In struct {
serialize.DefaultSerialize
StuScore3
}
type AvgStuScore3Out struct {
serialize.DefaultSerialize
StuScoreAvg
}
func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
fmt.Printf("->Call Func AvgStuScore3\n")
for _, row := range rows {
out := AvgStuScore3Out{
StuScoreAvg: StuScoreAvg{
StuId: row.StuId,
AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,
},
}
// Commit the result data
_ = flow.CommitRow(out)
}
return flow.Next()
}
AvgStuScore3()
calculates the average score of score_3
and the average scores of score_1
and score_2
read, and then recalculates the average score of the three subjects to get the final average score avg_score
.
main.go
func init() {
// Register functions
kis.Pool().FaaS("VerifyStu", VerifyStu)
kis.Pool().FaaS("AvgStuScore12", AvgStuScore12)
kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12)
kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12)
kis.Pool().FaaS("AvgStuScore3", AvgStuScore3)
// Register connectors
kis.Pool().CaaSInit("Score12Cache", InitScore12Cache)
}
main.go
package main
import (
"context"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
"sync"
)
func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {
// Submit data
_ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`)
_ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`)
// Run the flow
if err := flow.Run(ctx); err != nil {
return err
}
return nil
}
func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {
// Submit data
_ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`)
_ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`)
// Run the flow
if err := flow.Run(ctx); err != nil {
return err
}
return nil
}
func main() {
ctx := context.Background()
// Load Configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
// Run flow1
defer wg.Done()
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore12")
if flow1 == nil {
panic("flow1 is nil")
}
if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
panic(err)
}
}()
go func() {
defer wg.Done()
// Get the flow
flow2 := kis.Pool().GetFlow("CalStuAvgScore3")
if flow2 == nil {
panic("flow2 is nil")
}
// Run flow2
if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
panic(err)
}
}()
wg.Wait()
return
}
Two Goroutines are started to execute Flow1
and Flow2
respectively to calculate the final average scores of student 101
and student 102
.
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore12
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore3
->Call Func VerifyStu
->Call Func VerifyStu
->Call Func AvgStuScore12
->Call Func LoadScoreAvg12
->Call Func SaveScoreScore12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
stuid: [101], avg score: [95]
stuid: [102], avg score: [90]
->Call Func AvgStuScore12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
stuid: [101], avg score: [90]
stuid: [102], avg score: [83.33333333333333]
We can see that the average scores of score_1
, score_2
, and score_3
are calculated in Flow[CalStuAvgScore3].
For the creation and configuration of the Connector, it can also be constructed through the addition of methods, as shown below:
func main() {
ctx := context.Background()
// source
source := config.KisSource{
Name: "SourceStuScore",
Must: []string{"stu_id", "user_id"},
}
// Connector: Score12Cache (like: conf/conn/conn-Score12Cache.yml)
connScore12CacheConf := config.NewConnConfig("Score12Cache", "127.0.0.1:6379", common.REDIS, "stu_score12_avg_", nil)
if connScore12CacheConf == nil {
panic("connScore12CacheConf is nil")
}
// Function: VerifyStu(like: conf/func/func-VerifyStu.yml)
funcVerifyStuConf := config.NewFuncConfig("VerifyStu", common.V, &source, nil)
if funcVerifyStuConf == nil {
panic("funcVerifyStuConf is nil")
}
// Function: AvgStuScore12 (like: conf/func/func-AvgStuScore-1-2.yml)
funcAvgStuScore12Conf := config.NewFuncConfig("AvgStuScore12", common.C, &source, nil)
if funcAvgStuScore12Conf == nil {
panic("funcAvgStuScore12Conf is nil")
}
// Function: SaveScoreAvg12 (like: conf/func/func-SaveScoreAvg-1-2.yml)
funcSaveScoreAvg12Conf := config.NewFuncConfig("SaveScoreAvg12", common.S, &source, nil)
if funcSaveScoreAvg12Conf == nil {
panic("funcSaveScoreAvg12Conf is nil")
}
// ---> Add connector to function
if err := funcSaveScoreAvg12Conf.AddConnConfig(connScore12CacheConf); err != nil {
panic(err)
}
// Function: PrintStuAvgScore (like: conf/func/func-PrintStuAvgScore.yml)
funcPrintStuAvgScoreConf := config.NewFuncConfig("PrintStuAvgScore", common.E, &source, nil)
if funcPrintStuAvgScoreConf == nil {
panic("funcPrintStuAvgScoreConf is nil")
}
// Function: LoadScoreAvg12 (like: conf/func/func-LoadScoreAvg-1-2.yml)
funcLoadScoreAvg12Conf := config.NewFuncConfig("LoadScoreAvg12", common.L, &source, nil)
if funcLoadScoreAvg12Conf == nil {
panic("funcLoadScoreAvg12Conf is nil")
}
// ---> Add connector to function
if err := funcLoadScoreAvg12Conf.AddConnConfig(connScore12CacheConf); err != nil {
panic(err)
}
// Function: AvgStuScore3 (like: conf/func/func-AvgStuScore-3.yml)
funcAvgStuScore3Conf := config.NewFuncConfig("AvgStuScore3", common.C, &source, nil)
if funcAvgStuScore3Conf == nil {
panic("funcAvgStuScore3Conf is nil")
}
// Flow: CalStuAvgScore12 (like: conf/flow/flow-CalStuAvgScore-1-2.yml)
flowCalStuAvgScore12Conf := config.NewFlowConfig("CalStuAvgScore12", common.FlowEnable)
if flowCalStuAvgScore12Conf == nil {
panic("flowCalStuAvgScore12Conf is nil")
}
// Flow: CalStuAvgScore3 (like: conf/flow/flow-CalStuAvgScore-3.yml)
flowCalStuAvgScore3Conf := config.NewFlowConfig("CalStuAvgScore3", common.FlowEnable)
if flowCalStuAvgScore3Conf == nil {
panic("flowCalStuAvgScore3Conf is nil")
}
// Create a new flow1
flow1 := flow.NewKisFlow(flowCalStuAvgScore12Conf)
// Link Functions to flow1
if err := flow1.Link(funcVerifyStuConf, nil); err != nil {
panic(err)
}
if err := flow1.Link(funcAvgStuScore12Conf, nil); err != nil {
panic(err)
}
if err := flow1.Link(funcSaveScoreAvg12Conf, nil); err != nil {
panic(err)
}
if err := flow1.Link(funcPrintStuAvgScoreConf, nil); err != nil {
panic(err)
}
// Create a new flow2
flow2 := flow.NewKisFlow(flowCalStuAvgScore12Conf)
// Link Functions to flow1
if err := flow2.Link(funcVerifyStuConf, nil); err != nil {
panic(err)
}
if err := flow2.Link(funcLoadScoreAvg12Conf, nil); err != nil {
panic(err)
}
if err := flow2.Link(funcAvgStuScore3Conf, nil); err != nil {
panic(err)
}
if err := flow2.Link(funcPrintStuAvgScoreConf, nil); err != nil {
panic(err)
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
// run flow1
if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
panic(err)
}
}()
go func() {
defer wg.Done()
// run flow2
if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
panic(err)
}
}()
wg.Wait()
return
}