From 0ac3b7178f71e0ffb37a9dd98742cdd1eb60f4e1 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Fri, 8 Oct 2021 19:54:12 +0200 Subject: [PATCH] Add additional query logs --- pom.xml | 6 +- .../azure/cosmosdb/spark/IteratorLogger.java | 381 ++++++++++++++++++ .../cosmosdb/spark/ClientConfiguration.scala | 28 +- .../azure/cosmosdb/spark/Constants.scala | 2 +- .../cosmosdb/spark/CosmosDBConnection.scala | 11 +- .../spark/CosmosDBConnectionCache.scala | 35 +- .../azure/cosmosdb/spark/CosmosDBSpark.scala | 10 +- .../azure/cosmosdb/spark/DefaultSource.scala | 3 +- .../azure/cosmosdb/spark/HdfsLogWriter.scala | 144 +++++++ .../spark/config/CosmosDBConfig.scala | 6 +- .../partitioner/CosmosDBPartitioner.scala | 7 +- .../cosmosdb/spark/rdd/CosmosDBRDD.scala | 7 +- .../spark/rdd/CosmosDBRDDIterator.scala | 13 +- .../azure/cosmosdb/spark/util/HdfsUtils.scala | 14 +- 14 files changed, 639 insertions(+), 28 deletions(-) create mode 100644 src/main/java/com/microsoft/azure/cosmosdb/spark/IteratorLogger.java create mode 100644 src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala diff --git a/pom.xml b/pom.xml index 89d696cd..40ceffdf 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ limitations under the License. com.microsoft.azure azure-cosmosdb-spark_2.4.0_2.11 jar - 3.7.0 + 3.7.1-SNAPSHOT ${project.groupId}:${project.artifactId} Spark Connector for Microsoft Azure CosmosDB http://azure.microsoft.com/en-us/services/documentdb/ @@ -51,7 +51,7 @@ limitations under the License. com.microsoft.azure azure-documentdb - 2.6.4 + 2.6.5-SNAPSHOT org.scala-lang @@ -77,7 +77,7 @@ limitations under the License. com.microsoft.azure documentdb-bulkexecutor - 2.12.4 + 2.12.5-SNAPSHOT com.microsoft.azure diff --git a/src/main/java/com/microsoft/azure/cosmosdb/spark/IteratorLogger.java b/src/main/java/com/microsoft/azure/cosmosdb/spark/IteratorLogger.java new file mode 100644 index 00000000..ecce466d --- /dev/null +++ b/src/main/java/com/microsoft/azure/cosmosdb/spark/IteratorLogger.java @@ -0,0 +1,381 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2017 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.microsoft.azure.cosmosdb.spark; + +import com.microsoft.azure.documentdb.*; +import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal; +import org.joda.time.Instant; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.Serializable; +import java.io.StringWriter; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class IteratorLogger implements Serializable { + private final static String headerLine = "Timestamp|Level|Event|PKRangeId|UserAgentSuffix|Message|Exception|" + + "IteratedDocuments|Count"; + + private final static String LOG_LEVEL_INFO = "I"; + private final static String LOG_LEVEL_ERROR = "E"; + + private final static String EVENT_NAME_LOG = "Log"; + private final static String EVENT_NAME_ITERATOR_NEXT = "ConsumedFromIterator"; + private final CosmosLogWriter writer; + private final String userAgentSuffix; + private final String pkRangeId; + private final String iteratorName; + private final StringBuilder iteratedDocuments = new StringBuilder().append("["); + private final AtomicInteger iteratedDocumentCount = new AtomicInteger(0); + + public IteratorLogger( + CosmosLogWriter writer, + String userAgentSuffix, + String pkRangeId, + String iteratorName) { + this.userAgentSuffix = userAgentSuffix; + this.pkRangeId = pkRangeId; + this.iteratorName = iteratorName; + this.writer = writer; + + if (writer != null) { + this.writer.writeLine(headerLine); + } + } + + public void logError(String message, Throwable throwable) { + if (writer == null) { + return; + } + + logLogEvent(LOG_LEVEL_ERROR, message, throwable); + } + + public void onIteratorNext(T document, PartitionKeyDefinition pkDefinition) { + if (writer == null) { + return; + } + + String contentToFlush = null; + int countSnapshot; + + synchronized (this.iteratedDocuments) { + if (this.iteratedDocuments.length() > 1) { + this.iteratedDocuments.append(", "); + } + this.iteratedDocuments.append( + formatDocumentIdentity( + DocumentAnalyzer.extractDocumentIdentity( + document, + pkDefinition))); + countSnapshot = this.iteratedDocumentCount.incrementAndGet(); + + if (this.iteratedDocuments.length() > 1024) { + this.iteratedDocuments.append("]"); + contentToFlush = this.iteratedDocuments.toString(); + this.iteratedDocuments.setLength(0); + this.iteratedDocuments.append("["); + this.iteratedDocumentCount.set(0); + } + } + + if (contentToFlush != null) { + this.logLine( + LOG_LEVEL_INFO, + EVENT_NAME_ITERATOR_NEXT, + this.iteratorName, + null, + contentToFlush, + countSnapshot); + } + } + + public void flush() { + if (this.writer == null) { + return; + } + + synchronized (this.iteratedDocuments) { + if (this.iteratedDocuments.length() > 1) { + this.logLine( + LOG_LEVEL_INFO, + EVENT_NAME_ITERATOR_NEXT, + this.iteratorName, + null, + this.iteratedDocuments.toString() + "]", + this.iteratedDocumentCount.get()); + } + } + } + + private String extractDocumentIdentities(List resources, PartitionKeyDefinition pkDefinition) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < resources.size(); i++) { + if (i > 0) { + sb.append(", "); + } + + String[] identity = DocumentAnalyzer.extractDocumentIdentity( + resources.get(i), + pkDefinition); + sb.append(formatDocumentIdentity(identity)); + } + sb.append("]"); + + return sb.toString(); + } + + private String formatDocumentIdentity(String[] identity) { + return "(" + identity[0] + "/" + identity[1] + ")"; + } + + private void logLogEvent( + String logLevel, + String message, + Throwable exception) { + + logLine( + logLevel, + EVENT_NAME_LOG, + message, + exception, + null, + null); + } + + // "Timestamp|Level|Event|PKRangeId|UserAgentSuffix|Message|Exception|" + + // "IteratedDocuments|Count"; + private void logLine( + String logLevel, + String eventName, + String message, + Throwable exception, + String iteratedDocuments, + Integer count) { + + writer.writeLine( + join( + Instant.now().toString(), + logLevel, + eventName, + pkRangeId, + this.userAgentSuffix, + message, + throwableToString(exception), + iteratedDocuments, + count != null ? count.toString() : "") + ); + } + + private String join(String... args) { + StringBuilder sb = new StringBuilder(); + String separator = "|"; + for (String c : args) { + if (sb.length() > 0) { + sb.append(separator); + } + + sb.append(c); + } + + return sb.toString(); + } + + private String throwableToString(Throwable throwable) { + String exceptionText = null; + if (throwable == null) { + return null; + } + + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + throwable.printStackTrace(pw); + return sw.toString(); + } + + private static class DocumentAnalyzer { + private final static Logger LOGGER = LoggerFactory.getLogger(DocumentAnalyzer.class); + + /** + * Extracts effective {@link PartitionKeyInternal} from serialized document. + * @param partitionKeyDefinition Information about partition key. + * @return PartitionKeyInternal + */ + public static String[] extractDocumentIdentity( + Resource root, + PartitionKeyDefinition partitionKeyDefinition) { + + String pk = "n/a"; + if (partitionKeyDefinition != null && partitionKeyDefinition.getPaths().size() > 0) { + pk = DocumentAnalyzer + .extractPartitionKeyValueInternal( + root, + partitionKeyDefinition).toJson(); + } + + String id = "n/a"; + if (root.getId() != null){ + id = root.getId(); + } + + return new String[] { pk, id }; + } + + private static PartitionKeyInternal extractPartitionKeyValueInternal( + Resource resource, + PartitionKeyDefinition partitionKeyDefinition) { + if (partitionKeyDefinition != null) { + String path = partitionKeyDefinition.getPaths().iterator().next(); + Collection parts = com.microsoft.azure.documentdb.internal.PathParser.getPathParts(path); + if (parts.size() >= 1) { + Object value = resource.getObjectByPath(parts); + if (value == null || value.getClass() == JSONObject.class) { + value = Undefined.Value(); + } + + return PartitionKeyInternal.fromObjectArray(Arrays.asList(value), false); + } + } + + return null; + } + + public static PartitionKeyInternal fromPartitionKeyvalue(Object partitionKeyValue) { + try { + return PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), true); + } catch (Exception e) { + LOGGER.error("Failed to instantiate ParitionKeyInternal from {}", partitionKeyValue, e); + throw toRuntimeException(e); + } + } + + public static RuntimeException toRuntimeException(Exception e) { + if (e instanceof RuntimeException) { + return (RuntimeException) e; + } else { + return new RuntimeException(e); + } + } + } + + final static class PathParser + { + private final static char SEGMENT_SEPARATOR = '/'; + private final static String ERROR_MESSAGE_FORMAT = "Invalid path \"%s\", failed at %d"; + + /** + * Extract parts from a given path for '/' as the separator. + *

+ * This code doesn't do as much validation as the backend, as it assumes that IndexingPolicy path coming from the backend is valid. + * + * @param path specifies a partition key given as a path. + * @return a list of all the parts for '/' as the separator. + */ + public static List getPathParts(String path) + { + List tokens = new ArrayList(); + AtomicInteger currentIndex = new AtomicInteger(); + + while (currentIndex.get() < path.length()) + { + char currentChar = path.charAt(currentIndex.get()); + if (currentChar != SEGMENT_SEPARATOR) + { + throw new IllegalArgumentException( + String.format(ERROR_MESSAGE_FORMAT, path, currentIndex.get())); + } + + if (currentIndex.incrementAndGet() == path.length()) + { + break; + } + + currentChar = path.charAt(currentIndex.get()); + if (currentChar == '\"' || currentChar == '\'') + { + // Handles the partial path given in quotes such as "'abc/def'" + tokens.add(getEscapedToken(path, currentIndex)); + } + else + { + tokens.add(getToken(path, currentIndex)); + } + } + + return tokens; + } + + private static String getEscapedToken(String path, AtomicInteger currentIndex) + { + char quote = path.charAt(currentIndex.get()); + int newIndex = currentIndex.incrementAndGet(); + + while (true) + { + newIndex = path.indexOf(quote, newIndex); + if (newIndex == -1) + { + throw new IllegalArgumentException( + String.format(ERROR_MESSAGE_FORMAT, path, currentIndex.get())); + } + + // Ignore escaped quote in the partial path we look at such as "'abc/def \'12\'/ghi'" + if (path.charAt(newIndex - 1) != '\\') + { + break; + } + + ++newIndex; + } + + String token = path.substring(currentIndex.get(), newIndex); + currentIndex.set(newIndex + 1); + + return token; + } + + private static String getToken(String path, AtomicInteger currentIndex) + { + int newIndex = path.indexOf(SEGMENT_SEPARATOR, currentIndex.get()); + String token = null; + if (newIndex == -1) + { + token = path.substring(currentIndex.get()); + currentIndex.set(path.length()); + } + else + { + token = path.substring(currentIndex.get(), newIndex); + currentIndex.set(newIndex); + } + + token = token.trim(); + return token; + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala index 02e0cd71..3c0e4210 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala @@ -27,8 +27,8 @@ import com.microsoft.azure.documentdb._ import com.microsoft.azure.documentdb.internal._ import java.lang.management.ManagementFactory - import scala.collection.JavaConversions._ +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.language.implicitConversions @@ -50,7 +50,11 @@ private[spark] case class ClientConfiguration( consistencyLevel: String, database: String, container: String, - bulkConfig: BulkExecutorSettings) { + bulkConfig: BulkExecutorSettings, + countLoggingPath: Option[String], + queryLoggingPath: Option[String], + queryLoggingCorrelationId: Option[String], + hadoopConfig: mutable.Map[String, String]) { def getCollectionLink(): String = { ClientConfiguration.getCollectionLink(database, container) @@ -59,14 +63,25 @@ private[spark] case class ClientConfiguration( def getDatabaseLink() : String = { ClientConfiguration.getDatabaseLink(database) } + + def getQueryLoggingPath(): Option[String] = { + queryLoggingPath + } + + def getCountLoggingPath(): Option[String] = { + countLoggingPath + } } object ClientConfiguration extends CosmosDBLoggingTrait { - def apply(config: Config): ClientConfiguration = { + def apply(config: Config, hadoopConfig: mutable.Map[String, String]): ClientConfiguration = { val database : String = config.get(CosmosDBConfig.Database).get val collection : String = config.get(CosmosDBConfig.Collection).get val authConfig : AuthConfig = validateAndCreateAuthConfig(config, database, collection) val connectionPolicySettings : ConnectionPolicySettings = createConnectionPolicySettings(config) + val countLoggingPath = config.get(CosmosDBConfig.CountLoggingPath) + val queryLoggingPath = config.get(CosmosDBConfig.QueryLoggingPath) + val queryLoggingCorrelationId = config.get(CosmosDBConfig.QueryLoggingCorrelationId) val bulkExecutorSettings : BulkExecutorSettings = createBulkExecutorSettings(config) // Get consistency level @@ -81,7 +96,12 @@ object ClientConfiguration extends CosmosDBLoggingTrait { consistencyLevel, database, collection, - bulkExecutorSettings) + bulkExecutorSettings, + countLoggingPath, + queryLoggingPath, + queryLoggingCorrelationId, + hadoopConfig + ) } private def validateAndCreateAuthConfig(config: Config, database: String, collection: String) : AuthConfig = { diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala index 61d58bda..75b0783b 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala @@ -23,6 +23,6 @@ package com.microsoft.azure.cosmosdb.spark object Constants { - val currentVersion = "2.4.0_2.11-3.7.0" + val currentVersion = "2.4.0_2.11-3.7.1-SNAPSHOT" val userAgentSuffix = s" SparkConnector/$currentVersion" } \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala index 6e89a672..3c1ab8a0 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala @@ -23,6 +23,7 @@ package com.microsoft.azure.cosmosdb.spark import java.net.SocketTimeoutException +import java.util.concurrent.Callable import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.documentdb._ @@ -34,6 +35,7 @@ import scala.collection.mutable.ListBuffer import scala.language.implicitConversions import scala.reflect.ClassTag import scala.util.control.Breaks._ +import org.apache.hadoop.conf.Configuration private object CosmosDBConnection { private val rnd = scala.util.Random @@ -43,10 +45,10 @@ private object CosmosDBConnection { } } -private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLoggingTrait with Serializable { +private[spark] case class CosmosDBConnection(config: Config, hadoopConfig: mutable.Map[String, String]) extends CosmosDBLoggingTrait with Serializable { private val maxPagesPerBatch = config.getOrElse[String](CosmosDBConfig.ChangeFeedMaxPagesPerBatch, CosmosDBConfig.DefaultChangeFeedMaxPagesPerBatch.toString).toInt - private val clientConfig = ClientConfiguration(config) + val clientConfig = ClientConfiguration(config, hadoopConfig) def getCollectionLink: String = { executeWithRetryOnCollectionRecreate( @@ -58,6 +60,11 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog CosmosDBConnectionCache.reinitializeClient(clientConfig) } + def flushLogWriter = { + val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig) + documentClient.flushLogWriter() + } + private def getAllPartitionsInternal: List[PartitionKeyRange] = { val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig) val ranges = documentClient.readPartitionKeyRanges(getCollectionLink, null.asInstanceOf[FeedOptions]) diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala index aca2ec7d..3da20ee5 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala @@ -23,14 +23,19 @@ package com.microsoft.azure.cosmosdb.spark import java.util.concurrent.ConcurrentHashMap -import java.util.{Timer, TimerTask} +import java.util.{Timer, TimerTask, UUID} import com.microsoft.azure.cosmosdb.spark.config.CosmosDBConfig +import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils import com.microsoft.azure.documentdb._ import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.SparkSession +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration import scala.collection.JavaConversions._ +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.language.implicitConversions @@ -418,12 +423,38 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { val consistencyLevel = ConsistencyLevel.valueOf(config.consistencyLevel) lastConsistencyLevel = Some(consistencyLevel) - new DocumentClient( + var client = new DocumentClient( config.host, config.authConfig.authKey, lastConnectionPolicy, consistencyLevel ) + + client = config.getQueryLoggingPath() match { + case Some(path) => { + val logger = new HdfsLogWriter( + config.queryLoggingCorrelationId.getOrElse(""), + config.hadoopConfig.toMap, + path) + + client.setLogWriter(logger); + } + case None => client + } + + client = config.getCountLoggingPath() match { + case Some(path) => { + val logger = new HdfsLogWriter( + config.queryLoggingCorrelationId.getOrElse(""), + config.hadoopConfig.toMap, + path) + + client.setCountLogWriter(logger); + } + case None => client + } + + client } private def createConnectionPolicy(settings: ConnectionPolicySettings): ConnectionPolicy = { diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala index 306fe693..c93a0ec6 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala @@ -144,6 +144,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { */ def save[D: ClassTag](rdd: RDD[D], writeConfig: Config): Unit = { var numPartitions = 0 + val hadoopConfig = HdfsUtils.getConfigurationMap(rdd.sparkContext.hadoopConfiguration) var rddNumPartitions = 0 try { numPartitions = rdd.getNumPartitions @@ -180,7 +181,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { // In this case, users can set maxIngestionTaskParallelism to 32 and will help with the RU consumption based on writeThroughputBudget. if (maxIngestionTaskParallelism.exists(_ > 0)) numPartitions = maxIngestionTaskParallelism.get - val cosmosPartitionsCount = CosmosDBConnection(writeConfig).getAllPartitions.length + val cosmosPartitionsCount = CosmosDBConnection(writeConfig, hadoopConfig).getAllPartitions.length // writeThroughputBudget per cosmos db physical partition writeThroughputBudgetPerCosmosPartition = Some((writeThroughputBudget.get / cosmosPartitionsCount).ceil.toInt) val baseMiniBatchSizeAdjustmentFactor: Double = (baseMiniBatchRUConsumption.toDouble * numPartitions) / writeThroughputBudgetPerCosmosPartition.get @@ -195,10 +196,10 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { } val mapRdd = if (numPartitions < rddNumPartitions && numPartitions > 0) { - rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, numPartitions, + rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, hadoopConfig, numPartitions, baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true) } else { - rdd.mapPartitions(savePartition(_, writeConfig, numPartitions, + rdd.mapPartitions(savePartition(_, writeConfig, hadoopConfig, numPartitions, baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true) } @@ -449,10 +450,11 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { private def savePartition[D: ClassTag](iter: Iterator[D], config: Config, + hadoopConfig: mutable.Map[String, String], partitionCount: Int, baseMaxMiniBatchImportSize: Int, writeThroughputBudgetPerCosmosPartition: Option[Int]): Iterator[D] = { - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig) val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config) val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport). diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala index b86b5c31..921d2341 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala @@ -24,6 +24,7 @@ package com.microsoft.azure.cosmosdb.spark import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig} import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRelation +import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType @@ -62,7 +63,7 @@ class DefaultSource extends RelationProvider data: DataFrame): BaseRelation = { val config: Config = Config(sqlContext.sparkContext.getConf, parameters) - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, HdfsUtils.getConfigurationMap(sqlContext.sparkSession.sparkContext.hadoopConfiguration)) val isEmptyCollection: Boolean = connection.isDocumentCollectionEmpty mode match{ case Append => diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala new file mode 100644 index 00000000..7b22b6f6 --- /dev/null +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala @@ -0,0 +1,144 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2016 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.microsoft.azure.cosmosdb.spark + +import java.io.Closeable +import java.util.{Timer, TimerTask, UUID} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils +import com.microsoft.azure.documentdb.CosmosLogWriter +import org.apache.spark.SparkEnv +import org.joda.time.Instant + +import scala.collection.concurrent.TrieMap +import scala.util.Properties + +private object HdfsLogWriter extends CosmosDBLoggingTrait { + private val timerName = "hdfsLogWriter-cleanup-Timer" + private val timer: Timer = new Timer(timerName, true) + private val cleanupIntervalInMs = 60000 + private val writerCount = new AtomicInteger(0) + val targetedMemoryBufferSizeInBytes = 50000000 + + val lineSeparator = Properties.lineSeparator + val logWriters = new TrieMap[String, HdfsLogWriter] + + def registerWriter(writer: HdfsLogWriter): Unit = { + logWriters.put(writer.id, writer) match { + case Some(existingWriter) => + throw new IllegalStateException(s"Already a writer '${writer.id}' registered.'") + case None => if (writerCount.incrementAndGet() == 1) { + startCleanupTimer() + } + } + } + + def deregisterWriter(writer: HdfsLogWriter): Unit = { + logWriters.remove(writer.loggingLocation) + } + + private def startCleanupTimer() : Unit = { + logInfo(s"$timerName: scheduling timer - delay: $cleanupIntervalInMs ms, period: $cleanupIntervalInMs ms") + timer.schedule( + new TimerTask { def run(): Unit = onCleanup() }, + cleanupIntervalInMs, + cleanupIntervalInMs) + } + + private def onCleanup() : Unit = { + logInfo(s"$timerName: onCleanup") + val snapshot = logWriters.readOnlySnapshot() + val threshold = Instant.now().getMillis - cleanupIntervalInMs + snapshot.foreach(writerHolder => { + val lastFlushed = writerHolder._2.lastFlushed.get() + if (lastFlushed > 0 && lastFlushed < threshold && writerHolder._2.hasData) { + writerHolder._2.flush() + } + }) + } +} + +private case class HdfsLogWriter +( + correlationId: String, + configMap: Map[String, String], + loggingLocation: String +) extends CosmosLogWriter with Closeable with CosmosDBLoggingTrait { + + private[this] val inMemoryLock = "" + val executorId: String = SparkEnv.get.executorId + private[this] val fileId = new AtomicInteger(0) + private[this] val sb: StringBuilder = new StringBuilder() + private[this] lazy val hdfsUtils = new HdfsUtils(configMap, loggingLocation) + val lastFlushed = new AtomicLong(-1) + + val id = s"${correlationId}_${executorId}_${loggingLocation}_${UUID.randomUUID()}" + HdfsLogWriter.registerWriter(this) + logInfo("HdfsBulkLogWriter instantiated.") + + override def writeLine(line: String): Unit = { + if (line != null) { + val prettyLine = line.filter(_ >= ' ') + HdfsLogWriter.lineSeparator + logDebug(s"PrettyLine: $prettyLine") + this.inMemoryLock.synchronized { + if (sb.length + prettyLine.length >= HdfsLogWriter.targetedMemoryBufferSizeInBytes) { + this.flush() + } + + this.sb.append(prettyLine) + } + } + } + + override def flush(): Unit = { + logInfo(s"Flush: ${sb.size}") + var contentToFlush: Option[String] = None + this.inMemoryLock.synchronized { + if (this.sb.size > 0) { + contentToFlush = Some(this.sb.toString()) + this.sb.clear() + } + } + + contentToFlush match { + case Some(content) => { + val fileName = s"${correlationId}_${executorId}_${this.fileId.incrementAndGet()}_${UUID.randomUUID().toString()}.log" + logInfo(s"WriteLogFile: ${fileName} - ${content.length} bytes") + hdfsUtils.writeLogFile(this.loggingLocation, fileName, content) + lastFlushed.set(Instant.now().getMillis) + } + case None => + } + } + + def hasData = { + this.sb.length > 0 + } + + override def close(): Unit = { + logInfo("Close") + this.flush + HdfsLogWriter.deregisterWriter(this) + } +} \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala index 8891815f..9b1bfa19 100755 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala @@ -95,7 +95,11 @@ object CosmosDBConfig { val DefaultMaxTransientRetryDelayInMs = 100 // 0.1 second val DefaultPoisonMessageLocation = "" val DefaultTreatUnknownExceptionsAsTransient = true - + + val QueryLoggingPath = "queryLoggingPath" + val QueryLoggingCorrelationId = "queryLoggingCorrelationId" + val CountLoggingPath = "countLoggingPath" + // Not a config, constant val StreamingTimestampToken = "tsToken" diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala index dad76eff..bb5193a7 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala @@ -26,19 +26,20 @@ import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.cosmosdb.spark.schema.FilterConverter import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnection, CosmosDBLoggingTrait} +import org.apache.hadoop.conf.Configuration import org.apache.spark.Partition import org.apache.spark.sql.sources.Filter import scala.collection.mutable import scala.collection.mutable.ListBuffer -class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingTrait { +class CosmosDBPartitioner(hadoopConfig: mutable.Map[String, String]) extends Partitioner[Partition] with CosmosDBLoggingTrait { /** * @param config Partition configuration */ override def computePartitions(config: Config): Array[Partition] = { - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig) val partitionKeyRanges = connection.getAllPartitions logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions") Array.tabulate(partitionKeyRanges.length){ @@ -47,7 +48,7 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingT } def computePartitions(config: Config, requiredColumns: Array[String] = Array()): Array[Partition] = { - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig) connection.reinitializeClient() // CosmosDB source diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala index 63f0db55..2b848ec2 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala @@ -25,7 +25,7 @@ package com.microsoft.azure.cosmosdb.spark.rdd import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig} import com.microsoft.azure.cosmosdb.spark.partitioner.{CosmosDBPartition, CosmosDBPartitioner} import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils -import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark +import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnectionCache, CosmosDBSpark} import com.microsoft.azure.documentdb._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.Filter @@ -40,7 +40,7 @@ class CosmosDBRDD( spark: SparkSession, config: Config, maxItems: Option[Long] = None, - partitioner: CosmosDBPartitioner = new CosmosDBPartitioner(), + partitionerRaw: CosmosDBPartitioner = null, requiredColumns: Array[String] = Array(), filters: Array[Filter] = Array()) extends RDD[Document](spark.sparkContext, deps = Nil) { @@ -48,6 +48,7 @@ class CosmosDBRDD( // Keep a copy of hadoop config for hdfs file handling // It's a Map because Configuration is not serializable private val hadoopConfig: mutable.Map[String, String] = HdfsUtils.getConfigurationMap(sparkContext.hadoopConfiguration) + private val effectivePartitioner: CosmosDBPartitioner = Option.apply(partitionerRaw).getOrElse(new CosmosDBPartitioner(hadoopConfig)) private def cosmosDBSpark = { @@ -57,7 +58,7 @@ class CosmosDBRDD( override def toJavaRDD(): JavaCosmosDBRDD = JavaCosmosDBRDD(this) override def getPartitions: Array[Partition] = { - partitioner.computePartitions(config) + effectivePartitioner.computePartitions(config) } /** diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala index 7906ab71..2f5b1298 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala @@ -53,12 +53,14 @@ object CosmosDBRDDIterator { // For verification purpose var lastFeedOptions: FeedOptions = _ var hdfsUtils: HdfsUtils = _ + var hadoopConfig: mutable.Map[String, String] = _ def initializeHdfsUtils(hadoopConfig: Map[String, String], changeFeedCheckpointLocation: String): Any = { if (hdfsUtils == null) { this.synchronized { if (hdfsUtils == null) { hdfsUtils = HdfsUtils(hadoopConfig, changeFeedCheckpointLocation) + this.hadoopConfig = collection.mutable.Map(hadoopConfig.toSeq: _*) } } } @@ -79,7 +81,7 @@ object CosmosDBRDDIterator { * @return the corresponding global continuation token */ def getCollectionTokens(config: Config, shouldGetCurrentToken: Boolean = false): String = { - val connection = CosmosDBConnection(config) + val connection = CosmosDBConnection(config, this.hadoopConfig) val collectionLink = connection.getCollectionLink val queryName = config .get[String](CosmosDBConfig.ChangeFeedQueryName).get @@ -155,9 +157,15 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], private val maxRetryCountOnServiceUnavailable: Int = 100 private val rnd = scala.util.Random + logInfo(s"CosmosDBRDDIterator initialized for PK range id ${partition.partitionKeyRangeId}") + lazy val reader: Iterator[Document] = { initialized = true - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, this.hadoopConfig) + taskContext.addTaskCompletionListener((ctx: TaskContext) => { + logInfo(s"CosmosDBRDDIterator: Flushing LogWriter after completing task for partition key range id ${partition.partitionKeyRangeId}") + connection.flushLogWriter + }) val readingChangeFeed: Boolean = config .get[String](CosmosDBConfig.ReadChangeFeed) @@ -447,6 +455,7 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], }) if (!readingChangeFeed) { + logInfo(s"--> query document for pk range id ${partition.partitionKeyRangeId}") queryDocuments } else { readChangeFeed diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala index 9f713929..ee53c1ec 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala @@ -22,13 +22,13 @@ */ package com.microsoft.azure.cosmosdb.spark.util -import java.io.{FileNotFoundException, PrintWriter, StringWriter} +import java.io.{BufferedOutputStream, FileNotFoundException, OutputStream, PrintWriter, StringWriter} import java.util import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator} +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, LocatedFileStatus, Path, RemoteIterator} import scala.collection.mutable import java.net.URI @@ -54,6 +54,16 @@ case class HdfsUtils(configMap: Map[String, String], changeFeedCheckpointLocatio } } + def writeLogFile(base: String, filePath: String, content: String): Unit = { + val path = new Path(base + "/" + filePath) + retry(maxRetryCount) { + val os = fs.create(path) + val bos = new BufferedOutputStream(os) + bos.write(content.getBytes("UTF-8")) + bos.close() + } + } + def read(base: String, filePath: String, alternateQueryName: String, collectionRid: String): String = { val path = new Path(base + "/" + filePath) read(path, base, alternateQueryName, collectionRid)