From b1661ae4785ddfd7db5ebd8cf10f334cf4d9579b Mon Sep 17 00:00:00 2001 From: Raghwendra Singh Date: Sun, 31 Jul 2022 02:15:29 +0530 Subject: [PATCH 01/23] DI-679 added CSP HTTP response header --- conf/zeppelin-site.xml.template | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 4ee200b9956..5ab01b5ceb1 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -644,6 +644,14 @@ The X-Frame-Options HTTP response header can be used to indicate whether or not a browser should be allowed to render a page in a frame/iframe/object. + + + diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 350e72b35be..b67e92c68b3 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -908,7 +908,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } finalOutput.append(userName); - context.out.write(finalOutput.toString()); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); return new InterpreterResult(Code.ERROR, finalOutput.toString()); @@ -916,16 +915,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } catch (Exception e) { context.out.write("Error occurred while sending request"); - System.err.println("Error occurred while sending request: " + e.getMessage()); - e.printStackTrace(); } -// if (sqlToExecute.contains("fail_fast_kill")) { -// context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error"); -// cancel(context); -// } - - boolean isResultSetAvailable = statement.execute(sqlToExecute); getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix); if (isResultSetAvailable) { diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 7dea7f564b0..94de605e3d3 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -3,12 +3,10 @@ public class ValidationRequest { private String queryText; - // Constructor public ValidationRequest(String queryText) { this.queryText = queryText; } - // Getter and Setter public String getQueryText() { return queryText; } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 801a5aa82c8..027f91b7300 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -1,5 +1,8 @@ package org.apache.zeppelin.jdbc; +import com.google.gson.Gson; +import com.google.gson.JsonObject; + public class ValidationResponse { private boolean preSubmitFail; private boolean failFast; @@ -40,18 +43,20 @@ public void setMessage(String message) { } public static ValidationResponse fromJson(String jsonResponse) { + Gson gson = new Gson(); ValidationResponse response = new ValidationResponse(); - // Use simple JSON parsing (can replace with a library like Jackson or Gson) - response.setPreSubmitFail(jsonResponse.contains("\"pre_submit_fail\":true")); - response.setFailFast(jsonResponse.contains("\"fail_fast\":true")); - response.setFailedByDeprecatedTable(jsonResponse.contains("\"failed_by_deprecated_table\":true")); - - int messageIndex = jsonResponse.indexOf("\"message\":\""); - if (messageIndex != -1) { - int messageEnd = jsonResponse.indexOf("\"", messageIndex + 10); - String message = jsonResponse.substring(messageIndex + 10, messageEnd); - response.setMessage(message); + + JsonObject jsonObject = gson.fromJson(jsonResponse, JsonObject.class); + + response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); + response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); + response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); + + // Extract the "message" field + if (jsonObject.has("message")) { + response.setMessage(jsonObject.get("message").getAsString()); } + return response; } } From 39f0a7bab0dd5732c0f1fcd6e3b017ecbb2f837e Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 16:58:58 +0530 Subject: [PATCH 11/23] added loggers --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 9 ++++++--- .../org/apache/zeppelin/jdbc/ValidationResponse.java | 2 -- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index b67e92c68b3..a3eaa99ddc2 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -867,11 +867,14 @@ private InterpreterResult executeSql(String dbPrefix, String sql, ValidationRequest request = new ValidationRequest(sqlToExecute); try { + context.out.write("Sending request for validation"); ValidationResponse response = sendValidationRequest(request); + context.out.write("Response received for validation"); if (response.isPreSubmitFail()) { + context.out.write("Pre Submit custom error check"); String outputMessage = response.getMessage(); - String userName = getUser(context); - context.out.write(userName); +// String userName = getUser(context); +// context.out.write(userName); StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { @@ -907,7 +910,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } - finalOutput.append(userName); +// finalOutput.append(userName); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); return new InterpreterResult(Code.ERROR, finalOutput.toString()); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 027f91b7300..0f0a30cedf4 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -51,8 +51,6 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); - - // Extract the "message" field if (jsonObject.has("message")) { response.setMessage(jsonObject.get("message").getAsString()); } From b6225df40b710667e997fd4afde71fe07d1bddbf Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 18:00:15 +0530 Subject: [PATCH 12/23] added stract trace --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index a3eaa99ddc2..6e7c90afdc1 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -867,14 +867,14 @@ private InterpreterResult executeSql(String dbPrefix, String sql, ValidationRequest request = new ValidationRequest(sqlToExecute); try { - context.out.write("Sending request for validation"); + context.out.write("Sending request for validation\n"); ValidationResponse response = sendValidationRequest(request); context.out.write("Response received for validation"); if (response.isPreSubmitFail()) { context.out.write("Pre Submit custom error check"); String outputMessage = response.getMessage(); -// String userName = getUser(context); -// context.out.write(userName); + String userName = getUser(context); + context.out.write(userName); StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { @@ -910,14 +910,19 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } -// finalOutput.append(userName); + finalOutput.append(userName); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); return new InterpreterResult(Code.ERROR, finalOutput.toString()); } } catch (Exception e) { - context.out.write("Error occurred while sending request"); + String error = "Error occurred while sending request" + e.getMessage(); + String stackTrace = e.getStackTrace().toString(); + String mess = e.getLocalizedMessage(); + context.out.write(error); + context.out.write(stackTrace); + context.out.write(mess); } boolean isResultSetAvailable = statement.execute(sqlToExecute); From b06c1619f786226ee67d45a43e570a05fd8c694a Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 20:13:45 +0530 Subject: [PATCH 13/23] removed test loggers --- jdbc/pom.xml | 2 +- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 14 +++----------- .../apache/zeppelin/jdbc/ValidationRequest.java | 14 ++++---------- 3 files changed, 8 insertions(+), 22 deletions(-) diff --git a/jdbc/pom.xml b/jdbc/pom.xml index bfa0a600cc9..56467b4d074 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -59,7 +59,7 @@ com.google.code.gson gson - 2.8.9 + 2.8.9 diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 6e7c90afdc1..21c29b21121 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -865,20 +865,15 @@ private InterpreterResult executeSql(String dbPrefix, String sql, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } - ValidationRequest request = new ValidationRequest(sqlToExecute); + String userName = getUser(context); + ValidationRequest request = new ValidationRequest(sqlToExecute, userName); try { - context.out.write("Sending request for validation\n"); ValidationResponse response = sendValidationRequest(request); - context.out.write("Response received for validation"); if (response.isPreSubmitFail()) { - context.out.write("Pre Submit custom error check"); String outputMessage = response.getMessage(); - String userName = getUser(context); - context.out.write(userName); StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { - context.out.write("Fail Fast custom error"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following TABLE(s) used in the query are not using partition filter:\n"); @@ -910,18 +905,15 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } - finalOutput.append(userName); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); - return new InterpreterResult(Code.ERROR, finalOutput.toString()); + return new InterpreterResult(Code.ERROR, "Failed by Fail Fast"); } } catch (Exception e) { String error = "Error occurred while sending request" + e.getMessage(); - String stackTrace = e.getStackTrace().toString(); String mess = e.getLocalizedMessage(); context.out.write(error); - context.out.write(stackTrace); context.out.write(mess); } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 94de605e3d3..94e0a6edf40 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -2,21 +2,15 @@ public class ValidationRequest { private String queryText; + private String user; - public ValidationRequest(String queryText) { - this.queryText = queryText; - } - - public String getQueryText() { - return queryText; - } - - public void setQueryText(String queryText) { + public ValidationRequest(String queryText, String user) { this.queryText = queryText; + this.user = user; } public String toJson() { - return "{\"query_text\":\"" + queryText + "\"}"; + return "{\"queryText\":\"" + queryText + "\",\"user\":\"" + user + "\"}"; } } From 08cf063f0223123fe1188aae4cb7438a28599289 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 22:26:12 +0530 Subject: [PATCH 14/23] updated request payload --- .../src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 2 -- .../main/java/org/apache/zeppelin/jdbc/ValidationRequest.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 21c29b21121..0058f9f45dd 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -376,7 +376,6 @@ private static HttpURLConnection createConnection() throws Exception { private static void sendRequest(HttpURLConnection connection, ValidationRequest request) throws Exception { try (OutputStream os = connection.getOutputStream()) { - // Manually convert the request object to a JSON string String jsonRequest = request.toJson(); byte[] input = jsonRequest.getBytes("utf-8"); os.write(input, 0, input.length); @@ -909,7 +908,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, cancel(context); return new InterpreterResult(Code.ERROR, "Failed by Fail Fast"); } - } catch (Exception e) { String error = "Error occurred while sending request" + e.getMessage(); String mess = e.getLocalizedMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 94e0a6edf40..71d8ad17418 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -10,7 +10,7 @@ public ValidationRequest(String queryText, String user) { } public String toJson() { - return "{\"queryText\":\"" + queryText + "\",\"user\":\"" + user + "\"}"; + return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\"}"; } } From 682c3dcb11bf62b6aaea7cfe4c412a9a1e0547cd Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 18 Sep 2024 13:06:42 +0530 Subject: [PATCH 15/23] updated sql --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 10 ++++++++-- .../apache/zeppelin/jdbc/ValidationResponse.java | 15 ++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 0058f9f45dd..099083b1c8d 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -865,7 +865,13 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } String userName = getUser(context); - ValidationRequest request = new ValidationRequest(sqlToExecute, userName); + String sqlToValidate = sqlToExecute + .replace("\n", "\\n") // Newlines + .replace("\r", "\\r") // Carriage return + .replace("\t", "\\t") // Tabs + .replace("\"", "\\\"") // Double quotes + .replace("\\", "\\\\"); // Backslashes + ValidationRequest request = new ValidationRequest(sqlToValidate, userName); try { ValidationResponse response = sendValidationRequest(request); if (response.isPreSubmitFail()) { @@ -909,7 +915,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, return new InterpreterResult(Code.ERROR, "Failed by Fail Fast"); } } catch (Exception e) { - String error = "Error occurred while sending request" + e.getMessage(); + String error = "Error occurred while sending request " + e.getMessage(); String mess = e.getLocalizedMessage(); context.out.write(error); context.out.write(mess); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 0f0a30cedf4..2128dfb86a1 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -48,13 +48,18 @@ public static ValidationResponse fromJson(String jsonResponse) { JsonObject jsonObject = gson.fromJson(jsonResponse, JsonObject.class); - response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); - response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); - response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); - if (jsonObject.has("message")) { + if (jsonObject.has("pre_submit_fail") && !jsonObject.get("pre_submit_fail").isJsonNull()) { + response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); + } + if (jsonObject.has("fail_fast") && !jsonObject.get("fail_fast").isJsonNull()) { + response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); + } + if (jsonObject.has("failed_by_deprecated_table") && !jsonObject.get("failed_by_deprecated_table").isJsonNull()) { + response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); + } + if (jsonObject.has("message") && !jsonObject.get("message").isJsonNull()) { response.setMessage(jsonObject.get("message").getAsString()); } - return response; } } From 0e70d81bf62f2aabca3b141abd89683aabfc4936 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 18 Sep 2024 15:48:19 +0530 Subject: [PATCH 16/23] updated escapae character --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 099083b1c8d..8c1f6057012 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -866,11 +866,9 @@ private InterpreterResult executeSql(String dbPrefix, String sql, String userName = getUser(context); String sqlToValidate = sqlToExecute - .replace("\n", "\\n") // Newlines - .replace("\r", "\\r") // Carriage return - .replace("\t", "\\t") // Tabs - .replace("\"", "\\\"") // Double quotes - .replace("\\", "\\\\"); // Backslashes + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); ValidationRequest request = new ValidationRequest(sqlToValidate, userName); try { ValidationResponse response = sendValidationRequest(request); From e0a936dab8417559ba227ad7242cd07ba03e2dd1 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 18 Sep 2024 17:28:16 +0530 Subject: [PATCH 17/23] updated url --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 8c1f6057012..37414f95dee 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -151,7 +151,7 @@ public class JDBCInterpreter extends KerberosInterpreter { "zeppelin.jdbc.concurrent.max_connection"; private static final String DBCP_STRING = "jdbc:apache:commons:dbcp:"; private static final String MAX_ROWS_KEY = "zeppelin.jdbc.maxRows"; - private static final String FAIL_FAST_VALIDATE_URL = "http://localhost:8080/api/validate"; + private static final String FAIL_FAST_VALIDATE_URL = "http://spark-event-listener.prd.meesho.int/api/validate"; private static final Set PRESTO_PROPERTIES = new HashSet<>(Arrays.asList( "user", "password", @@ -877,6 +877,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { + context.out.write("Query failed because partitions were not used in the query. Please ensure that partition filters are applied.\n"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following TABLE(s) used in the query are not using partition filter:\n"); @@ -897,6 +898,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } else if (response.isFailedByDeprecatedTable()) { + context.out.write("Query failed as Restricted table(s) are used\n"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following TABLE(s) used in the query are restricted:\n"); @@ -910,7 +912,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); - return new InterpreterResult(Code.ERROR, "Failed by Fail Fast"); + return new InterpreterResult(Code.ERROR); } } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); From 76db3df10248abcf6914e856ec9b28188159ac12 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 1 Oct 2024 16:48:46 +0530 Subject: [PATCH 18/23] added timeout for rca cluster --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 37414f95dee..f9fe9dd4f87 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -839,6 +839,12 @@ private InterpreterResult executeSql(String dbPrefix, String sql, LOGGER.info("Execute sql: " + sqlToExecute); statement = connection.createStatement(); + String interpreterName = getProperty("zeppelin.jdbc.interpreter.name"); + + if (interpreterName != null && interpreterName.startsWith("spark_")) { + statement.setQueryTimeout(60); // 10800 seconds = 3 hours + } + // fetch n+1 rows in order to indicate there's more rows available (for large selects) statement.setFetchSize(context.getIntLocalProperty("limit", getMaxResult())); statement.setMaxRows(context.getIntLocalProperty("limit", maxRows)); From f0d6248c44250271bb5ed3060f06470fc2ccf05a Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 2 Oct 2024 20:30:30 +0530 Subject: [PATCH 19/23] added logging --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index f9fe9dd4f87..3b8dd8b6c05 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -840,9 +840,11 @@ private InterpreterResult executeSql(String dbPrefix, String sql, statement = connection.createStatement(); String interpreterName = getProperty("zeppelin.jdbc.interpreter.name"); + context.out.write("Interpreter Name: " + interpreterName); if (interpreterName != null && interpreterName.startsWith("spark_")) { - statement.setQueryTimeout(60); // 10800 seconds = 3 hours + statement.setQueryTimeout(5); // 10800 seconds = 3 hours + context.out.write("Query Timeout: 5 seconds"); } // fetch n+1 rows in order to indicate there's more rows available (for large selects) From 761455984b258b1e629ac51d8b9bcc23e096d348 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 8 Oct 2024 14:41:40 +0530 Subject: [PATCH 20/23] updated get interpreterName --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 3b8dd8b6c05..b34862723be 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -839,9 +839,13 @@ private InterpreterResult executeSql(String dbPrefix, String sql, LOGGER.info("Execute sql: " + sqlToExecute); statement = connection.createStatement(); - String interpreterName = getProperty("zeppelin.jdbc.interpreter.name"); + String interpreterName = getInterpreterGroup().getId(); context.out.write("Interpreter Name: " + interpreterName); + String className = getClassName(); + context.out.write("Class Name: " + className); + + if (interpreterName != null && interpreterName.startsWith("spark_")) { statement.setQueryTimeout(5); // 10800 seconds = 3 hours context.out.write("Query Timeout: 5 seconds"); @@ -885,16 +889,17 @@ private InterpreterResult executeSql(String dbPrefix, String sql, StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { - context.out.write("Query failed because partitions were not used in the query. Please ensure that partition filters are applied.\n"); + context.out.write("Query Error: Partition Filters Missing\n" + + "Your query failed because some tables are missing partition filters. To avoid this, please ensure partition filters are applied to improve performance."); JSONObject jsonObject = new JSONObject(outputMessage); - finalOutput.append("The following TABLE(s) used in the query are not using partition filter:\n"); + finalOutput.append("The following table(s) are missing partition filters:\n"); JSONArray tableNames = jsonObject.names(); if (tableNames != null) { for (int i = 0; i < tableNames.length(); i++) { String table = tableNames.getString(i); JSONArray partitions = jsonObject.getJSONArray(table); - finalOutput.append(table).append(" -> "); + finalOutput.append("Table: ").append(table).append(" Partition filters: "); for (int j = 0; j < partitions.length(); j++) { finalOutput.append(partitions.getString(j)); @@ -906,15 +911,15 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } else if (response.isFailedByDeprecatedTable()) { - context.out.write("Query failed as Restricted table(s) are used\n"); + context.out.write("Query Error: Restricted Table Used\n"); JSONObject jsonObject = new JSONObject(outputMessage); - finalOutput.append("The following TABLE(s) used in the query are restricted:\n"); + finalOutput.append("It seems you're trying to use a restricted table:\n"); JSONArray tableNames = jsonObject.names(); if (tableNames != null) { for (int i = 0; i < tableNames.length(); i++) { String table = tableNames.getString(i); - finalOutput.append(table).append(" -> ").append(jsonObject.getString(table)).append("\n"); + finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n"); } } } From b4e1d0d12a27a787465dce904d1f74757056c6e1 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 8 Oct 2024 17:16:52 +0530 Subject: [PATCH 21/23] removed logger --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index b34862723be..48592b21914 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -840,15 +840,9 @@ private InterpreterResult executeSql(String dbPrefix, String sql, statement = connection.createStatement(); String interpreterName = getInterpreterGroup().getId(); - context.out.write("Interpreter Name: " + interpreterName); - String className = getClassName(); - context.out.write("Class Name: " + className); - - - if (interpreterName != null && interpreterName.startsWith("spark_")) { - statement.setQueryTimeout(5); // 10800 seconds = 3 hours - context.out.write("Query Timeout: 5 seconds"); + if (interpreterName != null && interpreterName.startsWith("spark_rca_")) { + statement.setQueryTimeout(10800); // 10800 seconds = 3 hours } // fetch n+1 rows in order to indicate there's more rows available (for large selects) @@ -890,7 +884,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, if (response.isFailFast()) { context.out.write("Query Error: Partition Filters Missing\n" + - "Your query failed because some tables are missing partition filters. To avoid this, please ensure partition filters are applied to improve performance."); + "Your query failed because some tables are missing partition filters. To avoid this, please ensure partition filters are applied to improve performance.\n"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following table(s) are missing partition filters:\n"); @@ -899,7 +893,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, for (int i = 0; i < tableNames.length(); i++) { String table = tableNames.getString(i); JSONArray partitions = jsonObject.getJSONArray(table); - finalOutput.append("Table: ").append(table).append(" Partition filters: "); + finalOutput.append("Table: ").append(table).append(", Partition filter's: "); for (int j = 0; j < partitions.length(); j++) { finalOutput.append(partitions.getString(j)); From 12dfddcfc9c1f2d2088405535828fc0df0537c43 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Thu, 10 Oct 2024 13:53:11 +0530 Subject: [PATCH 22/23] updated Gson object --- .../zeppelin/jdbc/ValidationResponse.java | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 2128dfb86a1..05716cc2edb 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -1,6 +1,7 @@ package org.apache.zeppelin.jdbc; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; public class ValidationResponse { @@ -46,19 +47,28 @@ public static ValidationResponse fromJson(String jsonResponse) { Gson gson = new Gson(); ValidationResponse response = new ValidationResponse(); - JsonObject jsonObject = gson.fromJson(jsonResponse, JsonObject.class); + JsonElement jsonElement = gson.fromJson(jsonResponse, JsonElement.class); - if (jsonObject.has("pre_submit_fail") && !jsonObject.get("pre_submit_fail").isJsonNull()) { - response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); - } - if (jsonObject.has("fail_fast") && !jsonObject.get("fail_fast").isJsonNull()) { - response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); - } - if (jsonObject.has("failed_by_deprecated_table") && !jsonObject.get("failed_by_deprecated_table").isJsonNull()) { - response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); - } - if (jsonObject.has("message") && !jsonObject.get("message").isJsonNull()) { - response.setMessage(jsonObject.get("message").getAsString()); + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + + if (jsonObject.has("pre_submit_fail") && !jsonObject.get("pre_submit_fail").isJsonNull()) { + response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); + } + if (jsonObject.has("fail_fast") && !jsonObject.get("fail_fast").isJsonNull()) { + response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); + } + if (jsonObject.has("failed_by_deprecated_table") && !jsonObject.get("failed_by_deprecated_table").isJsonNull()) { + response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); + } + if (jsonObject.has("message") && !jsonObject.get("message").isJsonNull()) { + response.setMessage(jsonObject.get("message").getAsString()); + } + } else { + response.setPreSubmitFail(false); + response.setFailFast(false); + response.setFailedByDeprecatedTable(false); + response.setMessage(""); // Default message } return response; } From e54907b5e3ea0520048b47f59b7eea1bae883e9f Mon Sep 17 00:00:00 2001 From: shagil-meesho Date: Thu, 17 Oct 2024 23:53:53 +0530 Subject: [PATCH 23/23] feat: suffixing the STATEMENT_TIMEOUT = 10800 before rca interpreter fired queries --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 48592b21914..930e42e0a27 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -1058,8 +1058,14 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex LOGGER.debug("Run SQL command '{}'", cmd); String dbPrefix = getDBPrefix(context); LOGGER.debug("DBPrefix: {}, SQL command: '{}'", dbPrefix, cmd); + String interpreterName = getInterpreterGroup().getId(); + if (interpreterName!=null && interpreterName.startsWith("spark_rca_")) { + cmd = "set STATEMENT_TIMEOUT=10800;\n"+cmd; + } + LOGGER.debug("InterpreterName: {}, SQL command: '{}'", interpreterName, cmd); + String finalCmd = cmd; if (!isRefreshMode(context)) { - return executeSql(dbPrefix, cmd, context); + return executeSql(dbPrefix, finalCmd, context); } else { int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval")); paragraphCancelMap.put(context.getParagraphId(), false); @@ -1070,7 +1076,7 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex refreshExecutor.scheduleAtFixedRate(() -> { context.out.clear(false); try { - InterpreterResult result = executeSql(dbPrefix, cmd, context); + InterpreterResult result = executeSql(dbPrefix, finalCmd, context); context.out.flush(); interpreterResultRef.set(result); if (result.code() != Code.SUCCESS) {