Skip to content

Commit

Permalink
add a drop regex
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed May 18, 2016
1 parent 31bff18 commit e3383c9
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 16 deletions.
22 changes: 15 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ A sample TOML config file looks like this:
resume = true
resume-name = "default"
namespace-regex = "^mydb.mycollection$"
namespace-exclude-regex = "^mydb.ignorecollection$"
gtm-channel-size = 200


Expand All @@ -54,24 +55,31 @@ The following defaults are used for missing config values:
resume -> false
resume-name -> default
namespace-regex -> nil
namespace-exclude-regex -> nil
gtm-channel-size -> 100

When `resume` is true, monstache writes the timestamp of mongodb operations it has succefully synched to elasticsearch
to the collection monstache.monstache. It also reads this value from that collection when it starts in order to replay
events which it might have missed because monstache was stopped. monstache uses the value of resume-name as a key when
storing and retrieving timestamps. If resume is true but resume-name is not supplied this key defaults to 'default'.
to the collection `monstache.monstache`. It also reads this value from that collection when it starts in order to replay
events which it might have missed because monstache was stopped. monstache uses the value of `resume-name` as a key when
storing and retrieving timestamps. If resume is true but `resume-name` is not supplied this key defaults to `default`.

When `replay` is true, monstache replays all events from the beginning of the mongodb oplog and synchs them to elasticsearch.

When `resume` and `replay` are both true, monstache replays all events from the beginning of the mongodb oplog and synchs them
to elasticsearch and also writes the timestamp of processed event to monstache.monstache.
to elasticsearch and also writes the timestamp of processed event to `monstache.monstache`.

When neither `resume` nor `replay` are true, monstache reads the last timestamp in the oplog and starts listening for events
occurring after this timestamp. Timestamps are not written to monstache.monstache. This is the default behavior.
occurring after this timestamp. Timestamps are not written to `monstache.monstache`. This is the default behavior.

When `namespace-regex` is supplied this regex is tested against the namespace, `database.collection`, of the event. If
the regex matches monstache propogates the event to elasticsearch, otherwise it drops the event. By default monstache
processes events in all database and all collections with the exception of the reserverd database `monstache`.
the regex matches continues processing event filters, otherwise it drops the event. By default monstache
processes events in all database and all collections with the exception of the reserved database `monstache`, any
collections suffixed with `.chunks` and the system collections.

When `namespace-exclude-regex` is supplied this regex is tested against the namespace, `database.collection`, of the event. If
the regex matches monstache ignores the event, otherwise it continues processing event filters. By default monstache
processes events in all database and all collections with the exception of the reserved database `monstache`, any
collections suffixed with `.chunks` and the system collections.

When `gtm-channel-size` is supplied it controls the size of the go channels created for processing events. When many events
are processed at once a larger channel size may prevent blocking in gtm.
Expand Down
41 changes: 32 additions & 9 deletions monstache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"syscall"
)

var chunksRegex = regexp.MustCompile("\\.chunks$")
var systemsRegex = regexp.MustCompile("system\\..+$")
const mongoUrlDefault string = "localhost"
const resumeNameDefault string = "default"
const elasticMaxConnsDefault int = 10
Expand All @@ -23,7 +25,8 @@ type configOptions struct {
MongoUrl string `toml:"mongo-url"`
ElasticUrl string `toml:"elasticsearch-url"`
ResumeName string `toml:"resume-name"`
NamespaceRegex string `toml:"namespace-regex"`
NsRegex string `toml:"namespace-regex"`
NsExcludeRegex string `toml:"namespace-exclude-regex"`
Resume bool
Replay bool
ElasticMaxConns int `toml:"elasticsearch-max-conns"`
Expand Down Expand Up @@ -54,13 +57,28 @@ func NotMonstache(op *gtm.Op) bool {
return op.GetDatabase() != "monstache"
}

func NotChunks(op *gtm.Op) bool {
return !chunksRegex.MatchString(op.GetCollection())
}

func NotSystem(op *gtm.Op) bool {
return !systemsRegex.MatchString(op.GetCollection())
}

func FilterWithRegex(regex string) gtm.OpFilter {
var validNameSpace = regexp.MustCompile(regex)
return func(op *gtm.Op) bool {
var validNameSpace = regexp.MustCompile(regex)
return validNameSpace.MatchString(op.Namespace)
}
}

func FilterInverseWithRegex(regex string) gtm.OpFilter {
var invalidNameSpace = regexp.MustCompile(regex)
return func(op *gtm.Op) bool {
return !invalidNameSpace.MatchString(op.Namespace)
}
}

func SaveTimestamp(session *mgo.Session, op *gtm.Op, resumeName string) error {
col := session.DB("monstache").C("monstache")
doc := make(map[string]interface{})
Expand All @@ -78,7 +96,8 @@ func (configuration *configOptions) ParseCommandLineFlags() *configOptions {
flag.BoolVar(&configuration.Resume, "resume", false, "True to capture the last timestamp of this run and resume on a subsequent run")
flag.BoolVar(&configuration.Replay, "replay", false, "True to replay all events from the oplog and index them in elasticsearch")
flag.StringVar(&configuration.ResumeName, "resume-name", "", "Name under which to load/store the resume state. Defaults to 'default'")
flag.StringVar(&configuration.NamespaceRegex, "namespace-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which match are synched to elasticsearch")
flag.StringVar(&configuration.NsRegex, "namespace-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which match are synched to elasticsearch")
flag.StringVar(&configuration.NsRegex, "namespace-exclude-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which do not match are synched to elasticsearch")
flag.Parse()
return configuration
}
Expand Down Expand Up @@ -110,8 +129,8 @@ func (configuration *configOptions) LoadConfigFile() *configOptions {
if configuration.Resume && configuration.ResumeName == "" {
configuration.ResumeName = tomlConfig.ResumeName
}
if configuration.NamespaceRegex == "" {
configuration.NamespaceRegex = tomlConfig.NamespaceRegex
if configuration.NsRegex == "" {
configuration.NsRegex = tomlConfig.NsRegex
}
}
return configuration
Expand Down Expand Up @@ -162,6 +181,7 @@ func main() {
go func(mongo *mgo.Session, indexer *elastigo.BulkIndexer) {
<-sigs
mongo.Close()
indexer.Flush()
indexer.Stop()
done <- true
}(mongo, indexer)
Expand Down Expand Up @@ -189,11 +209,14 @@ func main() {
}

var filter gtm.OpFilter = nil
if configuration.NamespaceRegex != "" {
filter = gtm.ChainOpFilters(NotMonstache, FilterWithRegex(configuration.NamespaceRegex))
} else {
filter = NotMonstache
filterChain := []gtm.OpFilter{ NotMonstache, NotSystem, NotChunks }
if configuration.NsRegex != "" {
filterChain = append(filterChain, FilterWithRegex(configuration.NsRegex))
}
if configuration.NsExcludeRegex != "" {
filterChain = append(filterChain, FilterInverseWithRegex(configuration.NsExcludeRegex))
}
filter = gtm.ChainOpFilters(filterChain...)

ops, errs := gtm.Tail(mongo, &gtm.Options{
After: after,
Expand Down

0 comments on commit e3383c9

Please sign in to comment.