Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.1.1 #2

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ lazy val root = (project in file("."))
Seq(
organization := "org.apache.spark",
name := "spark-streaming-kinesis-asl",
version := "2.4.4",
scalaVersion := "2.11.12",
version := "3.1.1",
scalaVersion := "2.12.12",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % "2.4.4" % "provided",
"org.apache.spark" %% "spark-streaming" % "3.1.1" % "provided",
"com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.2" % "provided",
"com.amazonaws" % "aws-java-sdk-sts" % "1.11.728" % "provided",
"com.amazonaws" % "amazon-kinesis-producer" % "0.14.0" % "test",
"org.scalatest" %% "scalatest" % "3.0.3" % "test",
"org.scalacheck" %% "scalacheck" % "1.13.5" % "test",
"org.apache.spark" %% "spark-core" % "2.4.4" % "test" classifier "tests",
"org.apache.spark" %% "spark-streaming" % "2.4.4" % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % "3.1.1" % "test" classifier "tests",
"org.apache.spark" %% "spark-streaming" % "3.1.1" % "test" classifier "tests",
"junit" % "junit" % "4.12" % "test",
"org.mockito" % "mockito-core" % "1.10.19" % "test"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kinesis.KinesisUtils;

import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import scala.Tuple2;
import scala.reflect.ClassTag$;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
Expand All @@ -48,15 +49,15 @@
*
* Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
* [app-name] is the name of the consumer app, used to track the read data in DynamoDB
* [stream-name] name of the Kinesis stream (ie. mySparkStream)
* [stream-name] name of the Kinesis stream (i.e. mySparkStream)
* [endpoint-url] endpoint of the Kinesis service
* (e.g. https://kinesis.us-east-1.amazonaws.com)
*
*
* Example:
* # export AWS keys if necessary
* $ export AWS_ACCESS_KEY_ID=[your-access-key]
* $ export AWS_SECRET_KEY=<your-secret-key>
* $ export AWS_SECRET_ACCESS_KEY=<your-secret-key>
*
* # run the example
* $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \
Expand All @@ -67,7 +68,7 @@
*
* This code uses the DefaultAWSCredentialsProviderChain to find credentials
* in the following order:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
* Instance profile credentials - delivered through the Amazon EC2 metadata service
Expand Down Expand Up @@ -135,17 +136,25 @@ public static void main(String[] args) throws Exception {
// Create the Kinesis DStreams
List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
streamsList.add(
KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval,
StorageLevel.MEMORY_AND_DISK_2())
);
streamsList.add(JavaDStream.fromDStream(
KinesisInputDStream.builder()
.streamingContext(jssc)
.checkpointAppName(kinesisAppName)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPosition(new KinesisInitialPositions.Latest())
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2())
.build(),
ClassTag$.MODULE$.apply(byte[].class)
));
}

// Union all the streams if there is more than 1 stream
JavaDStream<byte[]> unionStreams;
if (streamsList.size() > 1) {
unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
unionStreams = jssc.union(streamsList.toArray(new JavaDStream[0]));
} else {
// Otherwise, just use the 1 stream
unionStreams = streamsList.get(0);
Expand Down

This file was deleted.

This file was deleted.

15 changes: 7 additions & 8 deletions src/main/python/examples/streaming/kinesis_wordcount_asl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,29 @@

Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
<app-name> is the name of the consumer app, used to track the read data in DynamoDB
<stream-name> name of the Kinesis stream (ie. mySparkStream)
<stream-name> name of the Kinesis stream (i.e. mySparkStream)
<endpoint-url> endpoint of the Kinesis service
(e.g. https://kinesis.us-east-1.amazonaws.com)
<region-name> region name of the Kinesis endpoint (e.g. us-east-1)


Example:
# export AWS keys if necessary
$ export AWS_ACCESS_KEY_ID=<your-access-key>
$ export AWS_SECRET_KEY=<your-secret-key>
$ export AWS_SECRET_ACCESS_KEY=<your-secret-key>

# run the example
$ bin/spark-submit -jars external/kinesis-asl/target/scala-*/\
spark-streaming-kinesis-asl-assembly_*.jar \
$ bin/spark-submit --jars \
'external/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \
external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com
myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com us-east-1

There is a companion helper class called KinesisWordProducerASL which puts dummy data
onto the Kinesis stream.

This code uses the DefaultAWSCredentialsProviderChain to find credentials
in the following order:
Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
Java System Properties - aws.accessKeyId and aws.secretKey
Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
Instance profile credentials - delivered through the Amazon EC2 metadata service
Expand All @@ -54,8 +55,6 @@
See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
the Kinesis Spark Streaming integration.
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ import org.apache.spark.streaming.kinesis.KinesisInputDStream
*
* Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
* <app-name> is the name of the consumer app, used to track the read data in DynamoDB
* <stream-name> name of the Kinesis stream (ie. mySparkStream)
* <stream-name> name of the Kinesis stream (i.e. mySparkStream)
* <endpoint-url> endpoint of the Kinesis service
* (e.g. https://kinesis.us-east-1.amazonaws.com)
*
*
* Example:
* # export AWS keys if necessary
* $ export AWS_ACCESS_KEY_ID=<your-access-key>
* $ export AWS_SECRET_KEY=<your-secret-key>
* $ export AWS_SECRET_ACCESS_KEY=<your-secret-key>
*
* # run the example
* $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \
Expand All @@ -62,7 +62,7 @@ import org.apache.spark.streaming.kinesis.KinesisInputDStream
*
* This code uses the DefaultAWSCredentialsProviderChain to find credentials
* in the following order:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
* Instance profile credentials - delivered through the Amazon EC2 metadata service
Expand All @@ -73,12 +73,12 @@ import org.apache.spark.streaming.kinesis.KinesisInputDStream
* the Kinesis Spark Streaming integration.
*/
object KinesisWordCountASL extends Logging {
def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
// Check that all required args were passed in.
if (args.length != 3) {
System.err.println(
"""
|Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
|Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url>
|
| <app-name> is the name of the consumer app, used to track the read data in DynamoDB
| <stream-name> is the name of the Kinesis stream
Expand Down Expand Up @@ -167,29 +167,29 @@ object KinesisWordCountASL extends Logging {
* Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
* <records-per-sec> <words-per-record>
*
* <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
* <stream-name> is the name of the Kinesis stream (i.e. mySparkStream)
* <endpoint-url> is the endpoint of the Kinesis service
* (ie. https://kinesis.us-east-1.amazonaws.com)
* (i.e. https://kinesis.us-east-1.amazonaws.com)
* <records-per-sec> is the rate of records per second to put onto the stream
* <words-per-record> is the rate of records per second to put onto the stream
* <words-per-record> is the number of words per record
*
* Example:
* $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \
* https://kinesis.us-east-1.amazonaws.com us-east-1 10 5
* https://kinesis.us-east-1.amazonaws.com 10 5
*/
object KinesisWordProducerASL {
def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
if (args.length != 4) {
System.err.println(
"""
|Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec>
<words-per-record>
| <words-per-record>
|
| <stream-name> is the name of the Kinesis stream
| <endpoint-url> is the endpoint of the Kinesis service
| (e.g. https://kinesis.us-east-1.amazonaws.com)
| <records-per-sec> is the rate of records per second to put onto the stream
| <words-per-record> is the rate of records per second to put onto the stream
| <words-per-record> is the number of words per record
|
""".stripMargin)

Expand Down Expand Up @@ -269,7 +269,7 @@ object KinesisWordProducerASL {
*/
private[streaming] object StreamingExamples extends Logging {
// Set reasonable logging levels for streaming if the user has not configured log4j.
def setStreamingLogLevels() {
def setStreamingLogLevels(): Unit = {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.streaming.kinesis

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand Down Expand Up @@ -89,7 +91,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
require(_blockIds.length == arrayOfseqNumberRanges.length,
"Number of blockIds is not equal to the number of sequence number ranges")

override def isValid(): Boolean = true
override def isValid: Boolean = true

override def getPartitions: Array[Partition] = {
Array.tabulate(_blockIds.length) { i =>
Expand Down Expand Up @@ -251,13 +253,16 @@ class KinesisSequenceRangeIterator(

/** Helper method to retry Kinesis API request with exponential backoff and timeouts */
private def retryOrTimeout[T](message: String)(body: => T): T = {
val startTimeMs = System.currentTimeMillis()
val startTimeNs = System.nanoTime()
var retryCount = 0
var result: Option[T] = None
var lastError: Throwable = null
var waitTimeInterval = kinesisReadConfigs.retryWaitTimeMs

def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= kinesisReadConfigs.retryTimeoutMs
def isTimedOut = {
val retryTimeoutNs = TimeUnit.MILLISECONDS.toNanos(kinesisReadConfigs.retryTimeoutMs)
(System.nanoTime() - startTimeNs) >= retryTimeoutNs
}
def isMaxRetryDone = retryCount >= kinesisReadConfigs.maxRetries

while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent._
import scala.util.control.NonFatal

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason

import org.apache.spark.internal.Logging
import org.apache.spark.streaming.Duration
Expand Down
Loading