Skip to content

Commit

Permalink
Add new support classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mnlipp committed Apr 29, 2024
1 parent 0f83a4f commit 6260e9d
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 0 deletions.
74 changes: 74 additions & 0 deletions org.jgrapes.io/src/org/jgrapes/io/util/ManagedBufferStreamer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* JGrapes Event Driven Framework
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
* for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program; if not, see <http://www.gnu.org/licenses/>.
*/

package org.jgrapes.io.util;

import java.io.IOException;
import java.io.Reader;
import java.nio.Buffer;
import java.nio.charset.Charset;
import java.util.function.Consumer;

/**
*/
public class ManagedBufferStreamer {

private ManagedBufferReader reader = new ManagedBufferReader();

public ManagedBufferStreamer(Consumer<Reader> processor) {
Thread thread = new Thread(() -> {
processor.accept(reader);
});
thread.start();
ThreadCleaner.watch(this, thread);
}

/**
* Sets the charset to be used if {@link #feed(ManagedBuffer)}
* is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8.
* Must be set before the first invocation of
* {@link #feed(ManagedBuffer)}.
*
* @param charset the charset
* @return the managed buffer streamer
*/
public ManagedBufferStreamer charset(Charset charset) {
reader.charset(charset);
return this;
}

/**
* Feed data to the reader. The call blocks while data from a previous
* invocation has not been fully read. The buffer passed as argument
* is locked (see {@link ManagedBuffer#lockBuffer()}) until all
* data has been read.
*
* Calling this method with `null` as argument closes the feed.
* After consuming any data still available from a previous
* invocation, further calls to {@link #read} therefore return -1.
*
* @param buffer the buffer
* @throws IOException Signals that an I/O exception has occurred.
*/
@SuppressWarnings({ "PMD.PreserveStackTrace" })
public <W extends Buffer> void feed(ManagedBuffer<W> buffer)
throws IOException {
reader.feed(buffer);
}

}
88 changes: 88 additions & 0 deletions org.jgrapes.io/src/org/jgrapes/io/util/ThreadCleaner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* JGrapes Event Driven Framework
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
* for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program; if not, see <http://www.gnu.org/licenses/>.
*/

package org.jgrapes.io.util;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.HashSet;
import java.util.Set;

/**
* Cleans up threads if some object has been garbage collected.
*
* Sometimes it is necessary to create a thread that delivers data
* to a synchronous consumer. This thread keeps the consumer running
* until the consumer expects no more input and thus terminates
* the thread.
*
* Due to an error condition it may happen, however, that the terminating
* event never occurs and the thread runs forever. As a possible remedy,
* this class allows the thread to be associated with the lifetime of an
* arbitrary object. When the object is garbage collected, the thread is
* terminated automatically.
*/
public class ThreadCleaner {

private static Set<RefWithThread> watched = new HashSet<>();
private static ReferenceQueue<Object> abandoned
= new ReferenceQueue<>();

/**
* Weak references to an object that interrupts the associated
* thread if the object has been garbage collected.
*
* @param <T> the generic type
*/
private static class RefWithThread extends WeakReference<Object> {
public Thread watched;

/**
* Creates a new instance.
*
* @param referent the referent
* @param thread the thread
*/
public RefWithThread(Object referent, Thread thread) {
super(referent, abandoned);
watched = thread;
}
}

static {
Thread watchdog = new Thread(() -> {
Thread.currentThread().setName("ThreadCleaner");
while (true) {
try {
ThreadCleaner.RefWithThread ref
= (ThreadCleaner.RefWithThread) abandoned.remove();
ref.watched.interrupt();
watched.remove(ref);
} catch (InterruptedException e) {
// Nothing to do
}
}
});
watchdog.setDaemon(true);
watchdog.start();
}

public static void watch(Object referent, Thread thread) {
watched.add(new RefWithThread(referent, thread));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.jgrapes.io.test;

import java.io.IOException;
import java.nio.CharBuffer;
import org.jgrapes.io.util.ManagedBuffer;
import org.jgrapes.io.util.ManagedBufferPool;
import org.jgrapes.io.util.ManagedBufferStreamer;
import static org.junit.Assert.assertEquals;
import org.junit.Test;

public class ManagedBufferStreamerTests {

private String result;

@Test(timeout = 1000)
public void test() throws InterruptedException, IOException {
var charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
() -> CharBuffer.allocate(4096), 2).setName("Test");

ManagedBufferStreamer streamer = new ManagedBufferStreamer(rdr -> {
StringBuffer sb = new StringBuffer();
try {
while (true) {
int ch = rdr.read();
if (ch == -1) {
break;
}
sb.append((char) ch);
}
} catch (IOException e) {
// ignore
}
result = sb.toString();
});

// Feed
var data = charBufferPool.acquire();
data.backingBuffer().append("Hello World!");
data.backingBuffer().flip();
streamer.feed(data);
data.unlockBuffer();

// End of feed
streamer.feed(null);

while (result == null) {
Thread.sleep(10);
}
assertEquals("Hello World!", result);
}

}
39 changes: 39 additions & 0 deletions org.jgrapes.io/test/org/jgrapes/io/test/ThreadCleanerTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.jgrapes.io.test;

import org.jgrapes.io.util.ThreadCleaner;
import org.junit.Test;

public class ThreadCleanerTests {

protected boolean terminated;

private void startThread() {
Object referent = new Object();
Thread thread = new Thread() {

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
interrupt();
}
}
terminated = true;
}

};
thread.start();
ThreadCleaner.watch(referent, thread);
}

@Test(timeout = 10000)
public void test() {
startThread();
while (!terminated) {
System.gc();
}
}

}

0 comments on commit 6260e9d

Please sign in to comment.