Skip to content

Commit

Permalink
refresh branch from upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
maasg committed Jul 6, 2020
1 parent 4309f32 commit 83f96be
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ limitations under the License.
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-spark_2.4.0_2.11</artifactId>
<artifactId>azure-cosmosdb-spark_2.4.0_2.12</artifactId>
<packaging>jar</packaging>
<version>3.0.6</version>
<name>${project.groupId}:${project.artifactId}</name>
Expand All @@ -32,8 +32,8 @@ limitations under the License.
</license>
</licenses>
<properties>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.12.11</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<sonar.projectBaseDir>azure-cosmosdb-spark</sonar.projectBaseDir>
<scala.test.version>3.1.1</scala.test.version>
<spark.version>2.4.4</spark.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ class CosmosDBRDD(
case cosmosDBPartition: CosmosDBPartition =>
logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")

val taskCompletionListener:TaskCompletionListener = (ctx: TaskContext) => {
logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
val completionListener: TaskCompletionListener = new TaskCompletionListener() {
override def onTaskCompletion(context: TaskContext): Unit =
logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
}
context.addTaskCompletionListener(taskCompletionListener)
context.addTaskCompletionListener(completionListener)

new CosmosDBRDDIterator(
hadoopConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,13 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
})

// Register an on-task-completion callback to close the input stream.
val taskCompletionListener: TaskCompletionListener = new TaskCompletionListener() {
override def onTaskCompletion(context: TaskContext): Unit = closeIfNeeded()
val taskCompletionListerner = new TaskCompletionListener() {
override def onTaskCompletion(context: TaskContext): Unit = {
closeIfNeeded()
}
}
taskContext.addTaskCompletionListener(taskCompletionListener)

taskContext.addTaskCompletionListener(taskCompletionListerner)

if (!readingChangeFeed) {
queryDocuments
Expand Down

0 comments on commit 83f96be

Please sign in to comment.