From fdd7abf7bfe85d0143425eeb83176c5eb82e1eab Mon Sep 17 00:00:00 2001 From: zzt Date: Sat, 8 Sep 2018 17:14:05 +0800 Subject: [PATCH] update readme & sample configs. --- README.md | 342 +++++++++++++----- .../input/mysql/meta/ConsumerSchemaMeta.java | 2 +- src/main/resources/application.yml | 4 - src/main/resources/config.yml | 1 - src/test/resources/sample-consumer.yml | 18 +- src/test/resources/sample-consumer2.yml | 65 +++- src/test/resources/sample-producer.yml | 25 +- src/test/resources/syncer-config.yml | 17 + 8 files changed, 350 insertions(+), 124 deletions(-) create mode 100644 src/test/resources/syncer-config.yml diff --git a/README.md b/README.md index eac8f626..833ada8b 100644 --- a/README.md +++ b/README.md @@ -18,15 +18,19 @@ - Checkpoint: Consumer module remember where we leave, try to not miss data if syncer shutdown in accident - Retry: If output channel fail to send to output target, retry until success or write to failure log - Failure Log: If retry exceed configured num, write item to failure log for human recheck -- Event Scheduler: to solve *Order problem* between events +- Event Scheduler: to solve *Order Problem* between events which has unchanged primary key - `mod`: `mod` integral primary key to make same row change always handled in order; - `hash`: hash primary key of data row, then `mod` hash value to schedule -- default value now; - `direct`: - - If your datasource has only insert operation, you can choose this scheduler, which is faster; - - *No order promise* for datasource with insert/update/delete, higher output rate if you can endure some inconsistency; + - If your data source has only insert operation, you can choose this scheduler, which is faster; + - *No order promise* for data source with insert/update/delete, higher output rate if you can endure some inconsistency; +--- + +If you are changing the `id` of event, it always means you are doing joining like I do, which is no +way to make consistency promise because Syncer can only provide 'at least once' semantic -### Input +### Input -- DataSource - Support listening to both MySQL & MongoDB & DRDS of Aliyun (https://www.aliyun.com/product/drds) - MySQL master source filter: @@ -34,11 +38,13 @@ - Table name filter - Interested column filter - automatic primary key detection and set into `id` - - If a table match multiple schema & table (because the usage of regex), a exception will be thrown + - If a table match multiple schema & table (because the usage of regex), an error message will be logged and + syncer will use any one that match filter column - If an event go through column filter, and only primary key is left: - - If event type is UPDATE_ROWS, then discard this event -- because not support update id now; + - If event type is `UPDATE`, then discard this event -- because not support update id now; - Other event type, keep it. - - Support reading from binlog file to do data recovering in case of loss of data + - Support reading from binlog file to do data recovering in case of loss of data (`syncer.producer.input.masters[x].file`) + - Support specify binlog file/position to start reading (`input.masters[x].syncMeta`) - MongoDB master source filter: - Version: 3.x - Schema filter, support regex @@ -47,18 +53,28 @@ - If an event match multiple schema & table, we will use the first specific match to filter/output, i.e. the specific schema config will override the regex schema config - If an event go through column filter, and only primary key is left: - - If event type is `UPDATE_ROWS`, then discard this event -- because not support update id now; + - If event type is `UPDATE`, then discard this event -- because not support update id now; - Other event type, keep it. -- Remember start file/position of binlog/oplog, and resume from where we leave so as to avoid any data loss - - More than once: we can ensure the at least once semantics now, so you need to make sure your `SyncData` +- DRDS: + - Same config as MySQL, but need to connect directly to RDS's MySQL because DRDS not support binlog dump + - Remember to fetch partition key in `rowName` + +- Remember where we leave last time by writing file/position of binlog/oplog, and resume from there so as to avoid any data loss + - More than once (at-least-once): we can ensure the at least once semantics now, so you need to make sure your `SyncData` is idempotent and your destination can handle it. Counterexample: a table without primary key definitely can't handle it and cause duplicate data soon or later. -- Multiple consumer can share a common connection to same data source, i.e. MySQL/MongoDB +- Multiple consumer can share a common connection to same data source, i.e. MySQL/MongoDB, to reduce the +burden of remote master + +--- +After data items come out from `Input` module, it is converted to `SyncData`(s) -- the abstraction of +a single data change. In other words, a single binlog item may contain multiple line change and convert +to multiple `SyncData`s. -### Filter +### Filter -- Operation -Manipulate `SyncData` through (for more details, see input part of *Pipelinie Config*): +Manipulate `SyncData` via (for more details, see input part of *[Consumer Pipeline Config](#consumer_config)*): - `if` - `switcher` @@ -80,12 +96,15 @@ Manipulate `SyncData` through (for more details, see input part of *Pipelinie Co - `records` - `extra` -### Output +### Output -- DataSink + - Elasticsearch - Version: 5.x - Bulk operation - Update/Delete documents by `UpdateByQuery` or `DeleteByQuery` - Join/merge documents from different source when push to ES[1](#join_in_es) + - ExtraQuery: do extra query to fetch extra needed info + - Support multiple extra dependent query via special mark `$var$` - One to many relationship (parent-child relationship in ES)for document in different index - Self referential relationship handle @@ -96,17 +115,6 @@ Manipulate `SyncData` through (for more details, see input part of *Pipelinie Co [1]: Be careful about this feature, it may affect your performance -## Usage - -- MySQL config - - binlog_format: row - - binlog_row_image: full -- MongoDB config: - - (optional) update `bind_ip` to allow listens for connections from applications on configured addresses. - - start with enable replication set: - - `mongod --replSet myapp` - - Or use docker: `docker run -d --name mongodb -p 27017:27017 -v /root/mongodb-container/db:/data/db mongo:3.2 mongod --replSet chat` - - init replication set in shell: `rs.initiate()` ### Limitation - MySQL: @@ -115,87 +123,231 @@ Manipulate `SyncData` through (for more details, see input part of *Pipelinie Co - Not support update primary key - Only support update/delete by query exact value, i.e. no support query analyzed field (`text` query when update) -## Notice +### Notice - Don't update/delete use `syncer` and other way (REST api or Java api) at the same time, it may cause version conflict and fail the change - Update/Delete by query will be executed at once, i.e. will not be buffered or use batch +## Use Syncer -## Producer Data Source Config -- MySQL master connection -- Mongo master connection +### Preparation -## Consumer Pipeline Config +- MySQL config + - binlog_format: row + - binlog_row_image: full +- MongoDB config: + - (optional) update `bind_ip` to allow listens for connections from applications on configured addresses. + - start with enable replication set: + - `mongod --replSet myapp` + - Or use docker: `docker run -d --name mongodb -p 27017:27017 -v /root/mongodb-container/db:/data/db mongo:3.2 mongod --replSet chat` + - init replication set in shell: `rs.initiate()` -### Input -- MySQL master connection: specify the master wanting to listen to -- Mongo master connection: specify the master wanting to listen to -### Filter +### Producer Data Source Config +- `syncer.producer.input.masters[x]` + - `type`: MySQL, Mongo + - `connection`: `ip`, `address`, `port`, `user`, `password`, `passwordFile` + - `file`: absolute path to binlog file -- statement: implemented by Spring EL, can call all public method in `SyncData` -- switcher +```yml + +syncer.producer.input: + masters: + - connection: + address: ${HOST_ADDRESS} + port: 3306 + user: xxx + password: yyy + + - connection: + address: ${HOST_ADDRESS} + port: 27018 + type: mongo +``` +### Consumer Pipeline Config + +#### Input +- `input.master[x]`: + - `type`: same as producer + - `connection`: [same as producer](#connection) + - `syncMeta`: + - `binlogFilename`: string name of remote master's binlog file name + - `binlogPosition`: position you want to start listening + - `schemas[x]`: + - `name`: schema name, allow regex + - `tables[x]`: + - `name`: entity name + - `rowName`: entity fields list + - `scheduler`: + + +#### Filter + +This part is implemented by [Spring EL](https://docs.spring.io/spring/docs/5.0.0.M5/spring-framework-reference/html/expressions.html), i.e. you can use any syntax Spring EL supported +if I didn't listed. + +- `statement`: list of String code to be executed. + - e.g. + ```yml + - statement: ["#type=table", "isWrite()"] + ``` +- `switcher` - support `default` case - only execute one case -- foreach: in most cases, you can use [Spring EL's collection projection](https://docs.spring.io/spring/docs/3.0.x/reference/expressions.html) rather than this feature -- if - - new: create a new event (or a bunch) and cp value & execute statement - - drop - - statement: same with outer `statement` - - dup: duplicate multiple event - - switch - - foreach + - e.g. + ```yml + - switcher: + switch: "table" + case: + "file": ["#docType='plain'", "renameRecord('uploader_id', 'uploaderId').renameRecord('parent_id', 'parentId')"] + "user": ["#suffix='' ", "renameRecord('superid', 'superId')"] + + ``` +- `foreach`: in most cases, you can use [Spring EL's collection projection](https://docs.spring.io/spring/docs/5.0.0.M5/spring-framework-reference/html/expressions.html#expressions-collection-projection) rather than this feature +- `if` + - `create`: create a new event (or a bunch) and cp value & execute post creation statement + - `drop`: drop this event + - `statement`: same with outer `statement` + - `switcher`: same as above + - `foreach` + ```yml + - if: + condition: "table == 'user' && isUpdate()" + ifBody: + - create: + copy: ["id", "table", "#suffix", "#title", "#docType"] + postCreation: ["addRecord('ownerTitle', #title)", "syncByQuery().filter('ownerId', id)", "id = null"] + elseBody: + - drop: {} + ``` - all public method in `SyncData`: - - `addRecord(String key, Object value)` - - `renameRecord(String oldKey, String newKey)` + - `isWrite()` + - `isUpdate()` + - `isDelete()` + - `toWrite()` + - `toUpdate()` + - `toDelete()` + - `getRecordValue(String key)` + - `addExtra(String key, Object value)` + - `addRecord(String key, Object value)` + - `renameRecord(String oldKey, String newKey)` - `removeRecord(String key)` - `removeRecords(String... keys)` - `containRecord(String key)` - `updateRecord(String key, Object value)` - - ... - - `syncByQuery()`: update/delete by query, now only support ES + - `syncByQuery()`: update/delete by query, supported by ES/MySQL output channel - `SyncByQueryES` - - `extraQuery(String schemaName, String tableName)`: usually work with `new` & `dup` to convert one event to multiple events + - `extraQuery(String schemaName, String tableName)`: usually work with `create` to convert one event to multiple events - `ExtraQuery`: enhance syncer with multiple dependent query; - all data field in `SyncData`: - `schema`: schema/db/index - `table`: table or collection - `id`: data primary key or similar thing - - `records`: data content of this sync event converted from log content according to your config. - **Notice**: - - if your interested column config (`rowName`) has name of `id`, records will have it. Otherwise, it will only in `id` field; + - `records`: data content of this sync event converted from log content according to your `schema` config + **Notice**: + - if your interested column config (`rowName`) has name of `primary key`, records will have it. Otherwise, it will only in `id` field; - `extra`: an extra map to store extra info -### Output Choice - -- Elasticsearch +#### Output + +- Special expression to do output mapping: + - "records.*": all key value in `records` + - "records.*.flatten": + - "extra.*" + - "extra.*.flatten" +- `batch`: support output change in batch + - `size`: flush if reach this size + - `delay`: flush if every this time in `MILLISECONDS` + - `maxRetry`: max retry if met error +- `failureLog`: failure log config + - `countLimit`: failure + - `timeLimit`: failure log item in this time range +- `requestMapping`, `rowMapping`: how to convert `SyncData` to suitable format +and send to where +- `elasticsearch` - When using this channel, you may prefer to not include `id` like field in interested column config (`rowName`), because it is always no need to include it in data field for ES and we will auto detect it and set it for you. -- MySQL + - e.g. + ```yml + elasticsearch: + connection: + clusterName: test-${ACTIVE_PROFILE} + clusterNodes: ["${HOST_ADDRESS}:9300"] + requestMapping: # mapping from input data to es request + enableExtraQuery: true + retryOnUpdateConflict: 3 + index: "table + #suffix" # default: schema + type: "#docType" # default: table + documentId: "id" # default: id + fieldsMapping: # default: records.*.flatten + "records": "records.*.flatten" + batch: + size: 100 + delay: 1000 + maxRetry: 5 + refreshInMillis: 1000 + failureLog: + countLimit: 1000 + + ``` +- `mysql` + - Use + - e.g.: + ```yml + mysql: + connection: + address: ${HOST_ADDRESS} + port: 3306 + user: xxx + password: xxx + rowMapping: + schema: "'test'" + table: "table" + id: "id" + rows: + "records": "records.*.flatten" + batch: + size: 100 + delay: 100 + maxRetry: 5 + failureLog: + countLimit: 1000 + ``` - Http endpoint -### Sample +#### In All More samples can be found under `src/test/resource/` ```yml +version: 1.1 + +consumerId: todomsg input: masters: - connection: address: ${HOST_ADDRESS} port: 27017 - user: root - passwordFile: password - type: mongo + type: Mongo + scheduler: direct + schemas: + - name: "test" + tables: + - name: test + rowName: [createTime, content] + - connection: + address: ${HOST_ADDRESS} + port: 3306 + scheduler: mod schemas: - name: "test_${ACTIVE_PROFILE}.*" tables: - - name: user - rowName: [id, name, email] + - name: user + rowName: [user_id, title] + - name: addr + rowName: [address] - name: "file_${ACTIVE_PROFILE}.*" tables: - name: file - rowName: [id, uploader, public_type, state] - - name: folder - rowName: [id, uploader, public_type, state] + rowName: [name] @@ -206,21 +358,15 @@ filter: switch: "table" case: "user": ["renameRecord('xxx', 'yyy')"] - - if: - condition: "table == 'test_table' && action == 'WRITE_ROWS'" - ifBody: - - dup: - copy: ["id"] - new: - - ["table='permission_identity'", "addRecord('name', '盟主') - .addRecord('permission_category', '[105,205,305,405,505,605,705,805]') - .addRecord('permission_identity_type', 5) - .addRecord('allianceId', id)" ] - - ["table='role_permission'", "addRecord('role_id', records['owner_role_id']) - .addRecord('affair_id', records['root_affair_id']) - .addRecord('alliance_id', records['id'])", - "extraQuery(schema, 'permission_identity').filter('is_super', 1).filter('allianceId', records['id']) - .select('id').addRecord('identity_id')"] + - if: + condition: "table == 'user' && isUpdate()" + ifBody: + - create: + copy: ["id", "table", "#suffix", "#title", "#docType"] + postCreation: ["addRecord('ownerTitle', #title)", "syncByQuery().filter('ownerId', id)", "id = null"] + elseBody: + - drop: {} + # filter result class: com.github.zzt93.syncer.common.data.SyncData @@ -243,14 +389,37 @@ output: delay: 100 maxRetry: 5 ``` -## Run + +### Syncer Config + +```yml +syncer: + ack: + flushPeriod: 100 + input: + max-retry: 5 + input-meta: + last-run-metadata-dir: /data/syncer/input/last_position/ + + filter: + worker: 6 + + output: + worker: 2 + batch: + worker: 2 + output-meta: + failure-log-dir: /data/syncer/output/failure/ + +``` +### Run ``` git clone https://github.com/zzt93/syncer mvn package # /path/to/config/: producer.yml, consumer.yml, password-file # use `-XX:+UseParallelOldGC` if you have less memory and lower input pressure # use `-XX:+UseG1GC` if you have at least 4g memory and event input rate larger than 2*10^4/s -java -server -XX:+UseG1GC -jar syncer.jar --producerConfig=/absolute/path/to/producer.yml --consumerConfig=/absolute/path/to/consumer1.yml,/absolute/path/to/consumer2.yml +java -server -XX:+UseG1GC -jar syncer.jar --config=/absolute/path/to/syncerConfig.yml --producerConfig=/absolute/path/to/producer.yml --consumerConfig=/absolute/path/to/consumer1.yml,/absolute/path/to/consumer2.yml ``` ## Test @@ -314,4 +483,11 @@ java -server -XX:+UseG1GC -jar syncer.jar --producerConfig=/absolute/path/to/pro #### Json Mapper - For now, mapping document value using `toString()`: {@link XContentBuilder#unknownValue} - java.sql.Timestamp format: 'yyyy-MM-dd HH:mm:ss.SSS'. For now, if you need other format, you have to format it to string by yourself - - Maybe change to jackson \ No newline at end of file + - Maybe change to jackson + +--- + +## How to ? + +If you have any problems with how to use `Syncer` or bugs of it, write a issue. +I will handle it as soon as I can. \ No newline at end of file diff --git a/src/main/java/com/github/zzt93/syncer/producer/input/mysql/meta/ConsumerSchemaMeta.java b/src/main/java/com/github/zzt93/syncer/producer/input/mysql/meta/ConsumerSchemaMeta.java index 215ea118..1065afc8 100644 --- a/src/main/java/com/github/zzt93/syncer/producer/input/mysql/meta/ConsumerSchemaMeta.java +++ b/src/main/java/com/github/zzt93/syncer/producer/input/mysql/meta/ConsumerSchemaMeta.java @@ -52,7 +52,7 @@ public TableMeta findTable(String database, String table) { } } if (count > 1) { - logger.warn("Multiple configured schema match `{}`.`{}`. Check your config", database, table); + logger.error("Multiple configured schema match `{}`.`{}`. Check your config", database, table); } return res; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 563004bb..451df213 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,9 +1,5 @@ spring.application.name: syncer -spring: - main: - web-application-type: none - spring: profiles: diff --git a/src/main/resources/config.yml b/src/main/resources/config.yml index dc080e0d..7610c6ac 100644 --- a/src/main/resources/config.yml +++ b/src/main/resources/config.yml @@ -7,7 +7,6 @@ syncer: last-run-metadata-dir: //Users/xiong/data/syncer/input/last_position/ filter: - # should be at least twice of core worker: 12 output: diff --git a/src/test/resources/sample-consumer.yml b/src/test/resources/sample-consumer.yml index 31f136a3..08eb7da2 100644 --- a/src/test/resources/sample-consumer.yml +++ b/src/test/resources/sample-consumer.yml @@ -1,10 +1,14 @@ +version: 1.1 + +consumerId: sample1 + + input: masters: - connection: address: 192.168.1.100 port: 1234 - user: root - passwordFile: password + scheduler: mod schemas: - name: "test_*" tables: @@ -22,8 +26,6 @@ input: - connection: address: 192.168.1.100 port: 1235 - user: root - password-file: password schemas: - name: "*_test" tables: @@ -52,21 +54,21 @@ filter: - switcher: switch: "table" case: # support default branch - "affair": ["#suffix = '-' + row['id']","#type = 'INDEX_AFFAIR'", "renameColumn('xx', 'yy')" ] + "affair": ["#suffix = '-' + row['id']","#type = 'INDEX_AFFAIR'", "renameRecord('xx', 'yy')" ] "file": ["#suffix = '-' + row['id']","#type = 'INDEX_FILE'", "addRow('type', '0')"] "folder": ["#suffix = '-' + row['id']","#type = 'INDEX_FILE'", "addRow('type', '1')"] - - statement: [ "#tags = row['tags']", "updateColumn('tags', new java.util.ArrayList())", "removeColumns('id', 'xid')"] + - statement: [ "#tags = row['tags']", "updateRecord('tags', new java.util.ArrayList())", "removeRecords('id', 'xid')"] - foreach: var: "tag" in: "#tags?.split('\n')" statement: ["#map = new java.util.HashMap()", "#map.put('des', #tag)", "row.get('tags').add(#map)"] - if: condition: "table == 'affair'" - if-body: + ifBody: create: copy: [row] postCreation: ["table = 'role'", "row['id']"] - else-body: + elseBody: drop: {} # filter result class: com.github.zzt93.syncer.common.data.SyncData diff --git a/src/test/resources/sample-consumer2.yml b/src/test/resources/sample-consumer2.yml index 56430171..c5081e77 100644 --- a/src/test/resources/sample-consumer2.yml +++ b/src/test/resources/sample-consumer2.yml @@ -1,10 +1,12 @@ +version: 1.1 + +consumerId: searcher + input: masters: - connection: address: 192.168.1.100 port: 1234 - user: root - passwordFile: password schemas: - name: "test_*" tables: @@ -22,8 +24,6 @@ input: - connection: address: 192.168.1.200 port: 1234 - user: root - password-file: password schemas: - name: "test_*" tables: @@ -53,13 +53,24 @@ input: #} filter: + - if: + condition: "containRecord('tags')" + ifBody: + - statement: ["updateRecord('tags', T(SyncUtil).fromJson(records['tags'], T(String[])))"] + - statement: ["#docType=table"] + + - if: + condition: "table == 'task' && isWrite() && records[state] != 0" + ifBody: + - drop: {} + - switcher: switch: "table" case: # support default branch - "affair": ["#suffix = '-' + row['id']","#type = 'INDEX_AFFAIR'", "renameColumn('xx', 'yy')" ] + "affair": ["#suffix = '-' + row['id']","#type = 'INDEX_AFFAIR'", "renameRecord('xx', 'yy')" ] "file": ["#suffix = '-' + row['id']","#type = 'INDEX_FILE'", "addRow('type', '0')"] "folder": ["#suffix = '-' + row['id']","#type = 'INDEX_FILE'", "addRow('type', '1')"] - - statement: [ "#tags = row['tags']", "updateColumn('tags', new java.util.ArrayList())", "removeColumns('id', 'xid')"] + - statement: [ "#tags = row['tags']", "updateRecord('tags', new java.util.ArrayList())", "removeRecords('id', 'xid')"] - foreach: var: "tag" in: "#tags?.split('\n')" @@ -93,18 +104,36 @@ output: clusterNodes: ["192.168.1.100:9300"] user: elastic-user # optional if not enable security passwordFile: es-password # optional if not enable security - documentMapping: # mapping from input data to document json - index: "table + #suffix" # default schema - type: "table" # default table - documentId: "id" # default id - fieldsMapping: # rest row.* => row.* - "row": "row.*.flatten" + requestMapping: # mapping from input data to es request + enableExtraQuery: true + retryOnUpdateConflict: 3 + index: "table + #suffix" # default: schema + type: "#docType" # default: table + documentId: "id" # default: id + fieldsMapping: # default: records.*.flatten + "records": "records.*.flatten" + batch: + size: 100 + delay: 1000 + maxRetry: 5 + refreshInMillis: 1000 - http: + mysql: connection: - address: 192.168.1.100 - port: 9700 - jsonMapping: - "data": "row.*" - "type": "extra['type']" + address: ${HOST_ADDRESS} + port: 3306 + user: xxx + password: xxx + rowMapping: + schema: "'auth'" + table: "table" + id: "id" + rows: + "records": "records.*.flatten" + batch: + size: 100 + delay: 100 + maxRetry: 5 + failureLog: + countLimit: 1000 diff --git a/src/test/resources/sample-producer.yml b/src/test/resources/sample-producer.yml index 17bba6df..2d5ef9de 100644 --- a/src/test/resources/sample-producer.yml +++ b/src/test/resources/sample-producer.yml @@ -1,15 +1,22 @@ +version: 1.1 -input: +syncer.producer.input: masters: - - connection: - address: localhost - port: 3308 - user: root - passwordFile: password - - connection: - address: localhost - port: 27017 + - connection: + address: ${HOST_ADDRESS} + port: 3306 + user: xxx + password: yyy + - connection: + address: ${HOST_ADDRESS} + port: 27018 + type: mongo + + - connection: + address: ${HOST_ADDRESS} + port: 27017 + type: mongo diff --git a/src/test/resources/syncer-config.yml b/src/test/resources/syncer-config.yml new file mode 100644 index 00000000..ffb08b57 --- /dev/null +++ b/src/test/resources/syncer-config.yml @@ -0,0 +1,17 @@ +syncer: + ack: + flushPeriod: 100 + input: + max-retry: 5 + input-meta: + last-run-metadata-dir: /data/syncer/input/last_position/ + + filter: + worker: 12 + + output: + worker: 2 + batch: + worker: 2 + output-meta: + failure-log-dir: /data/syncer/output/failure/