From 9dadde36d821094f1c4b48d22f75c5a41314faa5 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 9 Nov 2023 01:33:48 -0600 Subject: [PATCH] [ZEPPELIN-5978] Remove support for old Flink 1.13 and 1.14 (#4688) * [ZEPPELIN-5978] Remove support for old Flink 1.13 and 1.14 --- .github/workflows/core.yml | 18 +- .gitignore | 7 + docs/interpreter/flink.md | 12 +- flink/README.md | 33 +- flink/flink-scala-2.11/flink-scala-parent | 1 - flink/flink-scala-2.11/pom.xml | 83 --- .../zeppelin/flink/FlinkExprTyper.scala | 75 --- .../flink/FlinkILoopInterpreter.scala | 240 ------- .../flink/FlinkScala211Interpreter.scala | 54 -- flink/flink-scala-parent/pom.xml | 100 +-- flink/flink1.13-shims/pom.xml | 213 ------- .../apache/zeppelin/flink/Flink113Shims.java | 380 ----------- .../flink/Flink113SqlInterpreter.java | 562 ----------------- .../shims113/CollectStreamTableSink.java | 97 --- .../flink/shims113/Flink113ScalaShims.scala | 36 -- flink/flink1.14-shims/pom.xml | 206 ------ .../apache/zeppelin/flink/Flink114Shims.java | 378 ----------- .../flink/Flink114SqlInterpreter.java | 590 ------------------ .../shims114/CollectStreamTableSink.java | 97 --- flink/pom.xml | 31 +- testing/env_python_3_with_flink_113.yml | 30 - testing/env_python_3_with_flink_114.yml | 30 - .../integration/ZSessionIntegrationTest.java | 2 +- .../integration/DownloadUtils.java | 19 + .../integration/SemanticVersion.java | 106 ++++ 25 files changed, 158 insertions(+), 3242 deletions(-) delete mode 120000 flink/flink-scala-2.11/flink-scala-parent delete mode 100644 flink/flink-scala-2.11/pom.xml delete mode 100644 flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala delete mode 100644 flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala delete mode 100644 flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala delete mode 100644 flink/flink1.13-shims/pom.xml delete mode 100644 flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java delete mode 100644 flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java delete mode 100644 flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java delete mode 100644 flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala delete mode 100644 flink/flink1.14-shims/pom.xml delete mode 100644 flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java delete mode 100644 flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java delete mode 100644 flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java delete mode 100644 testing/env_python_3_with_flink_113.yml delete mode 100644 testing/env_python_3_with_flink_114.yml create mode 100644 zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index cc639869a91..d448a10c18d 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -216,7 +216,7 @@ jobs: ${{ runner.os }}-zeppelin- - name: install environment run: | - ./mvnw install -DskipTests -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am -Pflink-114 ${MAVEN_ARGS} + ./mvnw install -DskipTests -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pflink-117 ${MAVEN_ARGS} ./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS} - name: Setup conda environment with python 3.7 and R uses: conda-incubator/setup-miniconda@v2 @@ -243,7 +243,7 @@ jobs: strategy: fail-fast: false matrix: - flink: [113, 114, 115, 116, 117] + flink: [115, 116, 117] steps: - name: Checkout uses: actions/checkout@v3 @@ -265,13 +265,7 @@ jobs: key: ${{ runner.os }}-zeppelin-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-zeppelin- - - name: install environment for flink before 1.15 (exclusive) - if: matrix.flink < '115' - run: | - ./mvnw install -DskipTests -am -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration ${MAVEN_ARGS} - ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS} - - name: install environment for flink after 1.15 (inclusive) - if: matrix.flink >= '115' + - name: install environment for flink run: | ./mvnw install -DskipTests -am -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration ${MAVEN_ARGS} ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS} @@ -286,11 +280,7 @@ jobs: channel-priority: true auto-activate-base: false use-mamba: true - - name: run tests for flink before 1.15 (exclusive) - if: matrix.flink < '115' - run: ./mvnw verify -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS} - - name: run tests for flink after 1.15 (inclusive) - if: matrix.flink >= '115' + - name: run tests for flink run: ./mvnw verify -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -am -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS} - name: Print zeppelin logs if: always() diff --git a/.gitignore b/.gitignore index 2d58c179ade..29bb190ebca 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,7 @@ zeppelin-web/yarn.lock /warehouse/ /notebook/ /local-repo/ +/notebook_*/ **/sessions/ **/data/ @@ -97,6 +98,9 @@ Thumbs.db .idea/ *.iml +# Jetbrains Fleet project files +.fleet/ + # vscode project files .vscode/ @@ -132,3 +136,6 @@ tramp # jEnv file .java-version + +# pyenv file +.python-version diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md index c5a6af957ab..cc40d03a7aa 100644 --- a/docs/interpreter/flink.md +++ b/docs/interpreter/flink.md @@ -27,7 +27,7 @@ limitations under the License. [Apache Flink](https://flink.apache.org) is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale. -In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Only Flink 1.10+ is supported, old versions of flink won't work.** +In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.15+ is supported, old versions of flink won't work.** Apache Flink is supported in Zeppelin with the Flink interpreter group which consists of the five interpreters listed below. @@ -74,10 +74,6 @@ Apache Flink is supported in Zeppelin with the Flink interpreter group which con - - - - @@ -142,11 +138,11 @@ docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zep ## Prerequisites -Download Flink 1.10 or afterwards (Scala 2.11 & 2.12 are both supported) +Download Flink 1.15 or afterwards (Only Scala 2.12 is supported) -### Specific for Flink 1.15 and above +### Version-specific notes for Flink -Flink 1.15 is scala free and has changed its binary distribution. If you would like to make Zeppelin work with Flink 1.15, you need to do the following extra steps. +Flink 1.15 is scala free and has changed its binary distribution, the following extra steps is required. * Move FLINK_HOME/opt/flink-table-planner_2.12-1.15.0.jar to FLINK_HOME/lib * Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt * Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib diff --git a/flink/README.md b/flink/README.md index bb8aa74a064..12ce38d6349 100644 --- a/flink/README.md +++ b/flink/README.md @@ -8,45 +8,26 @@ This is the doc for Zeppelin developers who want to work on flink interpreter. Flink interpreter is more complex than other interpreter (such as jdbc, shell). Currently it has following 8 modules * flink-shims -* flink1.13-shims -* flink1.14-shims * flink1.15-shims * flink1.16-shims +* flink1.17-shims * flink-scala-parent -* flink-scala-2.11 * flink-scala-2.12 -The first 5 modules are to adapt different flink versions because there're some api changes between different versions of flink. +The first 4 modules are to adapt different flink versions because there're some api changes between different versions of flink. `flink-shims` is parent module for other shims modules. At runtime Flink interpreter will load the FlinkShims based on the current flink versions (See `FlinkShims#loadShims`). -The remaining 3 modules are to adapt different scala versions (Apache Flink supports 2 scala versions: 2.11 & 2.12). -`flink-scala-parent` is a parent module for `flink-scala-2.11` and `flink-scala-2.12`. It contains common code for both `flink-scala-2.11` and `flink-scala-2.12`. -There's symlink folder `flink-scala-parent` under `flink-scala-2.11` and `flink-scala-2.12`. +The remaining 2 modules are to adapt different scala versions (Apache Flink only supports Scala 2.12). +`flink-scala-parent` is a parent module for `flink-scala-2.12`. +There's symlink folder `flink-scala-parent` under `flink-scala-2.12`. When you run maven command to build flink interpreter, the source code in `flink-scala-parent` won't be compiled directly, instead -they will be compiled against different scala versions when building `flink-scala-2.11` & `flink-scala-2.12`. (See `build-helper-maven-plugin` in `pom.xml`) -Both `flink-scala-2.11` and `flink-scala-2.12` build a flink interpreter jar and `FlinkInterpreterLauncher` in `zeppelin-plugins/launcher/flink` will choose the right jar based +they will be compiled against different scala versions when building `flink-scala-2.12`. (See `build-helper-maven-plugin` in `pom.xml`) +Both `flink-scala-2.12` build a flink interpreter jar and `FlinkInterpreterLauncher` in `zeppelin-plugins/launcher/flink` will choose the right jar based on the scala version of flink. ### Work in IDE -Because of the complex project structure of flink interpreter, we need to do more configuration to make it work in IDE. -Here we take Intellij as an example (other IDE should be similar). - -The key point is that we can only make flink interpreter work with one scala version at the same time in IDE. -So we have to disable the other module when working with one specific scala version module. - -#### Make it work with scala-2.11 - -1. Exclude the source code folder (java/scala) of `flink-scala-parent` (Right click these folder -> Mark directory As -> Excluded) -2. Include the source code folder (java/scala) of `flink/flink-scala-2.11/flink-scala-parent` (Right click these folder -> Mark directory As -> Source root) - -#### Make it work with scala-2.12 - -1. Exclude the source code folder (java/scala) of `flink-scala-parent` (Right click these folder -> Mark directory As -> Excluded) -2. Include the source code folder (java/scala) of `flink/flink-scala-2.12/flink-scala-parent` (Right click these folder -> Mark directory As -> Source root) - - #### How to run unit test in IDE Take `FlinkInterpreterTest` as an example, you need to specify environment variables `FLINK_HOME`, `FLINK_CONF_DIR`, `ZEPPELIN_HOME`. diff --git a/flink/flink-scala-2.11/flink-scala-parent b/flink/flink-scala-2.11/flink-scala-parent deleted file mode 120000 index 3dfa859ba33..00000000000 --- a/flink/flink-scala-2.11/flink-scala-parent +++ /dev/null @@ -1 +0,0 @@ -../flink-scala-parent \ No newline at end of file diff --git a/flink/flink-scala-2.11/pom.xml b/flink/flink-scala-2.11/pom.xml deleted file mode 100644 index 33c40a3b733..00000000000 --- a/flink/flink-scala-2.11/pom.xml +++ /dev/null @@ -1,83 +0,0 @@ - - - - - org.apache.zeppelin - flink-scala-parent - 0.11.0-SNAPSHOT - ../flink-scala-parent/pom.xml - - - 4.0.0 - org.apache.zeppelin - flink-scala-2.11 - 0.11.0-SNAPSHOT - jar - Zeppelin: Flink Interpreter Scala_2.11 - - - 2.11.12 - 2.11 - ${flink.scala.version} - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - - net.alchim31.maven - scala-maven-plugin - - - - com.googlecode.maven-download-plugin - download-maven-plugin - - - - org.apache.maven.plugins - maven-surefire-plugin - - - - org.apache.maven.plugins - maven-resources-plugin - - - - org.scalatest - scalatest-maven-plugin - - - org.apache.maven.plugins - maven-jar-plugin - - - - org.apache.maven.plugins - maven-shade-plugin - - - - - diff --git a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala deleted file mode 100644 index d61bcbcc4f2..00000000000 --- a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink - -import scala.tools.nsc.interpreter.{ExprTyper, IR} - -trait FlinkExprTyper extends ExprTyper { - - import repl._ - import global.{Import => _, reporter => _, _} - import naming.freshInternalVarName - - def doInterpret(code: String): IR.Result = { - // interpret/interpretSynthetic may change the phase, - // which would have unintended effects on types. - val savedPhase = phase - try interpretSynthetic(code) finally phase = savedPhase - } - - override def symbolOfLine(code: String): Symbol = { - def asExpr(): Symbol = { - val name = freshInternalVarName() - // Typing it with a lazy val would give us the right type, but runs - // into compiler bugs with things like existentials, so we compile it - // behind a def and strip the NullaryMethodType which wraps the expr. - val line = "def " + name + " = " + code - - doInterpret(line) match { - case IR.Success => - val sym0 = symbolOfTerm(name) - // drop NullaryMethodType - sym0.cloneSymbol setInfo exitingTyper(sym0.tpe_*.finalResultType) - case _ => NoSymbol - } - } - - def asDefn(): Symbol = { - val old = repl.definedSymbolList.toSet - - doInterpret(code) match { - case IR.Success => - repl.definedSymbolList filterNot old match { - case Nil => NoSymbol - case sym :: Nil => sym - case syms => NoSymbol.newOverloaded(NoPrefix, syms) - } - case _ => NoSymbol - } - } - - def asError(): Symbol = { - doInterpret(code) - NoSymbol - } - - beSilentDuring(asExpr()) orElse beSilentDuring(asDefn()) orElse asError() - } - -} diff --git a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala deleted file mode 100644 index 08cb0c0db12..00000000000 --- a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink - -import scala.collection.mutable -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter._ - -class FlinkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => - - override lazy val memberHandlers = new { - val intp: self.type = self - } with MemberHandlers { - import intp.global._ - - override def chooseHandler(member: intp.global.Tree): MemberHandler = member match { - case member: Import => new FlinkImportHandler(member) - case _ => super.chooseHandler(member) - } - - class FlinkImportHandler(imp: Import) extends ImportHandler(imp: Import) { - - override def targetType: Type = intp.global.rootMirror.getModuleIfDefined("" + expr) match { - case NoSymbol => intp.typeOfExpression("" + expr) - case sym => sym.tpe - } - - private def safeIndexOf(name: Name, s: String): Int = fixIndexOf(name, pos(name, s)) - private def fixIndexOf(name: Name, idx: Int): Int = if (idx == name.length) -1 else idx - private def pos(name: Name, s: String): Int = { - var i = name.pos(s.charAt(0), 0) - val sLen = s.length() - if (sLen == 1) return i - while (i + sLen <= name.length) { - var j = 1 - while (s.charAt(j) == name.charAt(i + j)) { - j += 1 - if (j == sLen) return i - } - i = name.pos(s.charAt(0), i + 1) - } - name.length - } - - private def isFlattenedSymbol(sym: Symbol): Boolean = - sym.owner.isPackageClass && - sym.name.containsName(nme.NAME_JOIN_STRING) && - sym.owner.info.member(sym.name.take( - safeIndexOf(sym.name, nme.NAME_JOIN_STRING))) != NoSymbol - - private def importableTargetMembers = - importableMembers(exitingTyper(targetType)).filterNot(isFlattenedSymbol).toList - - def isIndividualImport(s: ImportSelector): Boolean = - s.name != nme.WILDCARD && s.rename != nme.WILDCARD - def isWildcardImport(s: ImportSelector): Boolean = - s.name == nme.WILDCARD - - // non-wildcard imports - private def individualSelectors = selectors filter isIndividualImport - - override val importsWildcard: Boolean = selectors exists isWildcardImport - - lazy val importableSymbolsWithRenames: List[(Symbol, Name)] = { - val selectorRenameMap = - individualSelectors.flatMap(x => x.name.bothNames zip x.rename.bothNames).toMap - importableTargetMembers flatMap (m => selectorRenameMap.get(m.name) map (m -> _)) - } - - override lazy val individualSymbols: List[Symbol] = importableSymbolsWithRenames map (_._1) - override lazy val wildcardSymbols: List[Symbol] = - if (importsWildcard) importableTargetMembers else Nil - - } - - } - - object expressionTyper extends { - val repl: FlinkILoopInterpreter.this.type = self - } with FlinkExprTyper { } - - override def symbolOfLine(code: String): global.Symbol = - expressionTyper.symbolOfLine(code) - - override def typeOfExpression(expr: String, silent: Boolean): global.Type = - expressionTyper.typeOfExpression(expr, silent) - - - import global.Name - override def importsCode(wanted: Set[Name], wrapper: Request#Wrapper, - definesClass: Boolean, generousImports: Boolean): ComputedImports = { - - import global._ - import definitions.PredefModule - import memberHandlers._ - - val header, code, trailingBraces, accessPath = new StringBuilder - val currentImps = mutable.HashSet[Name]() - // only emit predef import header if name not resolved in history, loosely - var predefEscapes = false - - /** - * Narrow down the list of requests from which imports - * should be taken. Removes requests which cannot contribute - * useful imports for the specified set of wanted names. - */ - case class ReqAndHandler(req: Request, handler: MemberHandler) - - def reqsToUse: List[ReqAndHandler] = { - /** - * Loop through a list of MemberHandlers and select which ones to keep. - * 'wanted' is the set of names that need to be imported. - */ - def select(reqs: List[ReqAndHandler], wanted: Set[Name]): List[ReqAndHandler] = { - // Single symbol imports might be implicits! See bug #1752. Rather than - // try to finesse this, we will mimic all imports for now. - def keepHandler(handler: MemberHandler) = handler match { - // While defining classes in class based mode - implicits are not needed. - case h: ImportHandler if isClassBased && definesClass => - h.importedNames.exists(x => wanted.contains(x)) - case _: ImportHandler => true - case x if generousImports => x.definesImplicit || - (x.definedNames exists (d => wanted.exists(w => d.startsWith(w)))) - case x => x.definesImplicit || - (x.definedNames exists wanted) - } - - reqs match { - case Nil => - predefEscapes = wanted contains PredefModule.name ; Nil - case rh :: rest if !keepHandler(rh.handler) => select(rest, wanted) - case rh :: rest => - import rh.handler._ - val augment = rh match { - case ReqAndHandler(_, _: ImportHandler) => referencedNames - case _ => Nil - } - val newWanted = wanted ++ augment -- definedNames -- importedNames - rh :: select(rest, newWanted) - } - } - - /** Flatten the handlers out and pair each with the original request */ - select(allReqAndHandlers reverseMap { case (r, h) => ReqAndHandler(r, h) }, wanted).reverse - } - - // add code for a new object to hold some imports - def addWrapper() { - import nme.{INTERPRETER_IMPORT_WRAPPER => iw} - code append (wrapper.prewrap format iw) - trailingBraces append wrapper.postwrap - accessPath append s".$iw" - currentImps.clear() - } - - def maybeWrap(names: Name*) = if (names exists currentImps) addWrapper() - - def wrapBeforeAndAfter[T](op: => T): T = { - addWrapper() - try op finally addWrapper() - } - - // imports from Predef are relocated to the template header to allow hiding. - def checkHeader(h: ImportHandler) = h.referencedNames contains PredefModule.name - - // loop through previous requests, adding imports for each one - wrapBeforeAndAfter { - // Reusing a single temporary value when import from a line with multiple definitions. - val tempValLines = mutable.Set[Int]() - for (ReqAndHandler(req, handler) <- reqsToUse) { - val objName = req.lineRep.readPathInstance - handler match { - case h: ImportHandler if checkHeader(h) => - header.clear() - header append f"${h.member}%n" - // If the user entered an import, then just use it; add an import wrapping - // level if the import might conflict with some other import - case x: ImportHandler if x.importsWildcard => - wrapBeforeAndAfter(code append (x.member + "\n")) - case x: ImportHandler => - maybeWrap(x.importedNames: _*) - code append (x.member + "\n") - currentImps ++= x.importedNames - - case x if isClassBased => - for (sym <- x.definedSymbols) { - maybeWrap(sym.name) - x match { - case _: ClassHandler => - code.append(s"import ${objName}${req.accessPath}.`${sym.name}`\n") - case _ => - val valName = s"${req.lineRep.packageName}${req.lineRep.readName}" - if (!tempValLines.contains(req.lineRep.lineId)) { - code.append(s"val $valName: ${objName}.type = $objName\n") - tempValLines += req.lineRep.lineId - } - code.append(s"import ${valName}${req.accessPath}.`${sym.name}`\n") - } - currentImps += sym.name - } - // For other requests, import each defined name. - // import them explicitly instead of with _, so that - // ambiguity errors will not be generated. Also, quote - // the name of the variable, so that we don't need to - // handle quoting keywords separately. - case x => - for (sym <- x.definedSymbols) { - maybeWrap(sym.name) - code append s"import ${x.path}\n" - currentImps += sym.name - } - } - } - } - - val computedHeader = if (predefEscapes) header.toString else "" - ComputedImports(computedHeader, code.toString, trailingBraces.toString, accessPath.toString) - } - - private def allReqAndHandlers = - prevRequestList flatMap (req => req.handlers map (req -> _)) - -} diff --git a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala deleted file mode 100644 index fc40df029de..00000000000 --- a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink - -import java.io.File -import java.net.URLClassLoader -import java.util.Properties - -import org.apache.zeppelin.interpreter.InterpreterContext -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion - -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{IMain, JPrintWriter} - - -class FlinkScala211Interpreter(override val properties: Properties, - override val flinkScalaClassLoader: ClassLoader) - extends FlinkScalaInterpreter(properties, flinkScalaClassLoader) { - - override def completion(buf: String, - cursor: Int, - context: InterpreterContext): java.util.List[InterpreterCompletion] = { - val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates - .map(e => new InterpreterCompletion(e, e, null)) - scala.collection.JavaConversions.seqAsJavaList(completions) - } - - override def createIMain(settings: Settings, out: JPrintWriter): IMain = new FlinkILoopInterpreter(settings, out) - - override def createSettings(): Settings = { - val settings = new Settings() - // Don't call settings#embeddedDefaults for scala-2.11, otherwise it could cause weird error - settings.usejavacp.value = true - settings.Yreplsync.value = true - settings.classpath.value = userJars.mkString(File.pathSeparator) - settings - } -} diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml index 79b839fefb4..e0969cc6fee 100644 --- a/flink/flink-scala-parent/pom.xml +++ b/flink/flink-scala-parent/pom.xml @@ -35,13 +35,11 @@ flink - ${flink1.13.version} + ${flink1.17.version} ${hadoop2.7.version} 2.3.4 4.0.0 1.15.0 - - _${flink.scala.binary.version} https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${flink.scala.binary.version}.tgz @@ -55,18 +53,6 @@ ${project.version} - - org.apache.zeppelin - flink1.13-shims - ${project.version} - - - - org.apache.zeppelin - flink1.14-shims - ${project.version} - - org.apache.zeppelin flink1.15-shims @@ -153,14 +139,14 @@ org.apache.flink - flink-clients${flink.library.scala.suffix} + flink-clients ${flink.version} provided org.apache.flink - flink-yarn${flink.library.scala.suffix} + flink-yarn ${flink.version} provided @@ -195,7 +181,7 @@ org.apache.flink - flink-table-api-java-bridge${flink.library.scala.suffix} + flink-table-api-java-bridge ${flink.version} provided @@ -209,7 +195,7 @@ org.apache.flink - flink-streaming-java${flink.library.scala.suffix} + flink-streaming-java ${flink.version} provided @@ -874,86 +860,12 @@ - - - flink-113 - - ${flink1.13.version} - - - - org.apache.flink - flink-runtime_${flink.scala.binary.version} - ${flink.version} - provided - - - org.apache.flink - flink-table-runtime-blink_${flink.scala.binary.version} - ${flink.version} - provided - - - org.apache.flink - flink-table-planner_${flink.scala.binary.version} - ${flink.version} - provided - - - org.apache.flink - flink-table-planner-blink_${flink.scala.binary.version} - ${flink.version} - provided - - - org.reflections - reflections - - - - - org.apache.flink - flink-python_${flink.scala.binary.version} - ${flink.version} - provided - - - - - - flink-114 - - ${flink1.14.version} - - - - org.apache.flink - flink-runtime - ${flink.version} - provided - - - org.apache.flink - flink-table-planner_${flink.scala.binary.version} - ${flink.version} - provided - - - org.apache.flink - flink-python_${flink.scala.binary.version} - ${flink.version} - provided - - - - flink-115 ${flink1.15.version} 2.12.7 2.12 - @@ -983,7 +895,6 @@ ${flink1.16.version} 2.12.7 2.12 - @@ -1019,7 +930,6 @@ ${flink1.17.version} 2.12.7 2.12 - diff --git a/flink/flink1.13-shims/pom.xml b/flink/flink1.13-shims/pom.xml deleted file mode 100644 index 8f4765ed8cc..00000000000 --- a/flink/flink1.13-shims/pom.xml +++ /dev/null @@ -1,213 +0,0 @@ - - - - - - flink-parent - org.apache.zeppelin - 0.11.0-SNAPSHOT - ../pom.xml - - - 4.0.0 - org.apache.zeppelin - flink1.13-shims - 0.11.0-SNAPSHOT - jar - Zeppelin: Flink1.13 Shims - - - ${flink1.13.version} - - - - - - org.apache.zeppelin - flink-shims - ${project.version} - - - - org.apache.flink - flink-core - ${flink.version} - provided - - - - org.apache.flink - flink-clients_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-runtime_${flink.scala.binary.version} - ${flink.version} - provided - - - - - org.apache.flink - flink-table-api-scala_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-table-api-scala-bridge_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-table-api-java-bridge_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-scala_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-streaming-java_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-streaming-scala_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-java - ${flink.version} - provided - - - - org.apache.flink - flink-table-planner-blink_${flink.scala.binary.version} - ${flink.version} - provided - - - org.reflections - reflections - - - - - - org.apache.flink - flink-table-planner_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-python_${flink.scala.binary.version} - ${flink.version} - provided - - - - - - - - - net.alchim31.maven - scala-maven-plugin - - - eclipse-add-source - - add-source - - - - scala-compile-first - process-resources - - compile - - - - scala-test-compile-first - process-test-resources - - testCompile - - - - - ${flink.scala.version} - - -unchecked - -deprecation - -feature - -nobootcp - - - -Xms1024m - -Xmx1024m - -XX:MaxMetaspaceSize=${MaxMetaspace} - - - -source - ${java.version} - -target - ${java.version} - -Xlint:all,-serial,-path,-options - - - - - - maven-resources-plugin - - - copy-interpreter-setting - none - - true - - - - - - - - diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java deleted file mode 100644 index 440b2457964..00000000000 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.compress.utils.Lists; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.scala.DataSet; -import org.apache.flink.client.cli.CliFrontend; -import org.apache.flink.client.cli.CustomCommandLine; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; -import org.apache.flink.table.api.*; -import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; -import org.apache.flink.table.api.config.TableConfigOptions; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; -import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.delegation.PlannerFactory; -import org.apache.flink.table.factories.ComponentFactoryService; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.functions.TableAggregateFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.utils.PrintUtils; -import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; -import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims113.CollectStreamTableSink; -import org.apache.zeppelin.flink.shims113.Flink113ScalaShims; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.URL; -import java.time.ZoneId; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; - - -/** - * Shims for flink 1.13 - */ -public class Flink113Shims extends FlinkShims { - - private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class); - - private Flink113SqlInterpreter batchSqlInterpreter; - private Flink113SqlInterpreter streamSqlInterpreter; - - public Flink113Shims(FlinkVersion flinkVersion, Properties properties) { - super(flinkVersion, properties); - } - - public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.batchSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, true); - } - - public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.streamSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, false); - } - - @Override - public Object createResourceManager(List jars, Object tableConfig) { - return null; - } - - @Override - public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List jars) { - return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); - } - - @Override - public void disableSysoutLogging(Object batchConfig, Object streamConfig) { - // do nothing - } - - @Override - public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, - Object senvObj, - Object tableConfigObj, - Object moduleManagerObj, - Object functionCatalogObj, - Object catalogManagerObj, - List jars, - ClassLoader classLoader) { - EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; - StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; - TableConfig tableConfig = (TableConfig) tableConfigObj; - ModuleManager moduleManager = (ModuleManager) moduleManagerObj; - FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; - CatalogManager catalogManager = (CatalogManager) catalogManagerObj; - ImmutablePair pair = createPlannerAndExecutor( - classLoader, environmentSettings, senv, - tableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, - moduleManager, - functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), - planner, executor, environmentSettings.isStreamingMode(), classLoader); - } - - @Override - public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, - Object senvObj, - Object tableConfigObj, - Object moduleManagerObj, - Object functionCatalogObj, - Object catalogManagerObj, - List jars, - ClassLoader classLoader) { - EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; - StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; - TableConfig tableConfig = (TableConfig) tableConfigObj; - ModuleManager moduleManager = (ModuleManager) moduleManagerObj; - FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; - CatalogManager catalogManager = (CatalogManager) catalogManagerObj; - ImmutablePair pair = createPlannerAndExecutor( - classLoader, environmentSettings, senv, - tableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - return new StreamTableEnvironmentImpl(catalogManager, moduleManager, - functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader); - } - @Override - public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { - return new StreamExecutionEnvironmentFactory() { - @Override - public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) { - return (StreamExecutionEnvironment) streamExecutionEnvironment; - } - }; - } - - @Override - public Object createCatalogManager(Object config) { - return CatalogManager.newBuilder() - .classLoader(Thread.currentThread().getContextClassLoader()) - .config((ReadableConfig) config) - .defaultCatalog("default_catalog", - new GenericInMemoryCatalog("default_catalog", "default_database")) - .build(); - } - - @Override - public String getPyFlinkPythonPath(Properties properties) throws IOException { - String mode = properties.getProperty("flink.execution.mode"); - if ("yarn-application".equalsIgnoreCase(mode)) { - // for yarn application mode, FLINK_HOME is container working directory - String flinkHome = new File(".").getAbsolutePath(); - return getPyFlinkPythonPath(new File(flinkHome + "/lib/python")); - } - - String flinkHome = System.getenv("FLINK_HOME"); - if (StringUtils.isNotBlank(flinkHome)) { - return getPyFlinkPythonPath(new File(flinkHome + "/opt/python")); - } else { - throw new IOException("No FLINK_HOME is specified"); - } - } - - private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException { - LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder); - if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) { - throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder", - pyFlinkFolder.getAbsolutePath())); - } - List depFiles = Arrays.asList(pyFlinkFolder.listFiles()); - StringBuilder builder = new StringBuilder(); - for (File file : depFiles) { - LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath()); - builder.append(file.getAbsolutePath() + ":"); - } - return builder.toString(); - } - - @Override - public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { - return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer>) serializer); - } - - @Override - public List collectToList(Object table) throws Exception { - return Lists.newArrayList(((Table) table).execute().collect()); - } - - @Override - public boolean rowEquals(Object row1, Object row2) { - Row r1 = (Row) row1; - Row r2 = (Row) row2; - r1.setKind(RowKind.INSERT); - r2.setKind(RowKind.INSERT); - return r1.equals(r2); - } - - @Override - public Object fromDataSet(Object btenv, Object ds) { - return Flink113ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); - } - - @Override - public Object toDataSet(Object btenv, Object table) { - return Flink113ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); - } - - @Override - public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { - ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv) - .registerTableSinkInternal(tableName, (TableSink) collectTableSink); - } - - @Override - public void registerScalarFunction(Object btenv, String name, Object scalarFunction) { - ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction); - } - - @Override - public void registerTableFunction(Object btenv, String name, Object tableFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction); - } - - @Override - public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); - } - - @Override - public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); - } - - /** - * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager. - * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code. - * @param catalogManager - * @param parserObject - * @param environmentSetting - */ - @Override - public void setCatalogManagerSchemaResolver(Object catalogManager, - Object parserObject, - Object environmentSetting) { - - } - - @Override - public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { - CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); - try { - ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine)); - return effectiveConfig; - } catch (FlinkException e) { - throw new RuntimeException("Fail to call addAll", e); - } - } - - @Override - public void setBatchRuntimeMode(Object tableConfig) { - ((TableConfig) tableConfig).getConfiguration() - .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); - } - - @Override - public void setOldPlanner(Object tableConfig) { - ((TableConfig) tableConfig).getConfiguration() - .set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD); - } - - @Override - public String[] rowToString(Object row, Object table, Object tableConfig) { - final String zone = ((TableConfig) tableConfig).getConfiguration() - .get(TableConfigOptions.LOCAL_TIME_ZONE); - ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) - ? ZoneId.systemDefault() - : ZoneId.of(zone); - - ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema(); - return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId); - } - - public boolean isTimeIndicatorType(Object type) { - return FlinkTypeFactory.isTimeIndicatorType((TypeInformation) type); - } - - private Object lookupExecutor(ClassLoader classLoader, - Object settings, - Object sEnv) { - try { - Map executorProperties = ((EnvironmentSettings) settings).toExecutorProperties(); - ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); - Method createMethod = executorFactory.getClass() - .getMethod("create", Map.class, StreamExecutionEnvironment.class); - - return createMethod.invoke( - executorFactory, - executorProperties, - sEnv); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } - } - - @Override - public ImmutablePair createPlannerAndExecutor( - ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { - EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; - Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); - Map plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, (TableConfig) tableConfig, - (FunctionCatalog) functionCatalog, - (CatalogManager) catalogManager); - return ImmutablePair.of(planner, executor); - } - - @Override - public Object createBlinkPlannerEnvSettingBuilder() { - return EnvironmentSettings.newInstance().useBlinkPlanner(); - } - - @Override - public Object createOldPlannerEnvSettingBuilder() { - return EnvironmentSettings.newInstance().useOldPlanner(); - } - - @Override - public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { - if (isBatch) { - return batchSqlInterpreter.runSqlList(st, context); - } else { - return streamSqlInterpreter.runSqlList(st, context); - } - } -} diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java deleted file mode 100644 index fa01fac460a..00000000000 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java +++ /dev/null @@ -1,562 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.JobListener; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.SqlParserException; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.operations.*; -import org.apache.flink.table.operations.command.HelpOperation; -import org.apache.flink.table.operations.command.SetOperation; -import org.apache.flink.table.operations.ddl.*; -import org.apache.flink.table.utils.EncodingUtils; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.apache.flink.util.CollectionUtil; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.ZeppelinContext; -import org.apache.zeppelin.interpreter.util.SqlSplitter; -import org.jline.utils.AttributedString; -import org.jline.utils.AttributedStringBuilder; -import org.jline.utils.AttributedStyle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - -public class Flink113SqlInterpreter { - - private static final Logger LOGGER = LoggerFactory.getLogger(Flink113SqlInterpreter.class); - - private static final AttributedString MESSAGE_HELP = - new AttributedStringBuilder() - .append("The following commands are available:\n\n") - .append( - formatCommand( - "CREATE TABLE", - "Create table under current catalog and database.")) - .append( - formatCommand( - "DROP TABLE", - "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] ;'")) - .append( - formatCommand( - "CREATE VIEW", - "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW AS ;'")) - .append( - formatCommand( - "DESCRIBE", - "Describes the schema of a table with the given name.")) - .append( - formatCommand( - "DROP VIEW", - "Deletes a previously created virtual table. Syntax: 'DROP VIEW ;'")) - .append( - formatCommand( - "EXPLAIN", - "Describes the execution plan of a query or table with the given name.")) - .append(formatCommand("HELP", "Prints the available commands.")) - .append( - formatCommand( - "INSERT INTO", - "Inserts the results of a SQL SELECT query into a declared table sink.")) - .append( - formatCommand( - "INSERT OVERWRITE", - "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) - .append( - formatCommand( - "SELECT", "Executes a SQL SELECT query on the Flink cluster.")) - .append( - formatCommand( - "SET", - "Sets a session configuration property. Syntax: 'SET =;'. Use 'SET;' for listing all properties.")) - .append( - formatCommand( - "SHOW FUNCTIONS", - "Shows all user-defined and built-in functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'")) - .append(formatCommand("SHOW TABLES", "Shows all registered tables.")) - .append( - formatCommand( - "USE CATALOG", - "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG ;'")) - .append( - formatCommand( - "USE", - "Sets the current default database. Experimental! Syntax: 'USE ;'")) - .append( - formatCommand( - "LOAD MODULE", - "Load a module. Syntax: 'LOAD MODULE [WITH ('' = " - + "'' [, '' = '', ...])];'")) - .append( - formatCommand( - "UNLOAD MODULE", - "Unload a module. Syntax: 'UNLOAD MODULE ;'")) - .append( - formatCommand( - "USE MODULES", - "Enable loaded modules. Syntax: 'USE MODULES [, , ...];'")) - .append( - formatCommand( - "BEGIN STATEMENT SET", - "Begins a statement set. Syntax: 'BEGIN STATEMENT SET;'")) - .append(formatCommand("END", "Ends a statement set. Syntax: 'END;'")) - .style(AttributedStyle.DEFAULT.underline()) - .append("\nHint") - .style(AttributedStyle.DEFAULT) - .append( - ": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") - .toAttributedString(); - - private static AttributedString formatCommand(String cmd, String description) { - return new AttributedStringBuilder() - .style(AttributedStyle.DEFAULT.bold()) - .append(cmd) - .append("\t\t") - .style(AttributedStyle.DEFAULT) - .append(description) - .append('\n') - .toAttributedString(); - } - - private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit."; - - private FlinkSqlContext flinkSqlContext; - private TableEnvironment tbenv; - private ZeppelinContext z; - private Parser sqlParser; - private SqlSplitter sqlSplitter; - // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: - // 1. runAsOne= true - // 2. begin statement set; - // ... - // end; - private Map> statementOperationsMap = new HashMap<>(); - private boolean isBatch; - private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); - - - public Flink113SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) { - this.flinkSqlContext = flinkSqlContext; - this.isBatch = isBatch; - if (isBatch) { - this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv(); - } else { - this.tbenv = (TableEnvironment) flinkSqlContext.getStenv(); - } - this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext(); - this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser(); - this.sqlSplitter = new SqlSplitter(); - JobListener jobListener = new JobListener() { - @Override - public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - LOGGER.info("UnLock JobSubmitLock"); - } - } - - @Override - public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { - - } - }; - - ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener); - ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener); - } - - public InterpreterResult runSqlList(String st, InterpreterContext context) { - try { - boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); - if (runAsOne) { - statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); - } - - String jobName = context.getLocalProperties().get("jobName"); - if (StringUtils.isNotBlank(jobName)) { - tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); - } - - List sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList()); - for (String sql : sqls) { - List operations = null; - try { - operations = sqlParser.parse(sql); - } catch (SqlParserException e) { - context.out.write("%text Invalid Sql statement: " + sql + "\n"); - context.out.write(MESSAGE_HELP.toString()); - return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); - } - - try { - callOperation(sql, operations.get(0), context); - context.out.flush(); - } catch (Throwable e) { - LOGGER.error("Fail to run sql:" + sql, e); - try { - context.out.write("%text Fail to run sql command: " + - sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n"); - } catch (IOException ex) { - LOGGER.warn("Unexpected exception:", ex); - return new InterpreterResult(InterpreterResult.Code.ERROR, - ExceptionUtils.getStackTrace(e)); - } - return new InterpreterResult(InterpreterResult.Code.ERROR); - } - } - - if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) { - try { - lock.lock(); - List modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); - if (!modifyOperations.isEmpty()) { - callInserts(modifyOperations, context); - } - } catch (Exception e) { - LOGGER.error("Fail to execute sql as one job", e); - return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - } catch (Exception e) { - LOGGER.error("Fail to execute sql", e); - return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); - } finally { - statementOperationsMap.remove(context.getParagraphId()); - } - - return new InterpreterResult(InterpreterResult.Code.SUCCESS); - } - - private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException { - if (operation instanceof HelpOperation) { - callHelp(context); - } else if (operation instanceof SetOperation) { - callSet((SetOperation) operation, context); - } else if (operation instanceof CatalogSinkModifyOperation) { - // INSERT INTO/OVERWRITE - callInsert((CatalogSinkModifyOperation) operation, context); - } else if (operation instanceof QueryOperation) { - // SELECT - callSelect(sql, (QueryOperation) operation, context); - } else if (operation instanceof ExplainOperation) { - callExplain((ExplainOperation) operation, context); - } else if (operation instanceof BeginStatementSetOperation) { - callBeginStatementSet(context); - } else if (operation instanceof EndStatementSetOperation) { - callEndStatementSet(context); - } else if (operation instanceof ShowCatalogsOperation) { - callShowCatalogs(context); - } else if (operation instanceof ShowCurrentCatalogOperation) { - callShowCurrentCatalog(context); - } else if (operation instanceof UseCatalogOperation) { - callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context); - } else if (operation instanceof CreateCatalogOperation) { - callDDL(sql, context, "Catalog has been created."); - } else if (operation instanceof DropCatalogOperation) { - callDDL(sql, context, "Catalog has been dropped."); - } else if (operation instanceof UseDatabaseOperation) { - UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation; - callUseDatabase(useDBOperation.getDatabaseName(), context); - } else if (operation instanceof CreateDatabaseOperation) { - callDDL(sql, context, "Database has been created."); - } else if (operation instanceof DropDatabaseOperation) { - callDDL(sql, context, "Database has been removed."); - } else if (operation instanceof AlterDatabaseOperation) { - callDDL(sql, context, "Alter database succeeded!"); - } else if (operation instanceof ShowDatabasesOperation) { - callShowDatabases(context); - } else if (operation instanceof ShowCurrentDatabaseOperation) { - callShowCurrentDatabase(context); - } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) { - callDDL(sql, context, "Table has been created."); - } else if (operation instanceof AlterTableOperation) { - callDDL(sql, context, "Alter table succeeded!"); - } else if (operation instanceof DropTableOperation) { - callDDL(sql, context, "Table has been dropped."); - } else if (operation instanceof DescribeTableOperation) { - DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; - callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context); - } else if (operation instanceof ShowTablesOperation) { - callShowTables(context); - } else if (operation instanceof CreateViewOperation) { - callDDL(sql, context, "View has been created."); - } else if (operation instanceof DropViewOperation) { - callDDL(sql, context, "View has been dropped."); - } else if (operation instanceof AlterViewOperation) { - callDDL(sql, context, "Alter view succeeded!"); - } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) { - callDDL(sql, context, "Function has been created."); - } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) { - callDDL(sql, context, "Function has been removed."); - } else if (operation instanceof AlterCatalogFunctionOperation) { - callDDL(sql, context, "Alter function succeeded!"); - } else if (operation instanceof ShowFunctionsOperation) { - callShowFunctions(context); - } else if (operation instanceof ShowModulesOperation) { - callShowModules(context); - } else if (operation instanceof ShowPartitionsOperation) { - ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation; - callShowPartitions(showPartitionsOperation.asSummaryString(), context); - } else { - throw new IOException(operation.getClass().getName() + " is not supported"); - } - } - - private void callHelp(InterpreterContext context) throws IOException { - context.out.write(MESSAGE_HELP.toString()); - } - - private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException { - if (statementOperationsMap.containsKey(context.getParagraphId())) { - List modifyOperations = statementOperationsMap.get(context.getParagraphId()); - modifyOperations.add(operation); - } else { - callInserts(Collections.singletonList(operation), context); - } - } - - private void callInserts(List operations, InterpreterContext context) throws IOException { - if (!isBatch) { - context.getLocalProperties().put("flink.streaming.insert_into", "true"); - } - TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); - checkState(tableResult.getJobClient().isPresent()); - try { - tableResult.await(); - JobClient jobClient = tableResult.getJobClient().get(); - if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { - context.out.write("Insertion successfully.\n"); - } else { - throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); - } - } catch (InterruptedException e) { - throw new IOException("Flink job is interrupted", e); - } catch (ExecutionException e) { - throw new IOException("Flink job is failed", e); - } - } - - private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException { - try { - lock.lock(); - TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation); - String explanation = - Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); - context.out.write(explanation + "\n"); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException { - try { - lock.lock(); - if (isBatch) { - callBatchInnerSelect(sql, context); - } else { - callStreamInnerSelect(sql, context); - } - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException { - Table table = this.tbenv.sqlQuery(sql); - String result = z.showData(table); - context.out.write(result); - } - - public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException { - flinkSqlContext.getStreamSqlSelectConsumer().accept(sql); - } - - public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException { - if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { - // set a property - String key = setOperation.getKey().get().trim(); - String value = setOperation.getValue().get().trim(); - this.tbenv.getConfig().getConfiguration().setString(key, value); - LOGGER.info("Set table config: {}={}", key, value); - } else { - // show all properties - final Map properties = this.tbenv.getConfig().getConfiguration().toMap(); - List prettyEntries = new ArrayList<>(); - for (String key : properties.keySet()) { - prettyEntries.add( - String.format( - "'%s' = '%s'", - EncodingUtils.escapeSingleQuotes(key), - EncodingUtils.escapeSingleQuotes(properties.get(key)))); - } - prettyEntries.sort(String::compareTo); - prettyEntries.forEach(entry -> { - try { - context.out.write(entry + "\n"); - } catch (IOException e) { - LOGGER.warn("Fail to write output", e); - } - }); - } - } - - private void callBeginStatementSet(InterpreterContext context) throws IOException { - statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); - } - - private void callEndStatementSet(InterpreterContext context) throws IOException { - List modifyOperations = statementOperationsMap.get(context.getParagraphId()); - if (modifyOperations != null && !modifyOperations.isEmpty()) { - callInserts(modifyOperations, context); - } else { - context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); - } - } - - private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { - tbenv.executeSql("USE CATALOG `" + catalog + "`"); - } - - private void callUseDatabase(String databaseName, - InterpreterContext context) throws IOException { - this.tbenv.executeSql("USE `" + databaseName + "`"); - } - - private void callShowCatalogs(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs"); - List catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n"); - } - - private void callShowCurrentCatalog(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog"); - String catalog = tableResult.collect().next().getField(0).toString(); - context.out.write("%text current catalog: " + catalog + "\n"); - } - - private void callShowDatabases(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Databases"); - List databases = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table database\n" + StringUtils.join(databases, "\n") + "\n"); - } - - private void callShowCurrentDatabase(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Current Database"); - String database = tableResult.collect().next().getField(0).toString(); - context.out.write("%text current database: " + database + "\n"); - } - - private void callShowTables(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Tables"); - List tables = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .filter(tbl -> !tbl.startsWith("UnnamedTable")) - .collect(Collectors.toList()); - context.out.write( - "%table table\n" + StringUtils.join(tables, "\n") + "\n"); - } - - private void callShowFunctions(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Functions"); - List functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table function\n" + StringUtils.join(functions, "\n") + "\n"); - } - - private void callShowModules(InterpreterContext context) throws IOException { - String[] modules = this.tbenv.listModules(); - context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n"); - } - - private void callShowPartitions(String sql, InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql(sql); - List partions = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table partitions\n" + StringUtils.join(partions, "\n") + "\n"); - } - - private void callDDL(String sql, InterpreterContext context, String message) throws IOException { - try { - lock.lock(); - this.tbenv.executeSql(sql); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - context.out.write(message + "\n"); - } - - private void callDescribe(String name, InterpreterContext context) throws IOException { - TableResult tableResult = null; - try { - tableResult = tbenv.executeSql("DESCRIBE " + name); - } catch (Exception e) { - throw new IOException("Fail to describe table: " + name, e); - } - CloseableIterator result = tableResult.collect(); - StringBuilder builder = new StringBuilder(); - builder.append("Column\tType\n"); - while (result.hasNext()) { - Row row = result.next(); - builder.append(row.getField(0) + "\t" + row.getField(1) + "\n"); - } - context.out.write("%table\n" + builder); - } -} diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java deleted file mode 100644 index 3cbcc75d71c..00000000000 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims113; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.experimental.CollectSink; -import org.apache.flink.table.sinks.RetractStreamTableSink; -import org.apache.flink.types.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.util.UUID; - -/** - * Table sink for collecting the results locally using sockets. - */ -public class CollectStreamTableSink implements RetractStreamTableSink { - - private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); - - private final InetAddress targetAddress; - private final int targetPort; - private final TypeSerializer> serializer; - - private String[] fieldNames; - private TypeInformation[] fieldTypes; - - public CollectStreamTableSink(InetAddress targetAddress, - int targetPort, - TypeSerializer> serializer) { - LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); - this.targetAddress = targetAddress; - this.targetPort = targetPort; - this.serializer = serializer; - } - - @Override - public String[] getFieldNames() { - return fieldNames; - } - - @Override - public TypeInformation[] getFieldTypes() { - return fieldTypes; - } - - @Override - public CollectStreamTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { - final CollectStreamTableSink copy = - new CollectStreamTableSink(targetAddress, targetPort, serializer); - copy.fieldNames = fieldNames; - copy.fieldTypes = fieldTypes; - return copy; - } - - @Override - public TypeInformation getRecordType() { - return Types.ROW_NAMED(fieldNames, fieldTypes); - } - - @Override - public DataStreamSink consumeDataStream(DataStream> stream) { - // add sink - return stream - .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) - .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) - .setParallelism(1); - } - - @Override - public TupleTypeInfo> getOutputType() { - return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); - } -} diff --git a/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala b/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala deleted file mode 100644 index 10250b0d077..00000000000 --- a/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims113 - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.table.api.Table -import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment -import org.apache.flink.types.Row - -object Flink113ScalaShims { - - def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = { - btenv.fromDataSet(ds) - } - - def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = { - btenv.toDataSet[Row](table) - } -} diff --git a/flink/flink1.14-shims/pom.xml b/flink/flink1.14-shims/pom.xml deleted file mode 100644 index 8e7246f3d71..00000000000 --- a/flink/flink1.14-shims/pom.xml +++ /dev/null @@ -1,206 +0,0 @@ - - - - - - flink-parent - org.apache.zeppelin - 0.11.0-SNAPSHOT - ../pom.xml - - - 4.0.0 - org.apache.zeppelin - flink1.14-shims - 0.11.0-SNAPSHOT - jar - Zeppelin: Flink1.14 Shims - - - ${flink1.14.version} - - - - - - org.apache.zeppelin - flink-shims - ${project.version} - - - - org.apache.flink - flink-core - ${flink.version} - provided - - - - org.apache.flink - flink-clients_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-runtime - ${flink.version} - provided - - - - org.apache.flink - flink-table-api-scala_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-table-api-scala-bridge_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-table-api-java-bridge_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-scala_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-streaming-java_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-streaming-scala_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-java - ${flink.version} - provided - - - - org.apache.flink - flink-table-api-java - ${flink.version} - provided - - - - org.apache.flink - flink-table-planner_${flink.scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-python_${flink.scala.binary.version} - ${flink.version} - provided - - - - - - - - - net.alchim31.maven - scala-maven-plugin - - - eclipse-add-source - - add-source - - - - scala-compile-first - process-resources - - compile - - - - scala-test-compile-first - process-test-resources - - testCompile - - - - - ${flink.scala.version} - - -unchecked - -deprecation - -feature - -nobootcp - - - -Xms1024m - -Xmx1024m - -XX:MaxMetaspaceSize=${MaxMetaspace} - - - -source - ${java.version} - -target - ${java.version} - -Xlint:all,-serial,-path,-options - - - - - - maven-resources-plugin - - - copy-interpreter-setting - none - - true - - - - - - - - diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java deleted file mode 100644 index 475be0da7e9..00000000000 --- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java +++ /dev/null @@ -1,378 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.compress.utils.Lists; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.client.cli.CliFrontend; -import org.apache.flink.client.cli.CustomCommandLine; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; -import org.apache.flink.table.api.*; -import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.api.config.TableConfigOptions; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; -import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.factories.PlannerFactoryUtil; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.functions.TableAggregateFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.utils.PrintUtils; -import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; -import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims114.CollectStreamTableSink; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.URL; -import java.time.ZoneId; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - - -/** - * Shims for flink 1.14 - */ -public class Flink114Shims extends FlinkShims { - - private static final Logger LOGGER = LoggerFactory.getLogger(Flink114Shims.class); - - private Flink114SqlInterpreter batchSqlInterpreter; - private Flink114SqlInterpreter streamSqlInterpreter; - - public Flink114Shims(FlinkVersion flinkVersion, Properties properties) { - super(flinkVersion, properties); - } - - public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.batchSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, true); - } - - public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.streamSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, false); - } - - @Override - public Object createResourceManager(List jars, Object tableConfig) { - return null; - } - - @Override - public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List jars) { - return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); - } - - @Override - public void disableSysoutLogging(Object batchConfig, Object streamConfig) { - // do nothing - } - - @Override - public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, - Object senvObj, - Object tableConfigObj, - Object moduleManagerObj, - Object functionCatalogObj, - Object catalogManagerObj, - List jars, - ClassLoader classLoader) { - EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; - StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; - TableConfig tableConfig = (TableConfig) tableConfigObj; - ModuleManager moduleManager = (ModuleManager) moduleManagerObj; - FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; - CatalogManager catalogManager = (CatalogManager) catalogManagerObj; - ImmutablePair pair = createPlannerAndExecutor( - classLoader, environmentSettings, senv, - tableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, - moduleManager, - functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), - planner, executor, environmentSettings.isStreamingMode(), classLoader); - } - - @Override - public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, - Object senvObj, - Object tableConfigObj, - Object moduleManagerObj, - Object functionCatalogObj, - Object catalogManagerObj, - List jars, - ClassLoader classLoader) { - EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; - StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; - TableConfig tableConfig = (TableConfig) tableConfigObj; - ModuleManager moduleManager = (ModuleManager) moduleManagerObj; - FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; - CatalogManager catalogManager = (CatalogManager) catalogManagerObj; - ImmutablePair pair = createPlannerAndExecutor( - classLoader, environmentSettings, senv, - tableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - return new StreamTableEnvironmentImpl(catalogManager, moduleManager, - functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader); - } - - @Override - public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { - return new StreamExecutionEnvironmentFactory() { - @Override - public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) { - return (StreamExecutionEnvironment) streamExecutionEnvironment; - } - }; - } - - @Override - public Object createCatalogManager(Object config) { - return CatalogManager.newBuilder() - .classLoader(Thread.currentThread().getContextClassLoader()) - .config((ReadableConfig) config) - .defaultCatalog("default_catalog", - new GenericInMemoryCatalog("default_catalog", "default_database")) - .build(); - } - - @Override - public String getPyFlinkPythonPath(Properties properties) throws IOException { - String mode = properties.getProperty("flink.execution.mode"); - if ("yarn-application".equalsIgnoreCase(mode)) { - // for yarn application mode, FLINK_HOME is container working directory - String flinkHome = new File(".").getAbsolutePath(); - return getPyFlinkPythonPath(new File(flinkHome + "/lib/python")); - } - - String flinkHome = System.getenv("FLINK_HOME"); - if (StringUtils.isNotBlank(flinkHome)) { - return getPyFlinkPythonPath(new File(flinkHome + "/opt/python")); - } else { - throw new IOException("No FLINK_HOME is specified"); - } - } - - private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException { - LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder); - if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) { - throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder", - pyFlinkFolder.getAbsolutePath())); - } - List depFiles = Arrays.asList(pyFlinkFolder.listFiles()); - StringBuilder builder = new StringBuilder(); - for (File file : depFiles) { - LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath()); - builder.append(file.getAbsolutePath() + ":"); - } - return builder.toString(); - } - - @Override - public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { - return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer>) serializer); - } - - @Override - public List collectToList(Object table) throws Exception { - return Lists.newArrayList(((Table) table).execute().collect()); - } - - @Override - public boolean rowEquals(Object row1, Object row2) { - Row r1 = (Row) row1; - Row r2 = (Row) row2; - r1.setKind(RowKind.INSERT); - r2.setKind(RowKind.INSERT); - return r1.equals(r2); - } - - @Override - public Object fromDataSet(Object btenv, Object ds) { - return null; - //return Flink114ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); - } - - @Override - public Object toDataSet(Object btenv, Object table) { - return null; - //return Flink114ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); - } - - @Override - public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { - ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv) - .registerTableSinkInternal(tableName, (TableSink) collectTableSink); - } - - @Override - public void registerScalarFunction(Object btenv, String name, Object scalarFunction) { - ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction); - } - - @Override - public void registerTableFunction(Object btenv, String name, Object tableFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction); - } - - @Override - public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); - } - - @Override - public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); - } - - /** - * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager. - * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code. - * @param catalogManager - * @param parserObject - * @param environmentSetting - */ - @Override - public void setCatalogManagerSchemaResolver(Object catalogManager, - Object parserObject, - Object environmentSetting) { - - } - - @Override - public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { - CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); - try { - ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine)); - return effectiveConfig; - } catch (FlinkException e) { - throw new RuntimeException("Fail to call addAll", e); - } - } - - @Override - public void setBatchRuntimeMode(Object tableConfig) { - ((TableConfig) tableConfig).getConfiguration() - .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); - } - - @Override - public void setOldPlanner(Object tableConfig) { - ((TableConfig) tableConfig).getConfiguration() - .set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD); - } - - @Override - public String[] rowToString(Object row, Object table, Object tableConfig) { - final String zone = ((TableConfig) tableConfig).getConfiguration() - .get(TableConfigOptions.LOCAL_TIME_ZONE); - ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) - ? ZoneId.systemDefault() - : ZoneId.of(zone); - - ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema(); - return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId); - } - - @Override - public boolean isTimeIndicatorType(Object type) { - return FlinkTypeFactory.isTimeIndicatorType((TypeInformation) type); - } - - private Object lookupExecutor(ClassLoader classLoader, - Object settings, - Object sEnv) { - try { - final ExecutorFactory executorFactory = - FactoryUtil.discoverFactory( - classLoader, ExecutorFactory.class, ((EnvironmentSettings) settings).getExecutor()); - final Method createMethod = - executorFactory - .getClass() - .getMethod("create", StreamExecutionEnvironment.class); - - return createMethod.invoke(executorFactory, sEnv); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } - } - - @Override - public ImmutablePair createPlannerAndExecutor( - ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { - EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; - Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv); - Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor, - (TableConfig) tableConfig, - (CatalogManager) catalogManager, - (FunctionCatalog) functionCatalog); - return ImmutablePair.of(planner, executor); - } - - @Override - public Object createBlinkPlannerEnvSettingBuilder() { - return EnvironmentSettings.newInstance().useBlinkPlanner(); - } - - @Override - public Object createOldPlannerEnvSettingBuilder() { - return EnvironmentSettings.newInstance().useOldPlanner(); - } - - public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { - if (isBatch) { - return batchSqlInterpreter.runSqlList(st, context); - } else { - return streamSqlInterpreter.runSqlList(st, context); - } - } -} diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java deleted file mode 100644 index eb0d6848010..00000000000 --- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java +++ /dev/null @@ -1,590 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.JobListener; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.SqlParserException; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.operations.*; -import org.apache.flink.table.operations.command.HelpOperation; -import org.apache.flink.table.operations.command.SetOperation; -import org.apache.flink.table.operations.ddl.*; -import org.apache.flink.table.utils.EncodingUtils; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.apache.flink.util.CollectionUtil; -import org.apache.flink.util.Preconditions; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.ZeppelinContext; -import org.apache.zeppelin.interpreter.util.SqlSplitter; -import org.jline.utils.AttributedString; -import org.jline.utils.AttributedStringBuilder; -import org.jline.utils.AttributedStyle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - -public class Flink114SqlInterpreter { - - protected static final Logger LOGGER = LoggerFactory.getLogger(Flink114SqlInterpreter.class); - - private static final String CMD_DESC_DELIMITER = "\t\t"; - - /** - * SQL Client HELP command helper class. - */ - private static final class SQLCliCommandsDescriptions { - private int commandMaxLength; - private final Map commandsDescriptions; - - public SQLCliCommandsDescriptions() { - this.commandsDescriptions = new LinkedHashMap<>(); - this.commandMaxLength = -1; - } - - public SQLCliCommandsDescriptions commandDescription(String command, String description) { - Preconditions.checkState( - StringUtils.isNotBlank(command), "content of command must not be empty."); - Preconditions.checkState( - StringUtils.isNotBlank(description), - "content of command's description must not be empty."); - this.updateMaxCommandLength(command.length()); - this.commandsDescriptions.put(command, description); - return this; - } - - private void updateMaxCommandLength(int newLength) { - Preconditions.checkState(newLength > 0); - if (this.commandMaxLength < newLength) { - this.commandMaxLength = newLength; - } - } - - public AttributedString build() { - AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder(); - if (!this.commandsDescriptions.isEmpty()) { - this.commandsDescriptions.forEach( - (cmd, cmdDesc) -> { - attributedStringBuilder - .style(AttributedStyle.DEFAULT.bold()) - .append( - String.format( - String.format("%%-%ds", commandMaxLength), cmd)) - .append(CMD_DESC_DELIMITER) - .style(AttributedStyle.DEFAULT) - .append(cmdDesc) - .append('\n'); - }); - } - return attributedStringBuilder.toAttributedString(); - } - } - - private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS = - new SQLCliCommandsDescriptions() - .commandDescription("HELP", "Prints the available commands.") - .commandDescription( - "SET", - "Sets a session configuration property. Syntax: \"SET ''='';\". Use \"SET;\" for listing all properties.") - .commandDescription( - "RESET", - "Resets a session configuration property. Syntax: \"RESET '';\". Use \"RESET;\" for reset all session properties.") - .commandDescription( - "INSERT INTO", - "Inserts the results of a SQL SELECT query into a declared table sink.") - .commandDescription( - "INSERT OVERWRITE", - "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.") - .commandDescription( - "SELECT", "Executes a SQL SELECT query on the Flink cluster.") - .commandDescription( - "EXPLAIN", - "Describes the execution plan of a query or table with the given name.") - .commandDescription( - "BEGIN STATEMENT SET", - "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"") - .commandDescription("END", "Ends a statement set. Syntax: \"END;\"") - // (TODO) zjffdu, ADD/REMOVE/SHOW JAR - .build(); - - // -------------------------------------------------------------------------------------------- - - public static final AttributedString MESSAGE_HELP = - new AttributedStringBuilder() - .append("The following commands are available:\n\n") - .append(SQL_CLI_COMMANDS_DESCRIPTIONS) - .style(AttributedStyle.DEFAULT.underline()) - .append("\nHint") - .style(AttributedStyle.DEFAULT) - .append( - ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.") - // About Documentation Link. - .style(AttributedStyle.DEFAULT) - .append( - "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.") - .toAttributedString(); - - private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit."; - - private FlinkSqlContext flinkSqlContext; - private TableEnvironment tbenv; - private ZeppelinContext z; - private Parser sqlParser; - private SqlSplitter sqlSplitter; - // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: - // 1. runAsOne= true - // 2. begin statement set; - // ... - // end; - private Map> statementOperationsMap = new HashMap<>(); - private boolean isBatch; - private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); - - - public Flink114SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) { - this.flinkSqlContext = flinkSqlContext; - this.isBatch = isBatch; - if (isBatch) { - this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv(); - } else { - this.tbenv = (TableEnvironment) flinkSqlContext.getStenv(); - } - this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext(); - this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser(); - this.sqlSplitter = new SqlSplitter(); - JobListener jobListener = new JobListener() { - @Override - public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - LOGGER.info("UnLock JobSubmitLock"); - } - } - - @Override - public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { - - } - }; - - ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener); - ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener); - } - - public InterpreterResult runSqlList(String st, InterpreterContext context) { - try { - boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); - if (runAsOne) { - statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); - } - - String jobName = context.getLocalProperties().get("jobName"); - if (StringUtils.isNotBlank(jobName)) { - tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); - } - - List sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList()); - for (String sql : sqls) { - List operations = null; - try { - operations = sqlParser.parse(sql); - } catch (SqlParserException e) { - context.out.write("%text Invalid Sql statement: " + sql + "\n"); - context.out.write(MESSAGE_HELP.toString()); - return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); - } - - try { - callOperation(sql, operations.get(0), context); - context.out.flush(); - } catch (Throwable e) { - LOGGER.error("Fail to run sql:" + sql, e); - try { - context.out.write("%text Fail to run sql command: " + - sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n"); - } catch (IOException ex) { - LOGGER.warn("Unexpected exception:", ex); - return new InterpreterResult(InterpreterResult.Code.ERROR, - ExceptionUtils.getStackTrace(e)); - } - return new InterpreterResult(InterpreterResult.Code.ERROR); - } - } - - if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) { - try { - lock.lock(); - List modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); - if (!modifyOperations.isEmpty()) { - callInserts(modifyOperations, context); - } - } catch (Exception e) { - LOGGER.error("Fail to execute sql as one job", e); - return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - } catch (Exception e) { - LOGGER.error("Fail to execute sql", e); - return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); - } finally { - statementOperationsMap.remove(context.getParagraphId()); - } - - return new InterpreterResult(InterpreterResult.Code.SUCCESS); - } - - private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException { - if (operation instanceof HelpOperation) { - // HELP - callHelp(context); - } else if (operation instanceof SetOperation) { - // SET - callSet((SetOperation) operation, context); - } else if (operation instanceof CatalogSinkModifyOperation) { - // INSERT INTO/OVERWRITE - callInsert((CatalogSinkModifyOperation) operation, context); - } else if (operation instanceof QueryOperation) { - // SELECT - callSelect(sql, (QueryOperation) operation, context); - } else if (operation instanceof ExplainOperation) { - // EXPLAIN - callExplain((ExplainOperation) operation, context); - } else if (operation instanceof BeginStatementSetOperation) { - // BEGIN STATEMENT SET - callBeginStatementSet(context); - } else if (operation instanceof EndStatementSetOperation) { - // END - callEndStatementSet(context); - } else if (operation instanceof ShowCreateTableOperation) { - // SHOW CREATE TABLE - callShowCreateTable((ShowCreateTableOperation) operation, context); - } else if (operation instanceof ShowCatalogsOperation) { - callShowCatalogs(context); - } else if (operation instanceof ShowCurrentCatalogOperation) { - callShowCurrentCatalog(context); - } else if (operation instanceof UseCatalogOperation) { - callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context); - } else if (operation instanceof CreateCatalogOperation) { - callDDL(sql, context, "Catalog has been created."); - } else if (operation instanceof DropCatalogOperation) { - callDDL(sql, context, "Catalog has been dropped."); - } else if (operation instanceof UseDatabaseOperation) { - UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation; - callUseDatabase(useDBOperation.getDatabaseName(), context); - } else if (operation instanceof CreateDatabaseOperation) { - callDDL(sql, context, "Database has been created."); - } else if (operation instanceof DropDatabaseOperation) { - callDDL(sql, context, "Database has been removed."); - } else if (operation instanceof AlterDatabaseOperation) { - callDDL(sql, context, "Alter database succeeded!"); - } else if (operation instanceof ShowDatabasesOperation) { - callShowDatabases(context); - } else if (operation instanceof ShowCurrentDatabaseOperation) { - callShowCurrentDatabase(context); - } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) { - callDDL(sql, context, "Table has been created."); - } else if (operation instanceof AlterTableOperation) { - callDDL(sql, context, "Alter table succeeded!"); - } else if (operation instanceof DropTableOperation) { - callDDL(sql, context, "Table has been dropped."); - } else if (operation instanceof DescribeTableOperation) { - DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; - callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context); - } else if (operation instanceof ShowTablesOperation) { - callShowTables(context); - } else if (operation instanceof CreateViewOperation) { - callDDL(sql, context, "View has been created."); - } else if (operation instanceof DropViewOperation) { - callDDL(sql, context, "View has been dropped."); - } else if (operation instanceof AlterViewOperation) { - callDDL(sql, context, "Alter view succeeded!"); - } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) { - callDDL(sql, context, "Function has been created."); - } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) { - callDDL(sql, context, "Function has been removed."); - } else if (operation instanceof AlterCatalogFunctionOperation) { - callDDL(sql, context, "Alter function succeeded!"); - } else if (operation instanceof ShowFunctionsOperation) { - callShowFunctions(context); - } else if (operation instanceof ShowModulesOperation) { - callShowModules(context); - } else if (operation instanceof ShowPartitionsOperation) { - ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation; - callShowPartitions(showPartitionsOperation.asSummaryString(), context); - } else { - throw new IOException(operation.getClass().getName() + " is not supported"); - } - } - - - private void callHelp(InterpreterContext context) throws IOException { - context.out.write(MESSAGE_HELP.toString() + "\n"); - } - - private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException { - if (statementOperationsMap.containsKey(context.getParagraphId())) { - List modifyOperations = statementOperationsMap.get(context.getParagraphId()); - modifyOperations.add(operation); - } else { - callInserts(Collections.singletonList(operation), context); - } - } - - private void callInserts(List operations, InterpreterContext context) throws IOException { - if (!isBatch) { - context.getLocalProperties().put("flink.streaming.insert_into", "true"); - } - TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); - checkState(tableResult.getJobClient().isPresent()); - try { - tableResult.await(); - JobClient jobClient = tableResult.getJobClient().get(); - if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { - context.out.write("Insertion successfully.\n"); - } else { - throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); - } - } catch (InterruptedException e) { - throw new IOException("Flink job is interrupted", e); - } catch (ExecutionException e) { - throw new IOException("Flink job is failed", e); - } - } - - private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException { - try { - lock.lock(); - TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation); - String explanation = - Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); - context.out.write(explanation + "\n"); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException { - try { - lock.lock(); - TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation); - String explanation = - Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); - context.out.write(explanation + "\n"); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException { - try { - lock.lock(); - if (isBatch) { - callBatchInnerSelect(sql, context); - } else { - callStreamInnerSelect(sql, context); - } - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException { - Table table = this.tbenv.sqlQuery(sql); - String result = z.showData(table); - context.out.write(result); - } - - public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException { - flinkSqlContext.getStreamSqlSelectConsumer().accept(sql); - } - - public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException { - if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { - // set a property - String key = setOperation.getKey().get().trim(); - String value = setOperation.getValue().get().trim(); - this.tbenv.getConfig().getConfiguration().setString(key, value); - LOGGER.info("Set table config: {}={}", key, value); - } else { - // show all properties - final Map properties = this.tbenv.getConfig().getConfiguration().toMap(); - List prettyEntries = new ArrayList<>(); - for (String key : properties.keySet()) { - prettyEntries.add( - String.format( - "'%s' = '%s'", - EncodingUtils.escapeSingleQuotes(key), - EncodingUtils.escapeSingleQuotes(properties.get(key)))); - } - prettyEntries.sort(String::compareTo); - prettyEntries.forEach(entry -> { - try { - context.out.write(entry + "\n"); - } catch (IOException e) { - LOGGER.warn("Fail to write output", e); - } - }); - } - } - - private void callBeginStatementSet(InterpreterContext context) throws IOException { - statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); - } - - private void callEndStatementSet(InterpreterContext context) throws IOException { - List modifyOperations = statementOperationsMap.get(context.getParagraphId()); - if (modifyOperations != null && !modifyOperations.isEmpty()) { - callInserts(modifyOperations, context); - } else { - context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); - } - } - - private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { - tbenv.executeSql("USE CATALOG `" + catalog + "`"); - } - - private void callUseDatabase(String databaseName, - InterpreterContext context) throws IOException { - this.tbenv.executeSql("USE `" + databaseName + "`"); - } - - private void callShowCatalogs(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs"); - List catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n"); - } - - private void callShowCurrentCatalog(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog"); - String catalog = tableResult.collect().next().getField(0).toString(); - context.out.write("%text current catalog: " + catalog + "\n"); - } - - private void callShowDatabases(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Databases"); - List databases = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table database\n" + StringUtils.join(databases, "\n") + "\n"); - } - - private void callShowCurrentDatabase(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Current Database"); - String database = tableResult.collect().next().getField(0).toString(); - context.out.write("%text current database: " + database + "\n"); - } - - private void callShowTables(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Tables"); - List tables = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .filter(tbl -> !tbl.startsWith("UnnamedTable")) - .collect(Collectors.toList()); - context.out.write( - "%table table\n" + StringUtils.join(tables, "\n") + "\n"); - } - - private void callShowFunctions(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Functions"); - List functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table function\n" + StringUtils.join(functions, "\n") + "\n"); - } - - private void callShowModules(InterpreterContext context) throws IOException { - String[] modules = this.tbenv.listModules(); - context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n"); - } - - private void callShowPartitions(String sql, InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql(sql); - List partions = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table partitions\n" + StringUtils.join(partions, "\n") + "\n"); - } - - private void callDDL(String sql, InterpreterContext context, String message) throws IOException { - try { - lock.lock(); - this.tbenv.executeSql(sql); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - context.out.write(message + "\n"); - } - - private void callDescribe(String name, InterpreterContext context) throws IOException { - TableResult tableResult = null; - try { - tableResult = tbenv.executeSql("DESCRIBE " + name); - } catch (Exception e) { - throw new IOException("Fail to describe table: " + name, e); - } - CloseableIterator result = tableResult.collect(); - StringBuilder builder = new StringBuilder(); - builder.append("Column\tType\n"); - while (result.hasNext()) { - Row row = result.next(); - builder.append(row.getField(0) + "\t" + row.getField(1) + "\n"); - } - context.out.write("%table\n" + builder.toString()); - } -} diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java deleted file mode 100644 index 7a224e1dd03..00000000000 --- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims114; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.experimental.CollectSink; -import org.apache.flink.table.sinks.RetractStreamTableSink; -import org.apache.flink.types.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.util.UUID; - -/** - * Table sink for collecting the results locally using sockets. - */ -public class CollectStreamTableSink implements RetractStreamTableSink { - - private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); - - private final InetAddress targetAddress; - private final int targetPort; - private final TypeSerializer> serializer; - - private String[] fieldNames; - private TypeInformation[] fieldTypes; - - public CollectStreamTableSink(InetAddress targetAddress, - int targetPort, - TypeSerializer> serializer) { - LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); - this.targetAddress = targetAddress; - this.targetPort = targetPort; - this.serializer = serializer; - } - - @Override - public String[] getFieldNames() { - return fieldNames; - } - - @Override - public TypeInformation[] getFieldTypes() { - return fieldTypes; - } - - @Override - public CollectStreamTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { - final CollectStreamTableSink copy = - new CollectStreamTableSink(targetAddress, targetPort, serializer); - copy.fieldNames = fieldNames; - copy.fieldTypes = fieldTypes; - return copy; - } - - @Override - public TypeInformation getRecordType() { - return Types.ROW_NAMED(fieldNames, fieldTypes); - } - - @Override - public DataStreamSink consumeDataStream(DataStream> stream) { - // add sink - return stream - .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) - .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) - .setParallelism(1); - } - - @Override - public TupleTypeInfo> getOutputType() { - return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); - } -} diff --git a/flink/pom.xml b/flink/pom.xml index 428ab57c890..3ece07f5687 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -36,27 +36,26 @@ flink-scala-parent flink-shims - flink1.13-shims - flink1.14-shims flink1.15-shims flink1.16-shims flink1.17-shims - 1.13.2 - 1.14.0 1.15.1 1.16.0 1.17.1 - 2.11.12 - 2.11 + 2.12.7 + 2.12 flink-117 + + true + flink-scala-2.12 @@ -78,26 +77,6 @@ flink-scala-2.12 - - - flink-114 - - flink-scala-2.11 - flink-scala-2.12 - - - - - flink-113 - - true - - - flink-scala-2.11 - flink-scala-2.12 - - - diff --git a/testing/env_python_3_with_flink_113.yml b/testing/env_python_3_with_flink_113.yml deleted file mode 100644 index be1eec019db..00000000000 --- a/testing/env_python_3_with_flink_113.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: python_3_with_flink -channels: - - conda-forge - - defaults -dependencies: - - pycodestyle - - scipy - - numpy=1.19.5 - - grpcio - - protobuf - - pandasql - - ipython - - ipython_genutils - - ipykernel - - jupyter_client=5 - - hvplot - - holoviews=1.16 - - plotnine - - seaborn - - intake - - intake-parquet - - intake-xarray - - altair - - vega_datasets - - plotly - - jinja2=3.0.3 - - pip - - pip: - - apache-flink==1.13.1 - diff --git a/testing/env_python_3_with_flink_114.yml b/testing/env_python_3_with_flink_114.yml deleted file mode 100644 index b04a8319097..00000000000 --- a/testing/env_python_3_with_flink_114.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: python_3_with_flink -channels: - - conda-forge - - defaults -dependencies: - - pycodestyle - - scipy - - numpy=1.19.5 - - grpcio - - protobuf - - pandasql - - ipython - - ipython_genutils - - ipykernel - - jupyter_client=5 - - hvplot - - holoviews=1.16 - - plotnine - - seaborn - - intake - - intake-parquet - - intake-xarray - - altair - - vega_datasets - - plotly - - jinja2=3.0.3 - - pip - - pip: - - apache-flink==1.14.0 - diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java index 89aa2b0f1d4..68320512e62 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java @@ -69,7 +69,7 @@ public static void setUp() throws Exception { notebook = TestUtils.getInstance(Notebook.class); sparkHome = DownloadUtils.downloadSpark("3.4.1", "3"); - flinkHome = DownloadUtils.downloadFlink("1.13.2", "2.12"); + flinkHome = DownloadUtils.downloadFlink("1.17.1", "2.12"); } @AfterAll diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java index bf07fcf28a8..8cbbdacd279 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java @@ -83,6 +83,25 @@ public static String downloadFlink(String flinkVersion, String scalaVersion) { runShellCommand(new String[]{"wget", "https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop2-uber/2.7.5-1.8.1/flink-shaded-hadoop2-uber-2.7.5-1.8.1.jar", "-P", targetFlinkHomeFolder + "/lib"}); + runShellCommand(new String[]{"wget", + "https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_" + scalaVersion + "/" + + flinkVersion + "/flink-table-api-scala_" + scalaVersion + "-" + flinkVersion + ".jar", + "-P", targetFlinkHomeFolder + "/lib"}); + runShellCommand(new String[]{"wget", + "https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_" + scalaVersion + "/" + + flinkVersion + "/flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar", + "-P", targetFlinkHomeFolder + "/lib"}); + runShellCommand(new String[]{"mv", + targetFlinkHomeFolder + "/opt/" + "flink-table-planner_" + scalaVersion + "-" + flinkVersion + ".jar", + targetFlinkHomeFolder + "/lib"}); + runShellCommand(new String[]{"mv", + targetFlinkHomeFolder + "/lib/" + "flink-table-planner-loader-" + flinkVersion + ".jar", + targetFlinkHomeFolder + "/opt"}); + if (SemanticVersion.of(flinkVersion).equalsOrNewerThan(SemanticVersion.of("1.16.0"))) { + runShellCommand(new String[]{"mv", + targetFlinkHomeFolder + "/opt/" + "flink-sql-client-" + flinkVersion + ".jar", + targetFlinkHomeFolder + "/lib"}); + } } catch (Exception e) { throw new RuntimeException("Fail to download jar", e); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java new file mode 100644 index 00000000000..f9bd771005d --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.integration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provide reading comparing capability of semantic version which is used widely in Apache projects + */ +public class SemanticVersion { + private static final Logger LOG = LoggerFactory.getLogger(SemanticVersion.class); + + public static SemanticVersion of(String versionString) { + return new SemanticVersion(versionString); + } + + private final String versionString; + private int version; + private int majorVersion; + private int minorVersion; + private int patchVersion; + + private SemanticVersion(String versionString) { + this.versionString = versionString; + + try { + int pos = versionString.indexOf('-'); + + String numberPart = versionString; + if (pos > 0) { + numberPart = versionString.substring(0, pos); + } + + String[] versions = numberPart.split("\\."); + this.majorVersion = Integer.parseInt(versions[0]); + this.minorVersion = Integer.parseInt(versions[1]); + this.patchVersion = Integer.parseInt(versions[2]); + // version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602) + version = Integer.parseInt(String.format("%d%02d%02d", majorVersion, minorVersion, patchVersion)); + } catch (Exception e) { + LOG.error("Can not recognize Spark version {}. Assume it's a future release", versionString, e); + // assume it is future release + version = 99999; + } + } + + public int getMajorVersion() { + return majorVersion; + } + + public int getMinorVersion() { + return minorVersion; + } + + public int getPatchVersion() { + return patchVersion; + } + + @Override + public String toString() { + return versionString; + } + + @Override + public int hashCode() { + return version; + } + + @Override + public boolean equals(Object versionToCompare) { + return versionToCompare instanceof SemanticVersion + && version == ((SemanticVersion) versionToCompare).version; + } + + public boolean newerThan(SemanticVersion versionToCompare) { + return version > versionToCompare.version; + } + + public boolean equalsOrNewerThan(SemanticVersion versionToCompare) { + return version >= versionToCompare.version; + } + + public boolean olderThan(SemanticVersion versionToCompare) { + return version < versionToCompare.version; + } + + public boolean equalsOrOlderThan(SemanticVersion versionToCompare) { + return version <= versionToCompare.version; + } +}
Support multiple versions of Flink You can run different versions of Flink in one Zeppelin instance
Support multiple versions of ScalaYou can run different Scala versions of Flink in on Zeppelin instance
Support multiple languages Scala, Python, SQL are supported, besides that you can also collaborate across languages, e.g. you can write Scala UDF and use it in PyFlink