簡介
IBM InfoSphere Streams( 以下簡稱 Streams) 是 IBM 於 2009 年推出的具有高可靠性,高可擴展性,分布式流計算平 台,前瞻性地把支持每秒 6G 或者每小時 21600G(相當於互聯網上所有網頁數量之和)數據處理能力作為系統設計的指標 ,實現了流數據“永恆分析”的能力。它包含一個運行時環境 ( 或者稱為實例 ) 和編程模型來簡化需要對大批量連續流數 據進行提取、過濾、分析以及關聯的應用程序的開發,能夠廣泛的應用於制造、零售、交通運輸、金融證券以及監管各行各 業的解決方案之中,使得實時快速做出商業決策的理念得以實現。
Streams 應用程序的總體結構如下圖所示,是由 一系列運算符通過輸入 / 輸出端口相互連接而構成的。為方便描述,我們首先給出 Streams 應用程序中的一些術語:
流:代表任何來自於數據源的連續數據流,數據的表現形式是由一系列屬性構成的元組。
運算符:流數據處理的功能組件,接收一個或多個輸入流,對流數據對應的元組和屬性進行處理,最終產生一個或多個 輸出流。
輸入端口:用於接收其他運算符輸出流的端口。
輸出端口:用於產生輸出流的端口。
處理元素(PE):Streams 應用程序被物理上分割為的一系列處理單位,一般以動態連接庫的形式存在。
運算符融合:一種把多個運算符融合產生一個 PE 的優化技術,以減少數據在多個運算符之間的傳輸代價。
作業 (Job): Streams 應用程序在 Streams 運行時間上的表現形式。
圖 1. Streams 應用程序結構
Java 編程語言作為第三代高級語言,自 20 世紀 90 年代誕生以來,以其使用簡單、完全面象對象、平台可移植性、健壯的沙盒安全機制、動態性,以及大量可用的開 發包等一系列優勢,在互聯網分布式環境下得到了極其廣泛的應用,幾乎涵蓋了互聯網應用的方方面面。相比較 C/C++ 程 序員,Java 程序員只需關注於業務邏輯的開發,而無需糾結於與系統相關的資源分配與釋放等細節,這些瑣碎的工作統一 由 Java 虛擬機 (JVM) 處理,從而極大地提升了開發效率。由於運行在虛擬機之上,Java 程序與原生的 C/C++ 程序相比 ,性能上可能略遜一籌,但虛擬機經過幾十年的發展與優化,這種差距在逐步縮小。尤其在 CPU 性能大幅提高的今天,這 種性能上的差異表現的越來越不明顯。
基於上述考慮,Streams 平台提供了使用 Java 編程語言來構建 Streams 應 用程序的框架,具體包括 Java 運算符模型描述文件以及 Java 運算符 API 兩部分。下圖是實現數據庫存儲功能 DBPersist 運算符的一個具體的例子,圖中左半邊 xml 通過一系列屬性提供了該運算符的模型定義,右半邊對應於 Java 類的實現,主要是元組的處理邏輯。在這個具體的例子當中,運算符的 Java 實現除了依賴於自己的 Jar 包之外,還依賴 於一些第三方的 Jar 包,這些依賴關系都需要在運算符模型中指定清楚。
圖 2. Java 運算符模型以及實 現
Java 運算符模型描述文件
Streams 使用 XSD (XML Schema Definition) 來對運算符模型進行描述,涵蓋:語法表達式、所需參數、輸入端口定義 、輸出端口定義、所依賴的類庫,以及集成開發環境中用於顯示的圖標,等。
下面給出的是上述 DBPersist 示例運 算符的模型描述文件,以供參考。
清單 1 DBPersist 示例運算符模型描述文件
<operatorModel xmlns="http://www.ibm.com/xmlns/prod/streams/spl/operator" xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation= "http://www.ibm.com/xmlns/prod/streams/spl/operator operatorModel.xsd"> <javaOperatorModel> <context> <description>JDBC based persistence operator</description> <executionSettings> <className>test.DBPersist</className> <vmArgs> <vmArg>-Xms256m</vmArg> <vmArg>-Xmx1024m</vmArg> </vmArgs> </executionSettings> <libraryDependencies> <library> <cmn:description>Java operator class library</cmn:description> <cmn:managedLibrary> <cmn:libPath>../../impl/lib/test_util.jar</cmn:libPath> </cmn:managedLibrary> </library> <library> <cmn:description>db2 jdbc type 4 library</cmn:description> <cmn:managedLibrary> <cmn:libPath>../../impl/lib/db2jcc4.jar</cmn:libPath> </cmn:managedLibrary> </library> </libraryDependencies> </context> <parameters> <parameter> <name>connectionURL</name> <description>the db connection configuration</description> <optional>false</optional> <type>rstring</type> <cardinality>1</cardinality> </parameter> <parameter> <name>userName</name> <description>the user name of the db connection</description> <optional>false</optional> <type>rstring</type> <cardinality>1</cardinality> </parameter> <parameter> <name>password</name> <description>the password of the db user</description> <optional>false</optional> <type>rstring</type> <cardinality>1</cardinality> </parameter> </parameters> <inputPorts> <inputPortOpenSet> <windowingMode>NonWindowed</windowingMode> <windowPunctuationInputMode>Oblivious</windowPunctuationInputMode> </inputPortOpenSet> </inputPorts> <outputPorts/> </javaOperatorModel> </operatorModel>
從中可看到,Java 運算符模型涵蓋四個方面的內容:
上下文(Context):用於指定運算符的描述信息,執行設置 ( 包括對應的 Java 實現類,JVM 參數 ) 以及類庫的依賴 關系等。
參數(Parameters):用於限定該運算符支持的參數,包括名稱,類型,是否可選,以及基數(參數能接受數據的個數 )。Streams 運行時將使用該信息進行運算符調用時的語法檢查。
輸入端口:用於定義輸入流的配置,包括窗口模式,輸入窗口結束符模式等。一般通過 inputPortOpenSet 來定義一個 集合來描述共同行為,避免不必要的重復配置。
輸出端口:用於定義輸出流的配置,包括輸出窗口結束符模式,預留結束符的輸入端口等。一般也是通過 outputPortOpenSet 來定義一個集合來描述共同行為,避免不必要的重復配置。
Java 運算符 API
除了 Java 運算符模型描述文件之外,Streams 還提供了基於 Java 編程語言的一系列元組處 理類庫以簡化 Java 運算符的開發。這些類庫統稱為 Java 運算符 API,絕大多數接口與類都位於 com.ibm.streams.operator 包中,物理上位於 Streams 安裝目錄之下的 lib 目錄。除此之外,Java 運算符 API 還依賴 於 Apache Commons Math 2.1 類庫 (commons-math-2.1.jar),其物理上位於 Streams 安裝目錄之下的 ext/lib 目錄。
當使用 Java 運算符 API 開發 Java 運算符時,上述提到的兩個 Jar 包需要顯示地包含在 classpath 或者 Streams Studio 的 build 環境中,以確保相應的類能被 Java 編譯器找到;但對於 Streams 運行時間,它們是自動進行 加載的,因此並不需要顯示指定為 Java 運算符的依賴類庫。另外,在 Streams 安裝目錄之下 lib 目錄中 com.ibm.streams.operator.samples.jar 包含的大量 Java 運算符源代碼示例,對於開始學習 Java 運算符的開發將是大 有裨益的。
下圖描述的是 Java 運算符 API 中主要的類與接口的關系,其中紅色標注的是 Streams2.0 版本新提 供的類或接口:
圖 3. Java 運算符 API
Operator 接口:定義了運算度的接口。Streams 運行時將調用該接口的初始化方法,之後對每個收到的元組與結束符調 用相應的處理方法,直至收到關閉請求。
AbstractOperator 類:Streams 提供的抽象類,提供了很多實用的方法以簡化運算符的開發,絕大多數情況下應該繼承 該抽象類來實現自己的運算符。
OperatorContext 接口:提供了對運算符運行時上下文的訪問功能,包括數據流以及參數的描述信息。除此之外,還提 供了用於 Java 異步操作的 ScheduledExectorService 和 ThreadFactory 的訪問。當需要使用多線程的情況之下,應該使 用 OpeartorContext 提供的 ScheduledExectorService 或者 ThreadFactory 以保證 Streams 運行時在處理關閉請求時能 夠對未完成的任務進行必要的清理工作。
StreamSchema 接口:提供了端口的模式定義,包括 Java 以及 SPL 語言對應的屬性名稱以及類型描述信息。
StreamingInput 接口:提供了對輸入端口以及相關模式的定義。
StreamingOutput 接口:提供了輸出端口的描述以及獲取新的輸出元組並且提交元組的方法。
Tuple 接口:提供了不可變通用元組對象的表示。
OutputTuple 接口:提供了可變通用輸出元組的表示。
Logging 接口:提供了 ERROR,INFO,DEBUG,TRACE 四個日志級別,以及 log 和 logException 等方法。
ProcessingElement 接口:提供了處理元素相關的信息查詢功能,如:輸出目錄,數據目錄,作業標識符,PE 標識符, 輸入輸出端口個數,運行模式等。
PEMetrics 接口:定義了 PE 層面上的輸入 / 輸出端口 Metric 枚舉類型,以及對應的獲取輸入 / 輸出 Metric 的方 法。
OperatorMetrics 接口:定義了運算符層面上的輸入 / 輸出端口 Metric 枚舉類型,以及對應的獲取輸入 / 輸出 Metric 的方法。
StreamWindow 接口:定義了窗口構成策略,如基於時間間隔的,基於計數器,結束符,或者數據的增量變化等,使得運 算符輸入端口接收到元組以此形成一定的邏輯集合,進行數據聚合運算。
Java 運算符生命周期與線程安全
使用 Java 運算符 API 實現的運算符是 com.ibm.streams.operator.Operator 接口的實例,必須提供一個無參數的構造函數,以便運算符能夠以反射的方式被實例化。實例化的運算符實例通過調用 initialize(OperatorContext) 方法來開始其生命周期。在所有端口准備就緒之前,運算符是不能接收和發送元組和結束符 的。一旦端口准備就緒,運算符將調用元組和結束符的處理方法,並在恰當的時候提交元組到輸出端口。allPortsReady 方 法用於通知運算符所有端口准備就緒以接收並且提交元組。當包含運算符的處理元素 (PE) 被關閉時,運算符將收到 shutdown 方法的調用,但在此期間元組和結束符的處理方法可能正在執行中。因此 shutdown 方法應該保證所有異步操作 的完成以及相關資源的釋放。
運算符的 process 方法將接收所有發送到輸入端口的元組,元組在經過處理之後能夠 被提交到輸出端口。其參數 StreamingInput 描述了元組實例以及相應的接收端口。在 process 方法中,能夠使用 Tuple 接口提供的類似 JDBC ResultSet 的方法通過屬性下標或者屬性名稱來獲取屬性值,例如,getInt, getString, getDouble, getObject。由於元組對象本身是不可變的,這樣即使經過多層次的調用,仍然能夠安全的獲取其屬性值。 OutputTuple 接口代表了輸出元組,繼承自 Tuple 接口,提供了額外的方法來進行屬性值的設置。OutputTuple 實例可通 過 StreamingOutput<OutputTuple>.newTuple 方法來創建,然後使用 StreamingOutput<OutputTuple>.submit 方法來進行提交。同樣,屬性值的設置方法也與 JDBC 類似,如,setInt, setString, setDouble, setObject。此外,還可使用 OutputTuple 的 assign 方法對其他元組中名稱和類型相同的屬性進 行批量復制。
下圖總結了 Java 運算符的生命周期以及方法的順序或者並發執行特性:
圖 4. Java 運算符 生命周期與線程安全
與 Java 運算符生命周期相關的大多數方法 (除 initialize 之外)都可能被不同的線程並發執行:
對於 allPortsReady 方法:在所有端口准備就緒之前,可能有元組或者結束符到達,因此 process 和 processPunctuation 方法將可能被調用;
對於 process 和 processPunctuation 方法:它們負責所有輸入端口的處理,而多個輸入端口的元組、結束符可能並發 到達;而且即便對同一個端口,其元組和結束符也可能並發到達;
對於 shutdown 方法:由於用戶能夠以按需的方式來提出關閉請求,例如,請求停止 PE 或者取消作業,因此關閉請求 可能出現在任何時候,包括處理元組,處理結束符,甚至端口准備階段當中。
為了確保運算符方法能夠被並發調用,Java 運算符的實現必須要滿足線程安全的要求以保證數據的一致性以及狀態的可 見性。根據功能以及性能的要求,應從以下幾方面進行考慮:
對象引用層面的同步:通過把生命周期相關的方法聲明為 synchronized 來保證運算符只對單一線程可見。這種方法勢 必對性能造成較大的影響,主要適合於對數據進行順序處理的情況。
在提交元組時進行同步或者加鎖:由於對運算符來說,元組的提交往往要比處理花費多得多的時間,因此往往不是推薦 的做法。這種方法尤其對使用了運算符融合優化技術的應用程序影響明顯,此時元組的提交將導致下游運算符 process 方 法的直接調用,從而導致負載較重運算符處理的阻塞。
除了 Java 提供的語言層面上的 synchronized 關鍵字,JDK1.5 之後的版本還提供了很多用於多線程編程的包,如: java.util.concurrent, java.util.concurrent.atomic 和 java.util.concurrent.locks,進行更加細粒度的並發控制。 基本的思想是只有在需要的時候進行數據的同步和加鎖操作。
Java 運算符 Metrics
前面在 Java 運算符 API 部分提到的 Metrics,對於了解 Streams 應用程序的運行狀況以及系統性能瓶頸都很有幫助 。我們在此將做進一步的探討。
Metrics 是一些 Streams 運行時維護的統計項,能夠被外部程序讀取並對感興趣的統計項即進行監控。Streams 運行時 提供了兩類 Metrics:系統 Metrics 以及用戶自定義 Metrics。系統 Metrics 是 Streams 運行時預先定義維護的,而用 戶自定義 Metrics 是由運算符負責創建和維護的。不論是系統 Metrics 還是用戶自定義 Metrics 都是 com.ibm.streams.operator.metrics.Metric 的對象實例。此外,Metric.Kind 枚舉類型還提供了 Metric 統計項的歸類功 能,大體上分為三類:計數器(COUNTER), 側度(GAUGE)和時間 (TIME)。這些信息能夠為外部監控工具提供必要的信息以 便數據的展示。
Java 運算符 Metics 可通過 com.ibm.streams.operator.metrics.OperatorMetrics 接口進行訪問,其引用可通過 OperatorContext.getMetrics() 獲取。OperatorMetrics 接口提供了使用 InputPortMetric 和 OutputPortMetric 枚舉類 型獲取系統 Metrics 的方法;對於用戶自定義的 Metrics,則能通過調用 createCustomMetric() 方法直接創建或者通過 在運算符模型中以聲明的方式由 Streams 運行時間自動創建。對於這兩種方式創建的用戶自定義 Metrics 都能通過 getCustomMetric() 方法來獲取其信息。類似地,PE 級別的系統 Metrics 能通過 com.ibm.streams.operator.metrics.PEMetrics 接口進行訪問,其引用可通過 OperatorContext.getPE().getMetrics() 來獲取。
此外,Streams Studio 作為 Streams 應用程序的集成開發環境,還提供了 Metrics 可視化插件,可對 Streams 運行 時,作業,PE,端口進行多層次的 Metrics 監控,總共提供了三種類型的視圖以方便問題的定位,分別是:數據流視圖, 警報視圖以及細節視圖。
數據流視圖是適用於 Streams 實例和作業,它顯示的是包含所有端口的列表,具體包括端口當前收集期間、前一個收集 期間收到元組的數量以及兩個相鄰期間的差值;
圖 5. Metrics 數據流視圖
警報視圖也只適用於 Streams 實例和作業,它顯示的是當前所有警報的列表,提供了單一視圖來顯示當前所有的警報;
圖 6. Metrics 警報視圖
細節視圖適用於所有收到 Metrics 的元素,包括 Streams 實例,作業,PE 以及端口等,它提供的針對不同元素比較細 節的信息,具體信息如下圖所示
圖 7. Metrics 實例層面的細節視圖
圖 8. Metrics 作業層面的細節視圖
圖 9. Metrics PE 層面的細節視圖
圖 10. Metric 輸入端口層面的細節視圖
圖 11. Metric 輸出端口層面的細節視圖
Java 運算符調試
在完成 Java 運算符的開發工作之後,往往需要調試的方式來解決代碼中存在的一些問題。Java 運算符的調試是通過 Java 標准的 JPDA 機制來實現的,其原理是設置 JVM 以接受來自客戶端的調試請求。目前 Streams 平台只能運行在 Redhat 企業版 5.5 版本以及更高版本之上,並且依賴於 IBM JDK1.6。以下假設 IBM JDK1.6 安裝在 /opt/ibm/java- x86_64-60 目錄之下。按以下兩個步驟完成 Java 運算符的調試:
建立 JVM 調試服務器:
把 JVM 的平台類庫包含在環境變量 $LD_LIBRARY_PATH 之中,命令如下:
export LD_LIBRARY_PATH= /opt/ibm/java-x86_64-60/jre/lib/amd64:$/opt/ibm/java-x86_64-60/jre/lib/amd64
修改 Java 運算符的模型描述文件,在 vmArgs 元素之下添加以下子元素:
<vmArgs> <vmArg>-Xdebug</vmArg> <vmArg> -Xrunjdwp:transport=dt_socket,address=127.0.0.1:8765, server=y,suspend=y </vmArg> </vmArgs>
上述配置將使 JVM 處於掛起狀態,直至收到客戶端發出的調試請求,並且該配置只影響該運算符所在 JVM,對於運行在 不同 JVM 之上的其他運算符並無影響。要實現運行在多個 JVM 上的多個運算符的調試可通過設置不同的偵聽端口來實現。
以 Standalone 模式編譯包含 Java 運算符的 Streams 應用程序。目前這是唯一支持 Java 調試的模式
使用 Eclipse 調試 Java 運算符
通過 Eclipse 的 Run/Debug Configuration... 菜單在 Remote Java Application 項下創建一個新的啟動配置,如下 圖所示,設置 Connection Type 為 Standard(Socket Attach), Host 為 localhost, Port 為 8765;同時也需要在 Source 頁面下設置源代碼的搜索路徑。設置完成之後,按 Apply 按鈕保存設置。
圖 12. Eclipse 中創建調試啟動配 置
以 Standalone 模式啟動 Streams 應用程序,需要指定 logLevel 為 trace 級別,以便能顯示更為詳盡的控制信息
在 Java 運算符的源代碼中設置斷點
當注意到 Standalone 應用程序的 JVM 處於掛起狀態時,打開 Debug Configuration 對話框,並選擇第一步創建的啟 動配置,按下 Debug 按鈕。之後當斷點的條件滿足時,Eclipse 將停在所設的斷點處
結束語
Streams 作為 IBM 新一代流計算平台,由於其具有的高可靠性、高可擴展性、分布式、高效性等特點,特別適用於制造 、零售、交通運輸、金融證券以及監管等各行各業的解決方案之中。通過對大量異構流數據的實時動態分析,能夠及時地捕 捉到各種機會,從而使得實時快速商業決策的理念得以實現。基於 Java 編程語言完全面向對象,高效的開發效率等諸多優 勢,Streams 平台對其構建 Streams 應用程序提供了完整的框架,包括 Java 運算符模型描述文件以及 Java 運算符 API 兩部分。Java 運算符模型描述文件定義了 Java 運算符的元數據,包括參數,輸入端口,輸出端口等,Streams 編譯器將 通過其校驗 Java 運算符調用的合法性並設定相應的執行環境;Java 運算符 API 定義了使用 Java 語言來開發運算符一套 完整的編程接口,涉及運算符的生命周期以及線程安全,同時也提供了接口用於創建以及訪問運算符的 Metrics,這些統計 信息對於應用程序的運行狀況以及系統性能瓶頸分析都大有裨益。最後,提供了 Java 運算符的調試方法,以幫助快速定位 代碼中存在的問題。