Skip to content

Commit

Permalink
Merge pull request #16 from zendesk/ben/store_position_in_database
Browse files Browse the repository at this point in the history
Store binlog position in database
  • Loading branch information
Ben Osheroff committed Mar 16, 2015
2 parents 1686cb3 + 34fc1da commit 67aa9a3
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 44 deletions.
5 changes: 3 additions & 2 deletions src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public class Maxwell {
private void initFirstRun() throws SQLException, IOException {
Connection connection = this.config.getMasterConnection();

SchemaStore.createMaxwellSchema(connection);

LOGGER.info("Maxwell is capturing initial schema");
SchemaCapturer capturer = new SchemaCapturer(connection);
this.schema = capturer.capture();

Expand All @@ -35,6 +34,8 @@ private void initFirstRun() throws SQLException, IOException {
private void run(String[] args) throws Exception {
this.config = MaxwellConfig.buildConfig("config.properties", args);

SchemaStore.ensureMaxwellSchema(this.config.getMasterConnection());

if ( this.config.getInitialPosition() != null ) {
LOGGER.info("Maxwell is booting, starting at " + this.config.getInitialPosition());
SchemaStore store = SchemaStore.restore(this.config.getMasterConnection(), this.config.getInitialPosition());
Expand Down
55 changes: 27 additions & 28 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Properties;
Expand All @@ -17,6 +17,7 @@
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.FileProducer;
import com.zendesk.maxwell.producer.MaxwellKafkaProducer;
import com.zendesk.maxwell.schema.SchemaPosition;

public class MaxwellConfig {
public String mysqlHost;
Expand All @@ -30,44 +31,31 @@ public class MaxwellConfig {
private final Properties kafkaProperties;
private String producerType;
private String outputFile;
private Long serverID;
private Connection connection;

public MaxwellConfig() {
this.kafkaProperties = new Properties();
}

public Connection getMasterConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://" + mysqlHost + ":" + mysqlPort, mysqlUser, mysqlPassword);
if ( this.connection != null )
return this.connection;

this.connection = DriverManager.getConnection("jdbc:mysql://" + mysqlHost + ":" + mysqlPort, mysqlUser, mysqlPassword);
return this.connection;
}

public BinlogPosition getInitialPosition() throws FileNotFoundException, IOException {
public BinlogPosition getInitialPosition() throws FileNotFoundException, IOException, SQLException {
if ( this.initialPosition != null )
return this.initialPosition;

File f = new File(this.currentPositionFile);
if ( !f.exists() ) {
return null;
} else {
Properties p = new Properties();
p.load(new FileReader(f));

this.initialPosition = new BinlogPosition(Integer.valueOf((String) p.get("offset")), p.getProperty("file"));
return this.initialPosition;
}
this.initialPosition = SchemaPosition.get(this.getMasterConnection(), this.getServerID());
return this.initialPosition;
}

public void setInitialPosition(BinlogPosition position) throws IOException {
Properties p = new Properties();
p.setProperty("offset", String.valueOf(position.getOffset()));
p.setProperty("file", position.getFile());

File f = new File(this.currentPositionFile);
FileWriter fw = new FileWriter(f);
try {
p.store(fw, "");
this.initialPosition = position;
} finally {
fw.close();
}
public void setInitialPosition(BinlogPosition position) throws IOException, SQLException {
SchemaPosition.set(this.getMasterConnection(), this.getServerID(), position);
}

private void parseOptions(String [] argv) {
Expand Down Expand Up @@ -108,13 +96,11 @@ private void parseFile(String filename) throws IOException {
FileReader reader = new FileReader(file);
p.load(reader);


this.mysqlHost = p.getProperty("host", "127.0.0.1");
this.mysqlPassword = p.getProperty("password");
this.mysqlUser = p.getProperty("user");
this.mysqlPort = Integer.valueOf(p.getProperty("port", "3306"));

this.currentPositionFile = p.getProperty("position_file");
this.producerType = p.getProperty("producer");
this.outputFile = p.getProperty("output_file");

Expand Down Expand Up @@ -161,4 +147,17 @@ public AbstractProducer getProducer() throws IOException {
return new StdoutProducer(this);
}
}

public Long getServerID() throws SQLException {
if ( this.serverID != null)
return this.serverID;

ResultSet rs = this.getMasterConnection().createStatement().executeQuery("SELECT @@server_id as server_id");
if ( !rs.next() ) {
throw new RuntimeException("Could not retrieve server_id!");
}
this.serverID = rs.getLong("server_id");
return this.serverID;
}

}
20 changes: 13 additions & 7 deletions src/main/java/com/zendesk/maxwell/MaxwellParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void setBinlogPosition(BinlogPosition p) {
this.replicator.setBinlogPosition(p.getOffset());
}

public void setConfig(MaxwellConfig c) throws FileNotFoundException, IOException {
public void setConfig(MaxwellConfig c) throws FileNotFoundException, IOException, SQLException {
this.replicator.setHost(c.mysqlHost);
this.replicator.setUser(c.mysqlUser);
this.replicator.setPassword(c.mysqlPassword);
Expand Down Expand Up @@ -92,17 +92,23 @@ public void run() throws Exception {
if ( event == null )
continue;

producer.push(event);
if ( !skipEvent(event)) {
producer.push(event);

// TODO: this isn't quite right: we need to only stop on table map events,
// although it's unclear to me whether you can have two row events in a row.
// I think maybe you can.
// TODO: this isn't quite right: we need to only stop on table map events,
// although it's unclear to me whether you can have two row events in a row.
// I think maybe you can.

BinlogPosition p = eventBinlogPosition(event);
this.config.setInitialPosition(p);
BinlogPosition p = eventBinlogPosition(event);
this.config.setInitialPosition(p);
}
}
}

private boolean skipEvent(MaxwellAbstractRowsEvent event) {
return event.getTable().getDatabase().getName().equals("maxwell");
}

private BinlogPosition eventBinlogPosition(AbstractBinlogEventV4 event) {
BinlogPosition p = new BinlogPosition(event.getHeader().getNextPosition(), event.getBinlogFilename());
return p;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.zendesk.maxwell.producer;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;

import com.zendesk.maxwell.BinlogPosition;
Expand All @@ -12,11 +13,13 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaCallback implements Callback {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);
private final MaxwellConfig config;
private final MaxwellAbstractRowsEvent event;

public KafkaCallback(MaxwellAbstractRowsEvent e, MaxwellConfig c) {
this.config = c;
this.event = e;
Expand All @@ -27,17 +30,23 @@ public void onCompletion(RecordMetadata md, Exception e) {
if ( e != null ) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + md.offset());
try {
config.setInitialPosition(new BinlogPosition(event.getHeader().getNextPosition(), event.getBinlogFilename()));
} catch (IOException e1) {
// TODO Auto-generated catch block
BinlogPosition p = new BinlogPosition(event.getHeader().getNextPosition(), event.getBinlogFilename());
if ( LOGGER.isDebugEnabled()) {
LOGGER.debug("-> " + md.topic() + ":" + md.offset());
LOGGER.debug(" " + event.toJSON());
LOGGER.debug(" " + p);
LOGGER.debug("");
}
config.setInitialPosition(p);
} catch (IOException | SQLException e1) {
e1.printStackTrace();
}
}
}
}
public class MaxwellKafkaProducer extends AbstractProducer {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);
private final KafkaProducer<byte[], byte[]> kafka;

public MaxwellKafkaProducer(MaxwellConfig config, Properties kafkaProperties) {
Expand Down
39 changes: 39 additions & 0 deletions src/main/java/com/zendesk/maxwell/schema/SchemaPosition.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.zendesk.maxwell.schema;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import com.zendesk.maxwell.BinlogPosition;

public class SchemaPosition {
public static BinlogPosition get(Connection c, Long serverID) throws SQLException {
PreparedStatement s = c.prepareStatement("SELECT * from `maxwell`.`positions` where server_id = ?");
s.setLong(1, serverID);

ResultSet rs = s.executeQuery();
if ( !rs.next() )
return null;

return new BinlogPosition(rs.getLong("binlog_position"), rs.getString("binlog_file"));
}

public static void set(Connection c, Long serverID, BinlogPosition p) throws SQLException {
String sql = "INSERT INTO `maxwell`.`positions` set "
+ "server_id = ?, "
+ "binlog_file = ?, "
+ "binlog_position = ? "
+ "ON DUPLICATE KEY UPDATE binlog_file=?, binlog_position=?";
PreparedStatement s = c.prepareStatement(sql);

s.setLong(1, serverID);
s.setString(2, p.getFile());
s.setLong(3, p.getOffset());
s.setString(4, p.getFile());
s.setLong(5, p.getOffset());

s.execute();
}

}
2 changes: 1 addition & 1 deletion src/main/java/com/zendesk/maxwell/schema/SchemaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void executeColumnInsert(ArrayList<Object> columnData) throws SQLExcepti
columnData.clear();
}

public static void createMaxwellSchema(Connection connection) throws SQLException, IOException {
public static void ensureMaxwellSchema(Connection connection) throws SQLException, IOException {
if ( SchemaStore.storeDatabaseExists(connection) )
return;

Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/sql/maxwell_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ CREATE TABLE IF NOT EXISTS `maxwell`.`columns` (
index (schema_id),
index (table_id)
);

CREATE TABLE IF NOT EXISTS `maxwell`.`positions` (
server_id int unsigned not null primary key,
binlog_file varchar(255),
binlog_position int unsigned
);
2 changes: 1 addition & 1 deletion src/test/java/com/zendesk/maxwell/AbstractMaxwellTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class AbstractMaxwellTest {
public static void setUpBeforeClass() throws Exception {
server = new MysqlIsolatedServer();
server.boot();
SchemaStore.createMaxwellSchema(server.getConnection());
SchemaStore.ensureMaxwellSchema(server.getConnection());
}

public String getSQLDir() {
Expand Down

0 comments on commit 67aa9a3

Please sign in to comment.