Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: session pool supports creating through spaceFromParam #330

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions ngbatis-demo/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ nebula:
check-fixed-rate: 300000
# space name needs to be informed through annotations(@Space) or xml(space="test")
# default false(false: Session pool map will not be initialized)
use-session-pool: false
hosts: 127.0.0.1:19669
use-session-pool: true
hosts: 139.9.187.207:9669
username: root
password: bmVidWxh
password: U3RhclNoYWRvd18wOTE5
space: test
pool-config:
min-conns-size: 0
max-conns-size: 10
timeout: 6000
idle-time: 0
interval-idle: -1
wait-time: 6000
wait-time: 0
min-cluster-health-rate: 1.0
enable-ssl: false

# 开启 nGQL 输出
#logging:
# level:
# org.nebula.contrib: DEBUG
logging:
level:
org.nebula.contrib: DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void spaceFromParam() {
try {
repository.spaceFromParam(spaceName);
} catch (Exception e) {
Assert.isTrue(e instanceof QueryException && e.getMessage().contains("SpaceNotFound"));
assertSpaceFailed(e);
}
}

Expand All @@ -258,10 +258,17 @@ public void dynamicSpaceWithPage() {
page.setPageNo(1);
repository.dynamicSpaceWithPage(page, spaceName);
} catch (Exception e) {
System.out.println(e.getMessage());
Assert.isTrue(e instanceof QueryException && e.getMessage().contains("SpaceNotFound"));
assertSpaceFailed(e);
}
}

void assertSpaceFailed(Exception e) {
e.printStackTrace();
String message = e.getMessage();
Assert.isTrue(e instanceof QueryException &&
(message.contains("SpaceNotFound") || (message.contains("create session failed.")))
);
}

@Test
public void insertWithTimestamp() {
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/nebula/contrib/ngbatis/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ public SessionDispatcher getDispatcher() {
* @return SessionPool
*/
public SessionPool getSessionPool(String spaceName) {
return mapperContext.getNebulaSessionPoolMap().get(spaceName);
SessionPool sessionPool = mapperContext.getNebulaSessionPoolMap().get(spaceName);
if (sessionPool == null) {
sessionPool = dispatcher.initSessionPool(spaceName);
mapperContext.getNebulaSessionPoolMap().put(spaceName, sessionPool);
}
return sessionPool;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
import static org.nebula.contrib.ngbatis.proxy.NebulaDaoBasicExt.entityTypeAndIdType;
import static org.nebula.contrib.ngbatis.proxy.NebulaDaoBasicExt.vertexName;

import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.SessionPool;
import com.vesoft.nebula.client.graph.SessionPoolConfig;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Resource;
import org.nebula.contrib.ngbatis.config.NebulaJdbcProperties;
import org.nebula.contrib.ngbatis.config.NgbatisConfig;
import org.nebula.contrib.ngbatis.config.ParseCfgProps;
import org.nebula.contrib.ngbatis.io.DaoResourceLoader;
import org.nebula.contrib.ngbatis.models.ClassModel;
Expand Down Expand Up @@ -86,8 +83,6 @@ public MapperContext mapperContext(NebulaPool nebulaPool) {
context.setNebulaPoolConfig(nebulaJdbcProperties.getPoolConfig());
figureTagTypeMapping(interfaces.values(), context.getTagTypeMapping());

setNebulaSessionPool(context);

registerBean(context);
return context;
}
Expand Down Expand Up @@ -201,63 +196,25 @@ public NebulaPool nebulaPool() {
/**
* create and init Nebula SessionPool
*/
public void setNebulaSessionPool(MapperContext context) {
NgbatisConfig ngbatisConfig = nebulaJdbcProperties.getNgbatis();
if (ngbatisConfig.getUseSessionPool() == null || !ngbatisConfig.getUseSessionPool()) {
return;
}

context.getSpaceNameSet().add(nebulaJdbcProperties.getSpace());
Map<String, SessionPool> nebulaSessionPoolMap = context.getNebulaSessionPoolMap();
for (String spaceName : context.getSpaceNameSet()) {
SessionPool sessionPool = initSessionPool(spaceName);
if (sessionPool == null) {
log.error("{} session pool init failed.", spaceName);
continue;
}
nebulaSessionPoolMap.put(spaceName, sessionPool);
}
@Deprecated
public void setNebulaSessionPool(MapperContext context) throws Exception {
throw new Exception(
"Deprecated method, "
+ "please use IntervalCheckSessionDispatcher.setNebulaSessionPool() instead."
);
}

/**
* session pool create and init
* @param spaceName nebula space name
* @return inited SessionPool
*/
public SessionPool initSessionPool(String spaceName) {
final NgbatisConfig ngbatisConfig = nebulaJdbcProperties.getNgbatis();
NebulaPoolConfig poolConfig = nebulaJdbcProperties.getPoolConfig();

SessionPoolConfig sessionPoolConfig = new SessionPoolConfig(
nebulaJdbcProperties.getHostAddresses(),
spaceName,
nebulaJdbcProperties.getUsername(),
nebulaJdbcProperties.getPassword()
).setUseHttp2(poolConfig.isUseHttp2())
.setEnableSsl(poolConfig.isEnableSsl())
.setSslParam(poolConfig.getSslParam())
.setCustomHeaders(poolConfig.getCustomHeaders())
.setWaitTime(poolConfig.getWaitTime())
.setTimeout(poolConfig.getTimeout());

if (poolConfig.getMinConnSize() <= 0) {
sessionPoolConfig.setMinSessionSize(1);
} else {
sessionPoolConfig.setMinSessionSize(poolConfig.getMinConnSize());
}
sessionPoolConfig.setMaxSessionSize(poolConfig.getMaxConnSize());
sessionPoolConfig.setTimeout(poolConfig.getTimeout());
sessionPoolConfig.setWaitTime(poolConfig.getWaitTime());
if (null != ngbatisConfig.getSessionLifeLength()) {
int cleanTime = (int) (ngbatisConfig.getSessionLifeLength() / 1000);
sessionPoolConfig.setCleanTime(cleanTime);
}
if (null != ngbatisConfig.getCheckFixedRate()) {
int healthCheckTime = (int) (ngbatisConfig.getCheckFixedRate() / 1000);
sessionPoolConfig.setHealthCheckTime(healthCheckTime);
}

return new SessionPool(sessionPoolConfig);
@Deprecated
public SessionPool initSessionPool(String spaceName) throws Exception {
throw new Exception(
"Deprecated method, "
+ "please use SessionDispatcher.initSessionPool() instead."
);
}

@Override
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/org/nebula/contrib/ngbatis/SessionDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import com.vesoft.nebula.client.graph.SessionPool;
import com.vesoft.nebula.client.graph.data.ResultSet;
import java.util.Map;
import org.nebula.contrib.ngbatis.config.NgbatisConfig;
import org.nebula.contrib.ngbatis.models.MapperContext;
import org.nebula.contrib.ngbatis.session.LocalSession;
Expand Down Expand Up @@ -68,4 +71,25 @@ static boolean useSessionPool() {
NgbatisConfig ngbatisConfig = MapperContext.newInstance().getNgbatisConfig();
return ngbatisConfig != null && ngbatisConfig.getUseSessionPool();
}

/**
* 按 spaceName 初始化 sessionPool
*
* @param spaceName 可以是启动时不存在的空间名
* @return 初始化后的 sessionPool
*/
SessionPool initSessionPool(String spaceName);

/**
* 处理会话
* @param localSession 本地会话
* @param result 结果集,主要获取成功与否
*/
void handleSession(LocalSession localSession, ResultSet result);

ResultSet executeWithParameter(
String gql,
Map<String, Object> params,
String space,
Map<String, Object> extraReturn);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class EnvConfig {
@Autowired(required = false)
private PkGenerator pkGenerator;

@Autowired
private SessionDispatcher sessionDispatcher;

/**
Expand All @@ -80,8 +81,6 @@ public class EnvConfig {
*/
@Bean
public Env getEnv() {
properties.setPoolConfig(MapperContext.newInstance().getNebulaPoolConfig());
sessionDispatcher = new IntervalCheckSessionDispatcher(properties);
return new Env(
textResolver,
resultResolver,
Expand Down
83 changes: 19 additions & 64 deletions src/main/java/org/nebula/contrib/ngbatis/proxy/MapperProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,8 @@ public static Object invoke(ClassModel classModel, MethodModel methodModel, Obje

Map<String,Object> parasForDb = argsResolver.resolve(methodModel, args);
final long step1 = System.currentTimeMillis();
NgbatisConfig ngbatisConfig = MapperContext.newInstance().getNgbatisConfig();
if (ngbatisConfig == null || !ngbatisConfig.getUseSessionPool()) {
query = executeWithParameter(classModel, methodModel, gql, parasForDb, argMap);
} else {
query = executeBySessionPool(classModel, methodModel, gql, parasForDb, argMap);
}

query = executeWithParameter(classModel, methodModel, gql, parasForDb, argMap);

final long step2 = System.currentTimeMillis();
if (!query.isSucceeded()) {
Expand Down Expand Up @@ -206,31 +202,29 @@ private static Object pageSupport(ClassModel classModel, Method method, Object[]
* @param params 待执行脚本的参数所需的参数
* @return nebula-graph 的未被 orm 操作的原始结果集
*/
public static ResultSet executeWithParameter(ClassModel cm, MethodModel mm, String gql,
Map<String, Object> params, Map<String, Object> paramsForTemplate) {
LocalSession localSession = null;
Session session = null;
public static ResultSet executeWithParameter(
ClassModel cm, MethodModel mm, String gql,
Map<String, Object> params,
Map<String, Object> paramsForTemplate) {

ResultSet result = null;
String proxyClass = null;
String proxyMethod = null;
String localSessionSpace = null;
String autoSwitch = null;

SessionDispatcher dispatcher = ENV.getDispatcher();
Map<String, Object> extraReturn = new HashMap<>();

try {
localSession = dispatcher.poll();
if (log.isDebugEnabled()) {
proxyClass = cm.getNamespace().getName();
proxyMethod = mm.getId();
localSessionSpace = localSession.getCurrentSpace();
}

String currentSpace = getSpace(cm, mm, paramsForTemplate);
String[] qlAndSpace = qlWithSpace(localSession, gql, currentSpace);
gql = qlAndSpace[1];
autoSwitch = qlAndSpace[0] == null ? "" : qlAndSpace[0];
session = localSession.getSession();
result = session.executeWithParameter(gql, params);
localSession.setCurrentSpace(getSpace(result));
result = dispatcher.executeWithParameter(
gql, params, currentSpace, extraReturn
);

if (result.isSucceeded()) {
return result;
} else {
Expand All @@ -243,15 +237,18 @@ public static ResultSet executeWithParameter(ClassModel cm, MethodModel mm, Stri
throw new QueryException("数据查询失败:" + e.getMessage(), e);
} finally {
if (log.isDebugEnabled()) {
Object autoSwitch = extraReturn.get("autoSwitch");
Object localSessionSpace = extraReturn.get("localSessionSpace");
boolean noNeedSwitch = isEmpty(autoSwitch);
autoSwitch = (isEmpty(autoSwitch) ? "" : autoSwitch);
log.debug("\n\t- proxyMethod: {}#{}"
+ "\n\t- session space: {}"
+ (isEmpty(autoSwitch) ? "\n\t- {}" : "\n\t- auto switch to: {}")
+ (noNeedSwitch ? "\n\t- {}" : "\n\t- auto switch to: {}")
+ "\n\t- nGql:{}"
+ "\n\t- params: {}"
+ "\n\t- result:{}",
proxyClass, proxyMethod, localSessionSpace, autoSwitch, gql, paramsForTemplate, result);
}
handleSession(dispatcher, localSession, result);
}
}

Expand Down Expand Up @@ -304,38 +301,6 @@ public static ResultSet executeBySessionPool(ClassModel cm, MethodModel mm, Stri
}
}

private static void handleSession(SessionDispatcher dispatcher,
LocalSession localSession, ResultSet result) {
if (localSession != null) {
boolean sessionError = ResultSetUtil.isSessionError(result);
if (sessionError || dispatcher.timeToRelease(localSession)) {
dispatcher.release(localSession);
} else {
dispatcher.offer(localSession);
}
}
}

private static String[] qlWithSpace(LocalSession localSession, String gql, String currentSpace)
throws IOErrorException, BindSpaceFailedException {
String[] qlAndSpace = new String[2];
gql = gql.trim();
String sessionSpace = localSession.getCurrentSpace();
boolean sameSpace = Objects.equals(sessionSpace, currentSpace);
if (!sameSpace && currentSpace != null) {
qlAndSpace[0] = currentSpace;
Session session = localSession.getSession();
ResultSet execute = session.execute(String.format("USE `%s`", currentSpace));
if (!execute.isSucceeded()) {
throw new BindSpaceFailedException(
String.format(" %s \"%s\"", execute.getErrorMessage(), currentSpace)
);
}
}
qlAndSpace[1] = String.format("\n\t\t%s", gql);
return qlAndSpace;
}

/**
* 获取当前语句所执行的目标space。
* @param cm 当前接口的类模型
Expand Down Expand Up @@ -370,16 +335,6 @@ public static String getSpace(
return space;
}

/**
* 从结果集中获取当前的 space
* @param result 脚本执行之后的结果集
* @return 结果集所对应的 space
*/
private static String getSpace(ResultSet result) {
String spaceName = result.getSpaceName();
return isBlank(spaceName) ? null : spaceName;
}

public static Logger getLog() {
return log;
}
Expand Down
Loading
Loading