From 83f96befdbeee9279b87777bfaa1c352507d2b19 Mon Sep 17 00:00:00 2001 From: maasg Date: Mon, 6 Jul 2020 15:53:27 +0200 Subject: [PATCH] refresh branch from upstream --- pom.xml | 6 +++--- .../microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala | 7 ++++--- .../azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala | 9 ++++++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index c1bd7a9e..be8abd5d 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ limitations under the License. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.microsoft.azure - azure-cosmosdb-spark_2.4.0_2.11 + azure-cosmosdb-spark_2.4.0_2.12 jar 3.0.6 ${project.groupId}:${project.artifactId} @@ -32,8 +32,8 @@ limitations under the License. - 2.11.12 - 2.11 + 2.12.11 + 2.12 azure-cosmosdb-spark 3.1.1 2.4.4 diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala index de712337..f58a033b 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala @@ -115,10 +115,11 @@ class CosmosDBRDD( case cosmosDBPartition: CosmosDBPartition => logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") - val taskCompletionListener:TaskCompletionListener = (ctx: TaskContext) => { - logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") + val completionListener: TaskCompletionListener = new TaskCompletionListener() { + override def onTaskCompletion(context: TaskContext): Unit = + logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") } - context.addTaskCompletionListener(taskCompletionListener) + context.addTaskCompletionListener(completionListener) new CosmosDBRDDIterator( hadoopConfig, diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala index 56a24d78..bdd05e84 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala @@ -425,10 +425,13 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], }) // Register an on-task-completion callback to close the input stream. - val taskCompletionListener: TaskCompletionListener = new TaskCompletionListener() { - override def onTaskCompletion(context: TaskContext): Unit = closeIfNeeded() + val taskCompletionListerner = new TaskCompletionListener() { + override def onTaskCompletion(context: TaskContext): Unit = { + closeIfNeeded() + } } - taskContext.addTaskCompletionListener(taskCompletionListener) + + taskContext.addTaskCompletionListener(taskCompletionListerner) if (!readingChangeFeed) { queryDocuments