博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java kettle v2
阅读量:4214 次
发布时间:2019-05-26

本文共 4603 字,大约阅读时间需要 15 分钟。

package com.Nxin.BigData;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.filerep.KettleFileRepository;
import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.Job;
import org.pentaho.di.trans.Trans;
public class Kettle {
public static void runTran(String tranname,Map<String,String> Paters) {
try {
TransMeta transMeta = new TransMeta(tranname);
Trans trans = new Trans(transMeta);
 
if (Paters!=null){ //设置参数
 
for(Map.Entry<String, String> entry : Paters.entrySet()){
 
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
 
transMeta.setParameterValue(entry.getKey(),entry.getValue());
}
 
}
trans.prepareExecution(null);  
       trans.startThreads();  
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
throw new RuntimeException("传输过程中发生异常");
}
} catch (KettleException e) {e.printStackTrace();}
}
public static void runJob(String jobname,Map<String,String> Paters) {
try {
JobMeta jobMeta = new JobMeta(jobname, null);
Job job = new Job(null, jobMeta);
 
if (Paters!=null){//设置参数
 
for(Map.Entry<String, String> entry : Paters.entrySet()){
 
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
 
jobMeta.setParameterValue(entry.getKey(),entry.getValue());
 
}
 
}
job.start();  
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new RuntimeException("传输过程中发生异常");
}
} catch (KettleException e) { e.printStackTrace(); }
}  
  
    public static void runTran(String tranName,KettleFileRepository rep,Map<String,String> Paters) {
   
   
try { 
   
Trans trans = null; 
   
if (tranName != null && !"".equals(tranName)) { 
   
TransMeta transMeta = rep.loadTransformation(rep.getTransformationID(tranName, null), null);  // 转换对象 
   
trans = new Trans(transMeta); // 转换 
 
if (Paters!=null){ //设置参数
 
for(Map.Entry<String, String> entry : Paters.entrySet()){
 
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
 
transMeta.setParameterValue(entry.getKey(),entry.getValue());
}
 
}
   
trans.execute(null);   // 执行转换 
   
trans.waitUntilFinished();  // 等待转换执行结束 
   
if (trans.getErrors() > 0) { 
   
throw new RuntimeException("传输过程中发生异常"); 
   
   
   
} catch (Exception e) { e.printStackTrace(); } 
    }
   
public static void runJob(String jobName,KettleFileRepository rep,Map<String,String> Paters) {
 
 
 
 try { 
 
 Job job = null; 
 
 RepositoryDirectoryInterface directory = rep.loadRepositoryDirectoryTree();  
 
 if (jobName != null && !"".equals(job)) { 
 
 JobMeta jobMeta = rep.loadJob(jobName, directory, null, null); 
 
 jobMeta.activateParameters(); 
 
 job = new Job(rep,jobMeta); //必须加rep否则无法
 
 if (Paters!=null){//设置参数
 
 for(Map.Entry<String, String> entry : Paters.entrySet()){
 
  System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
 
  jobMeta.setParameterValue(entry.getKey(),entry.getValue());
 
 }
 
 }
 
 job.start(); // 执行作业 
 
 job.waitUntilFinished();  // 等待作业执行结束   
 
 if (job.getErrors() > 0) { 
 
   throw new RuntimeException("传输过程中发生异常"); 
 
 } 
 
 } 
 
 } catch (Exception e) { e.printStackTrace(); } 
 }
  
public static void runJob(String jobName,KettleFileRepository rep) {
runJob(jobName,rep,new HashMap<String,String>());
 }
public static void runTran(String tranName,KettleFileRepository rep) {
runTran(tranName,rep,new HashMap<String,String>());
 }
public static void runJob(String jobName) {
runJob(jobName,new HashMap<String,String>());
 }
public static void runTran(String tranName) {
runTran(tranName,new HashMap<String,String>());
 }
    public static KettleFileRepository FileRepository(String RepName,String path) {    
   
KettleFileRepositoryMeta repMeta = new KettleFileRepositoryMeta("", "", RepName, path); // 资源库元对象     
   
KettleFileRepository rep = new KettleFileRepository(); // 文件形式的资源库 
   
rep.init(repMeta); 
   
return rep;
    }
   
public static void main(String[] args) throws KettleException {
String path=new File("D:\\ETL\\Kettle").toURI().toString(),repName="Kettle";
   KettleEnvironment.init(); // 初始化 
   KettleFileRepository rep = FileRepository(repName, path); 
   
   Map<String,String> Paters = new HashMap<String,String>();
   Paters.put("client", "clientA");
//runTran("D:\\test.ktr"); //执行转换
//runTran("D:\\test.ktr",Paters); //执行转换  带参数
   
//runJob("D:\\test.kjb"); //执行作业
//runJob("D:\\test.kjb",Paters); //执行作业 带参数
   
   //runTran("test0103",rep); //执行转换 基于文件资源库
   //runTran("test0103",rep,Paters); //执行转换 基于文件资源库 带参数
   
   //runJob("test0103_2",rep); //执行作业 基于文件资源库
   //runJob("test0103_2",rep,Paters); //执行作业 基于文件资源库 带参数
   
   rep.disconnect();
}
  
 
}

转载地址:http://nefmi.baihongyu.com/

你可能感兴趣的文章
smp的负载均衡
查看>>
docker 性能debug
查看>>
页面锁
查看>>
页面的反向映射
查看>>
页面回收线程
查看>>
cpu子系统的组调度
查看>>
物理内存映射
查看>>
kernel中ksm特性
查看>>
dpdk中的timer
查看>>
cfs调度中进程最小运行时间是0.75ms
查看>>
常用调优命令
查看>>
optimistic_spin_queue
查看>>
dpdk中的timer子系统的使用
查看>>
lru的加入与删除
查看>>
dpdk 中的中断
查看>>
查询一个进程中有多少个线程
查看>>
vcpu和physical cpu 绑定
查看>>
osq对mutex的优化
查看>>
读写锁总结
查看>>
dpdk中的大页初始化
查看>>