Skip to content

Latest commit

 

History

History
131 lines (86 loc) · 4.57 KB

集群模式任务执行.md

File metadata and controls

131 lines (86 loc) · 4.57 KB

集群模式任务执行

、ageiport-test-ext-cluster-spring-cloud-eureka-server

某一个主任务会被切分为很多子任务并行执行。开启集群模式后每一个接入AGEIPort的应用会自动组建集群,且应用之间相互隔离,子任务同时在一个应用的多个机器上执行

  • 单机执行时,一个主任务的所有子任务在某一个机器的线程池里执行,所有子任务执行完成后在当前机器执行Reduce完成主任务
  • 多机执行时,一个主任务的所有子任务分布在当前应用的所有机器的所有线程池里执行,所有子任务执行完成后在主节点执行Reduce完成主任务。 (主节点:主任务执行时会在集群中随机的选择一个节点作为此任务的主节点)

第1步,修改任务为多机模式

  • 方式一:ImportSpecification注解中通过executeType属性指定,设置为"CLUSTER"即为多机执行
  • 方式二:通过实现接口taskRuntimeConfig返回ExecuteType的值为"CLUSTER",可动态设置执行方式,此方式优先级比方式一高

第2步,添加集群管理插件

SpringCloud体系

AGEIPort提供了一个使用SpringCloud DiscoveryClient进行集群管理的插件

参考代码:ageiport-test-ext-cluster-spring-cloud-eureka

Maven 依赖如下:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>ageiport-ext-cluster-spring-cloud</artifactId>
</dependency>

初始化AGEIPort时传入DiscoveryClient和ConfigurableApplicationContext


import com.alibaba.ageiport.common.utils.NetUtils;
import com.alibaba.ageiport.ext.cluster.SpringCloudClusterOptions;
import com.alibaba.ageiport.ext.cluster.SpringCloudNode;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.AgeiPortOptions;
import com.alibaba.ageiport.processor.core.client.http.HttpTaskServerClientOptions;
import com.netflix.discovery.EurekaClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import java.util.HashMap;
import java.util.UUID;

@Slf4j
@Configuration
public class AgeiPortConfiguration {

    @Value("${spring.application.name}")
    private String applicationName;

    @Value("${ageiport.taskServerClientOptions.port}")
    private Integer port;
    @Value("${ageiport.taskServerClientOptions.endpoint}")
    private String endpoint;

    @Autowired
    private ConfigurableApplicationContext applicationContext;

    @Autowired
    private DiscoveryClient discoveryClient;


    @Bean
    public AgeiPort getAgeiPort() {
        log.info("AgeiPort init start.");

        //1.初始化AgeiPort实例
        AgeiPortOptions options = AgeiPortOptions.debug();

        //以下信息最好统一从Eureka 或 其他地方获取
        SpringCloudNode localNode = new SpringCloudNode();
        localNode.setGroup(applicationName);
        localNode.setHost(NetUtils.getInstanceIp());
        localNode.setId(UUID.randomUUID().toString());
        localNode.setApp(applicationName);
        localNode.setLabels(new HashMap<>());
        localNode.setLabels(new HashMap<>());

        SpringCloudClusterOptions clusterOptions = new SpringCloudClusterOptions();
        clusterOptions.setDiscoveryClient(discoveryClient);
        clusterOptions.setApplicationContext(applicationContext);
        clusterOptions.setLocalNode(localNode);

        options.setClusterOptions(clusterOptions);
        options.setApp(applicationName);

        HttpTaskServerClientOptions taskServerClientOptions = new HttpTaskServerClientOptions();
        taskServerClientOptions.setPort(port);
        taskServerClientOptions.setEndpoint(endpoint);
        options.setTaskServerClientOptions(taskServerClientOptions);

        AgeiPort ageiPort = AgeiPort.ageiPort(options);

        log.info("AgeiPort init finished.");
        return ageiPort;
    }
}

其他体系

AGEIPort的任务管理是一个SPI接口:com.alibaba.ageiport.processor.core.spi.cluster.ClusterManagerFactory。

可以参考 ageiport-test-ext-cluster-spring-cloud-eureka中代码实现上述接口,实现ZK、Nacos等注册中心的集群管理。

第3步,创建任务实例并执行

完成上述配置后,任务就会在应用集群所有机器并行执行了。