Skip to content

Commit

Permalink
Merge pull request #49 from zendesk/ben/flush_schema_pos_on_exit
Browse files Browse the repository at this point in the history
flush schema pos on exit
  • Loading branch information
Ben Osheroff committed Apr 27, 2015
2 parents 8ce5ff0 + 3b6195b commit 743dd26
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 6 deletions.
14 changes: 14 additions & 0 deletions src/main/java/com/zendesk/maxwell/BinlogPosition.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,18 @@ public String getFile() {
public String toString() {
return "BinlogPosition[" + file + ":" + offset + "]";
}

public boolean newerThan(BinlogPosition other) {
if ( other == null )
return true;

int cmp = this.file.compareTo(other.file);
if ( cmp > 0 ) {
return true;
} else if ( cmp == 0 ) {
return this.offset > other.offset;
} else {
return false;
}
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,19 @@ private void initFirstRun(Connection connection) throws SQLException, IOExceptio

private void run(String[] args) throws Exception {
this.config = MaxwellConfig.buildConfig("config.properties", args);

if ( this.config.log_level != null )
MaxwellLogging.setLevel(this.config.log_level);

this.context = new MaxwellContext(this.config);

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
context.terminate();
}
});

try ( Connection connection = this.context.getConnectionPool().getConnection() ) {
MaxwellMysqlStatus.ensureMysqlState(connection);
SchemaStore.ensureMaxwellSchema(connection);
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class MaxwellConfig {
public String kafkaTopic;
public String producerType;
public String outputFile;

public String log_level;

public MaxwellConfig() {
this.kafkaProperties = new Properties();
Expand All @@ -40,6 +40,7 @@ public String getConnectionURI() {

private OptionParser getOptionParser() {
OptionParser parser = new OptionParser();
parser.accepts( "log_level", "log level, one of DEBUG|INFO|WARN|ERROR" ).withRequiredArg();
parser.accepts( "host", "mysql host" ).withRequiredArg();
parser.accepts( "user", "mysql username" ).withRequiredArg();
parser.accepts( "password", "mysql password" ).withRequiredArg();
Expand All @@ -52,12 +53,22 @@ private OptionParser getOptionParser() {
return parser;
}

private String parseLogLevel(String level) {
level = level.toLowerCase();
if ( !( level.equals("debug") || level.equals("info") || level.equals("warn") || level.equals("error")))
usage("unknown log level: " + level);
return level;
}

private void parseOptions(String [] argv) {
OptionSet options = getOptionParser().parse(argv);

if ( options.has("help") )
usage("Help for Maxwell:");

if ( options.has("log_level")) {
this.log_level = parseLogLevel((String) options.valueOf("log_level"));
}
if ( options.has("host"))
this.mysqlHost = (String) options.valueOf("host");
if ( options.has("password"))
Expand Down Expand Up @@ -94,6 +105,9 @@ private void parseFile(String filename) throws IOException {
this.outputFile = p.getProperty("output_file");
this.kafkaTopic = p.getProperty("kafka_topic");

if ( p.containsKey("log_level") )
this.log_level = parseLogLevel(p.getProperty("log_level"));

for ( Enumeration<Object> e = p.keys(); e.hasMoreElements(); ) {
String k = (String) e.nextElement();
if ( k.startsWith("kafka.")) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public void setInitialPosition(BinlogPosition position) throws SQLException {
this.getSchemaPosition().set(position);
}

public void setInitialPositionSync(BinlogPosition position) throws SQLException {
this.getSchemaPosition().setSync(position);
}

public Long getServerID() throws SQLException {
if ( this.serverID != null)
return this.serverID;
Expand All @@ -85,4 +89,5 @@ public AbstractProducer getProducer() throws IOException {
return new StdoutProducer(this);
}
}

}
17 changes: 17 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellLogging.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.zendesk.maxwell;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;

public class MaxwellLogging {
public static void setLevel(String level) {
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
Configuration config = ctx.getConfiguration();
LoggerConfig loggerConfig = config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME);
loggerConfig.setLevel(Level.valueOf(level));
ctx.updateLoggers(); // This causes all Loggers to refetch information from their LoggerConfig.
}
}
1 change: 1 addition & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private void processQueryEvent(QueryEvent event) throws SchemaSyncError, SQLExce
try ( Connection c = this.context.getConnectionPool().getConnection() ) {
new SchemaStore(c, schema, p).save();
}
this.context.setInitialPositionSync(p);
}
}

Expand Down
28 changes: 23 additions & 5 deletions src/main/java/com/zendesk/maxwell/schema/SchemaPosition.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
public class SchemaPosition implements Runnable {
static final Logger LOGGER = LoggerFactory.getLogger(SchemaPosition.class);
private final Long serverID;
private BinlogPosition lastPosition;
private final AtomicReference<BinlogPosition> position;
private final AtomicReference<BinlogPosition> storedPosition;
private final AtomicBoolean run;
private Thread thread;
private final ConnectionPool connectionPool;

public SchemaPosition(ConnectionPool pool, Long serverID) {
this.connectionPool = pool;
this.serverID = serverID;
this.lastPosition = null;
this.position = new AtomicReference<>();
this.storedPosition = new AtomicReference<>();
this.run = new AtomicBoolean(false);
}

Expand All @@ -55,9 +55,8 @@ public void run() {
while ( true && run.get() ) {
BinlogPosition newPosition = position.get();

if ( newPosition != null && !newPosition.equals(lastPosition) ) {
if ( newPosition != null && newPosition.newerThan(storedPosition.get()) ) {
store(newPosition);
lastPosition = newPosition;
}

try {
Expand All @@ -70,6 +69,9 @@ public void run() {


private void store(BinlogPosition newPosition) {
if ( newPosition == null )
return;

String sql = "INSERT INTO `maxwell`.`positions` set "
+ "server_id = ?, "
+ "binlog_file = ?, "
Expand All @@ -78,14 +80,15 @@ private void store(BinlogPosition newPosition) {
try(Connection c = connectionPool.getConnection() ){
PreparedStatement s = c.prepareStatement(sql);

LOGGER.debug("Writing initial position: " + newPosition);
LOGGER.debug("Writing binlog position to maxwell.positions: " + newPosition);
s.setLong(1, serverID);
s.setString(2, newPosition.getFile());
s.setLong(3, newPosition.getOffset());
s.setString(4, newPosition.getFile());
s.setLong(5, newPosition.getOffset());

s.execute();
storedPosition.set(newPosition);
} catch (SQLException e) {
e.printStackTrace();
}
Expand All @@ -95,6 +98,20 @@ public void set(BinlogPosition p) {
position.set(p);
}

public void setSync(BinlogPosition p) {
LOGGER.debug("syncing binlog position: " + p);
position.set(p);
while ( true ) {
thread.interrupt();
BinlogPosition s = storedPosition.get();
if ( p.newerThan(s) ) {
try { Thread.sleep(50); } catch (InterruptedException e) { }
} else {
break;
}
}
}

public BinlogPosition get() throws SQLException {
BinlogPosition p = position.get();
if ( p != null )
Expand All @@ -111,4 +128,5 @@ public BinlogPosition get() throws SQLException {
return new BinlogPosition(rs.getLong("binlog_position"), rs.getString("binlog_file"));
}
}

}

0 comments on commit 743dd26

Please sign in to comment.