| Revision History | |
|---|---|
| Revision 1.0 | 2008/09/27 |
Abstract
這篇文章主要是介紹了JBoss ESB的基礎架構,以及通過(guò)一個(gè)簡(jiǎn)單的例子來(lái)認識JBoss ESB.JBossESB是JBoss推出的ESB的實(shí)現,也是JBoss的SOA產(chǎn)品的基礎.首先大家對于ESB的定義有很多的不同,我個(gè)人更喜歡把ESB看作是系統集成的一個(gè)平臺. JBossESB是一個(gè)基于消息的中間件(Message Oriented). 在這篇文章中,我們只是看待ESB中的一個(gè)很基礎部份,也就是怎么從Endpoint A發(fā)送信息給ESB的服務(wù)S1,然后再有S1發(fā)送信息到Endpoint B去調用服務(wù). 至于其他的Router(路由)或者Data Transformation(數據轉換),這里贊不介紹.
我們就假設一個(gè)簡(jiǎn)單的系統集成場(chǎng)景來(lái)開(kāi)始闡述JBossESB的設計和概念.
A系統(Endpoint A) – Message -> ESB -> – Message --> B系統 (Endpoint B)
所以,如果簡(jiǎn)單的對于JBossESB定義的話(huà),我們可以定義以下三個(gè)概念:
JBossESB 是一個(gè)面向服務(wù)(Service Oriented)的架構,所以在ESB內部的要么是一個(gè)Service, 要么是一個(gè)Message. 這里的Service就是指具有實(shí)現業(yè)務(wù)邏輯的服務(wù),也可以是一個(gè)實(shí)現路由(Router),或者數據轉化(Transformation)的服務(wù). 就拿上面的這個(gè)例子,系統A發(fā)送一個(gè)Message給 ESB的一個(gè)服務(wù),我們假設叫做S1, 那么S1收到Message后,做一些處理,轉到S2的服務(wù),S2再把處理后的結果發(fā)送給系統B. 這樣就實(shí)現了A和B之間通過(guò)ESB的通信.
System A -> message -> S1 -> S2 ->.... -> message -> System B
那么在ESB內部是怎么去表達一個(gè)服務(wù)呢?這里引入了EndpointReference的概念,簡(jiǎn)稱(chēng)EPR. 有了服務(wù)之后,服務(wù)之間是通過(guò)什么樣的傳輸層(比如JMS, FTP, HTTP)來(lái)通信呢? 所以ESB的內部也引入了Courier的API, 來(lái)統一抽象傳輸層. 剛我們也看到了,ESB的內部無(wú)非就是一系列的服務(wù), 但是我們怎么來(lái)保存/注冊這些服務(wù)的呢? JBossESB是使用jUDDI來(lái)注冊和保存這些服務(wù)元數據的.
在要了解和運行JBossESB之前,我們最好了解下JBossESB中比較重要的幾個(gè)概念
ESB內部所交流/傳遞的都是消息,所以可見(jiàn)這個(gè)消息格式的重要性. 在JBossESB中, 定義了一個(gè)Message的對象,它是有以下幾個(gè)部分構成的.
- Header (用來(lái)存放From, To, Reply-to等Addressing的信息).
- Body (存放信息主體)
- Attachment (用來(lái)存放附件等)
- Properties
- Context (主要是存放一些類(lèi)似事務(wù)的信息等)
目前在Body里面一般來(lái)說(shuō)存放兩種格式的數據,一個(gè)是串行化數據(Serialized Object ),另外一個(gè)是XML文件,比如常見(jiàn) 的SOAP的payload. 在ESB中,還有兩個(gè)定義,一個(gè)叫ESB-aware Message, 我們上面所講的Message就是ESB-aware Message, 正如名字說(shuō)講的,它是屬于ESB內部的Message對象. 還有個(gè)叫 ESB unaware Message,也就是說(shuō)他同樣也是一個(gè)message,比如SOAP Message,但是如果把soap message直接讓ESB來(lái)處理,是處理不了的,所以呢? 經(jīng)常的在Listener 監聽(tīng)的端口會(huì )有個(gè)Adapter (在JBossESB里叫做Gateway)來(lái)負責把ESB-unaware message 轉成 ESB-aware message.
ESB的內部服務(wù)是用EPR來(lái)映射的. ESB的內部服務(wù)可以是任何的一個(gè)服務(wù),比如說(shuō)一個(gè)FTP的服務(wù),一個(gè)基于文件系統的服務(wù)等等, 那么這個(gè)時(shí)候我們就需要用EPR來(lái)對這個(gè)服務(wù)進(jìn)行描述.在EPR這個(gè)類(lèi)里,主要是描述了這個(gè)服務(wù)的URI,以及所必須的一些元數據. 目前在JBossESB中提供的EPR有: FileEPR,EmailEPR,FTPEPR, HibernateEPR等等. 我們在注冊服務(wù)的時(shí)候,是將EPR的信息注冊到UDDI的容器里, 但不僅僅是EPR, 還有一些輔助信息,比如定義服務(wù)的category-name, service-name. 這些將在后面繼續介紹.
Listener的作用是負責監聽(tīng)端口,一般來(lái)說(shuō),客戶(hù)端是發(fā)送消息到Listener,然后有Listener把消息傳遞給ESB, 我們可以把Listener看做是inbound router. 在JBossESB中,我們是叫GatewayListener, 它一般來(lái)說(shuō)做兩件事情.
目前ESB支持的Gateway有: JMSGatewayListener, JBossRemotingGatewayListener, FileGatewayListener等等. 在MessageComposer這個(gè)類(lèi)里的compose/decompose方法來(lái)負責ESB-unaware信息和ESB-aware信息的轉化.
public interface MessageComposer<T> {
/**
* Set the composer's configuration
*/
public void setConfiguration(ConfigTree config) throws ConfigurationException;
/**
* Compose an ESB "aware" message from the supplied message payload.
* Implementations need to construct and populate an ESB Message from the
* messagePayload instance.
*/
public Message compose(T messagePayload) throws MessageDeliverException;
/**
* Decompose an ESB "aware" message, extracting and returning the message payload.
*/
public Object decompose(Message message, T originalInputMessagePayload) throws MessageDeliverException;
}
Courier的作用就是負責傳輸,正如以下接口所顯示:
public interface Courier extends DeliverOnlyCourier{public boolean deliver(Message message) throwsCourierException,MalformedEPRException;}目前實(shí)現的Transport有:JmsCourier, InVMCourier, HibernateCourier, FileCourier等傳輸層,在ESB內部是通過(guò)EPR來(lái)跟Courier進(jìn)行關(guān)聯(lián)的.
在JBossESB中,我們可以把ActionProcessingPipeline類(lèi)看作是Message Filter, 每個(gè)Message都會(huì )經(jīng)過(guò) ActionProcessingPipeline的處理. 里面有這么個(gè)方法:
public boolean process(final Message message)
而actionProcessingPipeline又是由Action (ActionPipelineProcessor)來(lái)組成的. 我們可以把Action看成是Interceptor, 由它來(lái)實(shí)現具體的業(yè)務(wù)邏輯,可以是路由,又或者數據轉化功能等等. 如果你用JBossESB的話(huà),那么Action是一個(gè)非常重要的部分,我們來(lái)看下它所定義的接口.
public interface ActionPipelineProcessor extends ActionLifecycle{public Message process(final Message message) throws ActionProcessingException ;public void processException(final Message message, final Throwable th) ;public void processSuccess(final Message message) ;}一般來(lái)說(shuō)自定義的Action把具體的業(yè)務(wù)邏輯放在Process的方法里,當然了,你也可以定義相對應的Exception處理方法,通過(guò)實(shí)現processException.在ESB代碼中,自定義的Action可以用繼承AbstractActionPipelineProcessor 或者 AbstractActionLifecycle.
在有些情況下,你需要一些全局的Interceptor,我們之前說(shuō)的Action,可以理解成是每個(gè)service的interceptor,但是如果我需要使用log來(lái)記錄一個(gè)消息在各個(gè)service之間傳輸的日志, 又或者想記錄消息進(jìn)入某個(gè)service的時(shí)間和退出的時(shí)間. 那么在JBoss ESB中就有Filter的概念. 如果你要實(shí)現自己的Filter,需要繼承InputOputFilter類(lèi).
public class InputOutputFilter
{
/**
* Called as the message flows towards the transport.
*/
public Message onOutput (Message msg, Map<String, Object> params) throws CourierException
{
return msg;
}
/**
* Called immediately after the message is received from the transport.
*/
public Message onInput (Message msg, Map<String, Object> params) throws CourierException
{
return msg;
}
}
寫(xiě)完自己的Filter后,你需要在$JBossESB/server/config (e.g. default)/deploy/jbossesb.sar/jbossesb-properties.xml里面增加filter. 需要注意的是,在這里配置的filter是對所有的esb包都起作用,是個(gè)全局的變量.
onInput方法總是在從傳輸層獲取到Message后,第一步所做的工作;類(lèi)似的, onOutput是給傳輸層傳遞前所做的最后一步工作. 你可以在TwoWayCourierImpl中看到這段代碼的調用.
對于客戶(hù)端調用來(lái)說(shuō),EPR, Courier等都太底層了.所以如果對此進(jìn)行了封裝. 我們對每個(gè)service加以service-category和service-name的屬性. 所以如果你想發(fā)送一個(gè)ESB的內部Message,你只需要知道目標service的service-category和service-name,然后就可以調用ServiceInvoker來(lái)調用服務(wù). 不需要去使用Courier等底層的API, 另外用ServiceInvoker還可以支持fail-over等特性.
public class ServiceInvoker {
public ServiceInvoker(String serviceCategory, String serviceName) throws MessageDeliverException {
this(new Service(serviceCategory, serviceName));
}
public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException, FaultMessageException
public void deliverAsync(Message message) throws MessageDeliverException
}
為了更好的來(lái)解釋JBossESB, 最好的一個(gè)方法就是試下JBossESB自帶的例子,這里我們先以helloworld_action的例子來(lái)講解.
到這里,你已經(jīng)成功的運行了helloworld_action的例子.
ant deploy ant runtest INFO [STDOUT] [Hello World Action]. INFO [STDOUT] &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& INFO [STDOUT] Body: Hello World Action INFO [STDOUT] &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& INFO [STDOUT] ConsoleNotifier 2008/09/26 06:35:39.643< BEFORE** Hello World Action AFTER** >
下面我們來(lái)具體看下helloworld_action這個(gè)例子. 在JBoss ESB 4.4的例子中,我們默認以JBoss Messaging來(lái)作為JMS的實(shí)現.
在看jboss-esb.xml的配置時(shí)候,我們應該分成兩個(gè)部份. providers和services.
首先是<providers>,它是有一系列的<provider>組成, 目前有jms-provider, fs-provider, ftp-provider等等. 然后我們在provider里面定義這個(gè).esb文件里面service所定義的listener所需要的bus, Bus可以簡(jiǎn)單理解成消息傳送所需要的傳輸層. 正如以下所顯示的,我們定義了兩個(gè)Bus,一個(gè)是給Gateway的Listener用,另外一個(gè)是給ESB-aware Message傳輸所需要的傳輸層.
<providers>
<jms-provider name="JBossMQ" connection-factory="ConnectionFactory">
<jms-bus busid="quickstartGwChannel">
<jms-message-filter
dest-type="QUEUE"
dest-name="queue/quickstart_helloworld_action_Request"
/>
</jms-bus>
<jms-bus busid="quickstartEsbChannel">
<jms-message-filter
dest-type="QUEUE"
dest-name="queue/B"
/>
</jms-bus>
</jms-provider>
</providers>
雖然在這邊寫(xiě)的是JBossMQ, 但是對于JBoss ESB server來(lái)說(shuō),是默認使用JBoss Messaging的; 如果是把JBoss ESB安裝在JBoss AS 4.x的服務(wù)器, 那么就是用JBoss MQ, 因為JBoss AS 4.x默認是使用JBoss MQ.
第二部份就是定義services的部份, 在這里定義了當前這個(gè)esb包所提供的services. 每個(gè)service又是由 <listener> 和 <actions>組成的.
<services>
<service category="HelloWorld_ActionESB"
name="SimpleListener"
description="Hello World" >
<listeners>
<jms-listener name="JMS-Gateway"
busidref="quickstartGwChannel"
is-gateway="true"
/>
<jms-listener name="JMS-ESBListener"
busidref="quickstartEsbChannel"
/>
</listeners>
<actions mep="OneWay">
<action name="action2"
class="org.jboss.soa.esb.actions.SystemPrintln"
/>
<action name="displayAction"
class="org.jboss.soa.esb.samples.quickstart.helloworldaction.MyJMSListenerAction"
process="displayMessage">
<property name="exceptionMethod" value="exceptionHandler"/>
</action>
<action name="playAction"
class="org.jboss.soa.esb.samples.quickstart.helloworldaction.MyJMSListenerAction"
process="playWithMessage">
<property name="exceptionMethod" value="exceptionHandler"/>
</action>
</actions>
</service>
</services>
在listener里,我們通過(guò) busidref來(lái)關(guān)聯(lián)到我們定義在provider里面的bus. 在這里,我們定義了兩個(gè)listener. 其中一個(gè)是做為Gateway,只負責從外界獲取到JMS的消息,然后轉成ESB內部所需要的Message. 而另外一個(gè)listener是用來(lái)這個(gè)Message在services內部之間通訊的通道. 所以對于每個(gè)service來(lái)說(shuō),一定要至少定義一個(gè)listener來(lái)作為內部Message傳輸用.
這里的action是對消息(Message)處理的地方.
正如我們在上面看到的,我們在jboss-esb.xml中定義了action,我們看下MyJMSListenerAction.
public class MyJMSListenerAction extends AbstractActionLifecycle
{
protected ConfigTree _config;
public MyJMSListenerAction(ConfigTree config) { _config = config; }
public Message playWithMessage(Message message) throws Exception {
Body msgBody = message.getBody();
String contents = msgBody.get().toString();
StringBuffer sb = new StringBuffer();
sb.append("\nBEFORE**\n");
sb.append(contents);
sb.append("\nAFTER**\n");
msgBody.add(sb.toString());
return message;
}
}
我們只是截取其中的一部分來(lái)說(shuō)明,一般來(lái)說(shuō)每個(gè)Action都要繼承AbstractActionLifecycle類(lèi),然后輸入/輸出參數都必須是ESB的Message. 方法名可以隨便定義. 你只需要在jboss-esb.xml的action的process屬性中寫(xiě)相對應的方法名就可以. 如果不寫(xiě),默認是process方法.
這里的ConfigTree是個(gè)很重要的屬性,我們很經(jīng)常的會(huì )在Action配置其他的信息,那么 所有的信息都可以通過(guò)ConfigTree來(lái)獲取到.比如說(shuō)在某個(gè)Action中配置靜態(tài)路由信息等等.也正是由于A(yíng)ction中你可以隨意的配置你自己的信息,增加了很多的靈活性和擴展性.
我們先看下部署在server下的.esb包的文件目錄,一般是包括以下些東西.
- /META-INF/jboss-esb.xml
- /META-INF/deployment.xml 在這里定義對其他包或者服務(wù)的依賴(lài),或者配置classloader.
- jbm-queue-service.xml (optional) 這里是定義啟動(dòng)所需要的Queue
- lib (optional) 放些所需要的第三方包
- 所需要的些classes文件
目前在JBossESB中,一般有兩種方式來(lái)調用service. 一種是通過(guò)Gateway listener, 另外一種是直接通過(guò)ServiceInvoker的API來(lái)調用.
回到我們的例子,我們通過(guò)JMS Gateway來(lái)訪(fǎng)問(wèn)ESB的服務(wù).
public class SendJMSMessage {
public void setupConnection() throws JMSException, NamingException
{
InitialContext iniCtx = new InitialContext();
Object tmp = iniCtx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
conn = qcf.createQueueConnection();
que = (Queue) iniCtx.lookup("queue/quickstart_helloworld_action_Request");
session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
conn.start();
System.out.println("Connection Started");
}
public void sendAMessage(String msg) throws JMSException {
QueueSender send = session.createSender(que);
ObjectMessage tm = session.createObjectMessage(msg);
tm.setStringProperty(StoreMessageToFile.PROPERTY_JBESB_FILENAME, "HelloWorldActionTest.log");
send.send(tm);
send.close();
}
public static void main(String args[]) throws Exception
{
SendJMSMessage sm = new SendJMSMessage();
sm.setupConnection();
sm.sendAMessage(args[0]);
sm.stop();
}
}
應該說(shuō),這是一個(gè)很普通發(fā)送JMS消息送到一個(gè)指定的Queue. 注意,我這里并沒(méi)有全部拷貝這個(gè)SendJMSMessage類(lèi). 只是拷貝出重要的部分.(如果想要看完整的,請參考helloworld_action例子下面的代碼)
在helloworld_action例子中,沒(méi)有直接SendESBMessage的客戶(hù)端來(lái)調用,但是我們可以看下helloworld的sample下面的,因為是一樣的.
public class SendEsbMessage
{
public static void main(String args[]) throws Exception
{
// Setting the ConnectionFactory such that it will use scout
System.setProperty("javax.xml.registry.ConnectionFactoryClass","org.apache.ws.scout.registry.ConnectionFactoryImpl");
if (args.length < 3)
{
System.out.println("Usage SendEsbMessage <category> <name> <text to send>");
}
Message esbMessage = MessageFactory.getInstance().getMessage();
esbMessage.getBody().add(args[2]);
new ServiceInvoker(args[0], args[1]).deliverAsync(esbMessage);
}
}
正如我們之前所說(shuō)的,客戶(hù)端用ServiceInvokerAPI大大簡(jiǎn)化了調用服務(wù)的過(guò)程. 我們在jboss-esb.xml中看到每個(gè)service都會(huì )有service-category和service-name的屬性. 在ServiceInvoker中,用戶(hù)只需要提供這兩個(gè)屬性,就能調用到ESB的服務(wù),當然了,還需要juddi.properties文件. 這也是為什么我們的 sample下面一般會(huì )有這個(gè)文件.
ServiceInvoker是個(gè)相當重要的API,應該說(shuō)在ESB service之間服務(wù)的互相調用,就是用ServiceInvoker來(lái)完成的. 因為ServiceInvoker對Courier等進(jìn)行了一層的抽象封裝. 所以用ServiceInvoker來(lái)調用服務(wù),是可以支持fail-over等高級特性的.
我們結合之前的概念,來(lái)看下這個(gè)例子的調用過(guò)程. 這里我們假設是通過(guò)JMS Gateway來(lái)調用ESB服務(wù)的.
我們這里講述了一個(gè)簡(jiǎn)單的調用oneway服務(wù)的一個(gè)過(guò)程.
通過(guò)前兩個(gè)部分的介紹,應該說(shuō)JBoss ESB簡(jiǎn)單的內部框架應該比較清楚了. 但是我們還沒(méi)有涉及到其他部署和集成的一些模塊. 正因為通過(guò)比如對Smooks, JBoss Rules, jBPM等的集成,使得JBoss ESB的功能更加強大,好用.
因為JBoss ESB 4.系列是基于JBoss AS的,所以有個(gè)擴展類(lèi)JBoss4ESBDeployer類(lèi)是來(lái)負責對JBoss ESB包的監聽(tīng)和解析. 我們可以看到在$jbossesb.sar/META-INF/jboss-service.xml里面定義了JBoss4ESBDeployer的服務(wù).
<mbean code="org.jboss.soa.esb.listeners.config.JBoss4ESBDeployer"
name="jboss.esb:service=ESBDeployer">
<depends>jboss.esb:service=ESBRegistry</depends>
<depends>jboss.esb:service=JuddiRMI</depends>
</mbean>
所以,通過(guò)部署了JBoss4ESBDeployer,凡是以.esb結尾的包,都是由JBoss4ESBDeployer來(lái)解析, 如果你對這塊比較感興趣,還可以看下以下的幾個(gè)類(lèi).
org.jboss.soa.esb.listeners.config.ConfigurationControllerorg.jboss.soa.esb.listeners.config.JBoss4ESBDeploymentorg.jboss.soa.esb.listeners.config.Generator
正如我們之前所看到的,因為JBossESB內部本身的架構比較靈活,所以說(shuō)你只要寫(xiě)個(gè)自己的Action,可以很方面的做和其他library的集成. 直到JBoss ESB 4.4.GA版本,有以下一系列的集成.
應該說(shuō)到目前,我們只是討論了JBoss ESB一個(gè)小的簡(jiǎn)單的模塊,JBoss ESB本身還有更多的功能,比如之前所說(shuō)的集成模塊等等. 所以,如果你想比較完整的了解JBoss ESB,應該是去JBoss ESB的站點(diǎn),然后去讀文檔,這里特別推薦<JBoss Programmers Guide >. 除此之外, JBoss ESB本身還帶有大量的例子.可以通過(guò)運行這些例子,更容易的了解JBoss ESB. 如果在用JBoss ESB遇到問(wèn)題是,可以上JBoss ESB論壇提問(wèn).
聯(lián)系客服