diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index c3941a6923..e775d3c766 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -308,7 +308,7 @@ public Void call() throws Exception { * sink that owns it. This method should be used only when size or count * based rolling closes this file. */ - public void close() throws InterruptedException { + public void close() throws InterruptedException, IOException { close(false); } @@ -338,7 +338,7 @@ public Void call() throws Exception { * If all close attempts were unsuccessful we try to recover the lease. * @param immediate An immediate close is required */ - public void close(boolean immediate) { + public void close(boolean immediate) throws IOException,InterruptedException { closeTries++; boolean shouldRetry = closeTries < maxRetries && !immediate; try { @@ -349,7 +349,11 @@ public void close(boolean immediate) { "retry again in " + retryInterval + " seconds.", e); if (timedRollerPool != null && !timedRollerPool.isTerminated()) { if (shouldRetry) { - timedRollerPool.schedule(this, retryInterval, TimeUnit.SECONDS); + close(false); + } else { + LOG.error("Closing file: " + path + " failed. beyond max retry " + + maxRetries, e); + throw e; } } else { LOG.warn("Cannot retry close any more timedRollerPool is null or terminated"); @@ -406,7 +410,7 @@ private synchronized void recoverLease() { } } - public void close(boolean callCloseCallback) throws InterruptedException { + public void close(boolean callCloseCallback) throws InterruptedException, IOException { close(callCloseCallback, false); } @@ -415,7 +419,7 @@ public void close(boolean callCloseCallback) throws InterruptedException { * Safe to call multiple times. Logs HDFSWriter.close() exceptions. */ public void close(boolean callCloseCallback, boolean immediate) - throws InterruptedException { + throws InterruptedException, IOException { if (callCloseCallback) { if (closed.compareAndSet(false, true)) { runCloseAction(); //remove from the cache as soon as possible @@ -427,7 +431,7 @@ public void close(boolean callCloseCallback, boolean immediate) } private synchronized void doClose(boolean immediate) - throws InterruptedException { + throws InterruptedException, IOException { checkAndThrowInterruptedException(); try { flush(); diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 4393b6d46a..e44ebc916a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -81,7 +81,7 @@ public interface WriterCallback { // Time between close retries, in seconds private static final long defaultRetryInterval = 180; // Retry forever. - private static final int defaultTryCount = Integer.MAX_VALUE; + private static final int defaultTryCount = 3; public static final String IN_USE_SUFFIX_PARAM_NAME = "hdfs.inUseSuffix"; @@ -163,7 +163,7 @@ protected boolean removeEldestEntry(Entry eldest) { // return true try { eldest.getValue().close(); - } catch (InterruptedException e) { + } catch (InterruptedException | IOException e) { LOG.warn(eldest.getKey().toString(), e); Thread.currentThread().interrupt(); } @@ -412,14 +412,20 @@ public void run(String bucketPath) { bucketWriter.append(event); } catch (BucketClosedException ex) { LOG.info("Bucket was closed while trying to append, " + - "reinitializing bucket and writing event."); + "reinitializing bucket and writing event."); hdfsWriter = writerFactory.getWriter(fileType); bucketWriter = initializeBucketWriter(realPath, realName, - lookupPath, hdfsWriter, closeCallback); + lookupPath, hdfsWriter, closeCallback); synchronized (sfWritersLock) { sfWriters.put(lookupPath, bucketWriter); } bucketWriter.append(event); + } catch (IOException e) { + transaction.rollback(); + LOG.warn("HDFS IO error", e); + sinkCounter.incrementEventWriteFail(); + sfWriters.remove(lookupPath); + return Status.BACKOFF; } // track the buckets getting written in this transaction