本文共 4603 字,大约阅读时间需要 15 分钟。
package com.Nxin.BigData; import java.io.File; import java.util.HashMap; import java.util.Map; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.repository.RepositoryDirectoryInterface; import org.pentaho.di.repository.filerep.KettleFileRepository; import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.job.JobMeta; import org.pentaho.di.job.Job; import org.pentaho.di.trans.Trans; public class Kettle { public static void runTran(String tranname,Map<String,String> Paters) { try { TransMeta transMeta = new TransMeta(tranname); Trans trans = new Trans(transMeta); if (Paters!=null){ //设置参数 for(Map.Entry<String, String> entry : Paters.entrySet()){ System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue()); transMeta.setParameterValue(entry.getKey(),entry.getValue()); } } trans.prepareExecution(null); trans.startThreads(); trans.waitUntilFinished(); if (trans.getErrors() > 0) { throw new RuntimeException("传输过程中发生异常"); } } catch (KettleException e) {e.printStackTrace();} } public static void runJob(String jobname,Map<String,String> Paters) { try { JobMeta jobMeta = new JobMeta(jobname, null); Job job = new Job(null, jobMeta); if (Paters!=null){//设置参数 for(Map.Entry<String, String> entry : Paters.entrySet()){ System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue()); jobMeta.setParameterValue(entry.getKey(),entry.getValue()); } } job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) { throw new RuntimeException("传输过程中发生异常"); } } catch (KettleException e) { e.printStackTrace(); } } public static void runTran(String tranName,KettleFileRepository rep,Map<String,String> Paters) { try { Trans trans = null; if (tranName != null && !"".equals(tranName)) { TransMeta transMeta = rep.loadTransformation(rep.getTransformationID(tranName, null), null); // 转换对象 trans = new Trans(transMeta); // 转换 if (Paters!=null){ //设置参数 for(Map.Entry<String, String> entry : Paters.entrySet()){ System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue()); transMeta.setParameterValue(entry.getKey(),entry.getValue()); } } trans.execute(null); // 执行转换 trans.waitUntilFinished(); // 等待转换执行结束 if (trans.getErrors() > 0) { throw new RuntimeException("传输过程中发生异常"); } } } catch (Exception e) { e.printStackTrace(); } } public static void runJob(String jobName,KettleFileRepository rep,Map<String,String> Paters) { try { Job job = null; RepositoryDirectoryInterface directory = rep.loadRepositoryDirectoryTree(); if (jobName != null && !"".equals(job)) { JobMeta jobMeta = rep.loadJob(jobName, directory, null, null); jobMeta.activateParameters(); job = new Job(rep,jobMeta); //必须加rep否则无法 if (Paters!=null){//设置参数 for(Map.Entry<String, String> entry : Paters.entrySet()){ System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue()); jobMeta.setParameterValue(entry.getKey(),entry.getValue()); } } job.start(); // 执行作业 job.waitUntilFinished(); // 等待作业执行结束 if (job.getErrors() > 0) { throw new RuntimeException("传输过程中发生异常"); } } } catch (Exception e) { e.printStackTrace(); } } public static void runJob(String jobName,KettleFileRepository rep) { runJob(jobName,rep,new HashMap<String,String>()); } public static void runTran(String tranName,KettleFileRepository rep) { runTran(tranName,rep,new HashMap<String,String>()); } public static void runJob(String jobName) { runJob(jobName,new HashMap<String,String>()); } public static void runTran(String tranName) { runTran(tranName,new HashMap<String,String>()); } public static KettleFileRepository FileRepository(String RepName,String path) { KettleFileRepositoryMeta repMeta = new KettleFileRepositoryMeta("", "", RepName, path); // 资源库元对象 KettleFileRepository rep = new KettleFileRepository(); // 文件形式的资源库 rep.init(repMeta); return rep; } public static void main(String[] args) throws KettleException { String path=new File("D:\\ETL\\Kettle").toURI().toString(),repName="Kettle"; KettleEnvironment.init(); // 初始化 KettleFileRepository rep = FileRepository(repName, path); Map<String,String> Paters = new HashMap<String,String>(); Paters.put("client", "clientA"); //runTran("D:\\test.ktr"); //执行转换 //runTran("D:\\test.ktr",Paters); //执行转换 带参数 //runJob("D:\\test.kjb"); //执行作业 //runJob("D:\\test.kjb",Paters); //执行作业 带参数 //runTran("test0103",rep); //执行转换 基于文件资源库 //runTran("test0103",rep,Paters); //执行转换 基于文件资源库 带参数 //runJob("test0103_2",rep); //执行作业 基于文件资源库 //runJob("test0103_2",rep,Paters); //执行作业 基于文件资源库 带参数 rep.disconnect(); } }转载地址:http://nefmi.baihongyu.com/