Kettle是一款國外開源的etl工具,純java編寫,數據抽取高效穩定(數據遷移工具)。Kettle中有兩種腳本文件,transformation和job,transformation完成針對數據的基礎轉換,job則完成整個工作流的控制。
// 初始化Kettle環境,加載配置
KettleEnvironment.init();
//文件路徑及文件名
String filename=”/foo/bar/trans.ktr”;
//解析transformation文件
TransMeta transmeta = new TransMeta(filename);
//加載transformation
Trans trans = new Trans(transmeta);
//在獨立線程中執行transformation,“null”可以用參數集替代
trans.execute(null);
//等待transformation執行完畢
trans.waitUntilFinished();
//獲取執行結果
Result result=trans.getResult();
// 初始化Kettle環境,加載配置
KettleEnvironment.init();
//文件路徑及文件名
String filename=”/foo/bar/jobn.kjb”;
//解析job文件,不使用資源庫
JobMeta jobmeta=new JobMeta(filename, null,null);
//加載job
Job job=new Job(null, jobmeta);
//在獨立線程中執行job
job.start();
//等待job執行完畢
job.waitUntilFinished();
//獲取執行結果
Result result=job.getResult();
// 初始化Kettle環境,加載配置
KettleEnvironment.init();
//資源庫類型插件初始化
PluginRegistry.init();
//資源庫對象實例化
Repository repository=null;
RepositoriesMeta repositoriesMeta = new RepositoriesMeta();
//讀取資源庫
repositoriesMeta.readData();
//遍歷資源庫
/*
for ( int i = 0; i < repositoriesMeta.nrRepositories(); i++ ) {
RepositoryMeta rinfo = repositoriesMeta.getRepository( i );
System.out.println( "#"+ ( i + 1 ) + " : " + rinfo.getName() + " [" + rinfo.getDescription() + "] id=" + rinfo.getId() );
}
*/
//根據資源庫名稱“1.0”查找資源庫
RepositoryMeta repositoryMeta = repositoriesMeta.findRepository( "1.0" );
// 獲取PluginRegistry實例
PluginRegistry registry = PluginRegistry.getInstance();
//加載資源庫
repository = registry.loadClass(RepositoryPluginType.class,repositoryMeta, Repository.class);
//資源庫初始化
repository.init(repositoryMeta);
//獲取資源庫路徑
RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree();
// JobMeta實例化
JobMeta jobmeta = new JobMeta();
//解析資源庫的job文件
jobmeta = repository.loadJob("job2", directory, null, null);
//job實例化
Job job = null;
//加載資源庫job
job = new Job(repository, jobmeta);
//在獨立線程中執行job
job.start();
//等待job執行完畢
job.waitUntilFinished();
//獲取執行結果
Result result=job.getResult();
日志輸出准備:
// FileLoggingEventListener實例化
FileLoggingEventListener fileLoggingEventListener=null;
//tran.log文件追加日志,true表示追加,false表示不追加
fileLoggingEventListener=new FileLoggingEventListener( "tran.log", true );
運行結果日志輸出:
//獲取結果集
Mapmap=result.getResultFiles();
//遍歷運行結果,並輸出日志文件
for(String key:map.keySet()){
//獲取ResultFile對象
ResultFile rf=map.get(key);
//創建日志通道
LogChannelInterface log = new LogChannel( "運行結果" );
//輸出日志到日志文件
log.logBasic(rf.getFile().getName().toString());