下面的例子都可以参考Fountain-examples
- fountain内部支持配置多个数据源做高可用,当一个数据源不可用时,自动切换。但是这仅限于java进程不挂的情况,这里的高可用指的是进程级别的多个实例高可用。
- 多个fountain实例运行,有且仅有一个实例同步MySQL binlog增量,其他实例做standby热备。
- 当fountain实例挂掉(例如java进程被kill或者该实例所在主机网络故障时)时候,可以从分布式存储中获取最近记录成功的同步点,这里采用了zookeeper。
- 由于zookeeper使用Paxos协议做数据一致性保证,因此性能不佳,QPS几百而已,因此fountain实例使用异步化+定时持久化的策略,隔一段时间自动存储同步点,避免打爆Zookeeper。
- 为了避免在切换fountain实例时候,因为slaveId重复,导致同步失败,所以建议使用随机生成slave的策略。
- 另外,正在研发高性能的分布式同步点服务,可以不做定时同步,实现实时同步点同步。
需要额外加入如下依赖:
Maven:
<dependency>
<groupId>net.neoremind</groupId>
<artifactId>fountain-zk-support</artifactId>
<version>1.0.0</version>
</dependency>
以MySQL5.6用GTID做同步点为例说明。
# mysql用户名密码
mysql_username=beidou
mysql_password=u7i8o9p0
# 当同步点存储文件不存在时候,使用该配置来进行binlog dump,如果置空,则从MySQL Server最后的点开始
mysql_binlogdump_gtidset=
# 主mysql地址,端口,slaveId(与其他slave不能重复,由于主备不会同一时间连,则配置中可以相同)
mysql_shard_0_server=10.94.37.23
mysql_shard_0_port=8769
# 备mysql
mysql_shard_0_ha1_server=10.94.37.23
mysql_shard_0_ha1_port=8769
# 随机slaveId起始和结束区间
mysql_random_slaveId_start=10
mysql_random_slaveId_end=5000
# 下面是mysql的一些高级设置,一般情况下不建议修改
# replication或者query socket的一些初始化参数
mysql_wait_timeout=999999
mysql_net_write_timeout=240
mysql_net_read_timeout=240
mysql_charset=binary
# 当主mysql长时间接收不到任何event时,切换到另外一个备mysql的超时时间,单位为毫秒
mysql_replication_socket_so_timeout=120000
# mysql复制线程的一些基础socket参数,timeout单位为毫秒
#mysql_replication_socket_in_buf=16384
#mysql_replication_socket_out_buf=16384
#mysql_replication_socket_connect_timeout=3000
# zookeeper配置
zk_string=127.0.0.1:2181
zk_connection_timeout=4000
zk_session_timeout=50000
# 存储同步点的根路径,叶子节点是instanceName,相当于同步线程的名称,例如producer00,data是同步点
zk_save_position_root_path=/fountain/beidou/eventposition/testha
# 后台异步同步点线程的配置,默认6s延迟启动,20s做一次持久化
async_event_position_thread_initdelay=6000
async_event_position_thread_period=20000
# 多个fountain实力做高可用方案,需要依赖一个leader选举机制,这个超时时间是尝试获取leader latch的毫秒数
# 如果没有获得latch则会阻塞同步线程,然后再次尝试,周而复始,知道leader权转移(一般是down了),其他standby
# 的实例可以竞争获取leader latch,path就是分布式锁的全路径。
zk_leader_latch_timeout=10000
zk_leader_latch_path=/fountain/beidou/leader/testha
# fountain-producer和consumer直接的缓冲队列长度
memq_limit=60000
### 按照表名进行过滤时,表名格式为database.table(可以为正则),以逗号分隔
### 当白名单和黑名单同时存在时,只有不在黑名单中同时在白名单中存在的才起作用
filter_shard_table_white=
filter_shard_table_black=
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<description>Fountain的Spring配置</description>
<!-- properties配置文件 -->
<bean id="producerConfig"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
<property name="ignoreResourceNotFound" value="true"/>
<property name="ignoreUnresolvablePlaceholders" value="true"/>
<property name="locations">
<list>
<value>classpath:zk.ha.baidu.ares.mysql.rowbase51-nontrx/fountain-config.properties</value>
<value>classpath:zk.ha.baidu.ares.mysql.rowbase51-nontrx/jdbc-mysql.properties</value>
</list>
</property>
</bean>
<!-- 根据gtid(Global Transaction ID)得到对应的BinlogFileName和Position的扩展器,
一般用于SyncPoint扩展成binlog dump命名需要的同步点,例如从gtid类型的同步点扩展成binlogFileNameOffset的同步点 -->
<bean id="groupExtender"
class="net.neoremind.fountain.producer.datasource.eventpositionext.GtId2BinPositionEventPositionExtender"/>
<!-- 基于表名的匹配器,支持黑白名单,支持正则表达式。黑名单的表直接忽略,白名单如果置空,则表示不启用,否则必须符合。
符合表过滤匹配条件的binlog event会被继续处理。可以自己实现EventMatcher接口,定制匹配 -->
<bean id="shardTableMatcher" class="net.neoremind.fountain.producer.matcher.TableMatcher">
<property name="tableWhite" value="${filter_shard_table_white}"></property>
<property name="tableBlack" value="${filter_shard_table_black}"></property>
</bean>
<!-- Row based binlog解析器 -->
<bean id="defaultParser"
class="net.neoremind.fountain.producer.parser.impl.DefaultParser"
scope="prototype">
<constructor-arg ref="shardTableMatcher"/>
</bean>
<!-- 对message queue起作用,用于保存在消费时保存gt id,但即使使用其他传输器时也不要删除该对象 -->
<bean id="disposeEventPositionBridge"
class="net.neoremind.fountain.eventposition.DisposeEventPositionBridgeImpl"></bean>
<!-- zookeeper重试策略,参数为重试一次之间的时间间隔毫秒 -->
<bean id="zkRetryPolicy" class="org.apache.curator.retry.RetryOneTime">
<constructor-arg value="0"/>
</bean>
<!-- zookeeper的单例client,如果不使用ZkHaGuard,那么必须初始化方法填init,否则不会初始化连接zk,这里使用了zkHaGuard,所以不用init-method -->
<bean id="singletonZkClientProvider" class="net.neoremind.simplezkclient.SingletonZkClientProvider">
<property name="zookeeperConnectionString" value="${zk_string}"/>
<property name="retryPolicy" ref="zkRetryPolicy"/>
<property name="connectionTimeoutMs" value="${zk_connection_timeout}"/>
<property name="sessionTimeoutMs" value="${zk_session_timeout}"/>
</bean>
<bean id="gtIdSyncPointFactory"
class="net.neoremind.fountain.eventposition.factory.GtIdSyncPointFactory"/>
<!-- 记录已处理事件的位置,当fountain重新启动或者切换master时会首先找JVM ThreadLocal获取同步点,然后通过本地或者远程服务寻找,
这里是使用Zookeeper作为同步点处理存取策略,可以使用装饰模式来不断的扩展保存、读取同步点的方法,
封装一个AsyncFixedRateDisposeEventPosition可以保证不把zk压垮,异步的每隔一段时间自动同步一次,
保存的根路径在配置中的zkRootPath,叶子节点保存的是instanceName,data为同步点 -->
<bean id="disposeEventPosition" class="net.neoremind.fountain.eventposition.ReadonlyDisposeEventPosition"
scope="prototype">
<property name="delegate">
<bean class="net.neoremind.fountain.eventposition.AsyncFixedRateDisposeEventPosition" scope="prototype"
init-method="init">
<property name="delegate">
<bean class="net.neoremind.fountain.zk.eventposition.ZkDisposeEventPosition" scope="prototype">
<property name="zkRootPath" value="${zk_save_position_root_path}"/>
<property name="zkClientProvider" ref="singletonZkClientProvider"/>
<property name="syncPointFactory" ref="gtIdSyncPointFactory"/>
</bean>
</property>
<property name="initDelayMs" value="${async_event_position_thread_initdelay}"></property>
<property name="periodMs" value="${async_event_position_thread_period}"></property>
<property name="disposeEventPositionBridge" ref="disposeEventPositionBridge"></property>
</bean>
</property>
</bean>
<!-- 对于RowsLogEvent的处理策略,NonTransactionPolicy只要接收到RowsLogEvent,
便会通过dispatcher下发到下游 -->
<bean id="nonTransactionPolicy"
class="net.neoremind.fountain.producer.dispatch.transcontrol.NonTransactionPolicy"
scope="prototype">
</bean>
<!-- fountain-producer和consumer直接的缓冲队列 -->
<bean id="fountainMQ" class="net.neoremind.fountain.common.mq.MultiPermitsMemFountainMQ">
<constructor-arg index="0">
<value>${memq_limit}</value>
</constructor-arg>
</bean>
<!-- 通过MQ为通道传输变化的数据传输层 -->
<bean id="memqTransport"
class="net.neoremind.fountain.producer.dispatch.fountainmq.FoutainMQTransport">
<property name="fmq" ref="fountainMQ"/>
</bean>
<!-- 变化的数据的下发流程,包括转换、序列化、和传输 -->
<bean id="dispatchWorkflow"
class="net.neoremind.fountain.producer.dispatch.DefaultDispatchWorkflow">
<!-- 转化ChangeDataSet对象到其他的java对象的转化器,用户可以实现EventConverter接口定制转化器,以期转化为对用户更为友好的java对象
缺省是DefaultEventConverter,它不做任何转化,直接输出ChangeDataSet -->
<!-- 配置数据传输层 -->
<property name="tranport" ref="memqTransport"/>
<!-- <property name="packProtocol" ref="packProtocol"/> -->
<!-- <property name="transFilter" ref="transFilter"/> -->
</bean>
<!-- 真正用于处理消息即变化数据的对象,实现Consumer接口。这一部分涉及使用方的业务逻辑,必须要使用方自行实现,
如果使用方不使用ConsumerWorkflow接口的缺省实现net.neoremind.fountain.consumer.DefaultConsumerWorkflow, 这部分可以忽略 -->
<bean id="consumerFromMemMQ" class="net.neoremind.fountain.test.consumer.TestConsumer">
<property name="bridge" ref="disposeEventPositionBridge"></property>
</bean>
<!-- 最终消费、使用变化数据的流程,内置缺省实现是DefaultConsumerWorkflow,它需要反序列化之后调用Consumer接口实现,使用方可以
通过实现ConsumerWorkflow接口来实现自己的消费流程,此时实现Consumer接口就不是必要的 -->
<bean id="consumerWorkflowFromMemMQ"
class="net.neoremind.fountain.consumer.spi.def.DefaultConsumerWorkflow">
<property name="consumer" ref="consumerFromMemMQ"></property>
<!-- <property name="recievedDataConverter" ref=""></property> -->
<!-- <property name="unPackProtocol" ref="unPackProtocol"></property> -->
</bean>
<!-- 消费者监听线程 -->
<bean id="fountainMQMessageListener"
class="net.neoremind.fountain.consumer.support.fountainmq.FountainMQMessageListener"
init-method="start" destroy-method="destroy">
<property name="fmq" ref="fountainMQ"/>
<property name="workflow" ref="consumerWorkflowFromMemMQ"></property>
</bean>
<!-- MySQL binlog dump的策略,针对不同的MySQL版本可以采用如下策略,
BinlogGtIdAresV51DumpStrategy,针对百度自己的MySQL5.1版本使用gtid
BinlogFileNamePositionDumpStrategy,所有版本MySQL都支持的传统通过binlog filename + position
BinlogGtIdV56DumpStrategy,MySQL5.6之后支持的gtid
这里注意,默认isChecksumSupport=false,BinlogGtIdV56DumpStrategy则恒为true,
对于如果其他版本的MySQL master支持checksum,需要设置isChecksumSupport为true,目前fountain对于4个byte的校验和做忽略处理-->
<bean id="binlogGtIdV56DumpStrategy"
class="net.neoremind.fountain.producer.datasource.binlogdump.BinlogGtIdV56DumpStrategy">
<property name="gtIdset" value="${mysql_binlogdump_gtidset}"/>
<!--<property name="isChecksumSupport" value="false"/>-->
</bean>
<bean id="randomSlaveIdGenerateStrategy"
class="net.neoremind.fountain.producer.datasource.slaveid.RandomSlaveIdGenerateStrategy">
<property name="start" value="${mysql_random_slaveId_start}"></property>
<property name="end" value="${mysql_random_slaveId_end}"></property>
</bean>
<!-- 配置数据源和数据监控器,它们成对出现,数据监控器称之为fountain-producer,每个fountain-producer绑定一个数据源,
一般数据源是ha数据源。每个fountain-producer是一个线程。如果有多个数据源,可以配置多个配置数据源和数据监控器对 -->
<!-- 配置需要监控的mysql数据源,支持ha,可以定制多个具体的数据源,一般定制2个,一主一备 -->
<bean id="groupIdHAMysqlBinlogDataSource00"
class="net.neoremind.fountain.producer.datasource.ha.HAMysqlBinlogDataSource"
init-method="init">
<property name="disposeEventPosition" ref="disposeEventPosition"></property>
<property name="datasourceChoosePolicy">
<bean class="net.neoremind.fountain.datasource.RoundRobinDatasourceChoosePolicy">
<property name="tryInterval" value="3000"></property>
</bean>
</property>
<property name="mysqlDataSourceList">
<list>
<bean class="net.neoremind.fountain.producer.datasource.MysqlBinlogDataSource">
<property name="conf">
<bean class="net.neoremind.fountain.datasource.DatasourceConfigure">
<property name="mysqlServer" value="${mysql_shard_0_server}"></property>
<property name="mysqlPort" value="${mysql_shard_0_port}"></property>
<property name="waitTimeout" value="${mysql_wait_timeout}"/>
<property name="netWriteTimeout" value="${mysql_net_write_timeout}"/>
<property name="netReadTimeout" value="${mysql_net_read_timeout}"/>
<property name="soTimeout" value="${mysql_replication_socket_so_timeout}"/>
<property name="charset" value="${mysql_charset}"/>
<property name="userName" value="${mysql_username}"></property>
<property name="password" value="${mysql_password}"></property>
</bean>
</property>
<property name="binlogDumpStrategy" ref="binlogGtIdV56DumpStrategy"></property>
<property name="slaveIdGenerateStrategy" ref="randomSlaveIdGenerateStrategy"></property>
</bean>
<bean class="net.neoremind.fountain.producer.datasource.MysqlBinlogDataSource">
<property name="conf">
<bean class="net.neoremind.fountain.datasource.DatasourceConfigure">
<property name="mysqlServer" value="${mysql_shard_0_ha1_server}"></property>
<property name="mysqlPort" value="${mysql_shard_0_ha1_port}"></property>
<property name="waitTimeout" value="${mysql_wait_timeout}"/>
<property name="netWriteTimeout" value="${mysql_net_write_timeout}"/>
<property name="netReadTimeout" value="${mysql_net_read_timeout}"/>
<property name="soTimeout" value="${mysql_replication_socket_so_timeout}"/>
<property name="charset" value="${mysql_charset}"/>
<property name="userName" value="${mysql_username}"></property>
<property name="password" value="${mysql_password}"></property>
</bean>
</property>
<property name="binlogDumpStrategy" ref="binlogGtIdV56DumpStrategy"></property>
<property name="slaveIdGenerateStrategy" ref="randomSlaveIdGenerateStrategy"></property>
</bean>
</list>
</property>
</bean>
<!-- 多个fountain实例共存时候,只有一个实例是leader,可以同步增量,其他都是standby热备,使用zk的leader latch实现-->
<bean id="zkHaGuard" class="net.neoremind.haguard.zk.ZkHaGuard">
<property name="zkClientProvider" ref="singletonZkClientProvider"></property>
<property name="defaultTimeoutMs" value="${zk_leader_latch_timeout}"></property>
<property name="latchPath" value="${zk_leader_latch_path}"></property>
</bean>
<!-- 监控mysql数据变化的监控器,称之为fountain-producer -->
<bean id="producer-zk-ha" class="net.neoremind.fountain.producer.DefaultProducer"
init-method="start" destroy-method="destroy">
<!-- 事务控制器,缺省使用net.neoremind.fountain.producer.dispatch.transcontrol.NonTransactionPolicy,
这不是代表没事务,而是事务不完整,当一个RowsLogEvent的数据解析完后就下发,
一个event只是事务中一张表或一张表的部分数据. 一个完成的事务可能是多张表或者全部数据 -->
<property name="transactionPolicy" ref="nonTransactionPolicy"></property>
<!-- 绑定要监控的数据源 -->
<property name="dataSource" ref="groupIdHAMysqlBinlogDataSource00"></property>
<!-- 数据解析器 -->
<property name="parser" ref="defaultParser"></property>
<!-- event匹配器 -->
<property name="matcher" ref="shardTableMatcher"></property>
<!-- 配置数据下发,支持多个下发, 每个下发支持不同的下发流程 -->
<property name="dispatcher" ref="dispatchWorkflow"/>
<!-- 使用zookeeper来做高可用保证,同一时间只有一个实例是leader,处于同步状态,其他实例都是standby做热备 -->
<property name="haGuard" ref="zkHaGuard"/>
</bean>
</beans>