【主题】:java远程调用kettle集群服务器说明与代码

Kettle远程任务执行
远程服务器

说明

KETTLE 提供了名为carteweb server 程序,也叫slave server ,启动该程序可以把主机作为kettle的运行服务器,可以接收其他kettle客户端发送过来的ETL任务。

此功能为ETL任务分布式执行提供了方便。可以在多个服务器上运行carte web server,在调用不同的ETL任务时,指定不同的目标服务器执行。

 

启动方法

kettle提供了carte.batcarte.shlinux)批处理脚本来启动子服务器,这种启动方式分为两种

使用主机号和端口号

     Carte 127.0.0.1 8080

     Carte 192.168.1.221 8081

 

使用配置文件

Carte  /foo/bar/carte-config.xml

Carte http://www.example.com/carte-config.xml

如果cluster schema中定义了Dynamic cluster选项,则必须使用配置文件来进行启动,当这个子服务器启动时,它需要向配置文件中“masters”中列出的主服务器列表中汇报其运行状态(通过调用主服务器的registerSlave服务),已达到动态地设置子服务器的目的。配置文件格式

  <slave_config>

  <masters>

    <slaveserver>

      <name>master1</name>

      <hostname>localhost</hostname>

      <port>8080</port>

      <username>cluster</username>

      <password>cluster</password>

      <master>Y</master>

    </slaveserver>

  </masters>

  <report_to_masters>Y</report_to_masters>

  <slaveserver>

    <name>slave4-8084</name>

    <hostname>localhost</hostname>

    <port>8084</port>

    <username>cluster</username>

    <password>cluster</password>

    <master>N</master>

  </slaveserver>

</slave_config>

 

这个配置文件主要包括以下几个节点

masters: 这里列出来的服务器是当前子服务器需要向其汇报状态的主服务器。如果当前这个子服务器是主服务器,则它将连接其它的主服务器来获得这个集群中的所有子服务器。

report_to_masters : 如果为Y,则表示需要向定义的主服务器发送消息以表明该从属服务器存在

slaveserver : 这里定义的就是当前carte实例运行时需要的子服务器的配置情况

这里定义的usernamepassword在向主服务器调用Register服务时连接主服务器时提供的安全设置。 <slaveserver> 部分,你可以使用<network_interface> 参数,这个参数的优先级高于<hostname>参数 ,如果你的机器中安装有多个网卡,这个设置可以起作用。

程序启动

   Kettle提供了org.pentaho.di.www.Carte类,你可以通过该类提供的函数来启动或者停止子服务器。

启动子服务器

SlaveServerConfig config = new SlaveServerConfig(hostname, port, false);

Carte. runCarte(config);

 

停止子服务器

carte.getWebServer().stopServer();

子服务器内幕

    我们前面提到过子服务器实际上就是一个web server,web server是基于Jetty这个嵌入式的开源servlet容器。

这个web server主要是提供转换运行的环境,另外一个重要的功能通过提供servlet来在客户端、主服务器和从属服务器之间进行通讯和控制。主服务器和从属服务器之间是通过httpClient来进行通讯的,通讯时传递的数据是xml格式。通过提供的servlet,可以实现启动、停止、暂停转换或者作业、获得转换或者作业的状态、注册子服务器、获得子服务器的列表等等

Kettle主要提供了以下的几种基于servlet的服务

GetRootServlet:获得Carte的根目录

GetStatusServlet:获得在服务器上运行的所有的转换和作业的状态

GetTransStatusServlet:获得在服务器上运行的某个指定的转换的每个步骤的运行状态。

PrepareExecutionTransServlet:让服务器上的某个指定的转换做好运行的准备。

StartTransServlet:执行服务器上的某个指定的转换

PauseTransServlet:暂停或者重新运行某一个转换

StopTransServlet:停止正在运行的转换

CleanupTransServlet:清理运行转换时的环境

AddTransServlet:向子服务器中增加某个转换。如果服务器中有正在运行或者准备运行的相同名字的转换,则抛出异常。

AllocateServerSocketServlet:分配一个新的socket端口号。这个端口号是基于你在定义cluster schema中设置的端口号,依次加1

StartJobServlet:执行服务器上某个指定的作业

StopJobServlet:停止正在运行的作业

GetJobStatusServlet:获得某个指定作业的状态

AddJobServlet:向当前的子服务器中添加某个作业。

RegisterSlaveServlet:注册某个服务器的信息。服务器信息包括子服务器是否活动、最新活动的时间、最新不活动的时间。这个在dynamic cluster中需要用到,由从属服务器向主服务器汇报当前状态。

GetSlavesServlet:获得集群中子服务器的信息

AddExportServlet:以zip文件的形式向caret服务器传递作业或者转换信息,并将信息加入到服务器中。

 

运行任务

spoon中运行

 kettle的集成设计环境spoon中,你可以选择转换中的“运行”菜单项,或者按F9快捷键,弹出以下的窗口

这里有三个选项来决定转换是以什么方式来执行

本地执行: 转换或者作业将在你现在使用的JVM中运行。

远程执行: 允许你指定一个想运行转换的远程服务器。这需要你在远程服务器上安装Pentaho Data IntegrationKettle)并且运行Carte子服务器。

集群方式执行: 允许你在集群环境下执行作业或者转换

程序运行

 

       jobMeta = repository.loadJob(jobName, directory, null, null);                                                            

       JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();

 

       jobExecutionConfiguration.setRemoteServer(remoteSlaveServer);

       jobExecutionConfiguration.setRepository(repository);

代码中需要定义slaveserver ,然后在任务执行配置中设置slaveserver,然后把任务发送给slaveserver

 

 

程序实现

首先,需要在项目中引入执行kettle所需要的jar包:

实际应该引用更多的jar包,包括

lib/中的所有包

Libext/commons中的所有包等。

 

调用的源码如下

 

其中在172.16.100.28服务器上运行carte.bat 172.16.100.28 8765命令来启动slaveserver

其中调用方法如下

    public static void main(String[] args) {

//设置资源库信息

       OracleRepositoryInfo rposInfo = new OracleRepositoryInfo();

       rposInfo.setDbHostname("172.16.100.107");

       rposInfo.setDbName("ora1");

       rposInfo.setDbPort("1521");

       rposInfo.setDbType("Oracle");

       rposInfo.setDbUsername("kettle");

       rposInfo.setDbPassword("kettle");

       rposInfo.setRepoId("admin");

       rposInfo.setRepoName("admin");

       rposInfo.setRepoPassword("admin");

       rposInfo.setRepoUsername("admin");

//新建job对象,初始化资源库

       CallJob ctf = new CallJob(rposInfo);

//设置slaveserver信息

       SlaveServerInfo ssi = new SlaveServerInfo();

       ssi.setServerHost("172.16.100.28");

       ssi.setServerPort("8765");

       ssi.setServerName("test");

       ssi.setServerUsername("cluster");

       ssi.setServerPassword("cluster");

//job设置slaveserve

       ctf.initSlaveServer(ssi);

 

       try {

           //ctf.executeJobLocal("WF_DD_T_AP_DA_RTU_DAY_STATS_QUANTITY");

    //调用远程执行job      ctf.executeJobRemote("WF_DD_T_AP_DA_RTU_DAY_STATS_QUANTITY");

 

       } catch (Exception e) {

           // TODO Auto-generated catch block

           e.printStackTrace();

       }

    }

 

 

 发表评论     发表时间:『2019-02-15 15:09:53』


扫描二维码关注网站最新动态