Skip to content

Commit

Permalink
Merge branch 'apache:dev' into 4163
Browse files Browse the repository at this point in the history
  • Loading branch information
Mrart authored Jan 13, 2025
2 parents 10399cb + 422c276 commit 1991d27
Show file tree
Hide file tree
Showing 24 changed files with 117 additions and 192 deletions.
4 changes: 2 additions & 2 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache StreamPark (incubating)
Copyright 2022-2024 The Apache Software Foundation
Apache StreamPark
Copyright 2022-2025 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
4 changes: 2 additions & 2 deletions dist-material/release-docs/NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache StreamPark (incubating)
Copyright 2022-2024 The Apache Software Foundation
Apache StreamPark
Copyright 2022-2025 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.streampark.console.core.mapper.ApplicationLogMapper;
import org.apache.streampark.console.core.service.application.ApplicationLogService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
Expand All @@ -41,18 +40,17 @@ public class ApplicationLogServiceImpl extends ServiceImpl<ApplicationLogMapper,

@Override
public IPage<ApplicationLog> getPage(ApplicationLog applicationLog, RestRequest request) {
request.setSortField("option_time");
Page<ApplicationLog> page = MybatisPager.getPage(request);
LambdaQueryWrapper<ApplicationLog> queryWrapper = new LambdaQueryWrapper<ApplicationLog>()
.eq(ApplicationLog::getAppId, applicationLog.getAppId());
return this.page(page, queryWrapper);
return this.lambdaQuery()
.eq(ApplicationLog::getAppId, applicationLog.getAppId())
.orderByDesc(ApplicationLog::getOptionTime).page(page);
}

@Override
public void removeByAppId(Long appId) {
LambdaQueryWrapper<ApplicationLog> queryWrapper = new LambdaQueryWrapper<ApplicationLog>()
.eq(ApplicationLog::getAppId, appId);
this.remove(queryWrapper);
this.lambdaUpdate()
.eq(ApplicationLog::getAppId, appId)
.remove();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
import org.apache.streampark.console.core.service.application.SparkApplicationManageService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
Expand Down Expand Up @@ -147,9 +146,7 @@ public void revoke(SparkApplication appParam) {
@Override
public void remove(SparkApplication appParam) {
try {
baseMapper.delete(
new LambdaQueryWrapper<SparkApplicationBackup>()
.eq(SparkApplicationBackup::getAppId, appParam.getId()));
this.lambdaUpdate().eq(SparkApplicationBackup::getAppId, appParam.getId()).remove();
appParam
.getFsOperator()
.delete(
Expand All @@ -165,10 +162,10 @@ public void remove(SparkApplication appParam) {

@Override
public void rollbackSparkSql(SparkApplication appParam, SparkSql sparkSqlParam) {
LambdaQueryWrapper<SparkApplicationBackup> queryWrapper = new LambdaQueryWrapper<SparkApplicationBackup>()
SparkApplicationBackup backUp = this.lambdaQuery()
.eq(SparkApplicationBackup::getAppId, appParam.getId())
.eq(SparkApplicationBackup::getSqlId, sparkSqlParam.getId());
SparkApplicationBackup backUp = baseMapper.selectOne(queryWrapper);
.eq(SparkApplicationBackup::getSqlId, sparkSqlParam.getId())
.one();
ApiAlertException.throwIfNull(
backUp, "Application backup can't be null. Rollback spark sql failed.");
// rollback config and sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -500,8 +499,7 @@ public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long> appId

@Override
public void removeByAppId(Long appId) {
baseMapper.delete(
new LambdaQueryWrapper<ApplicationBuildPipeline>().eq(ApplicationBuildPipeline::getAppId, appId));
this.lambdaUpdate().eq(ApplicationBuildPipeline::getAppId, appId).remove();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.streampark.console.core.service.SparkEffectiveService;
import org.apache.streampark.console.core.service.application.SparkApplicationConfigService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
Expand Down Expand Up @@ -257,8 +256,7 @@ public synchronized String readTemplate() {

@Override
public void removeByAppId(Long appId) {
baseMapper.delete(
new LambdaQueryWrapper<SparkApplicationConfig>().eq(SparkApplicationConfig::getAppId, appId));
this.lambdaUpdate().eq(SparkApplicationConfig::getAppId, appId).remove();
}

private void fillEffectiveField(Long id, List<SparkApplicationConfig> configList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -184,20 +183,17 @@ public boolean checkAlter(SparkApplication appParam) {

@Override
public boolean existsByTeamId(Long teamId) {
return baseMapper.exists(
new LambdaQueryWrapper<SparkApplication>().eq(SparkApplication::getTeamId, teamId));
return this.lambdaQuery().eq(SparkApplication::getTeamId, teamId).exists();
}

@Override
public boolean existsByUserId(Long userId) {
return baseMapper.exists(
new LambdaQueryWrapper<SparkApplication>().eq(SparkApplication::getUserId, userId));
return this.lambdaQuery().eq(SparkApplication::getUserId, userId).exists();
}

@Override
public boolean existsBySparkEnvId(Long sparkEnvId) {
return baseMapper.exists(
new LambdaQueryWrapper<SparkApplication>().eq(SparkApplication::getVersionId, sparkEnvId));
return this.lambdaQuery().eq(SparkApplication::getVersionId, sparkEnvId).exists();
}

@Override
Expand Down Expand Up @@ -304,8 +300,7 @@ public AppExistsStateEnum checkExists(SparkApplication appParam) {
}

private boolean existsByAppName(String jobName) {
return baseMapper.exists(
new LambdaQueryWrapper<SparkApplication>().eq(SparkApplication::getAppName, jobName));
return this.lambdaQuery().eq(SparkApplication::getAppName, jobName).exists();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@

import org.apache.commons.lang3.StringUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
Expand Down Expand Up @@ -307,8 +306,7 @@ public boolean create(SparkApplication appParam) {
}

private boolean existsByAppName(String jobName) {
return baseMapper.exists(
new LambdaQueryWrapper<SparkApplication>().eq(SparkApplication::getAppName, jobName));
return this.lambdaQuery().eq(SparkApplication::getAppName, jobName).exists();
}

@SuppressWarnings("checkstyle:WhitespaceAround")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.streampark.console.core.service.ExternalLinkService;
import org.apache.streampark.console.core.service.application.FlinkApplicationManageService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -51,7 +50,7 @@ public class ExternalLinkServiceImpl extends ServiceImpl<ExternalLinkMapper, Ext

@Override
public void create(ExternalLink externalLink) {
if (!this.check(externalLink)) {
if (this.check(externalLink)) {
return;
}
externalLink.setId(null);
Expand All @@ -60,7 +59,7 @@ public void create(ExternalLink externalLink) {

@Override
public void update(ExternalLink externalLink) {
if (!this.check(externalLink)) {
if (this.check(externalLink)) {
return;
}
baseMapper.updateById(externalLink);
Expand Down Expand Up @@ -96,24 +95,21 @@ private void renderLinkUrl(ExternalLink link, FlinkApplication app) {
}

private boolean check(ExternalLink params) {
LambdaQueryWrapper<ExternalLink> queryWrapper = new LambdaQueryWrapper<ExternalLink>();
// badgeName and LinkUrl cannot be duplicated
queryWrapper.nested(
ExternalLink result = this.lambdaQuery().nested(
qw -> qw.eq(ExternalLink::getBadgeName, params.getBadgeName())
.or()
.eq(ExternalLink::getLinkUrl, params.getLinkUrl()));
if (params.getId() != null) {
queryWrapper.and(qw -> qw.ne(ExternalLink::getId, params.getId()));
}
ExternalLink result = this.getOne(queryWrapper);
.eq(ExternalLink::getLinkUrl, params.getLinkUrl()))
.and(params.getId() != null, qw -> qw.ne(ExternalLink::getId, params.getId())).one();

if (result == null) {
return true;
return false;
}
ApiAlertException.throwIfTrue(result.getBadgeName().equals(params.getBadgeName()),
String.format("The name: %s is already existing.", result.getBadgeName()));
ApiAlertException.throwIfTrue(result.getLinkUrl().equals(params.getLinkUrl()),
String.format("The linkUrl: %s is already existing.", result.getLinkUrl()));

return false;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
Expand Down Expand Up @@ -96,14 +95,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli

@Override
public List<FlinkCluster> listAvailableCluster() {
LambdaQueryWrapper<FlinkCluster> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(FlinkCluster::getClusterState, ClusterState.RUNNING);
return this.list(lambdaQueryWrapper);
return this.lambdaQuery().eq(FlinkCluster::getClusterState, ClusterState.RUNNING.getState()).list();
}

@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();
public ResponseResult<Void> check(FlinkCluster cluster) {
ResponseResult<Void> result = new ResponseResult<>();
result.setStatus(0);

// 1) Check name if already exists
Expand Down Expand Up @@ -290,9 +287,7 @@ public Boolean existsByClusterName(String clusterName, Long id) {

@Override
public Boolean existsByFlinkEnvId(Long flinkEnvId) {
LambdaQueryWrapper<FlinkCluster> lambdaQueryWrapper = new LambdaQueryWrapper<FlinkCluster>()
.eq(FlinkCluster::getVersionId, flinkEnvId);
return getBaseMapper().exists(lambdaQueryWrapper);
return this.lambdaQuery().eq(FlinkCluster::getVersionId, flinkEnvId).exists();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.streampark.console.core.mapper.FlinkEffectiveMapper;
import org.apache.streampark.console.core.service.FlinkEffectiveService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -41,26 +40,24 @@ public class FlinkEffectiveServiceImpl extends ServiceImpl<FlinkEffectiveMapper,

@Override
public void remove(Long appId, EffectiveTypeEnum effectiveTypeEnum) {
LambdaQueryWrapper<FlinkEffective> queryWrapper = new LambdaQueryWrapper<FlinkEffective>()
.eq(FlinkEffective::getAppId, appId)
.eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType());
baseMapper.delete(queryWrapper);
this.lambdaUpdate().eq(FlinkEffective::getAppId, appId)
.eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType())
.remove();
}

@Override
public FlinkEffective get(Long appId, EffectiveTypeEnum effectiveTypeEnum) {
LambdaQueryWrapper<FlinkEffective> queryWrapper = new LambdaQueryWrapper<FlinkEffective>()
.eq(FlinkEffective::getAppId, appId)
.eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType());
return this.getOne(queryWrapper);
return this.lambdaQuery().eq(FlinkEffective::getAppId, appId)
.eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType())
.one();
}

@Override
public void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id) {
LambdaQueryWrapper<FlinkEffective> queryWrapper = new LambdaQueryWrapper<FlinkEffective>()
long count = this.lambdaQuery()
.eq(FlinkEffective::getAppId, appId)
.eq(FlinkEffective::getTargetType, type.getType());
long count = count(queryWrapper);
.eq(FlinkEffective::getTargetType, type.getType())
.count();
if (count == 0) {
FlinkEffective effective = new FlinkEffective();
effective.setAppId(appId);
Expand All @@ -79,8 +76,6 @@ public void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id) {

@Override
public void removeByAppId(Long appId) {
LambdaQueryWrapper<FlinkEffective> queryWrapper =
new LambdaQueryWrapper<FlinkEffective>().eq(FlinkEffective::getAppId, appId);
this.remove(queryWrapper);
this.lambdaUpdate().eq(FlinkEffective::getAppId, appId).remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
Expand Down Expand Up @@ -61,12 +60,10 @@ public class FlinkEnvServiceImpl extends ServiceImpl<FlinkEnvMapper, FlinkEnv>
@Override
public FlinkEnvCheckEnum check(FlinkEnv version) {
// 1) check name
LambdaQueryWrapper<FlinkEnv> queryWrapper = new LambdaQueryWrapper<FlinkEnv>().eq(FlinkEnv::getFlinkName,
version.getFlinkName());
if (version.getId() != null) {
queryWrapper.ne(FlinkEnv::getId, version.getId());
}
if (this.count(queryWrapper) > 0) {
boolean exists = this.lambdaQuery().eq(FlinkEnv::getFlinkName,
version.getFlinkName())
.ne(version.getId() != null, FlinkEnv::getId, version.getId()).exists();
if (exists) {
return FlinkEnvCheckEnum.NAME_REPEATED;
}

Expand Down
Loading

0 comments on commit 1991d27

Please sign in to comment.