Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vagetablechicken committed Jan 17, 2024
1 parent c788708 commit 19e9199
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import com._4paradigm.openmldb.sdk.SqlException;
import com._4paradigm.openmldb.sdk.SqlExecutor;
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor;
import com._4paradigm.openmldb.spark.read.OpenmldbReadConfig;
import com._4paradigm.openmldb.spark.read.OpenmldbScanBuilder;
import com._4paradigm.openmldb.spark.write.OpenmldbWriteBuilder;
import com._4paradigm.openmldb.spark.write.OpenmldbWriteConfig;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package com._4paradigm.openmldb.spark.read;

import com._4paradigm.openmldb.spark.OpenmldbConfig;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;

public class OpenmldbPartitionReaderFactory implements PartitionReaderFactory {
private final OpenmldbReadConfig config;
private final OpenmldbConfig config;

public OpenmldbPartitionReaderFactory(OpenmldbReadConfig config) {
public OpenmldbPartitionReaderFactory(OpenmldbConfig config) {
this.config = config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package com._4paradigm.openmldb.spark.read;

import com._4paradigm.openmldb.spark.OpenmldbConfig;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.types.StructType;

public class OpenmldbScan implements Scan, Batch {
private final OpenmldbReadConfig config;
private final OpenmldbConfig config;

public OpenmldbScan(OpenmldbReadConfig config) {
public OpenmldbScan(OpenmldbConfig config) {
this.config = config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package com._4paradigm.openmldb.spark.read;

import com._4paradigm.openmldb.spark.OpenmldbConfig;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;

public class OpenmldbScanBuilder implements ScanBuilder {
private final OpenmldbReadConfig config;
private final OpenmldbConfig config;

public OpenmldbScanBuilder(OpenmldbReadConfig config) {
public OpenmldbScanBuilder(OpenmldbConfig config) {
this.config = config;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com._4paradigm.openmldb.spark.read

import com._4paradigm.openmldb.spark.OpenmldbConfig
import com._4paradigm.openmldb.sdk.{Schema, SdkOption}
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -8,15 +9,10 @@ import org.apache.spark.unsafe.types.UTF8String

import java.sql.Types

class OpenmldbPartitionReader(config: OpenmldbReadConfig) extends PartitionReader[InternalRow] {

val option = new SdkOption
option.setZkCluster(config.zkCluster)
option.setZkPath(config.zkPath)
option.setLight(true)
val executor = new SqlClusterExecutor(option)
val dbName: String = config.dbName
val tableName: String = config.tableName
class OpenmldbPartitionReader(config: OpenmldbConfig) extends PartitionReader[InternalRow] {
val executor = new SqlClusterExecutor(config.getSdkOption)
val dbName: String = config.getDB
val tableName: String = config.getTable

val schema: Schema = executor.getTableSchema(dbName, tableName)
executor.executeSQL(dbName, "SET @@execute_mode='online'")
Expand Down

0 comments on commit 19e9199

Please sign in to comment.