a DAG task engine based on asynq
- prepare asynq server mux
please refer to asynq's doc to figure out more config options.
srv := asynq.NewServer( asynq.RedisClientOpt{Addr: redisAddr}, asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, // Optionally specify multiple queues with different priority. Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, // See the godoc for other configuration options }, ) mux := asynq.NewServeMux()
- create dagflow service
svc, err := service.New(&types.Config{ Redis: types.RedisConfig{ Addr: "127.0.0.1:6379", Expire: 120, }, Store: types.StoreConfig{ Type: "redis", }, }, nil) if err != nil { log.Fatal("failed to create service", err) }
- create a flow object and register it to dagflow service
f, err := svc.NewFlow("f1") if err != nil { log.Fatal("failed to create flow", err) } if err = f.Node("n1", incOp); err != nil { log.Fatal("failed to create node", err) } // for complex dag, you can use RegisterFlowsWithDefinitor svc.RegisterFlows(mux, f)
- start asynq server
if err := srv.Run(mux); err != nil { log.Fatalf("could not run server: %v", err) }
- create dagflow service, same as step 2 in server side
svc, err := service.New(&types.Config{ Redis: types.RedisConfig{ Addr: "127.0.0.1:6379", Expire: 120, }, Store: types.StoreConfig{ Type: "redis", }, }, nil) if err != nil { log.Fatal("failed to create service", err) }
- create a flow object and register it to dagflow service, same as step 3 in server side except
mux
should be set tonil
f, err := svc.NewFlow("f1") if err != nil { log.Fatal("failed to create flow", err) } if err = f.Node("n1", incOp); err != nil { log.Fatal("failed to create node", err) } // for complex dag, you can use RegisterFlowsWithDefinitor svc.RegisterFlows(nil, f)
- submit dagflow tasks
svc.Submit("f1", []byte(`1`))
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.Node("n1", incOp); err != nil {
log.Fatal("failed to create node", err)
}
func prepareFlow(f *flow.Flow) error {
if err := f.Node("l1n1", incOp); err != nil {
return err
}
if err := f.Node("l2n1", incOp); err != nil {
return err
}
if err := f.Node("l2n2", decOp); err != nil {
return err
}
if err := f.Node("l3n1", mulOp, flow.WithAggregator(func (dataMap map[string][]byte) ([]byte, error) {
l2n1Result := dataMap["l2n1"]
l2n2Result := dataMap["l2n2"]
// do anything you want to construct input data for node l3n1
})); err != nil {
return err
}
if err := f.Edge("l1n1", "l2n1"); err != nil {
return err
}
if err := f.Edge("l1n1", "l2n2"); err != nil {
return err
}
if err := f.Edge("l2n1", "l3n1"); err != nil {
return err
}
if err := f.Edge("l2n2", "l3n1"); err != nil {
return err
}
return nil
}
SwitchNode is a special type node which works like switch case statment in golang
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.SwitchNode("n1", func(data []byte) string {
return "+"
}, map[string]flow.NodeFunc{
"+": incOp,
"-": decOp,
}); err != nil {
log.Fatal("failed to create node", err)
}