★楼主 |
![]() |
性别:男 邮箱:1329289117@qq.com 管理员 |
【主题】:java远程调用kettle集群服务器说明与代码 |
Kettle远程任务执行 说明 KETTLE 提供了名为carte的web server 程序,也叫slave server ,启动该程序可以把主机作为kettle的运行服务器,可以接收其他kettle客户端发送过来的ETL任务。 此功能为ETL任务分布式执行提供了方便。可以在多个服务器上运行carte web server,在调用不同的ETL任务时,指定不同的目标服务器执行。
启动方法 kettle提供了carte.bat和carte.sh(linux)批处理脚本来启动子服务器,这种启动方式分为两种 使用主机号和端口号 Carte 127.0.0.1 8080 Carte 192.168.1.221 8081
使用配置文件 Carte /foo/bar/carte-config.xml Carte http://www.example.com/carte-config.xml 如果cluster schema中定义了Dynamic cluster选项,则必须使用配置文件来进行启动,当这个子服务器启动时,它需要向配置文件中“masters”中列出的主服务器列表中汇报其运行状态(通过调用主服务器的registerSlave服务),已达到动态地设置子服务器的目的。配置文件格式 <slave_config> <masters> <slaveserver> <name>master1</name> <hostname>localhost</hostname> <port>8080</port> <username>cluster</username> <password>cluster</password> <master>Y</master> </slaveserver> </masters> <report_to_masters>Y</report_to_masters> <slaveserver> <name>slave4-8084</name> <hostname>localhost</hostname> <port>8084</port> <username>cluster</username> <password>cluster</password> <master>N</master> </slaveserver> </slave_config>
这个配置文件主要包括以下几个节点 masters: 这里列出来的服务器是当前子服务器需要向其汇报状态的主服务器。如果当前这个子服务器是主服务器,则它将连接其它的主服务器来获得这个集群中的所有子服务器。 report_to_masters : 如果为Y,则表示需要向定义的主服务器发送消息以表明该从属服务器存在 slaveserver : 这里定义的就是当前carte实例运行时需要的子服务器的配置情况 这里定义的username和password在向主服务器调用Register服务时连接主服务器时提供的安全设置。 在 <slaveserver> 部分,你可以使用<network_interface> 参数,这个参数的优先级高于<hostname>参数 ,如果你的机器中安装有多个网卡,这个设置可以起作用。 程序启动 Kettle提供了org.pentaho.di.www.Carte类,你可以通过该类提供的函数来启动或者停止子服务器。 启动子服务器 SlaveServerConfig config = new SlaveServerConfig(hostname, port, false); Carte. runCarte(config);
停止子服务器 carte.getWebServer().stopServer(); 子服务器内幕 我们前面提到过子服务器实际上就是一个web server,该web server是基于Jetty这个嵌入式的开源servlet容器。 这个web server主要是提供转换运行的环境,另外一个重要的功能通过提供servlet来在客户端、主服务器和从属服务器之间进行通讯和控制。主服务器和从属服务器之间是通过httpClient来进行通讯的,通讯时传递的数据是xml格式。通过提供的servlet,可以实现启动、停止、暂停转换或者作业、获得转换或者作业的状态、注册子服务器、获得子服务器的列表等等 Kettle主要提供了以下的几种基于servlet的服务 GetRootServlet:获得Carte的根目录 GetStatusServlet:获得在服务器上运行的所有的转换和作业的状态 GetTransStatusServlet:获得在服务器上运行的某个指定的转换的每个步骤的运行状态。 PrepareExecutionTransServlet:让服务器上的某个指定的转换做好运行的准备。 StartTransServlet:执行服务器上的某个指定的转换 PauseTransServlet:暂停或者重新运行某一个转换 StopTransServlet:停止正在运行的转换 CleanupTransServlet:清理运行转换时的环境 AddTransServlet:向子服务器中增加某个转换。如果服务器中有正在运行或者准备运行的相同名字的转换,则抛出异常。 AllocateServerSocketServlet:分配一个新的socket端口号。这个端口号是基于你在定义cluster schema中设置的端口号,依次加1 StartJobServlet:执行服务器上某个指定的作业 StopJobServlet:停止正在运行的作业 GetJobStatusServlet:获得某个指定作业的状态 AddJobServlet:向当前的子服务器中添加某个作业。 RegisterSlaveServlet:注册某个服务器的信息。服务器信息包括子服务器是否活动、最新活动的时间、最新不活动的时间。这个在dynamic cluster中需要用到,由从属服务器向主服务器汇报当前状态。 GetSlavesServlet:获得集群中子服务器的信息 AddExportServlet:以zip文件的形式向caret服务器传递作业或者转换信息,并将信息加入到服务器中。
运行任务 在spoon中运行 在kettle的集成设计环境spoon中,你可以选择转换中的“运行”菜单项,或者按F9快捷键,弹出以下的窗口
这里有三个选项来决定转换是以什么方式来执行 本地执行: 转换或者作业将在你现在使用的JVM中运行。 远程执行: 允许你指定一个想运行转换的远程服务器。这需要你在远程服务器上安装Pentaho Data Integration(Kettle)并且运行Carte子服务器。 集群方式执行: 允许你在集群环境下执行作业或者转换 程序运行
jobMeta = repository.loadJob(jobName, directory, null, null); JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setRemoteServer(remoteSlaveServer); jobExecutionConfiguration.setRepository(repository); 代码中需要定义slaveserver ,然后在任务执行配置中设置slaveserver,然后把任务发送给slaveserver
程序实现 首先,需要在项目中引入执行kettle所需要的jar包:
实际应该引用更多的jar包,包括 lib/中的所有包 Libext/commons中的所有包等。
调用的源码如下
其中在172.16.100.28服务器上运行carte.bat 172.16.100.28 8765命令来启动slaveserver 其中调用方法如下 public static void main(String[] args) { //设置资源库信息 OracleRepositoryInfo rposInfo = new OracleRepositoryInfo(); rposInfo.setDbHostname("172.16.100.107"); rposInfo.setDbName("ora1"); rposInfo.setDbPort("1521"); rposInfo.setDbType("Oracle"); rposInfo.setDbUsername("kettle"); rposInfo.setDbPassword("kettle"); rposInfo.setRepoId("admin"); rposInfo.setRepoName("admin"); rposInfo.setRepoPassword("admin"); rposInfo.setRepoUsername("admin"); //新建job对象,初始化资源库 CallJob ctf = new CallJob(rposInfo); //设置slaveserver信息 SlaveServerInfo ssi = new SlaveServerInfo(); ssi.setServerHost("172.16.100.28"); ssi.setServerPort("8765"); ssi.setServerName("test"); ssi.setServerUsername("cluster"); ssi.setServerPassword("cluster"); //为job设置slaveserve ctf.initSlaveServer(ssi);
try { //ctf.executeJobLocal("WF_DD_T_AP_DA_RTU_DAY_STATS_QUANTITY"); //调用远程执行job ctf.executeJobRemote("WF_DD_T_AP_DA_RTU_DAY_STATS_QUANTITY");
} catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
|
发表评论 发表时间:『2019-02-15 15:09:53』 |