diff --git a/.gitignore b/.gitignore index c708c36..1baf13f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,16 @@ /.settings /.classpath /.project +.idea/.gitignore +.idea/codeStyles/ +.idea/compiler.xml +.idea/encodings.xml +.idea/jarRepositories.xml +.idea/libraries/ +.idea/misc.xml +.idea/modules.xml +.idea/sbt.xml +.idea/sonarlint/ +.idea/vcs.xml +.java-version +bigqueue.iml diff --git a/pom.xml b/pom.xml index e85e829..b981e30 100644 --- a/pom.xml +++ b/pom.xml @@ -2,9 +2,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.leansoft + org.clojars.nitishgoyal13 bigqueue - 0.7.2 + 0.7.6 jar bigqueue @@ -37,15 +37,10 @@ - github.release.repo - Release Repository - file:///dev/bulldog-repo/repo/releases + clojars + Clojars repository + https://clojars.org/repo - - github.snapshot.repo - Snapshot Repository - file:///dev/bulldog-repo/repo/snapshots - @@ -101,11 +96,33 @@ org.apache.maven.plugins maven-compiler-plugin - - 1.6 - 1.6 - UTF-8 - + 3.8.1 + + + compile-java-8 + + compile + + + 1.8 + 1.8 + + + + compile-java-11 + compile + + compile + + + 11 + + ${project.basedir}/src/main/java11 + + ${project.build.outputDirectory}/META-INF/versions/11 + + + org.apache.maven.plugins @@ -134,6 +151,9 @@ src/main/resources/META-INF/MANIFEST.MF + + true + diff --git a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java index 6fb1249..02e8b65 100644 --- a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java +++ b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -3,6 +3,7 @@ import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Method; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; @@ -10,44 +11,44 @@ import org.slf4j.LoggerFactory; public class MappedPageImpl implements IMappedPage, Closeable { - + private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class); - + private ThreadLocalByteBuffer threadLocalBuffer; private volatile boolean dirty = false; private volatile boolean closed = false; private String pageFile; private long index; - + public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) { this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb); this.pageFile = pageFile; this.index = index; } - + public void close() throws IOException { synchronized(this) { if (closed) return; flush(); - + MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer(); unmap(srcBuf); - + this.threadLocalBuffer = null; // hint GC - + closed = true; if (logger.isDebugEnabled()) { logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed."); } } } - + @Override public void setDirty(boolean dirty) { this.dirty = dirty; } - + @Override public void flush() { synchronized(this) { @@ -69,19 +70,19 @@ public byte[] getLocal(int position, int length) { buf.get(data); return data; } - + @Override public ByteBuffer getLocal(int position) { ByteBuffer buf = this.threadLocalBuffer.get(); - buf.position(position); + ((Buffer)buf).position(position); return buf; } - + private static void unmap(MappedByteBuffer buffer) { Cleaner.clean(buffer); } - + /** * Helper class allowing to clean direct buffers. */ @@ -120,18 +121,18 @@ public static void clean(ByteBuffer buffer) { } } } - + private static class ThreadLocalByteBuffer extends ThreadLocal { private ByteBuffer _src; - + public ThreadLocalByteBuffer(ByteBuffer src) { _src = src; } - + public ByteBuffer getSourceBuffer() { return _src; } - + @Override protected synchronized ByteBuffer initialValue() { ByteBuffer dup = _src.duplicate(); @@ -143,7 +144,7 @@ protected synchronized ByteBuffer initialValue() { public boolean isClosed() { return closed; } - + public String toString() { return "Mapped page for " + this.pageFile + ", index = " + this.index + "."; } diff --git a/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java new file mode 100644 index 0000000..110c44a --- /dev/null +++ b/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -0,0 +1,150 @@ +package com.leansoft.bigqueue.page; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; + +public class MappedPageImpl implements IMappedPage, Closeable { + + private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class); + + private ThreadLocalByteBuffer threadLocalBuffer; + private volatile boolean dirty = false; + private volatile boolean closed = false; + private String pageFile; + private long index; + + public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) { + this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb); + this.pageFile = pageFile; + this.index = index; + } + + public void close() throws IOException { + synchronized(this) { + if (closed) return; + + flush(); + + MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer(); + unmap(srcBuf); + + this.threadLocalBuffer = null; // hint GC + + closed = true; + if (logger.isDebugEnabled()) { + logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed."); + } + } + } + + @Override + public void setDirty(boolean dirty) { + this.dirty = dirty; + } + + @Override + public void flush() { + synchronized(this) { + if (closed) return; + if (dirty) { + MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer(); + srcBuf.force(); // flush the changes + dirty = false; + if (logger.isDebugEnabled()) { + logger.debug("Mapped page for " + this.pageFile + " was just flushed."); + } + } + } + } + + @Override + public ByteBuffer getLocal(int position) { + ByteBuffer buf = this.threadLocalBuffer.get(); + buf.position(position); + return buf; + } + + public byte[] getLocal(int position, int length) { + ByteBuffer buf = this.getLocal(position); + byte[] data = new byte[length]; + buf.get(data); + return data; + } + + private static void unmap(MappedByteBuffer buffer) + { + Cleaner.clean(buffer); + } + + /** + * Helper class allowing to clean direct buffers. + */ + private static class Cleaner { + + private static Unsafe unsafe; + + static { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe) f.get(null); + } catch (Exception e) { + logger.warn("Unsafe Not support {}", e.getMessage(), e); + } + } + + public static void clean(ByteBuffer buffer) { + if (buffer == null) return; + if (buffer.isDirect()) { + if (unsafe != null) { + unsafe.invokeCleaner(buffer); + } else { + logger.warn("Unable to clean bytebuffer"); + } + } + } + } + + private static class ThreadLocalByteBuffer extends ThreadLocal { + private ByteBuffer _src; + + public ThreadLocalByteBuffer(ByteBuffer src) { + _src = src; + } + + public ByteBuffer getSourceBuffer() { + return _src; + } + + @Override + protected synchronized ByteBuffer initialValue() { + ByteBuffer dup = _src.duplicate(); + return dup; + } + } + + @Override + public boolean isClosed() { + return closed; + } + + public String toString() { + return "Mapped page for " + this.pageFile + ", index = " + this.index + "."; + } + + @Override + public String getPageFile() { + return this.pageFile; + } + + @Override + public long getPageIndex() { + return this.index; + } +}