parts = com.microsoft.azure.documentdb.internal.PathParser.getPathParts(path);
+ if (parts.size() >= 1) {
+ Object value = resource.getObjectByPath(parts);
+ if (value == null || value.getClass() == JSONObject.class) {
+ value = Undefined.Value();
+ }
+
+ return PartitionKeyInternal.fromObjectArray(Arrays.asList(value), false);
+ }
+ }
+
+ return null;
+ }
+
+ public static PartitionKeyInternal fromPartitionKeyvalue(Object partitionKeyValue) {
+ try {
+ return PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), true);
+ } catch (Exception e) {
+ LOGGER.error("Failed to instantiate ParitionKeyInternal from {}", partitionKeyValue, e);
+ throw toRuntimeException(e);
+ }
+ }
+
+ public static RuntimeException toRuntimeException(Exception e) {
+ if (e instanceof RuntimeException) {
+ return (RuntimeException) e;
+ } else {
+ return new RuntimeException(e);
+ }
+ }
+ }
+
+ final static class PathParser
+ {
+ private final static char SEGMENT_SEPARATOR = '/';
+ private final static String ERROR_MESSAGE_FORMAT = "Invalid path \"%s\", failed at %d";
+
+ /**
+ * Extract parts from a given path for '/' as the separator.
+ *
+ * This code doesn't do as much validation as the backend, as it assumes that IndexingPolicy path coming from the backend is valid.
+ *
+ * @param path specifies a partition key given as a path.
+ * @return a list of all the parts for '/' as the separator.
+ */
+ public static List getPathParts(String path)
+ {
+ List tokens = new ArrayList();
+ AtomicInteger currentIndex = new AtomicInteger();
+
+ while (currentIndex.get() < path.length())
+ {
+ char currentChar = path.charAt(currentIndex.get());
+ if (currentChar != SEGMENT_SEPARATOR)
+ {
+ throw new IllegalArgumentException(
+ String.format(ERROR_MESSAGE_FORMAT, path, currentIndex.get()));
+ }
+
+ if (currentIndex.incrementAndGet() == path.length())
+ {
+ break;
+ }
+
+ currentChar = path.charAt(currentIndex.get());
+ if (currentChar == '\"' || currentChar == '\'')
+ {
+ // Handles the partial path given in quotes such as "'abc/def'"
+ tokens.add(getEscapedToken(path, currentIndex));
+ }
+ else
+ {
+ tokens.add(getToken(path, currentIndex));
+ }
+ }
+
+ return tokens;
+ }
+
+ private static String getEscapedToken(String path, AtomicInteger currentIndex)
+ {
+ char quote = path.charAt(currentIndex.get());
+ int newIndex = currentIndex.incrementAndGet();
+
+ while (true)
+ {
+ newIndex = path.indexOf(quote, newIndex);
+ if (newIndex == -1)
+ {
+ throw new IllegalArgumentException(
+ String.format(ERROR_MESSAGE_FORMAT, path, currentIndex.get()));
+ }
+
+ // Ignore escaped quote in the partial path we look at such as "'abc/def \'12\'/ghi'"
+ if (path.charAt(newIndex - 1) != '\\')
+ {
+ break;
+ }
+
+ ++newIndex;
+ }
+
+ String token = path.substring(currentIndex.get(), newIndex);
+ currentIndex.set(newIndex + 1);
+
+ return token;
+ }
+
+ private static String getToken(String path, AtomicInteger currentIndex)
+ {
+ int newIndex = path.indexOf(SEGMENT_SEPARATOR, currentIndex.get());
+ String token = null;
+ if (newIndex == -1)
+ {
+ token = path.substring(currentIndex.get());
+ currentIndex.set(path.length());
+ }
+ else
+ {
+ token = path.substring(currentIndex.get(), newIndex);
+ currentIndex.set(newIndex);
+ }
+
+ token = token.trim();
+ return token;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala
index 02e0cd71..3c0e4210 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala
@@ -27,8 +27,8 @@ import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.internal._
import java.lang.management.ManagementFactory
-
import scala.collection.JavaConversions._
+import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
@@ -50,7 +50,11 @@ private[spark] case class ClientConfiguration(
consistencyLevel: String,
database: String,
container: String,
- bulkConfig: BulkExecutorSettings) {
+ bulkConfig: BulkExecutorSettings,
+ countLoggingPath: Option[String],
+ queryLoggingPath: Option[String],
+ queryLoggingCorrelationId: Option[String],
+ hadoopConfig: mutable.Map[String, String]) {
def getCollectionLink(): String = {
ClientConfiguration.getCollectionLink(database, container)
@@ -59,14 +63,25 @@ private[spark] case class ClientConfiguration(
def getDatabaseLink() : String = {
ClientConfiguration.getDatabaseLink(database)
}
+
+ def getQueryLoggingPath(): Option[String] = {
+ queryLoggingPath
+ }
+
+ def getCountLoggingPath(): Option[String] = {
+ countLoggingPath
+ }
}
object ClientConfiguration extends CosmosDBLoggingTrait {
- def apply(config: Config): ClientConfiguration = {
+ def apply(config: Config, hadoopConfig: mutable.Map[String, String]): ClientConfiguration = {
val database : String = config.get(CosmosDBConfig.Database).get
val collection : String = config.get(CosmosDBConfig.Collection).get
val authConfig : AuthConfig = validateAndCreateAuthConfig(config, database, collection)
val connectionPolicySettings : ConnectionPolicySettings = createConnectionPolicySettings(config)
+ val countLoggingPath = config.get(CosmosDBConfig.CountLoggingPath)
+ val queryLoggingPath = config.get(CosmosDBConfig.QueryLoggingPath)
+ val queryLoggingCorrelationId = config.get(CosmosDBConfig.QueryLoggingCorrelationId)
val bulkExecutorSettings : BulkExecutorSettings = createBulkExecutorSettings(config)
// Get consistency level
@@ -81,7 +96,12 @@ object ClientConfiguration extends CosmosDBLoggingTrait {
consistencyLevel,
database,
collection,
- bulkExecutorSettings)
+ bulkExecutorSettings,
+ countLoggingPath,
+ queryLoggingPath,
+ queryLoggingCorrelationId,
+ hadoopConfig
+ )
}
private def validateAndCreateAuthConfig(config: Config, database: String, collection: String) : AuthConfig = {
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
index 61d58bda..75b0783b 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
@@ -23,6 +23,6 @@
package com.microsoft.azure.cosmosdb.spark
object Constants {
- val currentVersion = "2.4.0_2.11-3.7.0"
+ val currentVersion = "2.4.0_2.11-3.7.1-SNAPSHOT"
val userAgentSuffix = s" SparkConnector/$currentVersion"
}
\ No newline at end of file
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
index 6e89a672..3c1ab8a0 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
@@ -23,6 +23,7 @@
package com.microsoft.azure.cosmosdb.spark
import java.net.SocketTimeoutException
+import java.util.concurrent.Callable
import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.documentdb._
@@ -34,6 +35,7 @@ import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.control.Breaks._
+import org.apache.hadoop.conf.Configuration
private object CosmosDBConnection {
private val rnd = scala.util.Random
@@ -43,10 +45,10 @@ private object CosmosDBConnection {
}
}
-private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLoggingTrait with Serializable {
+private[spark] case class CosmosDBConnection(config: Config, hadoopConfig: mutable.Map[String, String]) extends CosmosDBLoggingTrait with Serializable {
private val maxPagesPerBatch =
config.getOrElse[String](CosmosDBConfig.ChangeFeedMaxPagesPerBatch, CosmosDBConfig.DefaultChangeFeedMaxPagesPerBatch.toString).toInt
- private val clientConfig = ClientConfiguration(config)
+ val clientConfig = ClientConfiguration(config, hadoopConfig)
def getCollectionLink: String = {
executeWithRetryOnCollectionRecreate(
@@ -58,6 +60,11 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
CosmosDBConnectionCache.reinitializeClient(clientConfig)
}
+ def flushLogWriter = {
+ val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig)
+ documentClient.flushLogWriter()
+ }
+
private def getAllPartitionsInternal: List[PartitionKeyRange] = {
val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig)
val ranges = documentClient.readPartitionKeyRanges(getCollectionLink, null.asInstanceOf[FeedOptions])
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala
index aca2ec7d..3da20ee5 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala
@@ -23,14 +23,19 @@
package com.microsoft.azure.cosmosdb.spark
import java.util.concurrent.ConcurrentHashMap
-import java.util.{Timer, TimerTask}
+import java.util.{Timer, TimerTask, UUID}
import com.microsoft.azure.cosmosdb.spark.config.CosmosDBConfig
+import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.SparkSession
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration
import scala.collection.JavaConversions._
+import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
@@ -418,12 +423,38 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait {
val consistencyLevel = ConsistencyLevel.valueOf(config.consistencyLevel)
lastConsistencyLevel = Some(consistencyLevel)
- new DocumentClient(
+ var client = new DocumentClient(
config.host,
config.authConfig.authKey,
lastConnectionPolicy,
consistencyLevel
)
+
+ client = config.getQueryLoggingPath() match {
+ case Some(path) => {
+ val logger = new HdfsLogWriter(
+ config.queryLoggingCorrelationId.getOrElse(""),
+ config.hadoopConfig.toMap,
+ path)
+
+ client.setLogWriter(logger);
+ }
+ case None => client
+ }
+
+ client = config.getCountLoggingPath() match {
+ case Some(path) => {
+ val logger = new HdfsLogWriter(
+ config.queryLoggingCorrelationId.getOrElse(""),
+ config.hadoopConfig.toMap,
+ path)
+
+ client.setCountLogWriter(logger);
+ }
+ case None => client
+ }
+
+ client
}
private def createConnectionPolicy(settings: ConnectionPolicySettings): ConnectionPolicy = {
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
index 306fe693..c93a0ec6 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
@@ -144,6 +144,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
*/
def save[D: ClassTag](rdd: RDD[D], writeConfig: Config): Unit = {
var numPartitions = 0
+ val hadoopConfig = HdfsUtils.getConfigurationMap(rdd.sparkContext.hadoopConfiguration)
var rddNumPartitions = 0
try {
numPartitions = rdd.getNumPartitions
@@ -180,7 +181,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
// In this case, users can set maxIngestionTaskParallelism to 32 and will help with the RU consumption based on writeThroughputBudget.
if (maxIngestionTaskParallelism.exists(_ > 0)) numPartitions = maxIngestionTaskParallelism.get
- val cosmosPartitionsCount = CosmosDBConnection(writeConfig).getAllPartitions.length
+ val cosmosPartitionsCount = CosmosDBConnection(writeConfig, hadoopConfig).getAllPartitions.length
// writeThroughputBudget per cosmos db physical partition
writeThroughputBudgetPerCosmosPartition = Some((writeThroughputBudget.get / cosmosPartitionsCount).ceil.toInt)
val baseMiniBatchSizeAdjustmentFactor: Double = (baseMiniBatchRUConsumption.toDouble * numPartitions) / writeThroughputBudgetPerCosmosPartition.get
@@ -195,10 +196,10 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
}
val mapRdd = if (numPartitions < rddNumPartitions && numPartitions > 0) {
- rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, numPartitions,
+ rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, hadoopConfig, numPartitions,
baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true)
} else {
- rdd.mapPartitions(savePartition(_, writeConfig, numPartitions,
+ rdd.mapPartitions(savePartition(_, writeConfig, hadoopConfig, numPartitions,
baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true)
}
@@ -449,10 +450,11 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
private def savePartition[D: ClassTag](iter: Iterator[D],
config: Config,
+ hadoopConfig: mutable.Map[String, String],
partitionCount: Int,
baseMaxMiniBatchImportSize: Int,
writeThroughputBudgetPerCosmosPartition: Option[Int]): Iterator[D] = {
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig)
val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config)
val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport).
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala
index b86b5c31..921d2341 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala
@@ -24,6 +24,7 @@ package com.microsoft.azure.cosmosdb.spark
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRelation
+import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
@@ -62,7 +63,7 @@ class DefaultSource extends RelationProvider
data: DataFrame): BaseRelation = {
val config: Config = Config(sqlContext.sparkContext.getConf, parameters)
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, HdfsUtils.getConfigurationMap(sqlContext.sparkSession.sparkContext.hadoopConfiguration))
val isEmptyCollection: Boolean = connection.isDocumentCollectionEmpty
mode match{
case Append =>
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala
new file mode 100644
index 00000000..7b22b6f6
--- /dev/null
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala
@@ -0,0 +1,144 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmosdb.spark
+
+import java.io.Closeable
+import java.util.{Timer, TimerTask, UUID}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
+import com.microsoft.azure.documentdb.CosmosLogWriter
+import org.apache.spark.SparkEnv
+import org.joda.time.Instant
+
+import scala.collection.concurrent.TrieMap
+import scala.util.Properties
+
+private object HdfsLogWriter extends CosmosDBLoggingTrait {
+ private val timerName = "hdfsLogWriter-cleanup-Timer"
+ private val timer: Timer = new Timer(timerName, true)
+ private val cleanupIntervalInMs = 60000
+ private val writerCount = new AtomicInteger(0)
+ val targetedMemoryBufferSizeInBytes = 50000000
+
+ val lineSeparator = Properties.lineSeparator
+ val logWriters = new TrieMap[String, HdfsLogWriter]
+
+ def registerWriter(writer: HdfsLogWriter): Unit = {
+ logWriters.put(writer.id, writer) match {
+ case Some(existingWriter) =>
+ throw new IllegalStateException(s"Already a writer '${writer.id}' registered.'")
+ case None => if (writerCount.incrementAndGet() == 1) {
+ startCleanupTimer()
+ }
+ }
+ }
+
+ def deregisterWriter(writer: HdfsLogWriter): Unit = {
+ logWriters.remove(writer.loggingLocation)
+ }
+
+ private def startCleanupTimer() : Unit = {
+ logInfo(s"$timerName: scheduling timer - delay: $cleanupIntervalInMs ms, period: $cleanupIntervalInMs ms")
+ timer.schedule(
+ new TimerTask { def run(): Unit = onCleanup() },
+ cleanupIntervalInMs,
+ cleanupIntervalInMs)
+ }
+
+ private def onCleanup() : Unit = {
+ logInfo(s"$timerName: onCleanup")
+ val snapshot = logWriters.readOnlySnapshot()
+ val threshold = Instant.now().getMillis - cleanupIntervalInMs
+ snapshot.foreach(writerHolder => {
+ val lastFlushed = writerHolder._2.lastFlushed.get()
+ if (lastFlushed > 0 && lastFlushed < threshold && writerHolder._2.hasData) {
+ writerHolder._2.flush()
+ }
+ })
+ }
+}
+
+private case class HdfsLogWriter
+(
+ correlationId: String,
+ configMap: Map[String, String],
+ loggingLocation: String
+) extends CosmosLogWriter with Closeable with CosmosDBLoggingTrait {
+
+ private[this] val inMemoryLock = ""
+ val executorId: String = SparkEnv.get.executorId
+ private[this] val fileId = new AtomicInteger(0)
+ private[this] val sb: StringBuilder = new StringBuilder()
+ private[this] lazy val hdfsUtils = new HdfsUtils(configMap, loggingLocation)
+ val lastFlushed = new AtomicLong(-1)
+
+ val id = s"${correlationId}_${executorId}_${loggingLocation}_${UUID.randomUUID()}"
+ HdfsLogWriter.registerWriter(this)
+ logInfo("HdfsBulkLogWriter instantiated.")
+
+ override def writeLine(line: String): Unit = {
+ if (line != null) {
+ val prettyLine = line.filter(_ >= ' ') + HdfsLogWriter.lineSeparator
+ logDebug(s"PrettyLine: $prettyLine")
+ this.inMemoryLock.synchronized {
+ if (sb.length + prettyLine.length >= HdfsLogWriter.targetedMemoryBufferSizeInBytes) {
+ this.flush()
+ }
+
+ this.sb.append(prettyLine)
+ }
+ }
+ }
+
+ override def flush(): Unit = {
+ logInfo(s"Flush: ${sb.size}")
+ var contentToFlush: Option[String] = None
+ this.inMemoryLock.synchronized {
+ if (this.sb.size > 0) {
+ contentToFlush = Some(this.sb.toString())
+ this.sb.clear()
+ }
+ }
+
+ contentToFlush match {
+ case Some(content) => {
+ val fileName = s"${correlationId}_${executorId}_${this.fileId.incrementAndGet()}_${UUID.randomUUID().toString()}.log"
+ logInfo(s"WriteLogFile: ${fileName} - ${content.length} bytes")
+ hdfsUtils.writeLogFile(this.loggingLocation, fileName, content)
+ lastFlushed.set(Instant.now().getMillis)
+ }
+ case None =>
+ }
+ }
+
+ def hasData = {
+ this.sb.length > 0
+ }
+
+ override def close(): Unit = {
+ logInfo("Close")
+ this.flush
+ HdfsLogWriter.deregisterWriter(this)
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
index 8891815f..9b1bfa19 100755
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
@@ -95,7 +95,11 @@ object CosmosDBConfig {
val DefaultMaxTransientRetryDelayInMs = 100 // 0.1 second
val DefaultPoisonMessageLocation = ""
val DefaultTreatUnknownExceptionsAsTransient = true
-
+
+ val QueryLoggingPath = "queryLoggingPath"
+ val QueryLoggingCorrelationId = "queryLoggingCorrelationId"
+ val CountLoggingPath = "countLoggingPath"
+
// Not a config, constant
val StreamingTimestampToken = "tsToken"
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
index dad76eff..bb5193a7 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
@@ -26,19 +26,20 @@ import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.cosmosdb.spark.schema.FilterConverter
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnection, CosmosDBLoggingTrait}
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.Partition
import org.apache.spark.sql.sources.Filter
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
-class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingTrait {
+class CosmosDBPartitioner(hadoopConfig: mutable.Map[String, String]) extends Partitioner[Partition] with CosmosDBLoggingTrait {
/**
* @param config Partition configuration
*/
override def computePartitions(config: Config): Array[Partition] = {
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig)
val partitionKeyRanges = connection.getAllPartitions
logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
Array.tabulate(partitionKeyRanges.length){
@@ -47,7 +48,7 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingT
}
def computePartitions(config: Config, requiredColumns: Array[String] = Array()): Array[Partition] = {
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig)
connection.reinitializeClient()
// CosmosDB source
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
index 63f0db55..2b848ec2 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
@@ -25,7 +25,7 @@ package com.microsoft.azure.cosmosdb.spark.rdd
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import com.microsoft.azure.cosmosdb.spark.partitioner.{CosmosDBPartition, CosmosDBPartitioner}
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
-import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
+import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnectionCache, CosmosDBSpark}
import com.microsoft.azure.documentdb._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.Filter
@@ -40,7 +40,7 @@ class CosmosDBRDD(
spark: SparkSession,
config: Config,
maxItems: Option[Long] = None,
- partitioner: CosmosDBPartitioner = new CosmosDBPartitioner(),
+ partitionerRaw: CosmosDBPartitioner = null,
requiredColumns: Array[String] = Array(),
filters: Array[Filter] = Array())
extends RDD[Document](spark.sparkContext, deps = Nil) {
@@ -48,6 +48,7 @@ class CosmosDBRDD(
// Keep a copy of hadoop config for hdfs file handling
// It's a Map because Configuration is not serializable
private val hadoopConfig: mutable.Map[String, String] = HdfsUtils.getConfigurationMap(sparkContext.hadoopConfiguration)
+ private val effectivePartitioner: CosmosDBPartitioner = Option.apply(partitionerRaw).getOrElse(new CosmosDBPartitioner(hadoopConfig))
private def cosmosDBSpark = {
@@ -57,7 +58,7 @@ class CosmosDBRDD(
override def toJavaRDD(): JavaCosmosDBRDD = JavaCosmosDBRDD(this)
override def getPartitions: Array[Partition] = {
- partitioner.computePartitions(config)
+ effectivePartitioner.computePartitions(config)
}
/**
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
index 7906ab71..2f5b1298 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
@@ -53,12 +53,14 @@ object CosmosDBRDDIterator {
// For verification purpose
var lastFeedOptions: FeedOptions = _
var hdfsUtils: HdfsUtils = _
+ var hadoopConfig: mutable.Map[String, String] = _
def initializeHdfsUtils(hadoopConfig: Map[String, String], changeFeedCheckpointLocation: String): Any = {
if (hdfsUtils == null) {
this.synchronized {
if (hdfsUtils == null) {
hdfsUtils = HdfsUtils(hadoopConfig, changeFeedCheckpointLocation)
+ this.hadoopConfig = collection.mutable.Map(hadoopConfig.toSeq: _*)
}
}
}
@@ -79,7 +81,7 @@ object CosmosDBRDDIterator {
* @return the corresponding global continuation token
*/
def getCollectionTokens(config: Config, shouldGetCurrentToken: Boolean = false): String = {
- val connection = CosmosDBConnection(config)
+ val connection = CosmosDBConnection(config, this.hadoopConfig)
val collectionLink = connection.getCollectionLink
val queryName = config
.get[String](CosmosDBConfig.ChangeFeedQueryName).get
@@ -155,9 +157,15 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
private val maxRetryCountOnServiceUnavailable: Int = 100
private val rnd = scala.util.Random
+ logInfo(s"CosmosDBRDDIterator initialized for PK range id ${partition.partitionKeyRangeId}")
+
lazy val reader: Iterator[Document] = {
initialized = true
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, this.hadoopConfig)
+ taskContext.addTaskCompletionListener((ctx: TaskContext) => {
+ logInfo(s"CosmosDBRDDIterator: Flushing LogWriter after completing task for partition key range id ${partition.partitionKeyRangeId}")
+ connection.flushLogWriter
+ })
val readingChangeFeed: Boolean = config
.get[String](CosmosDBConfig.ReadChangeFeed)
@@ -447,6 +455,7 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
})
if (!readingChangeFeed) {
+ logInfo(s"--> query document for pk range id ${partition.partitionKeyRangeId}")
queryDocuments
} else {
readChangeFeed
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala
index 9f713929..ee53c1ec 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala
@@ -22,13 +22,13 @@
*/
package com.microsoft.azure.cosmosdb.spark.util
-import java.io.{FileNotFoundException, PrintWriter, StringWriter}
+import java.io.{BufferedOutputStream, FileNotFoundException, OutputStream, PrintWriter, StringWriter}
import java.util
import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, LocatedFileStatus, Path, RemoteIterator}
import scala.collection.mutable
import java.net.URI
@@ -54,6 +54,16 @@ case class HdfsUtils(configMap: Map[String, String], changeFeedCheckpointLocatio
}
}
+ def writeLogFile(base: String, filePath: String, content: String): Unit = {
+ val path = new Path(base + "/" + filePath)
+ retry(maxRetryCount) {
+ val os = fs.create(path)
+ val bos = new BufferedOutputStream(os)
+ bos.write(content.getBytes("UTF-8"))
+ bos.close()
+ }
+ }
+
def read(base: String, filePath: String, alternateQueryName: String, collectionRid: String): String = {
val path = new Path(base + "/" + filePath)
read(path, base, alternateQueryName, collectionRid)