diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java index 9e5478f3c89..59c575c98ff 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java +++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java @@ -100,7 +100,7 @@ public IteratorWriter.ItemWriter add(Object o) throws IOException { long lastOutputCounter = 0; for (int count = 0; count < totalHits; ) { // log.debug("--- filler fillOutDocs in {}", fillBuffer); - exportWriter.fillOutDocs(mergeIterator, buffer); + exportWriter.fillNextBuffer(mergeIterator, buffer); count += (buffer.outDocsIndex + 1); // log.debug("--- filler count={}, exchange buffer from {}", count, buffer); try { diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java index f2d0b9ba21f..087a1b0e7d0 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java +++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java @@ -24,9 +24,9 @@ import java.io.PrintWriter; import java.lang.invoke.MethodHandles; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.TreeSet; +import java.util.*; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedDocValues; @@ -37,6 +37,7 @@ import org.apache.lucene.util.FixedBitSet; import org.apache.solr.client.solrj.impl.BinaryResponseParser; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.ParallelStream; import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.TupleStream; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; @@ -52,6 +53,7 @@ import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.StreamParams; import org.apache.solr.common.util.JavaBinCodec; +import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.SolrCore; import org.apache.solr.metrics.SolrMetricsContext; import org.apache.solr.request.SolrQueryRequest; @@ -71,9 +73,7 @@ import org.apache.solr.schema.SchemaField; import org.apache.solr.schema.SortableTextField; import org.apache.solr.schema.StrField; -import org.apache.solr.search.SolrIndexSearcher; -import org.apache.solr.search.SortSpec; -import org.apache.solr.search.SyntaxError; +import org.apache.solr.search.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +97,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { public static final String BATCH_SIZE_PARAM = "batchSize"; public static final String QUEUE_SIZE_PARAM = "queueSize"; + public static final String SOLR_CACHE_KEY = "exportCache"; + public static final int DEFAULT_BATCH_SIZE = 30000; public static final int DEFAULT_QUEUE_SIZE = 150000; @@ -109,6 +111,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { final String metricsPath; //The batch size for the output writer thread. final int batchSize; + final private String wt; + final int numWorkers; + final int workerId; + final String fieldList; + final List partitionKeys; + final String partitionCacheKey; //The max combined size of the segment level priority queues. private int priorityQueueSize; StreamExpression streamExpression; @@ -117,13 +125,19 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { int totalHits = 0; FixedBitSet[] sets = null; PushWriter writer; - private String wt; + // per-segment caches for already populated partitioning filters when parallel() is in use + final SolrCache> partitionCaches; + + // local per-segment partitioning filters that are incomplete (still being updated from the current request) + final Map tempPartitionCaches; + + @SuppressWarnings("unchecked") public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt, StreamContext initialStreamContext, SolrMetricsContext solrMetricsContext, - String metricsPath) { + String metricsPath) throws Exception { this.req = req; this.res = res; this.wt = wt; @@ -131,14 +145,35 @@ public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt, this.solrMetricsContext = solrMetricsContext; this.metricsPath = metricsPath; this.priorityQueueSize = req.getParams().getInt(QUEUE_SIZE_PARAM, DEFAULT_QUEUE_SIZE); + this.numWorkers = req.getParams().getInt(ParallelStream.NUM_WORKERS_PARAM, 1); + this.workerId = req.getParams().getInt(ParallelStream.WORKER_ID_PARAM, 0); + boolean useHashQuery = req.getParams().getBool(ParallelStream.USE_HASH_QUERY_PARAM, false); + if (numWorkers > 1 && !useHashQuery) { + String keysList = req.getParams().get(ParallelStream.PARTITION_KEYS_PARAM); + if (keysList == null || keysList.trim().equals("none")) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when numWorkers > 1 partitionKeys MUST be specified!"); + } + partitionKeys = StrUtils.splitSmart(keysList, ',', true); + // we have to use ALL parameters as a cache key to account for different queries + partitionCacheKey = req.getParamString(); + tempPartitionCaches = new HashMap<>(); + } else { + partitionKeys = null; + partitionCacheKey = null; + tempPartitionCaches = null; + } + this.fieldList = req.getParams().get(CommonParams.FL); this.batchSize = DEFAULT_BATCH_SIZE; + this.partitionCaches = (SolrCache>)req.getSearcher().getCache(SOLR_CACHE_KEY); } @Override public String getContentType() { if ("javabin".equals(wt)) { return BinaryResponseParser.BINARY_CONTENT_TYPE; - } else return "json"; + } else { + return "json"; + } } @Override @@ -237,15 +272,14 @@ private void _write(OutputStream os) throws IOException { } } SolrParams params = req.getParams(); - String fl = params.get("fl"); String[] fields = null; - if (fl == null) { + if (fieldList == null) { writeException((new IOException(new SyntaxError("export field list (fl) must be specified."))), writer, true); return; } else { - fields = fl.split(","); + fields = fieldList.split(","); for (int i = 0; i < fields.length; i++) { @@ -265,6 +299,8 @@ private void _write(OutputStream os) throws IOException { return; } + outputDoc = new OutputDocMapWriter(fields, partitionKeys); + String expr = params.get(StreamParams.EXPR); if (expr != null) { StreamFactory streamFactory = initialStreamContext.getStreamFactory(); @@ -321,34 +357,12 @@ private TupleStream createTupleStream() throws IOException { return tupleStream; } - private void transferBatchToBufferForOutput(MergeIterator mergeIterator, - ExportBuffers.Buffer destination) throws IOException { - try { - int outDocsIndex = -1; - for (int i = 0; i < batchSize; i++) { - SortDoc sortDoc = mergeIterator.next(); - if (sortDoc != null) { - destination.outDocs[++outDocsIndex].setValues(sortDoc); - } else { - break; - } - } - destination.outDocsIndex = outDocsIndex; - } catch (Throwable t) { - log.error("transfer", t); - if (t instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw t; - } finally { - - } - } - protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.ItemWriter writer, Sort sort) throws IOException { List leaves = req.getSearcher().getTopReaderContext().leaves(); final int queueSize = Math.min(batchSize, totalHits); - + if (tempPartitionCaches != null && partitionCaches != null) { + initTempPartitionCaches(leaves); + } ExportBuffers buffers = new ExportBuffers(this, leaves, @@ -417,7 +431,10 @@ protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.I // reduction in the number of output items, unlike when using // streaming expressions final SortDoc currentDoc = buffer.outDocs[i]; - writer.add((MapWriter) ew -> writeDoc(currentDoc, leaves, ew, fieldWriters)); + MapWriter outputDoc = fillOutputDoc(currentDoc, leaves, fieldWriters); + if (outputDoc != null) { + writer.add(outputDoc); + } } } finally { } @@ -437,24 +454,196 @@ protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.I return true; }); } + transferTempPartitionCaches(); } - void fillOutDocs(MergeIterator mergeIterator, - ExportBuffers.Buffer buffer) throws IOException { - transferBatchToBufferForOutput(mergeIterator, buffer); + /** + * This method transfers the newly built per-segment partitioning bitsets to the global cache, + * keyed by the current query. + */ + private void transferTempPartitionCaches() { + if (tempPartitionCaches == null || partitionCaches == null) { + return; + } + tempPartitionCaches.forEach((cacheKey, partitionSet) -> { + SolrCache perSegmentCache = partitionCaches.computeIfAbsent(cacheKey, k -> { + CaffeineCache cache = new CaffeineCache<>(); + cache.init( + Map.of( + // 100 unique queries should be enough for anyone ;) + SolrCache.SIZE_PARAM, "100", + // evict entries after 600 sec + SolrCache.MAX_IDLE_TIME_PARAM, "600"), + null, null); + return cache; + }); + // use our unique query+numWorkers+worker key + perSegmentCache.put(partitionCacheKey, partitionSet); + }); } - void writeDoc(SortDoc sortDoc, - List leaves, - EntryWriter ew, FieldWriter[] writers) throws IOException { + // this inits only those sets that are not already present in the global cache + // which were populated for these segments in previous runs + private void initTempPartitionCaches(List leaves) { + tempPartitionCaches.clear(); + for (LeafReaderContext leaf : leaves) { + IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper(); + if (cacheHelper == null) { + continue; + } + IndexReader.CacheKey cacheKey = cacheHelper.getKey(); + // check if a bitset was computed earlier for this segment and this query and can be skipped + SolrCache perSegmentCache = partitionCaches.get(cacheKey); + if (perSegmentCache != null && perSegmentCache.get(partitionCacheKey) != null) { + // already computed earlier + continue; + } + tempPartitionCaches.put(cacheKey, new FixedBitSet(leaf.reader().maxDoc())); + } + } + + void fillNextBuffer(MergeIterator mergeIterator, + ExportBuffers.Buffer buffer) throws IOException { + try { + int outDocsIndex = -1; + for (int i = 0; i < batchSize; i++) { + SortDoc sortDoc = mergeIterator.next(); + if (sortDoc != null) { + buffer.outDocs[++outDocsIndex].setValues(sortDoc); + } else { + break; + } + } + buffer.outDocsIndex = outDocsIndex; + } catch (Throwable t) { + log.error("transfer", t); + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw t; + } finally { + + } + } + + // not sure about this class - it somewhat reduces object allocation as compared to LinkedHashMap + // NOTE: the lookup of values associated with partition keys uses an int lookup table that is + // indexed by the ord of the partition key in the list of partition keys. + private static final class OutputDocMapWriter implements MapWriter, EntryWriter { + final CharSequence[] keys; + final Object[] values; + final int[] partitionKeyToFieldIdx; + int pos; + + OutputDocMapWriter(String[] fields, List partitionKeys) { + keys = new CharSequence[fields.length]; + values = new Object[fields.length]; + if (partitionKeys != null) { + partitionKeyToFieldIdx = new int[partitionKeys.size()]; +OUTER: for (int keyIdx = 0; keyIdx < partitionKeys.size(); keyIdx++) { + for (int fieldIdx = 0; fieldIdx < fields.length; fieldIdx++) { + if (fields[fieldIdx].equals(partitionKeys.get(keyIdx))) { + partitionKeyToFieldIdx[keyIdx] = fieldIdx; + continue OUTER; + } + } + partitionKeyToFieldIdx[keyIdx] = -1; + } + } else { + partitionKeyToFieldIdx = null; + } + pos = 0; + } + + @Override + public EntryWriter put(CharSequence k, Object v) throws IOException { + keys[pos] = k; + values[pos] = v; + pos++; + return this; + } + + public void clear() { + for (int i = 0; i < pos; i++) { + keys[i] = null; + values[i] = null; + } + pos = 0; + } + + @Override + public void writeMap(EntryWriter ew) throws IOException { + for (int i = 0; i < pos; i++) { + ew.put(keys[i], values[i]); + } + } + + /** + * Get the value associated with the partition key + * @param keyIdx index of the partition key in the list of keys + * @return associated value or null if missing + */ + public Object get(int keyIdx) { + final int fieldIdx = partitionKeyToFieldIdx[keyIdx]; + if (fieldIdx == -1) { + return null; + } else { + return values[fieldIdx]; + } + } + } + + // we materialize this document so that we can potentially do hash partitioning + private OutputDocMapWriter outputDoc; + + // WARNING: single-thread only! shared var outputDoc + MapWriter fillOutputDoc(SortDoc sortDoc, + List leaves, + FieldWriter[] writers) throws IOException { int ord = sortDoc.ord; LeafReaderContext context = leaves.get(ord); + // reuse + outputDoc.clear(); int fieldIndex = 0; for (FieldWriter fieldWriter : writers) { - if (fieldWriter.write(sortDoc, context, ew, fieldIndex)) { + if (fieldWriter.write(sortDoc, context, outputDoc, fieldIndex)) { ++fieldIndex; } } + if (partitionKeys == null) { + return outputDoc; + } else { + // if we use partitioning then filter out unwanted docs + return partitionFilter(sortDoc, context, outputDoc); + } + } + + MapWriter partitionFilter(SortDoc sortDoc, LeafReaderContext leaf, OutputDocMapWriter doc) { + // calculate hash + int hash = 0; + for (int keyIdx = 0; keyIdx < partitionKeys.size(); keyIdx++) { + Object value = doc.get(keyIdx); + if (value != null) { + hash += value.hashCode(); + } + } + if ((hash & 0x7FFFFFFF) % numWorkers == workerId) { + // our partition + // check if we should mark it in the partitionSet + IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper(); + if (cacheHelper != null) { + IndexReader.CacheKey cacheKey = cacheHelper.getKey(); + FixedBitSet partitionSet = tempPartitionCaches.get(cacheKey); + if (partitionSet != null) { + // not computed before - mark it + partitionSet.set(sortDoc.docId); + } + } + return doc; + } else { + // not our partition - skip it + return null; + } } public FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException { @@ -702,7 +891,9 @@ public MergeIterator getMergeIterator(List leaves, FixedBitSe SegmentIterator[] segmentIterators = new SegmentIterator[leaves.size()]; for (int i = 0; i < segmentIterators.length; i++) { SortQueue sortQueue = new SortQueue(sizes[i], sortDoc.copy()); - segmentIterators[i] = new SegmentIterator(bits[i], leaves.get(i), sortQueue, sortDoc.copy()); + // check if we have an existing partition filter and use it if present + FixedBitSet myPartitionSet = partitionCacheKey != null ? getMyPartitionSet(leaves.get(i)) : null; + segmentIterators[i] = new SegmentIterator(bits[i], myPartitionSet, leaves.get(i), sortQueue, sortDoc.copy()); } return new MergeIterator(segmentIterators, sortDoc); @@ -710,6 +901,24 @@ public MergeIterator getMergeIterator(List leaves, FixedBitSe } } + private FixedBitSet getMyPartitionSet(LeafReaderContext leaf) { + if (partitionCaches == null) { + return null; + } + IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper(); + if (cacheHelper == null) { + return null; + } + IndexReader.CacheKey cacheKey = cacheHelper.getKey(); + + SolrCache perSegmentCaches = partitionCaches.get(cacheKey); + if (perSegmentCaches == null) { + // no queries yet for this segment + return null; + } + return perSegmentCaches.get(partitionCacheKey); + } + private static class SegmentIterator { private final FixedBitSet bits; @@ -722,8 +931,20 @@ private static class SegmentIterator { private int index; - public SegmentIterator(FixedBitSet bits, LeafReaderContext context, SortQueue sortQueue, SortDoc sortDoc) throws IOException { + /** + * Construct per-segment iterator for matching docs. + * @param bits matching document id-s in the segment + * @param myPartitionSet filter to match only the docs in the current worker's partition, may be + * null if not partitioning + * @param context segment context + * @param sortQueue sort queue + * @param sortDoc proto sort document + */ + public SegmentIterator(FixedBitSet bits, FixedBitSet myPartitionSet, LeafReaderContext context, SortQueue sortQueue, SortDoc sortDoc) throws IOException { this.bits = bits; + if (myPartitionSet != null) { + this.bits.and(myPartitionSet); + } this.queue = sortQueue; this.sortDoc = sortDoc; this.nextDoc = sortDoc.copy(); diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java index 3d0b3b13ada..f923a3dafe4 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java +++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java @@ -137,81 +137,86 @@ public void close() throws IOException { @Override public Tuple read() throws IOException { Tuple res = null; - if (pos < 0) { - - try { - buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY; - //log.debug("--- ews exchange empty buffer {}", buffer); - boolean exchanged = false; - while (!exchanged) { - try { - long startExchangeBuffers = System.nanoTime(); - exportBuffers.exchangeBuffers(); - long endExchangeBuffers = System.nanoTime(); - if(log.isDebugEnabled()) { - log.debug("Waited for reader thread:{}", Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000))); - } - exchanged = true; - } catch (TimeoutException e) { - log.debug("--- ews timeout loop"); - if (exportBuffers.isShutDown()) { - log.debug("--- ews - the other end is shutdown, returning EOF"); - res = Tuple.EOF(); - break; - } - continue; - } catch (InterruptedException e) { - log.debug("--- ews interrupted"); - exportBuffers.error(e); - res = Tuple.EXCEPTION(e, true); - break; - } catch (BrokenBarrierException e) { - if (exportBuffers.getError() != null) { - res = Tuple.EXCEPTION(exportBuffers.getError(), true); - } else { + do { + if (pos < 0) { + + try { + buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY; + //log.debug("--- ews exchange empty buffer {}", buffer); + boolean exchanged = false; + while (!exchanged) { + try { + long startExchangeBuffers = System.nanoTime(); + exportBuffers.exchangeBuffers(); + long endExchangeBuffers = System.nanoTime(); + if(log.isDebugEnabled()) { + log.debug("Waited for reader thread:{}", Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000))); + } + exchanged = true; + } catch (TimeoutException e) { + log.debug("--- ews timeout loop"); + if (exportBuffers.isShutDown()) { + log.debug("--- ews - the other end is shutdown, returning EOF"); + res = Tuple.EOF(); + break; + } + continue; + } catch (InterruptedException e) { + log.debug("--- ews interrupted"); + exportBuffers.error(e); res = Tuple.EXCEPTION(e, true); + break; + } catch (BrokenBarrierException e) { + if (exportBuffers.getError() != null) { + res = Tuple.EXCEPTION(exportBuffers.getError(), true); + } else { + res = Tuple.EXCEPTION(e, true); + } + break; + } finally { } - break; - } finally { } + } catch (InterruptedException e) { + log.debug("--- ews interrupt"); + exportBuffers.error(e); + res = Tuple.EXCEPTION(e, true); + } catch (Exception e) { + log.debug("--- ews exception", e); + exportBuffers.error(e); + res = Tuple.EXCEPTION(e, true); + } + buffer = exportBuffers.getOutputBuffer(); + if (buffer == null) { + res = Tuple.EOF(); + } + if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) { + log.debug("--- ews EOF"); + res = Tuple.EOF(); + } else { + pos = buffer.outDocsIndex; + index = -1; //restart index. + log.debug("--- ews new pos={}", pos); } - } catch (InterruptedException e) { - log.debug("--- ews interrupt"); - exportBuffers.error(e); - res = Tuple.EXCEPTION(e, true); - } catch (Exception e) { - log.debug("--- ews exception", e); - exportBuffers.error(e); - res = Tuple.EXCEPTION(e, true); } - buffer = exportBuffers.getOutputBuffer(); - if (buffer == null) { + if (pos < 0) { + log.debug("--- ews EOF?"); res = Tuple.EOF(); } - if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) { - log.debug("--- ews EOF"); - res = Tuple.EOF(); - } else { - pos = buffer.outDocsIndex; - index = -1; //restart index. - log.debug("--- ews new pos={}", pos); + if (res != null) { + // only errors or EOF assigned result so far + return res; } - } - if (pos < 0) { - log.debug("--- ews EOF?"); - res = Tuple.EOF(); - } - if (res != null) { - // only errors or EOF assigned result so far - return res; - } - - SortDoc sortDoc = buffer.outDocs[++index]; - tupleEntryWriter.tuple = new Tuple(); - exportBuffers.exportWriter.writeDoc(sortDoc, exportBuffers.leaves, tupleEntryWriter, exportBuffers.exportWriter.fieldWriters); - pos--; - return tupleEntryWriter.tuple; + SortDoc sortDoc = buffer.outDocs[++index]; + MapWriter outputDoc = exportBuffers.exportWriter.fillOutputDoc(sortDoc, exportBuffers.leaves, exportBuffers.exportWriter.fieldWriters); + pos--; + if (outputDoc != null) { + tupleEntryWriter.tuple = new Tuple(); + outputDoc.writeMap(tupleEntryWriter); + return tupleEntryWriter.tuple; + } + } while (res == null); + return res; } @Override diff --git a/solr/core/src/java/org/apache/solr/search/CaffeineCache.java b/solr/core/src/java/org/apache/solr/search/CaffeineCache.java index 756718c6175..e9ade62640c 100644 --- a/solr/core/src/java/org/apache/solr/search/CaffeineCache.java +++ b/solr/core/src/java/org/apache/solr/search/CaffeineCache.java @@ -25,11 +25,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -225,6 +221,10 @@ public void clear() { ramBytes.reset(); } + public ConcurrentMap asMap() { + return cache.asMap(); + } + @Override public int size() { return cache.asMap().size(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java index 0d48b0bae3b..4f081556f5b 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java @@ -32,7 +32,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.StreamParams; import static org.apache.solr.common.params.CommonParams.DISTRIB; import static org.apache.solr.common.params.CommonParams.SORT; @@ -45,6 +47,11 @@ **/ public class ParallelStream extends CloudSolrStream implements Expressible { + public static final String NUM_WORKERS_PARAM = "numWorkers"; + public static final String WORKER_ID_PARAM = "workerID"; + public static final String PARTITION_KEYS_PARAM = "partitionKeys"; + public static final String USE_HASH_QUERY_PARAM = "useHashQuery"; + private TupleStream tupleStream; private int workers; private transient StreamFactory streamFactory; @@ -81,54 +88,52 @@ public ParallelStream(StreamExpression expression, StreamFactory factory) throws // validate expression contains only what we want. - if(expression.getParameters().size() != streamExpressions.size() + 3 + (null != zkHostExpression ? 1 : 0)){ + if (expression.getParameters().size() != streamExpressions.size() + 3 + (null != zkHostExpression ? 1 : 0)) { throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); } // Collection Name - if(null == collectionName){ + if (null == collectionName) { throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression)); } // Workers - if(null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){ + if (null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workers' parameter of type positive integer but didn't find one",expression)); } - String workersStr = ((StreamExpressionValue)workersParam.getParameter()).getValue(); + String workersStr = ((StreamExpressionValue) workersParam.getParameter()).getValue(); int workersInt = 0; try{ workersInt = Integer.parseInt(workersStr); - if(workersInt <= 0){ + if (workersInt <= 0) { throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' must be greater than 0.",expression, workersStr)); } - } - catch(NumberFormatException e){ + } catch(NumberFormatException e) { throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' is not a valid integer.",expression, workersStr)); } // Stream - if(1 != streamExpressions.size()){ + if (1 != streamExpressions.size()) { throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); } // Sort - if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){ + if (null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)) { throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'sort' parameter telling us how to join the parallel streams but didn't find one",expression)); } // zkHost, optional - if not provided then will look into factory list to get String zkHost = null; - if(null == zkHostExpression){ + if (null == zkHostExpression) { zkHost = factory.getCollectionZkHost(collectionName); - if(zkHost == null) { + if (zkHost == null) { zkHost = factory.getDefaultZkHost(); } - } - else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){ + } else if (zkHostExpression.getParameter() instanceof StreamExpressionValue) { zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue(); } - if(null == zkHost){ - throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName)); + if (null == zkHost) { + throw new IOException(String.format(Locale.ROOT, "invalid expression %s - zkHost not found for collection '%s'", expression, collectionName)); } // We've got all the required items @@ -138,7 +143,7 @@ else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){ init(zkHost,collectionName,stream,workersInt,comp); } - private void init(String zkHost,String collection,TupleStream tupleStream,int workers,StreamComparator comp) throws IOException{ + private void init(String zkHost, String collection, TupleStream tupleStream, int workers, StreamComparator comp) throws IOException{ this.zkHost = zkHost; this.collection = collection; this.workers = workers; @@ -146,7 +151,7 @@ private void init(String zkHost,String collection,TupleStream tupleStream,int wo this.tupleStream = tupleStream; // requires Expressible stream and comparator - if(! (tupleStream instanceof Expressible)){ + if (! (tupleStream instanceof Expressible)) { throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream."); } } @@ -166,15 +171,13 @@ private StreamExpression toExpression(StreamFactory factory, boolean includeStre // workers expression.addParameter(new StreamExpressionNamedParameter("workers", Integer.toString(workers))); - if(includeStreams){ - if(tupleStream instanceof Expressible){ - expression.addParameter(((Expressible)tupleStream).toExpression(factory)); - } - else{ + if (includeStreams) { + if (tupleStream instanceof Expressible) { + expression.addParameter(((Expressible) tupleStream).toExpression(factory)); + } else { throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression"); } - } - else{ + } else { expression.addParameter(""); } @@ -198,7 +201,7 @@ public Explanation toExplanation(StreamFactory factory) throws IOException { explanation.setExpression(toExpression(factory, false).toString()); // add a child for each worker - for(int idx = 0; idx < workers; ++idx){ + for (int idx = 0; idx < workers; ++idx) { explanation.addChild(tupleStream.toExplanation(factory)); } @@ -214,7 +217,7 @@ public List children() { public Tuple read() throws IOException { Tuple tuple = _read(); - if(tuple.EOF) { + if (tuple.EOF) { /* Map metrics = new HashMap(); Iterator> it = this.eofTuples.entrySet().iterator(); @@ -237,7 +240,7 @@ public Tuple read() throws IOException { public void setStreamContext(StreamContext streamContext) { this.streamContext = streamContext; - if(streamFactory == null) { + if (streamFactory == null) { this.streamFactory = streamContext.getStreamFactory(); } this.tupleStream.setStreamContext(streamContext); @@ -249,14 +252,15 @@ protected void constructStreams() throws IOException { List shardUrls = getShards(this.zkHost, this.collection, this.streamContext); - for(int w=0; w 1) { - String partitionFilter = getPartitionFilter(); - solrParams.add("fq", partitionFilter); + if (!params.get("partitionKeys").equals("none") && numWorkers > 1) { + // turn on ExportWriter partitioning only for /export and only when requested + String qt = params.get(CommonParams.QT); + if (qt != null && qt.equals("/export")) { + solrParams.add(ParallelStream.WORKER_ID_PARAM, String.valueOf(workerID)); + solrParams.add(ParallelStream.NUM_WORKERS_PARAM, String.valueOf(numWorkers)); + } else { + String partitionFilter = getPartitionFilter(); + solrParams.add(CommonParams.FQ, partitionFilter); + } } - } else if(numWorkers > 1) { - throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker."); + } else if (numWorkers > 1) { + throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker."); } - if(checkpoint > 0) { - solrParams.add("fq", "{!frange cost=100 incl=false l="+checkpoint+"}_version_"); + if (checkpoint > 0) { + solrParams.add(CommonParams.FQ, "{!frange cost=100 incl=false l="+checkpoint+"}_version_"); } return solrParams; @@ -276,10 +283,10 @@ private Map mapFields(Map fields, Map mappings) { } private TupleStreamParser constructParser(SolrParams requestParams) throws IOException, SolrServerException { - String p = requestParams.get("qt"); + String p = requestParams.get(CommonParams.QT); if (p != null) { ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams; - modifiableSolrParams.remove("qt"); + modifiableSolrParams.remove(CommonParams.QT); //performance optimization - remove extra whitespace by default when streaming modifiableSolrParams.set("indent", modifiableSolrParams.get("indent", "off")); }