diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index cc639869a91..d448a10c18d 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -216,7 +216,7 @@ jobs:
${{ runner.os }}-zeppelin-
- name: install environment
run: |
- ./mvnw install -DskipTests -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am -Pflink-114 ${MAVEN_ARGS}
+ ./mvnw install -DskipTests -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pflink-117 ${MAVEN_ARGS}
./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
- name: Setup conda environment with python 3.7 and R
uses: conda-incubator/setup-miniconda@v2
@@ -243,7 +243,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- flink: [113, 114, 115, 116, 117]
+ flink: [115, 116, 117]
steps:
- name: Checkout
uses: actions/checkout@v3
@@ -265,13 +265,7 @@ jobs:
key: ${{ runner.os }}-zeppelin-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-zeppelin-
- - name: install environment for flink before 1.15 (exclusive)
- if: matrix.flink < '115'
- run: |
- ./mvnw install -DskipTests -am -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration ${MAVEN_ARGS}
- ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
- - name: install environment for flink after 1.15 (inclusive)
- if: matrix.flink >= '115'
+ - name: install environment for flink
run: |
./mvnw install -DskipTests -am -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration ${MAVEN_ARGS}
./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
@@ -286,11 +280,7 @@ jobs:
channel-priority: true
auto-activate-base: false
use-mamba: true
- - name: run tests for flink before 1.15 (exclusive)
- if: matrix.flink < '115'
- run: ./mvnw verify -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS}
- - name: run tests for flink after 1.15 (inclusive)
- if: matrix.flink >= '115'
+ - name: run tests for flink
run: ./mvnw verify -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -am -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS}
- name: Print zeppelin logs
if: always()
diff --git a/.gitignore b/.gitignore
index 2d58c179ade..29bb190ebca 100644
--- a/.gitignore
+++ b/.gitignore
@@ -67,6 +67,7 @@ zeppelin-web/yarn.lock
/warehouse/
/notebook/
/local-repo/
+/notebook_*/
**/sessions/
**/data/
@@ -97,6 +98,9 @@ Thumbs.db
.idea/
*.iml
+# Jetbrains Fleet project files
+.fleet/
+
# vscode project files
.vscode/
@@ -132,3 +136,6 @@ tramp
# jEnv file
.java-version
+
+# pyenv file
+.python-version
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index c5a6af957ab..cc40d03a7aa 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -27,7 +27,7 @@ limitations under the License.
[Apache Flink](https://flink.apache.org) is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
-In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Only Flink 1.10+ is supported, old versions of flink won't work.**
+In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.15+ is supported, old versions of flink won't work.**
Apache Flink is supported in Zeppelin with the Flink interpreter group which consists of the five interpreters listed below.
@@ -74,10 +74,6 @@ Apache Flink is supported in Zeppelin with the Flink interpreter group which con
Support multiple versions of Flink
You can run different versions of Flink in one Zeppelin instance
-
-
Support multiple versions of Scala
-
You can run different Scala versions of Flink in on Zeppelin instance
-
Support multiple languages
Scala, Python, SQL are supported, besides that you can also collaborate across languages, e.g. you can write Scala UDF and use it in PyFlink
@@ -142,11 +138,11 @@ docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zep
## Prerequisites
-Download Flink 1.10 or afterwards (Scala 2.11 & 2.12 are both supported)
+Download Flink 1.15 or afterwards (Only Scala 2.12 is supported)
-### Specific for Flink 1.15 and above
+### Version-specific notes for Flink
-Flink 1.15 is scala free and has changed its binary distribution. If you would like to make Zeppelin work with Flink 1.15, you need to do the following extra steps.
+Flink 1.15 is scala free and has changed its binary distribution, the following extra steps is required.
* Move FLINK_HOME/opt/flink-table-planner_2.12-1.15.0.jar to FLINK_HOME/lib
* Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt
* Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib
diff --git a/flink/README.md b/flink/README.md
index bb8aa74a064..12ce38d6349 100644
--- a/flink/README.md
+++ b/flink/README.md
@@ -8,45 +8,26 @@ This is the doc for Zeppelin developers who want to work on flink interpreter.
Flink interpreter is more complex than other interpreter (such as jdbc, shell). Currently it has following 8 modules
* flink-shims
-* flink1.13-shims
-* flink1.14-shims
* flink1.15-shims
* flink1.16-shims
+* flink1.17-shims
* flink-scala-parent
-* flink-scala-2.11
* flink-scala-2.12
-The first 5 modules are to adapt different flink versions because there're some api changes between different versions of flink.
+The first 4 modules are to adapt different flink versions because there're some api changes between different versions of flink.
`flink-shims` is parent module for other shims modules.
At runtime Flink interpreter will load the FlinkShims based on the current flink versions (See `FlinkShims#loadShims`).
-The remaining 3 modules are to adapt different scala versions (Apache Flink supports 2 scala versions: 2.11 & 2.12).
-`flink-scala-parent` is a parent module for `flink-scala-2.11` and `flink-scala-2.12`. It contains common code for both `flink-scala-2.11` and `flink-scala-2.12`.
-There's symlink folder `flink-scala-parent` under `flink-scala-2.11` and `flink-scala-2.12`.
+The remaining 2 modules are to adapt different scala versions (Apache Flink only supports Scala 2.12).
+`flink-scala-parent` is a parent module for `flink-scala-2.12`.
+There's symlink folder `flink-scala-parent` under `flink-scala-2.12`.
When you run maven command to build flink interpreter, the source code in `flink-scala-parent` won't be compiled directly, instead
-they will be compiled against different scala versions when building `flink-scala-2.11` & `flink-scala-2.12`. (See `build-helper-maven-plugin` in `pom.xml`)
-Both `flink-scala-2.11` and `flink-scala-2.12` build a flink interpreter jar and `FlinkInterpreterLauncher` in `zeppelin-plugins/launcher/flink` will choose the right jar based
+they will be compiled against different scala versions when building `flink-scala-2.12`. (See `build-helper-maven-plugin` in `pom.xml`)
+Both `flink-scala-2.12` build a flink interpreter jar and `FlinkInterpreterLauncher` in `zeppelin-plugins/launcher/flink` will choose the right jar based
on the scala version of flink.
### Work in IDE
-Because of the complex project structure of flink interpreter, we need to do more configuration to make it work in IDE.
-Here we take Intellij as an example (other IDE should be similar).
-
-The key point is that we can only make flink interpreter work with one scala version at the same time in IDE.
-So we have to disable the other module when working with one specific scala version module.
-
-#### Make it work with scala-2.11
-
-1. Exclude the source code folder (java/scala) of `flink-scala-parent` (Right click these folder -> Mark directory As -> Excluded)
-2. Include the source code folder (java/scala) of `flink/flink-scala-2.11/flink-scala-parent` (Right click these folder -> Mark directory As -> Source root)
-
-#### Make it work with scala-2.12
-
-1. Exclude the source code folder (java/scala) of `flink-scala-parent` (Right click these folder -> Mark directory As -> Excluded)
-2. Include the source code folder (java/scala) of `flink/flink-scala-2.12/flink-scala-parent` (Right click these folder -> Mark directory As -> Source root)
-
-
#### How to run unit test in IDE
Take `FlinkInterpreterTest` as an example, you need to specify environment variables `FLINK_HOME`, `FLINK_CONF_DIR`, `ZEPPELIN_HOME`.
diff --git a/flink/flink-scala-2.11/flink-scala-parent b/flink/flink-scala-2.11/flink-scala-parent
deleted file mode 120000
index 3dfa859ba33..00000000000
--- a/flink/flink-scala-2.11/flink-scala-parent
+++ /dev/null
@@ -1 +0,0 @@
-../flink-scala-parent
\ No newline at end of file
diff --git a/flink/flink-scala-2.11/pom.xml b/flink/flink-scala-2.11/pom.xml
deleted file mode 100644
index 33c40a3b733..00000000000
--- a/flink/flink-scala-2.11/pom.xml
+++ /dev/null
@@ -1,83 +0,0 @@
-
-
-
-
- org.apache.zeppelin
- flink-scala-parent
- 0.11.0-SNAPSHOT
- ../flink-scala-parent/pom.xml
-
-
- 4.0.0
- org.apache.zeppelin
- flink-scala-2.11
- 0.11.0-SNAPSHOT
- jar
- Zeppelin: Flink Interpreter Scala_2.11
-
-
- 2.11.12
- 2.11
- ${flink.scala.version}
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
-
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
-
- com.googlecode.maven-download-plugin
- download-maven-plugin
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
-
-
- org.apache.maven.plugins
- maven-resources-plugin
-
-
-
- org.scalatest
- scalatest-maven-plugin
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
-
-
-
diff --git a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
deleted file mode 100644
index d61bcbcc4f2..00000000000
--- a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink
-
-import scala.tools.nsc.interpreter.{ExprTyper, IR}
-
-trait FlinkExprTyper extends ExprTyper {
-
- import repl._
- import global.{Import => _, reporter => _, _}
- import naming.freshInternalVarName
-
- def doInterpret(code: String): IR.Result = {
- // interpret/interpretSynthetic may change the phase,
- // which would have unintended effects on types.
- val savedPhase = phase
- try interpretSynthetic(code) finally phase = savedPhase
- }
-
- override def symbolOfLine(code: String): Symbol = {
- def asExpr(): Symbol = {
- val name = freshInternalVarName()
- // Typing it with a lazy val would give us the right type, but runs
- // into compiler bugs with things like existentials, so we compile it
- // behind a def and strip the NullaryMethodType which wraps the expr.
- val line = "def " + name + " = " + code
-
- doInterpret(line) match {
- case IR.Success =>
- val sym0 = symbolOfTerm(name)
- // drop NullaryMethodType
- sym0.cloneSymbol setInfo exitingTyper(sym0.tpe_*.finalResultType)
- case _ => NoSymbol
- }
- }
-
- def asDefn(): Symbol = {
- val old = repl.definedSymbolList.toSet
-
- doInterpret(code) match {
- case IR.Success =>
- repl.definedSymbolList filterNot old match {
- case Nil => NoSymbol
- case sym :: Nil => sym
- case syms => NoSymbol.newOverloaded(NoPrefix, syms)
- }
- case _ => NoSymbol
- }
- }
-
- def asError(): Symbol = {
- doInterpret(code)
- NoSymbol
- }
-
- beSilentDuring(asExpr()) orElse beSilentDuring(asDefn()) orElse asError()
- }
-
-}
diff --git a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
deleted file mode 100644
index 08cb0c0db12..00000000000
--- a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink
-
-import scala.collection.mutable
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter._
-
-class FlinkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) {
- self =>
-
- override lazy val memberHandlers = new {
- val intp: self.type = self
- } with MemberHandlers {
- import intp.global._
-
- override def chooseHandler(member: intp.global.Tree): MemberHandler = member match {
- case member: Import => new FlinkImportHandler(member)
- case _ => super.chooseHandler(member)
- }
-
- class FlinkImportHandler(imp: Import) extends ImportHandler(imp: Import) {
-
- override def targetType: Type = intp.global.rootMirror.getModuleIfDefined("" + expr) match {
- case NoSymbol => intp.typeOfExpression("" + expr)
- case sym => sym.tpe
- }
-
- private def safeIndexOf(name: Name, s: String): Int = fixIndexOf(name, pos(name, s))
- private def fixIndexOf(name: Name, idx: Int): Int = if (idx == name.length) -1 else idx
- private def pos(name: Name, s: String): Int = {
- var i = name.pos(s.charAt(0), 0)
- val sLen = s.length()
- if (sLen == 1) return i
- while (i + sLen <= name.length) {
- var j = 1
- while (s.charAt(j) == name.charAt(i + j)) {
- j += 1
- if (j == sLen) return i
- }
- i = name.pos(s.charAt(0), i + 1)
- }
- name.length
- }
-
- private def isFlattenedSymbol(sym: Symbol): Boolean =
- sym.owner.isPackageClass &&
- sym.name.containsName(nme.NAME_JOIN_STRING) &&
- sym.owner.info.member(sym.name.take(
- safeIndexOf(sym.name, nme.NAME_JOIN_STRING))) != NoSymbol
-
- private def importableTargetMembers =
- importableMembers(exitingTyper(targetType)).filterNot(isFlattenedSymbol).toList
-
- def isIndividualImport(s: ImportSelector): Boolean =
- s.name != nme.WILDCARD && s.rename != nme.WILDCARD
- def isWildcardImport(s: ImportSelector): Boolean =
- s.name == nme.WILDCARD
-
- // non-wildcard imports
- private def individualSelectors = selectors filter isIndividualImport
-
- override val importsWildcard: Boolean = selectors exists isWildcardImport
-
- lazy val importableSymbolsWithRenames: List[(Symbol, Name)] = {
- val selectorRenameMap =
- individualSelectors.flatMap(x => x.name.bothNames zip x.rename.bothNames).toMap
- importableTargetMembers flatMap (m => selectorRenameMap.get(m.name) map (m -> _))
- }
-
- override lazy val individualSymbols: List[Symbol] = importableSymbolsWithRenames map (_._1)
- override lazy val wildcardSymbols: List[Symbol] =
- if (importsWildcard) importableTargetMembers else Nil
-
- }
-
- }
-
- object expressionTyper extends {
- val repl: FlinkILoopInterpreter.this.type = self
- } with FlinkExprTyper { }
-
- override def symbolOfLine(code: String): global.Symbol =
- expressionTyper.symbolOfLine(code)
-
- override def typeOfExpression(expr: String, silent: Boolean): global.Type =
- expressionTyper.typeOfExpression(expr, silent)
-
-
- import global.Name
- override def importsCode(wanted: Set[Name], wrapper: Request#Wrapper,
- definesClass: Boolean, generousImports: Boolean): ComputedImports = {
-
- import global._
- import definitions.PredefModule
- import memberHandlers._
-
- val header, code, trailingBraces, accessPath = new StringBuilder
- val currentImps = mutable.HashSet[Name]()
- // only emit predef import header if name not resolved in history, loosely
- var predefEscapes = false
-
- /**
- * Narrow down the list of requests from which imports
- * should be taken. Removes requests which cannot contribute
- * useful imports for the specified set of wanted names.
- */
- case class ReqAndHandler(req: Request, handler: MemberHandler)
-
- def reqsToUse: List[ReqAndHandler] = {
- /**
- * Loop through a list of MemberHandlers and select which ones to keep.
- * 'wanted' is the set of names that need to be imported.
- */
- def select(reqs: List[ReqAndHandler], wanted: Set[Name]): List[ReqAndHandler] = {
- // Single symbol imports might be implicits! See bug #1752. Rather than
- // try to finesse this, we will mimic all imports for now.
- def keepHandler(handler: MemberHandler) = handler match {
- // While defining classes in class based mode - implicits are not needed.
- case h: ImportHandler if isClassBased && definesClass =>
- h.importedNames.exists(x => wanted.contains(x))
- case _: ImportHandler => true
- case x if generousImports => x.definesImplicit ||
- (x.definedNames exists (d => wanted.exists(w => d.startsWith(w))))
- case x => x.definesImplicit ||
- (x.definedNames exists wanted)
- }
-
- reqs match {
- case Nil =>
- predefEscapes = wanted contains PredefModule.name ; Nil
- case rh :: rest if !keepHandler(rh.handler) => select(rest, wanted)
- case rh :: rest =>
- import rh.handler._
- val augment = rh match {
- case ReqAndHandler(_, _: ImportHandler) => referencedNames
- case _ => Nil
- }
- val newWanted = wanted ++ augment -- definedNames -- importedNames
- rh :: select(rest, newWanted)
- }
- }
-
- /** Flatten the handlers out and pair each with the original request */
- select(allReqAndHandlers reverseMap { case (r, h) => ReqAndHandler(r, h) }, wanted).reverse
- }
-
- // add code for a new object to hold some imports
- def addWrapper() {
- import nme.{INTERPRETER_IMPORT_WRAPPER => iw}
- code append (wrapper.prewrap format iw)
- trailingBraces append wrapper.postwrap
- accessPath append s".$iw"
- currentImps.clear()
- }
-
- def maybeWrap(names: Name*) = if (names exists currentImps) addWrapper()
-
- def wrapBeforeAndAfter[T](op: => T): T = {
- addWrapper()
- try op finally addWrapper()
- }
-
- // imports from Predef are relocated to the template header to allow hiding.
- def checkHeader(h: ImportHandler) = h.referencedNames contains PredefModule.name
-
- // loop through previous requests, adding imports for each one
- wrapBeforeAndAfter {
- // Reusing a single temporary value when import from a line with multiple definitions.
- val tempValLines = mutable.Set[Int]()
- for (ReqAndHandler(req, handler) <- reqsToUse) {
- val objName = req.lineRep.readPathInstance
- handler match {
- case h: ImportHandler if checkHeader(h) =>
- header.clear()
- header append f"${h.member}%n"
- // If the user entered an import, then just use it; add an import wrapping
- // level if the import might conflict with some other import
- case x: ImportHandler if x.importsWildcard =>
- wrapBeforeAndAfter(code append (x.member + "\n"))
- case x: ImportHandler =>
- maybeWrap(x.importedNames: _*)
- code append (x.member + "\n")
- currentImps ++= x.importedNames
-
- case x if isClassBased =>
- for (sym <- x.definedSymbols) {
- maybeWrap(sym.name)
- x match {
- case _: ClassHandler =>
- code.append(s"import ${objName}${req.accessPath}.`${sym.name}`\n")
- case _ =>
- val valName = s"${req.lineRep.packageName}${req.lineRep.readName}"
- if (!tempValLines.contains(req.lineRep.lineId)) {
- code.append(s"val $valName: ${objName}.type = $objName\n")
- tempValLines += req.lineRep.lineId
- }
- code.append(s"import ${valName}${req.accessPath}.`${sym.name}`\n")
- }
- currentImps += sym.name
- }
- // For other requests, import each defined name.
- // import them explicitly instead of with _, so that
- // ambiguity errors will not be generated. Also, quote
- // the name of the variable, so that we don't need to
- // handle quoting keywords separately.
- case x =>
- for (sym <- x.definedSymbols) {
- maybeWrap(sym.name)
- code append s"import ${x.path}\n"
- currentImps += sym.name
- }
- }
- }
- }
-
- val computedHeader = if (predefEscapes) header.toString else ""
- ComputedImports(computedHeader, code.toString, trailingBraces.toString, accessPath.toString)
- }
-
- private def allReqAndHandlers =
- prevRequestList flatMap (req => req.handlers map (req -> _))
-
-}
diff --git a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala
deleted file mode 100644
index fc40df029de..00000000000
--- a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink
-
-import java.io.File
-import java.net.URLClassLoader
-import java.util.Properties
-
-import org.apache.zeppelin.interpreter.InterpreterContext
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.{IMain, JPrintWriter}
-
-
-class FlinkScala211Interpreter(override val properties: Properties,
- override val flinkScalaClassLoader: ClassLoader)
- extends FlinkScalaInterpreter(properties, flinkScalaClassLoader) {
-
- override def completion(buf: String,
- cursor: Int,
- context: InterpreterContext): java.util.List[InterpreterCompletion] = {
- val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates
- .map(e => new InterpreterCompletion(e, e, null))
- scala.collection.JavaConversions.seqAsJavaList(completions)
- }
-
- override def createIMain(settings: Settings, out: JPrintWriter): IMain = new FlinkILoopInterpreter(settings, out)
-
- override def createSettings(): Settings = {
- val settings = new Settings()
- // Don't call settings#embeddedDefaults for scala-2.11, otherwise it could cause weird error
- settings.usejavacp.value = true
- settings.Yreplsync.value = true
- settings.classpath.value = userJars.mkString(File.pathSeparator)
- settings
- }
-}
diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index 79b839fefb4..e0969cc6fee 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -35,13 +35,11 @@
flink
- ${flink1.13.version}
+ ${flink1.17.version}${hadoop2.7.version}2.3.44.0.01.15.0
-
- _${flink.scala.binary.version}https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${flink.scala.binary.version}.tgz
@@ -55,18 +53,6 @@
${project.version}
-
- org.apache.zeppelin
- flink1.13-shims
- ${project.version}
-
-
-
- org.apache.zeppelin
- flink1.14-shims
- ${project.version}
-
-
org.apache.zeppelinflink1.15-shims
@@ -153,14 +139,14 @@
org.apache.flink
- flink-clients${flink.library.scala.suffix}
+ flink-clients${flink.version}providedorg.apache.flink
- flink-yarn${flink.library.scala.suffix}
+ flink-yarn${flink.version}provided
@@ -195,7 +181,7 @@
org.apache.flink
- flink-table-api-java-bridge${flink.library.scala.suffix}
+ flink-table-api-java-bridge${flink.version}provided
@@ -209,7 +195,7 @@
org.apache.flink
- flink-streaming-java${flink.library.scala.suffix}
+ flink-streaming-java${flink.version}provided
@@ -874,86 +860,12 @@
-
-
- flink-113
-
- ${flink1.13.version}
-
-
-
- org.apache.flink
- flink-runtime_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-table-runtime-blink_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-table-planner_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-table-planner-blink_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
- org.reflections
- reflections
-
-
-
-
- org.apache.flink
- flink-python_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
-
-
- flink-114
-
- ${flink1.14.version}
-
-
-
- org.apache.flink
- flink-runtime
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-table-planner_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-python_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
-
flink-115${flink1.15.version}2.12.72.12
-
@@ -983,7 +895,6 @@
${flink1.16.version}2.12.72.12
-
@@ -1019,7 +930,6 @@
${flink1.17.version}2.12.72.12
-
diff --git a/flink/flink1.13-shims/pom.xml b/flink/flink1.13-shims/pom.xml
deleted file mode 100644
index 8f4765ed8cc..00000000000
--- a/flink/flink1.13-shims/pom.xml
+++ /dev/null
@@ -1,213 +0,0 @@
-
-
-
-
-
- flink-parent
- org.apache.zeppelin
- 0.11.0-SNAPSHOT
- ../pom.xml
-
-
- 4.0.0
- org.apache.zeppelin
- flink1.13-shims
- 0.11.0-SNAPSHOT
- jar
- Zeppelin: Flink1.13 Shims
-
-
- ${flink1.13.version}
-
-
-
-
-
- org.apache.zeppelin
- flink-shims
- ${project.version}
-
-
-
- org.apache.flink
- flink-core
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-clients_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-runtime_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
-
- org.apache.flink
- flink-table-api-scala_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-api-scala-bridge_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-api-java-bridge_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-scala_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-streaming-java_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-streaming-scala_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-java
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-planner-blink_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
- org.reflections
- reflections
-
-
-
-
-
- org.apache.flink
- flink-table-planner_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-python_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
-
-
-
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
- eclipse-add-source
-
- add-source
-
-
-
- scala-compile-first
- process-resources
-
- compile
-
-
-
- scala-test-compile-first
- process-test-resources
-
- testCompile
-
-
-
-
- ${flink.scala.version}
-
- -unchecked
- -deprecation
- -feature
- -nobootcp
-
-
- -Xms1024m
- -Xmx1024m
- -XX:MaxMetaspaceSize=${MaxMetaspace}
-
-
- -source
- ${java.version}
- -target
- ${java.version}
- -Xlint:all,-serial,-path,-options
-
-
-
-
-
- maven-resources-plugin
-
-
- copy-interpreter-setting
- none
-
- true
-
-
-
-
-
-
-
-
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
deleted file mode 100644
index 440b2457964..00000000000
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.scala.DataSet;
-import org.apache.flink.client.cli.CliFrontend;
-import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.*;
-import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
-import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
-import org.apache.flink.table.factories.ComponentFactoryService;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.TableAggregateFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.utils.PrintUtils;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims113.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims113.Flink113ScalaShims;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.URL;
-import java.time.ZoneId;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-
-/**
- * Shims for flink 1.13
- */
-public class Flink113Shims extends FlinkShims {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class);
-
- private Flink113SqlInterpreter batchSqlInterpreter;
- private Flink113SqlInterpreter streamSqlInterpreter;
-
- public Flink113Shims(FlinkVersion flinkVersion, Properties properties) {
- super(flinkVersion, properties);
- }
-
- public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
- this.batchSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, true);
- }
-
- public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
- this.streamSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, false);
- }
-
- @Override
- public Object createResourceManager(List jars, Object tableConfig) {
- return null;
- }
-
- @Override
- public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List jars) {
- return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
- }
-
- @Override
- public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
- // do nothing
- }
-
- @Override
- public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
- Object senvObj,
- Object tableConfigObj,
- Object moduleManagerObj,
- Object functionCatalogObj,
- Object catalogManagerObj,
- List jars,
- ClassLoader classLoader) {
- EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
- StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
- TableConfig tableConfig = (TableConfig) tableConfigObj;
- ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
- FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
- CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
- ImmutablePair