Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Refactor component #66

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/xhit/go-str2duration/v2 v2.0.0
go.uber.org/automaxprocs v0.0.0-20200415073007-b685be8c1c23
golang.org/x/net v0.0.0-20220111093109-d55c255bac03
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.26.0-rc.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down
183 changes: 183 additions & 0 deletions pkg/core/source/abstract/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package abstract

import (
"fmt"
"github.com/loggie-io/loggie/pkg/core/api"
"github.com/loggie-io/loggie/pkg/core/event"
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/pipeline"
)

type Source struct {
name string
typeName api.Type
eventPool *event.Pool
pipelineInfo pipeline.Info
context api.Context
productFunc api.ProductFunc

startFunc func()
stopFunc func()
commitFunc func(events []api.Event)
internalProductFunc func()
}

func ExtendsAbstractSource(info pipeline.Info, typeName api.Type) *Source {
return &Source{
typeName: typeName,
eventPool: info.EventPool,
pipelineInfo: info,
}
}

// ------------------------------------------------------------------------
// extension methods
// ------------------------------------------------------------------------

func (as *Source) Name() string {
return as.name
}

func (as *Source) PipelineName() string {
return as.pipelineInfo.PipelineName
}

func (as *Source) Epoch() pipeline.Epoch {
return as.pipelineInfo.Epoch
}

func (as *Source) PipelineInfo() pipeline.Info {
return as.pipelineInfo
}

func (as *Source) NewEvent() api.Event {
return as.eventPool.Get()
}

// ProductFunc only use in DoProduct()
func (as *Source) ProductFunc() api.ProductFunc {
return as.productFunc
}

// Send only use in DoProduct()
func (as *Source) Send(e api.Event) api.Result {
return as.productFunc(e)
}

// SendWithBody only use in DoProduct()
func (as *Source) SendWithBody(body []byte) api.Result {
e := as.NewEvent()
e.Fill(e.Meta(), e.Header(), body)
return as.Send(e)
}

// ------------------------------------------------------------------------
// implement methods of api.Source
// do not override
// ------------------------------------------------------------------------

func (as *Source) Category() api.Category {
return api.SOURCE
}

func (as *Source) Type() api.Type {
return as.typeName
}

func (as *Source) String() string {
return fmt.Sprintf("%s/%s/%s/%s", as.PipelineName(), as.Category(), as.Type(), as.Name())
}

func (as *Source) Init(context api.Context) {
as.name = context.Name()
as.context = context
}

func (as *Source) Start() {
log.Info("start source: %s", as.String())
if as.startFunc != nil {
as.startFunc()
}
log.Info("source has started: %s", as.String())
}

func (as *Source) Stop() {
log.Info("start stop source: %s", as.String())
if as.stopFunc != nil {
as.stopFunc()
}
log.Info("source has stopped: %s", as.String())
}

func (as *Source) Commit(events []api.Event) {
if as.commitFunc != nil {
as.commitFunc(events)
}
if len(events) == 0 {
return
}
// release events
as.eventPool.PutAll(events)
}

func (as *Source) ProductLoop(productFunc api.ProductFunc) {
as.productFunc = productFunc
log.Info("[%s] start product loop", as.String())
if as.internalProductFunc != nil {
go as.internalProductFunc()
}
}

// ------------------------------------------------------------------------
// optional override methods
// ------------------------------------------------------------------------

// Config A pointer to config or nil should be returned
func (as *Source) Config() interface{} {
return nil
}

func (as *Source) DoStart() {
}

func (as *Source) DoStop() {
}

func (as *Source) DoProduct() {
}

func (as *Source) DoCommit(events []api.Event) {
}

// ------------------------------------------------------------------------
// internal methods
// do not override
// ------------------------------------------------------------------------

func (as *Source) AbstractSource() *Source {
return as
}

type SourceConvert interface {
api.Component
AbstractSource() *Source

DoStart()
DoStop()
DoProduct()
DoCommit(events []api.Event)
}

type SourceRegisterFactory func(info pipeline.Info) SourceConvert

func SourceRegister(t api.Type, factory SourceRegisterFactory) {
pipeline.Register(api.SOURCE, t, func(info pipeline.Info) api.Component {
convert := factory(info)
source := convert.AbstractSource()
source.startFunc = convert.DoStart
source.stopFunc = convert.DoStop
source.internalProductFunc = convert.DoProduct
source.commitFunc = convert.DoCommit
return convert
})
}
64 changes: 14 additions & 50 deletions pkg/source/dev/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ package dev

import (
"context"
"fmt"
"math/rand"

"github.com/loggie-io/loggie/pkg/core/api"
"github.com/loggie-io/loggie/pkg/core/event"
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/core/source/abstract"
"github.com/loggie-io/loggie/pkg/pipeline"
"golang.org/x/time/rate"
)
Expand All @@ -33,75 +30,42 @@ const Type = "dev"
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func init() {
pipeline.Register(api.SOURCE, Type, makeSource)
abstract.SourceRegister(Type, makeSource)
}

func makeSource(info pipeline.Info) api.Component {
func makeSource(info pipeline.Info) abstract.SourceConvert {
return &Dev{
stop: info.Stop,
config: &Config{},
eventPool: info.EventPool,
Source: abstract.ExtendsAbstractSource(info, Type),
stop: info.Stop,
config: &Config{},
}
}

type Dev struct {
name string
stop bool
eventPool *event.Pool
config *Config
limiter *rate.Limiter
content []byte
*abstract.Source
stop bool
config *Config
limiter *rate.Limiter
content []byte
}

func (d *Dev) Config() interface{} {
return d.config
}

func (d *Dev) Category() api.Category {
return api.SOURCE
}

func (d *Dev) Type() api.Type {
return Type
}

func (d *Dev) String() string {
return fmt.Sprintf("%s/%s", api.SOURCE, Type)
}

func (d *Dev) Init(context api.Context) {
d.name = context.Name()
}

func (d *Dev) Start() {
func (d *Dev) DoStart() {
d.limiter = rate.NewLimiter(rate.Limit(d.config.Qps), d.config.Qps)
d.content = make([]byte, d.config.ByteSize)
for i := range d.content {
d.content[i] = letterBytes[rand.Intn(len(letterBytes))]
}
}

func (d *Dev) Stop() {

}

func (d *Dev) Product() api.Event {
return nil
}

func (d *Dev) ProductLoop(productFunc api.ProductFunc) {
func (d *Dev) DoProduct() {
ctx := context.Background()
log.Info("%s start product loop", d.String())
content := d.content
for !d.stop {
header := make(map[string]interface{})
e := d.eventPool.Get()
e.Fill(e.Meta(), header, content)
d.limiter.Wait(ctx)
productFunc(e)
d.SendWithBody(content)
}
}

func (d *Dev) Commit(events []api.Event) {
d.eventPool.PutAll(events)
}
Loading