kettle轉換步驟工作組件
???????這里有四個類構成了這個kettle?步驟/節點,每一個類都有其特定的目的及所扮演的角色。
TemplateStep:??步驟類實現了StepInteface接口,在轉換運行時,它的實例將是數據實際處理的位置。每一個執行線程都表示一個此類的實例。
?
TemplateStepData: ?數據類用來存儲數據,當插件執行時,對于每個執行的線程都是唯一的。執行時里面存儲的東西主要包括數據庫連接、文件句柄、緩存等等其他東西。
?
TemplateStepMeta:?元數據類實現了StepMetaInterface接口。它的職責是保存和序列化特定步驟實例的配置,在我們這個例子中,它負責保存用戶設置的步驟名稱和輸出字段的名稱。
?
TemplateStepDialog:對話框類實現了該步驟與用戶交互的界面,它顯示一對話框,通過對話框用戶可以自己的喜好設定步驟的操作。對話框類與元數據類關系非常緊密,元數據類可以追蹤用戶的設置。
?
除了上面的代碼,還有一個plugin.xml,它設置好了插件的元數據,定義了步驟在kettle圖形工作臺中的顯示效果。為了更好的讓大家理解,我將利用這個步驟設計一個轉換流程并執行它。對于插件的開發,我們將從plugin.xml配置文件開始講起,然后講講元數據和對話框類,最后再講講步驟類和數據類。
?
書寫你自己的plugin.xml:
???????下面plugin.xml是我們這個插件里面的內容,它的功能是告訴kettle插件的元數據類,插件的名稱及描敘,還有需要加載的jar包。想要了解細節,可以查看文章:plug-in loading
<?xml version="1.0" encoding="UTF-8"?>
<plugin
???id="TemplatePlugin"
???iconfile="icon.png"
???description="Template Plugin"
???tooltip="Only there for demonstration purposes"
???category="Demonstration"
???classname="plugin.template.TemplateStepMeta">
???<libraries>
??????<library?name="templatestep.jar"/>
???</libraries>
</plugin>
?
ID:在kettle插件中必須全局唯一,因為被kettle序列化了,所以不要隨便改變
Iconfile: kettle中插件顯示的圖片,必須是png圖片
Description:插件描敘,顯示在樹形菜單里面。
Tooltip:樹形菜單中,鼠標滑過的時候顯示的提示信息
Category:插件顯示的父目錄
Classname:元數據類
Library:指明了插件需要加載所依賴的jar包
?
?
插件主要包含類介紹
一、元數據類:
?????????下面顯示了元數據的幾個關鍵的方法,注意元數據類里面用私有成員變量outputField 存儲了下一個步驟的輸出字段。
?// keep track of the step settings
public String getOutputField()
public void setOutputField(…)
public void setDefault()
// serialize the step settings to and from xml
public String getXML()
public void loadXML(…)
// serialize the step settings to and from a kettle repository
public void readRep(…)
public void saveRep(…)
// provide information about how the step affects the field structure of processed rows
public void getFields(…)
// perform extended validation checks for the step
public void check(…)
// provide instances of the step, data and dialog classes to Kettle
public StepInterface getStep(…)
public StepDataInterface getStepData()
public StepDialogInterface getDialog(…)?
TemplateStepMeta元數據類其實還有很多方面,不過大多被他的父類BaseStepMeta給默認實現了,這些默認的實現足以使我們的元數據類工作良好。想要了解更多,大家可以查查關于StepMetaInteface和BaseStepMeta的kettle官方文檔。
?
二、對話框類:
???????? TemeplateStepDialog為步驟實現了對話框的設置,kettle的用戶界面部件是使用的eclipse的swt框架,如果要開發比較復雜的對話框,你還必須熟悉大部分swt代碼。 Swt文檔大家可以從eclipse上的幫助菜單點擊在線獲取。在開發過程中,一個對話框對象擁有一個元數據對象,它記錄了應該從哪里讀取配置?應該把設置好的配置保存在哪里?它僅僅設置了輸出字段的名稱在我們這個模板步驟里面。一個繼承自BaseStepDialog特定的對話框類必須提供open(…)方法,這個方法必須返回這個步驟的名稱(發生改變時)或NULL(對話框被取消時)
?
?
三、步驟類:
?????????步驟類是實際的處理和轉換工作的地方。因為大部分樣本代碼已經由父類BaseStep提供了,大多數插件僅僅關注下面幾個特定的方法就行。
?// initialization and teardown
public boolean?init(…)
public void?dispose(..)
// processing rows
public void?run()
public boolean?processRow(..)
Init()方法在轉換執行前被kettle調用,轉換必須在所有步驟初始化成功時才真正執行。我們這個模板步驟沒有做任何事情,這里僅僅是拿出來讓大家了解了解。
dispose()方法是在步驟執行完之后執行(非轉換執行完哈),它完成資源的關閉,像文件句柄、緩存等等。
run()方法在實際處理記錄集的時候調用。里面其實是個調用processRow()方法處理記錄的小循環,當此步驟再沒有數據處理或轉換被停止時退出循環。
processRow()方法在處理單條記錄的時候被調用。這個方法通常通過調用getRow()來獲取需要處理的單條記錄。 這個方法如果有需要將會被阻塞,例如當此步驟希望放慢腳步處理數據時。processRow()隨后的流程將執行轉換工作并調用putRow()方法將處理過的記錄放到它的下游步驟。
注意:你的步驟可能會變記錄的結構,為了安全起見,一定要多熟悉包org.pentaho.di.core.row,特別是類RowMetaInterface和RowDataUtil。
??基類BaseStep對處理的記錄提供了第一次訪問的標識,在某些代碼只執行一次的時候可能非常有用,例如某個費時的查找,其實這就是緩存。
?
四、數據類:
?????????大多數步驟都需要臨時的緩沖或者臨時的數據。數據類就是這些數據合適的存放位置。每一個執行線程將得到其擁有的數據類實例,所以它能在獨立的空間里面運行。TemplateStepData繼承自BaseStepData,作為一個經驗法則,不要將non-constant字段放置BaseStepData類里面,如果你必須,請將它最好放置TemplateStepData數據類里面.
?
我們的步驟僅僅使用了一個數據對象來存儲記錄集輸出的結構,沒有用到其他的存儲介質,例如文件等等。
開發插件實例
1.???????在kettle-steps.xml下添加如下節點
?
?
<step?id="MyTest"> ???????<description>MyTest</description> ???????<classname>mytest.MyTestMeta</classname> ???????<category>插件測試</category> ???????<tooltip>測試</tooltip> ???????<iconfile>ui/images/TIP.png</iconfile> ????</step> |
?
?
?
?
?
?
?
?
?
?
2.???????創建插件類
?
MyTest類代碼
?
package?mytest; ? import?org.pentaho.di.trans.Trans; import?org.pentaho.di.trans.TransMeta; import?org.pentaho.di.trans.step.BaseStep; import?org.pentaho.di.trans.step.StepDataInterface; import?org.pentaho.di.trans.step.StepInterface; import?org.pentaho.di.trans.step.StepMeta; ? ? publicclass?MyTest?extends?BaseStep?implements?StepInterface { ? public?MyTest(StepMeta stepMeta, StepDataInterface stepDataInterface, ????????int?copyNr, TransMeta transMeta, Trans trans) { ????super(stepMeta, stepDataInterface, copyNr, transMeta, trans); ????//?TODO?Auto-generated constructor stub } ? } ? |
?
MyTestData類代碼
?
?
package?mytest; ? import?org.pentaho.di.trans.step.BaseStepData; import?org.pentaho.di.trans.step.StepDataInterface; ? publicclass?MyTestData?extends?BaseStepData?implements?StepDataInterface?{ ? } ? |
MyTestMeta類代碼
?
package?mytest; ? import?java.util.List; import?java.util.Map; ? import?org.pentaho.di.core.CheckResultInterface; import?org.pentaho.di.core.Counter; import?org.pentaho.di.core.database.DatabaseMeta; import?org.pentaho.di.core.exception.KettleException; import?org.pentaho.di.core.exception.KettleXMLException; import?org.pentaho.di.core.row.RowMetaInterface; import?org.pentaho.di.repository.ObjectId; import?org.pentaho.di.repository.Repository; import?org.pentaho.di.trans.Trans; import?org.pentaho.di.trans.TransMeta; import?org.pentaho.di.trans.step.BaseStepMeta; import?org.pentaho.di.trans.step.StepDataInterface; import?org.pentaho.di.trans.step.StepInterface; import?org.pentaho.di.trans.step.StepMeta; import?org.pentaho.di.trans.step.StepMetaInterface; import?org.w3c.dom.Node; ? publicclass?MyTestMeta?extends?BaseStepMeta?implements?StepMetaInterface { ? ????public?MyTestMeta() ????{ ???????super(); ????} ????@Override ????publicvoid?setDefault() { ???????//?TODO?Auto-generated method stub ? ????} ? ????@Override ????publicvoid?loadXML(Node stepnode, List<DatabaseMeta> databases, ???????????Map<String, Counter> counters)?throws?KettleXMLException { ???????//?TODO?Auto-generated method stub ? ????} ? ????@Override ????publicvoid?saveRep(Repository rep, ObjectId id_transformation, ???????????ObjectId id_step)?throws?KettleException { ???????//?TODO?Auto-generated method stub ? ????} ? ????@Override ????publicvoid?readRep(Repository rep, ObjectId id_step, ???????????List<DatabaseMeta> databases, Map<String, Counter> counters) ???????????throws?KettleException { ???????//?TODO?Auto-generated method stub ? ????} ? ????@Override ????publicvoid?check(List<CheckResultInterface> remarks, TransMeta transMeta, ???????????StepMeta stepMeta, RowMetaInterface prev, String[] input, ???????????String[] output, RowMetaInterface info) { ???????//?TODO?Auto-generated method stub ? ????} ? ????@Override ????public?StepInterface getStep(StepMeta stepMeta, ???????????StepDataInterface stepDataInterface,?int?copyNr, ???????????TransMeta transMeta, Trans trans) { ???????//?TODO?Auto-generated method stub ???????returnnull; ????} ? ????@Override ????public?StepDataInterface getStepData() { ???????//?TODO?Auto-generated method stub ???????returnnull; ????} ? } ? ? ? |
MyTestDialog類代碼
?
package?mytest; ? import?org.eclipse.swt.SWT; import?org.eclipse.swt.widgets.Display; import?org.eclipse.swt.widgets.Shell; import?org.pentaho.di.trans.TransMeta; import?org.pentaho.di.trans.step.BaseStepMeta; import?org.pentaho.di.trans.step.StepDialogInterface; import?org.pentaho.di.ui.trans.step.BaseStepDialog; ? publicclass?MyTestDialog?extends?BaseStepDialog?implements?StepDialogInterface { ??? ????public?MyTestDialog(Shell parent, Object in, ???????????TransMeta transMeta, String stepname) { ???????super(parent, (BaseStepMeta)in, transMeta, stepname); ???????//?TODO?Auto-generated constructor stub ????} ? ????@Override ????public?String open() { ???????Shell parent = getParent(); ???????Display?display?= parent.getDisplay(); ? ???????shell?=?new?Shell(parent, SWT.DIALOG_TRIM?| SWT.RESIZE?| SWT.MAX?| SWT.MIN); ???????props.setLook(shell); ???????shell.open(); ???????shell.setSize(200, 200); ???????shell.setText("hello"); ???????returnnull; ????} ? } ? ? |
除TestDialog類外的3個類的代碼都是在加入繼承基類和接口后更加eclipse插件提示自動生成的,也是我們自己需要實現的方法,TestDialog類中的MyTestDialog方法參數是固定的,根據生成的需要修改下,open函數是要我們自己實現的,運行效果如下
?
雙擊MyTest后彈出一個空白的窗口
?
調用use define java class?插件
以kettle中自帶的samples\transformations\User Defined Java Class - Calculate the date of Easter.ktr為例
public boolean?processRow(StepMetaInterface smi, StepDataInterface sdi)?throws?KettleException { ? Object[] r=getRow();//從阻塞隊列中獲取一個數據對象(一行數據記錄) ??if?(r==null)//如果沒有可獲取的數據,代表以處理完成 ? { ????setOutputDone();//設置處理完成標志 ????return?false;//退出循環 ? } ? ??if?(first) {//第一次進入循環 ??? //初始化動作 ???? first=false;//設置為非第一次循環 ? } ?/* *處理函數 */ ??logBasic(r[0].toString());//打印 ??putRow(data.outputRowMeta, r);//輸出到阻塞隊列 ??return?true; } |
use define java class?插件其實就是一個空插件,然后我們自己來實現processRow函數體,kettle通過while循環來調用processRow函數一行一行的處理數據,流程如下所示