Skip to content

Commit

Permalink
Merge pull request #17 from zendesk/ben/position_flush_thread
Browse files Browse the repository at this point in the history
binlog position flush thread
  • Loading branch information
Ben Osheroff committed Mar 18, 2015
2 parents 67aa9a3 + 11b3c9d commit a049fa7
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 19 deletions.
23 changes: 21 additions & 2 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class MaxwellConfig {
private String outputFile;
private Long serverID;
private Connection connection;
private SchemaPosition schemaPosition;

public MaxwellConfig() {
this.kafkaProperties = new Properties();
Expand All @@ -46,16 +47,34 @@ public Connection getMasterConnection() throws SQLException {
return this.connection;
}

public void terminate() {
try {
this.connection.close();
} catch (SQLException e) {
}
this.connection = null;
this.schemaPosition.stop();
this.schemaPosition = null;
}

private SchemaPosition getSchemaPosition() throws SQLException {
if ( this.schemaPosition == null ) {
this.schemaPosition = new SchemaPosition(this.getMasterConnection(), this.getServerID());
this.schemaPosition.start();
}
return this.schemaPosition;
}

public BinlogPosition getInitialPosition() throws FileNotFoundException, IOException, SQLException {
if ( this.initialPosition != null )
return this.initialPosition;

this.initialPosition = SchemaPosition.get(this.getMasterConnection(), this.getServerID());
this.initialPosition = getSchemaPosition().get();
return this.initialPosition;
}

public void setInitialPosition(BinlogPosition position) throws IOException, SQLException {
SchemaPosition.set(this.getMasterConnection(), this.getServerID(), position);
this.getSchemaPosition().set(position);
}

private void parseOptions(String [] argv) {
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/zendesk/maxwell/MaxwellParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public void stop() throws Exception {
}




public void run() throws Exception {
MaxwellAbstractRowsEvent event;

Expand Down
98 changes: 83 additions & 15 deletions src/main/java/com/zendesk/maxwell/schema/SchemaPosition.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,104 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zendesk.maxwell.BinlogPosition;

public class SchemaPosition {
public static BinlogPosition get(Connection c, Long serverID) throws SQLException {
PreparedStatement s = c.prepareStatement("SELECT * from `maxwell`.`positions` where server_id = ?");
s.setLong(1, serverID);
public class SchemaPosition implements Runnable {
static final Logger LOGGER = LoggerFactory.getLogger(SchemaPosition.class);
private final Connection connection;
private final Long serverID;
private BinlogPosition lastPosition;
private final AtomicReference<BinlogPosition> position;
private final AtomicBoolean run;
private Thread thread;

ResultSet rs = s.executeQuery();
if ( !rs.next() )
return null;
public SchemaPosition(Connection c, Long serverID) {
this.connection = c;
this.serverID = serverID;
this.lastPosition = null;
this.position = new AtomicReference<>();
this.run = new AtomicBoolean(false);
}

return new BinlogPosition(rs.getLong("binlog_position"), rs.getString("binlog_file"));
public void start() {
this.thread = new Thread(this, "Position Flush Thread");
this.run.set(true);
thread.start();
}

public static void set(Connection c, Long serverID, BinlogPosition p) throws SQLException {
public void stop() {
this.run.set(false);

thread.interrupt();

while ( thread.isAlive() ) {
try {
Thread.sleep(10);
} catch (InterruptedException e) { }
}
}

@Override
public void run() {
while ( true && run.get() ) {
BinlogPosition newPosition = position.get();

if ( newPosition != null && !newPosition.equals(lastPosition) ) {
try {
store(newPosition);
lastPosition = newPosition;
} catch (SQLException e) {
e.printStackTrace();
}
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) { }
}
}


private void store(BinlogPosition newPosition) throws SQLException {
String sql = "INSERT INTO `maxwell`.`positions` set "
+ "server_id = ?, "
+ "binlog_file = ?, "
+ "binlog_position = ? "
+ "ON DUPLICATE KEY UPDATE binlog_file=?, binlog_position=?";
PreparedStatement s = c.prepareStatement(sql);
PreparedStatement s = this.connection.prepareStatement(sql);

LOGGER.debug("Writing initial position: " + newPosition);
s.setLong(1, serverID);
s.setString(2, p.getFile());
s.setLong(3, p.getOffset());
s.setString(4, p.getFile());
s.setLong(5, p.getOffset());
s.setString(2, newPosition.getFile());
s.setLong(3, newPosition.getOffset());
s.setString(4, newPosition.getFile());
s.setLong(5, newPosition.getOffset());

s.execute();
}

}
public void set(BinlogPosition p) {
position.set(p);
}

public BinlogPosition get() throws SQLException {
BinlogPosition p = position.get();
if ( p != null )
return p;

PreparedStatement s = this.connection.prepareStatement("SELECT * from `maxwell`.`positions` where server_id = ?");
s.setLong(1, serverID);

ResultSet rs = s.executeQuery();
if ( !rs.next() )
return null;

return new BinlogPosition(rs.getLong("binlog_position"), rs.getString("binlog_file"));
}
}
1 change: 1 addition & 0 deletions src/test/java/com/zendesk/maxwell/AbstractMaxwellTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void setupMysql() throws SQLException, IOException, InterruptedException
}

p.stop();
config.terminate();

return list;
}
Expand Down

0 comments on commit a049fa7

Please sign in to comment.