From 0fb89153ef4f99a0a1447b655857f682c18b98e0 Mon Sep 17 00:00:00 2001 From: bmalinowsky Date: Sat, 3 Aug 2024 16:40:29 +0200 Subject: [PATCH] Override quit to close selector and channel --- src/io/calimero/knxnetip/Discoverer.java | 33 +++++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/io/calimero/knxnetip/Discoverer.java b/src/io/calimero/knxnetip/Discoverer.java index 3a2afc78..a2b01401 100644 --- a/src/io/calimero/knxnetip/Discoverer.java +++ b/src/io/calimero/knxnetip/Discoverer.java @@ -739,6 +739,7 @@ private CompletableFuture> receiveAsync(final DatagramCha private final class ReceiverLoop extends UdpSocketLooper implements Runnable { + private final DatagramChannel dc; private final boolean multicast; private final InetSocketAddress server; private final NetworkInterface nif; @@ -764,6 +765,7 @@ private final class ReceiverLoop extends UdpSocketLooper implements Runnable throws IOException { super(null, false, receiveBufferSize, 0, (int) timeout.toMillis()); + this.dc = dc; final var mcastIf = dc.getOption(StandardSocketOptions.IP_MULTICAST_IF); nif = mcastIf == null ? Net.defaultNetif() : mcastIf; this.localEndpoint = localEndpoint; @@ -784,6 +786,8 @@ private final class ReceiverLoop extends UdpSocketLooper implements Runnable ReceiverLoop(final DatagramChannel dc, final int receiveBufferSize, final Duration timeout, final InetSocketAddress queriedServer) throws IOException { super(null, true, receiveBufferSize, 0, (int) timeout.toMillis()); + + this.dc = dc; nif = null; localEndpoint = null; multicast = false; @@ -877,21 +881,42 @@ else if (!multicast && svc == KNXnetIPHeader.SearchResponse) { protected void receive(final byte[] buf) throws IOException { var remaining = timeout; final var end = Instant.now().plus(remaining); - while (remaining.toMillis() > 0) { + while (selector.isOpen() && remaining.toMillis() > 0) { if (selector.select(remaining.toMillis()) > 0) { - for (final var i = selector.selectedKeys().iterator(); i.hasNext();) { - final var key = i.next(); + final Set selectedKeys; + // synchronize and copy to avoid CME if quit() closes selector + synchronized (this) { + selectedKeys = Set.copyOf(selector.selectedKeys()); + selector.selectedKeys().clear(); + } + for (final var key : selectedKeys) { final var channel = key.channel(); final ByteBuffer buffer = ByteBuffer.wrap(buf); final var source = ((DatagramChannel) channel).receive(buffer); buffer.flip(); onReceive((InetSocketAddress) source, buf, buffer.position(), buffer.remaining()); - i.remove(); } return; } remaining = Duration.between(Instant.now(), end); } } + + @Override + public void quit() { + super.quit(); + try { + synchronized (this) { + selector.close(); + } + } + catch (final IOException ignore) {} + finally { + try { + dc.close(); + } + catch (final IOException ignore) {} + } + } } }