Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

F/jdbc custom interpreter #4856

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/zeppelin-site.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,14 @@
<description>The X-Frame-Options HTTP response header can be used to indicate whether or not a browser should be allowed to render a page in a frame/iframe/object.</description>
</property>

<!--
<property>
<name>zeppelin.server.csp.frame</name>
<value>frame-ancestors 'none'</value>
<description>The Content-Security-Policy HTTP response header can be used to mitigate the risk of content-injection attacks and can be used in conjunction with X-Frame-Options to provide better security for browsers that don't support ALLOWED-FROM.</description>
</property>
-->

<!--
<property>
<name>zeppelin.server.strict.transport</name>
Expand Down
19 changes: 18 additions & 1 deletion jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,24 @@

<dependencies>

<dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>


<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
Expand Down
129 changes: 127 additions & 2 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.zeppelin.jdbc.hive.HiveUtils;
import org.apache.zeppelin.tabledata.TableDataUtils;
import org.apache.zeppelin.util.PropertiesUtil;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,6 +77,12 @@
import org.apache.zeppelin.user.UserCredentials;
import org.apache.zeppelin.user.UsernamePassword;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;

/**
* JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ,
* GreenplumDB, MariaDB, MySQL, Postgres and Redshift.
Expand Down Expand Up @@ -143,6 +151,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
"zeppelin.jdbc.concurrent.max_connection";
private static final String DBCP_STRING = "jdbc:apache:commons:dbcp:";
private static final String MAX_ROWS_KEY = "zeppelin.jdbc.maxRows";
private static final String FAIL_FAST_VALIDATE_URL = "http://spark-event-listener.prd.meesho.int/api/validate";

private static final Set<String> PRESTO_PROPERTIES = new HashSet<>(Arrays.asList(
"user", "password",
Expand Down Expand Up @@ -350,6 +359,51 @@ public void close() {
}
}

public static ValidationResponse sendValidationRequest(ValidationRequest request) throws Exception {
HttpURLConnection connection = createConnection();
sendRequest(connection, request);
return readResponse(connection);
}

private static HttpURLConnection createConnection() throws Exception {
URL url = new URL(FAIL_FAST_VALIDATE_URL);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setDoOutput(true); // Enable sending request body
return connection;
}

private static void sendRequest(HttpURLConnection connection, ValidationRequest request) throws Exception {
try (OutputStream os = connection.getOutputStream()) {
String jsonRequest = request.toJson();
byte[] input = jsonRequest.getBytes("utf-8");
os.write(input, 0, input.length);
}
}

private static ValidationResponse readResponse(HttpURLConnection connection) throws Exception {
int statusCode = connection.getResponseCode();
BufferedReader reader;

if (statusCode == HttpURLConnection.HTTP_OK) {
reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
} else {
reader = new BufferedReader(new InputStreamReader(connection.getErrorStream(), "utf-8"));
}

StringBuilder responseBuilder = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
responseBuilder.append(line.trim());
}

reader.close();
connection.disconnect();

return ValidationResponse.fromJson(responseBuilder.toString());
}

/* Get user of this sql.
* 1. If shiro is enabled, use the login user
* 2. Otherwise try to get it from interpreter setting, e.g. default.user
Expand Down Expand Up @@ -785,6 +839,12 @@ private InterpreterResult executeSql(String dbPrefix, String sql,
LOGGER.info("Execute sql: " + sqlToExecute);
statement = connection.createStatement();

String interpreterName = getInterpreterGroup().getId();

if (interpreterName != null && interpreterName.startsWith("spark_rca_")) {
statement.setQueryTimeout(10800); // 10800 seconds = 3 hours
}

// fetch n+1 rows in order to indicate there's more rows available (for large selects)
statement.setFetchSize(context.getIntLocalProperty("limit", getMaxResult()));
statement.setMaxRows(context.getIntLocalProperty("limit", maxRows));
Expand All @@ -809,6 +869,65 @@ private InterpreterResult executeSql(String dbPrefix, String sql,
HiveUtils.startHiveMonitorThread(statement, context,
Boolean.parseBoolean(getProperty("hive.log.display", "true")), this);
}

String userName = getUser(context);
String sqlToValidate = sqlToExecute
.replace("\n", " ")
.replace("\r", " ")
.replace("\t", " ");
ValidationRequest request = new ValidationRequest(sqlToValidate, userName);
try {
ValidationResponse response = sendValidationRequest(request);
if (response.isPreSubmitFail()) {
String outputMessage = response.getMessage();
StringBuilder finalOutput = new StringBuilder();

if (response.isFailFast()) {
context.out.write("Query Error: Partition Filters Missing\n" +
"Your query failed because some tables are missing partition filters. To avoid this, please ensure partition filters are applied to improve performance.\n");
JSONObject jsonObject = new JSONObject(outputMessage);
finalOutput.append("The following table(s) are missing partition filters:\n");

JSONArray tableNames = jsonObject.names();
if (tableNames != null) {
for (int i = 0; i < tableNames.length(); i++) {
String table = tableNames.getString(i);
JSONArray partitions = jsonObject.getJSONArray(table);
finalOutput.append("Table: ").append(table).append(", Partition filter's: ");

for (int j = 0; j < partitions.length(); j++) {
finalOutput.append(partitions.getString(j));
if (j < partitions.length() - 1) {
finalOutput.append(", ");
}
}
finalOutput.append("\n");
}
}
} else if (response.isFailedByDeprecatedTable()) {
context.out.write("Query Error: Restricted Table Used\n");
JSONObject jsonObject = new JSONObject(outputMessage);
finalOutput.append("It seems you're trying to use a restricted table:\n");

JSONArray tableNames = jsonObject.names();
if (tableNames != null) {
for (int i = 0; i < tableNames.length(); i++) {
String table = tableNames.getString(i);
finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n");
}
}
}
context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString());
cancel(context);
return new InterpreterResult(Code.ERROR);
}
} catch (Exception e) {
String error = "Error occurred while sending request " + e.getMessage();
String mess = e.getLocalizedMessage();
context.out.write(error);
context.out.write(mess);
}

boolean isResultSetAvailable = statement.execute(sqlToExecute);
getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix);
if (isResultSetAvailable) {
Expand Down Expand Up @@ -939,8 +1058,14 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex
LOGGER.debug("Run SQL command '{}'", cmd);
String dbPrefix = getDBPrefix(context);
LOGGER.debug("DBPrefix: {}, SQL command: '{}'", dbPrefix, cmd);
String interpreterName = getInterpreterGroup().getId();
if (interpreterName!=null && interpreterName.startsWith("spark_rca_")) {
cmd = "set STATEMENT_TIMEOUT=10800;\n"+cmd;
}
LOGGER.debug("InterpreterName: {}, SQL command: '{}'", interpreterName, cmd);
String finalCmd = cmd;
if (!isRefreshMode(context)) {
return executeSql(dbPrefix, cmd, context);
return executeSql(dbPrefix, finalCmd, context);
} else {
int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval"));
paragraphCancelMap.put(context.getParagraphId(), false);
Expand All @@ -951,7 +1076,7 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex
refreshExecutor.scheduleAtFixedRate(() -> {
context.out.clear(false);
try {
InterpreterResult result = executeSql(dbPrefix, cmd, context);
InterpreterResult result = executeSql(dbPrefix, finalCmd, context);
context.out.flush();
interpreterResultRef.set(result);
if (result.code() != Code.SUCCESS) {
Expand Down
16 changes: 16 additions & 0 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.zeppelin.jdbc;

public class ValidationRequest {
private String queryText;
private String user;

public ValidationRequest(String queryText, String user) {
this.queryText = queryText;
this.user = user;
}

public String toJson() {
return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\"}";
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.apache.zeppelin.jdbc;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

public class ValidationResponse {
private boolean preSubmitFail;
private boolean failFast;
private boolean failedByDeprecatedTable;
private String message;

// Getters and Setters
public boolean isPreSubmitFail() {
return preSubmitFail;
}

public void setPreSubmitFail(boolean preSubmitFail) {
this.preSubmitFail = preSubmitFail;
}

public boolean isFailFast() {
return failFast;
}

public void setFailFast(boolean failFast) {
this.failFast = failFast;
}

public boolean isFailedByDeprecatedTable() {
return failedByDeprecatedTable;
}

public void setFailedByDeprecatedTable(boolean failedByDeprecatedTable) {
this.failedByDeprecatedTable = failedByDeprecatedTable;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public static ValidationResponse fromJson(String jsonResponse) {
Gson gson = new Gson();
ValidationResponse response = new ValidationResponse();

JsonElement jsonElement = gson.fromJson(jsonResponse, JsonElement.class);

if (jsonElement.isJsonObject()) {
JsonObject jsonObject = jsonElement.getAsJsonObject();

if (jsonObject.has("pre_submit_fail") && !jsonObject.get("pre_submit_fail").isJsonNull()) {
response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean());
}
if (jsonObject.has("fail_fast") && !jsonObject.get("fail_fast").isJsonNull()) {
response.setFailFast(jsonObject.get("fail_fast").getAsBoolean());
}
if (jsonObject.has("failed_by_deprecated_table") && !jsonObject.get("failed_by_deprecated_table").isJsonNull()) {
response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean());
}
if (jsonObject.has("message") && !jsonObject.get("message").isJsonNull()) {
response.setMessage(jsonObject.get("message").getAsString());
}
} else {
response.setPreSubmitFail(false);
response.setFailFast(false);
response.setFailedByDeprecatedTable(false);
response.setMessage(""); // Default message
}
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,10 @@ public boolean isAuthorizationHeaderClear() {
public String getXFrameOptions() {
return getString(ConfVars.ZEPPELIN_SERVER_XFRAME_OPTIONS);
}

public String getCSPFrame() {
return getString(ConfVars.ZEPPELIN_SERVER_CSP_FRAME);
}

public String getXxssProtection() {
return getString(ConfVars.ZEPPELIN_SERVER_X_XSS_PROTECTION);
Expand Down Expand Up @@ -1038,6 +1042,7 @@ public enum ConfVars {
ZEPPELIN_WEBSOCKET_PARAGRAPH_STATUS_PROGRESS("zeppelin.websocket.paragraph_status_progress.enable", true),
ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED("zeppelin.server.default.dir.allowed", false),
ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"),
ZEPPELIN_SERVER_CSP_FRAME("zeppelin.server.csp.frame", "frame-ancestors 'none'"),
ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", " "),
ZEPPELIN_SERVER_SEND_JETTY_NAME("zeppelin.server.send.jetty.name", true),
ZEPPELIN_SERVER_JETTY_THREAD_POOL_MAX("zeppelin.server.jetty.thread.pool.max", 400),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private void addCorsHeaders(HttpServletResponse response, String origin) {

ZeppelinConfiguration zeppelinConfiguration = ZeppelinConfiguration.create();
response.setHeader("X-FRAME-OPTIONS", zeppelinConfiguration.getXFrameOptions());
response.setHeader("Content-Security-Policy", zeppelinConfiguration.getCSPFrame());
if (zeppelinConfiguration.useSsl()) {
response.setHeader("Strict-Transport-Security", zeppelinConfiguration.getStrictTransport());
}
Expand Down