diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java index 7864c0dff79..7ef10f4e31a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java @@ -97,7 +97,7 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv @SuppressWarnings("FieldCanBeLocal") // We need to hold onto this reference for reachability purposes. private final Runnable processNewLocationsUpdateRoot; - private final UpdateCommitter removedLocationsComitter; + private final UpdateCommitter removedLocationsCommitter; private List removedConstituents = null; private UnderlyingTableMaintainer( @@ -156,12 +156,12 @@ protected void instrumentedRefresh() { }; refreshCombiner.addSource(processNewLocationsUpdateRoot); - this.removedLocationsComitter = new UpdateCommitter<>( + this.removedLocationsCommitter = new UpdateCommitter<>( this, result.getUpdateGraph(), ignored -> { Assert.neqNull(removedConstituents, "removedConstituents"); - removedConstituents.forEach(result::unmanage); + result.unmanage(removedConstituents.stream()); removedConstituents = null; }); processPendingLocations(false); @@ -170,7 +170,7 @@ protected void instrumentedRefresh() { pendingLocationStates = null; readyLocationStates = null; processNewLocationsUpdateRoot = null; - removedLocationsComitter = null; + removedLocationsCommitter = null; tableLocationProvider.refresh(); final Collection locations = new ArrayList<>(); @@ -203,7 +203,8 @@ private QueryTable result() { private RowSet sortAndAddLocations(@NotNull final Stream locations) { final long initialLastRowKey = resultRows.lastRowKey(); final MutableLong lastInsertedRowKey = new MutableLong(initialLastRowKey); - locations.sorted(Comparator.comparing(TableLocation::getKey)).forEach(tl -> { + // Note that makeConstituentTable expects us to subsequently unmanage the TableLocations + unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> { final long constituentRowKey = lastInsertedRowKey.incrementAndGet(); final Table constituentTable = makeConstituentTable(tl); @@ -216,7 +217,7 @@ private RowSet sortAndAddLocations(@NotNull final Stream location if (result.isRefreshing()) { result.manage(constituentTable); } - }); + })); return initialLastRowKey == lastInsertedRowKey.get() ? RowSetFactory.empty() : RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get()); @@ -235,7 +236,7 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) { // Transfer management to the constituent CSM. NOTE: this is likely to end up double-managed // after the CSM adds the location to the table, but that's acceptable. constituent.columnSourceManager.manage(tableLocation); - unmanage(tableLocation); + // Note that the caller is now responsible for unmanaging tableLocation on behalf of this. // Be careful to propagate the systemic attribute properly to child tables constituent.setAttribute(Table.SYSTEMIC_TABLE_ATTRIBUTE, result.isSystemicObject()); @@ -293,8 +294,12 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp readyLocationStates.offer(pendingLocationState); } } - final RowSet added = sortAndAddLocations(readyLocationStates.stream() - .map(PendingLocationState::release)); + + if (readyLocationStates.isEmpty()) { + return RowSetFactory.empty(); + } + + final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release)); readyLocationStates.clearFast(); return added; } @@ -312,14 +317,23 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd } // Iterate through the pending locations and remove any that are in the removed set. + List toUnmanage = null; for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) { final PendingLocationState pendingLocationState = iter.next(); if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) { iter.remove(); - // Release the state and unmanage the location - unmanage(pendingLocationState.release()); + // Release the state and plan to unmanage the location + if (toUnmanage == null) { + toUnmanage = new ArrayList<>(); + } + toUnmanage.add(pendingLocationState.release()); } } + if (toUnmanage != null) { + unmanage(toUnmanage.stream()); + // noinspection UnusedAssignment + toUnmanage = null; + } // At the end of the cycle we need to make sure we unmanage any removed constituents. this.removedConstituents = new ArrayList<>(relevantRemovedLocations.size()); @@ -350,7 +364,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd removedConstituents = null; return RowSetFactory.empty(); } - this.removedLocationsComitter.maybeActivate(); + this.removedLocationsCommitter.maybeActivate(); final WritableRowSet deletedRows = deleteBuilder.build(); resultTableLocationKeys.setNull(deletedRows); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java index 071b5b7d980..576767dae22 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java @@ -331,7 +331,9 @@ protected void endTransaction(@NotNull final Object token) { } // Release the keys that were removed after we have delivered the notifications and the // subscribers have had a chance to process them - removedKeys.forEach(livenessManager::unmanage); + if (!removedKeys.isEmpty()) { + livenessManager.unmanage(removedKeys.stream()); + } } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java index 3fbf3aefc3b..9dcd940d496 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java @@ -54,6 +54,10 @@ private LocationUpdate() { private void processAdd(@NotNull final LiveSupplier addedKeySupplier) { final ImmutableTableLocationKey addedKey = addedKeySupplier.get(); + // Note that we might have a remove for this key if it previously existed and is being replaced. Hence, we + // don't look for an existing remove, which is apparently asymmetric w.r.t. processRemove but still correct. + // Consumers of a LocationUpdate must process removes before adds. + // Need to verify that we don't have stacked adds (without intervening removes). if (added.containsKey(addedKey)) { throw new IllegalStateException("TableLocationKey " + addedKey @@ -99,10 +103,16 @@ private void processTransaction( } } + /** + * @return The pending location keys to add. Note that removes should be processed before adds. + */ public Collection> getPendingAddedLocationKeys() { return added.values(); } + /** + * @return The pending location keys to remove. Note that removes should be processed before adds. + */ public Collection> getPendingRemovedLocationKeys() { return removed.values(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java index f737fe32ee2..3f301c8ef32 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java @@ -5,6 +5,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.liveness.LivenessReferent; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.*; @@ -323,12 +324,16 @@ private final class ChangeProcessingContext implements SafeCloseable { * {@link #resultRows}. The truncating constituent and following will need to insert their entire shifted row * set, and must update the next slot in {@link #currFirstRowKeys}. */ - boolean slotAllocationChanged; + private boolean slotAllocationChanged; /** * The first key after which we began inserting shifted constituent row sets instead of trying for piecemeal * updates. */ - long firstTruncatedResultKey; + private long firstTruncatedResultKey; + /** + * Removed constituent listeners to bulk-unmanage. + */ + private List toUnmanage; private ChangeProcessingContext(@NotNull final TableUpdate constituentChanges) { modifiedColumnSet.clear(); @@ -388,7 +393,10 @@ public void close() { final SafeCloseable ignored3 = removedValues; final SafeCloseable ignored4 = addedKeys; final SafeCloseable ignored5 = modifiedKeys; - final SafeCloseable ignored6 = modifiedPreviousValues) { + final SafeCloseable ignored6 = modifiedPreviousValues; + final SafeCloseable ignored7 = toUnmanage == null + ? null + : () -> mergedListener.unmanage(toUnmanage.stream())) { } // @formatter:on } @@ -504,7 +512,10 @@ private void processRemove(@NotNull final Table removedConstituent) { listeners.remove(); } removedConstituent.removeUpdateListener(nextListener); - mergedListener.unmanage(nextListener); + if (toUnmanage == null) { + toUnmanage = new ArrayList<>(); + } + toUnmanage.add(nextListener); advanceListener(); } final long firstRemovedKey = prevFirstRowKeys[nextPreviousSlot]; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java index 0a131275a7b..f33e85514ec 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java @@ -239,8 +239,10 @@ protected void destroy() { private synchronized void invalidateAndRelease() { invalidatedLocations.forEach(IncludedTableLocationEntry::invalidate); invalidatedLocations.clear(); - releasedLocations.forEach(this::unmanage); - releasedLocations.clear(); + if (!releasedLocations.isEmpty()) { + unmanage(releasedLocations.stream()); + releasedLocations.clear(); + } } @Override diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java index 4bcfdfc4af6..7c6307403ff 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java @@ -42,6 +42,8 @@ default void manage(@NotNull final LivenessReferent referent) { /** * If this manager manages {@code referent} one or more times, drop one such reference. If this manager is also a * {@link LivenessReferent}, then it must also be live. + *

+ * Strongly prefer using {@link #unmanage(Stream)} when multiple referents should be unmanaged. * * @param referent The referent to drop */ @@ -55,6 +57,8 @@ default void unmanage(@NotNull LivenessReferent referent) { /** * If this manager manages referent one or more times, drop one such reference. If this manager is also a * {@link LivenessReferent}, then this method is a no-op if {@code this} is not live. + *

+ * Strongly prefer using {@link #tryUnmanage(Stream)}} when multiple referents should be unmanaged. * * @param referent The referent to drop * @return If this node is also a {@link LivenessReferent}, whether this node was live and thus in fact tried to