SOA架構中的事件驅動(dòng)服務(wù)
使用Mule框架設計事件驅動(dòng)和面向服務(wù)的平臺
作者:Jeff Hanson
譯者:steven_guo版權聲明:任何獲得Matrix授權的網(wǎng)站,轉載時(shí)請務(wù)必以超鏈接形式標明文章原始出處和作者信息及本聲明作者:Jeff Hanson;
steven_guo原文地址:
http://www.javaworld.com/javaworld/jw-01-2005/jw-0131-soa.html中文地址:
http://www.matrix.org.cn/resource/article/43/43929_SOA_Event_driven.html關(guān)鍵詞: SOA,Event-driven
摘要及時(shí)響應實(shí)時(shí)的變化和事件成為了企業(yè)級架構的最重要需求。這篇文章討論面向服務(wù)框架的技術(shù)和機制,這些技術(shù)使得該框架高效發(fā)送、接受那些跨越層級結構的同步和異步事件,而不需要知道產(chǎn)生這些事件的系統方面的細節
Internet事務(wù),B2B系統,P2P程序,和實(shí)時(shí)工作流,這些系統有著(zhù)非常高的動(dòng)態(tài)性,復雜的系統處理,用傳統的面向過(guò)程的處理方法不能有效地實(shí)現。
一個(gè)面向服務(wù)的框架代表了一個(gè)動(dòng)態(tài)的運行時(shí)環(huán)境,在那里服務(wù)提供者和服務(wù)消費者松散耦合、更靈活的組件交互。建立一個(gè)具備所有這些優(yōu)勢的交互模型,成為軟件開(kāi)發(fā)中最優(yōu)先考慮的。一個(gè)事件驅動(dòng)的交互模型,比通常的請求/響應機制對實(shí)時(shí)變化和激勵有著(zhù)更好的應答效率。
面向服務(wù)的架構和事件驅動(dòng)的架構天生就有著(zhù)對分布式系統的適應性,這些架構都有著(zhù)模塊性、松散耦合,和適應性等特性。
在這篇文章里,討論使用Mule實(shí)現一個(gè)高效的事件驅動(dòng)和面向服務(wù)的平臺,一個(gè)輕量級的事件-消息架構,企業(yè)信息總線(xiàn)(ESB)模式。組件和程序可以使用Mule通過(guò)公共的JMS或其他的消息處理技術(shù)去實(shí)現通信。
面向服務(wù)架構概述“面向服務(wù)”這個(gè)術(shù)語(yǔ)已經(jīng)演變成一個(gè)架構,在那里服務(wù)作為一個(gè)軟件組件嵌入在企業(yè)業(yè)務(wù)邏輯和特新的核心中,特性如下:
· 松散耦合:服務(wù)部與其它組件有著(zhù)根深蒂固的關(guān)系
· 協(xié)議獨立:多種協(xié)議透明訪(fǎng)問(wèn)
· 位置不可知:一個(gè)服務(wù)執行一組業(yè)務(wù)邏輯,針對這次調用返回一個(gè)結果
· 粗粒度:不論在什么位置均可訪(fǎng)問(wèn)該服務(wù)。
· 維護無(wú)用戶(hù)狀態(tài)
服務(wù)是典型地專(zhuān)注于解決業(yè)務(wù)領(lǐng)域的問(wèn)題。
通常,服務(wù)使用端根據配置數據,注冊項和軟件工廠(chǎng)去決定給服務(wù)的位置,協(xié)議和公共接口。
應用程序通常被表述成他們有什么功能,而不強調這個(gè)應用程序是什么東西,包含什么?;谶@個(gè)院應,更多直接描述一個(gè)應用程序通過(guò)使用動(dòng)詞(服務(wù))而不是用名詞(應用主體)。因為,一個(gè)名詞(應用主體)是定義了了一個(gè)事務(wù),而不是動(dòng)作,當強制把一個(gè)組件有什么功能作為一個(gè)組件是什么來(lái)定義,那就會(huì )出現誤解。在SOA領(lǐng)域,一個(gè)應用程序能很自然的被描述,因為每個(gè)應用程序的業(yè)務(wù)邏輯操作能被描述成為一個(gè)服務(wù)的執行選擇。因此,SOA解決了這種誤解,它允許應用程序和組件去訪(fǎng)問(wèn)一個(gè)服務(wù)所能實(shí)現的功能,例如,他們執行什么動(dòng)作。依次,應用程序開(kāi)發(fā)者能更容易匹配他們的需要與適當的服務(wù),因為服務(wù)接口的描述更完整地說(shuō)清了他們要解決的問(wèn)題。
事件驅動(dòng)架構概述一個(gè)事件驅動(dòng)框架(EDA)定義了一個(gè)設計和實(shí)現一個(gè)應用系統得方法學(xué),在這個(gè)系統里事件可傳輸于松散耦合的軟件組件和服務(wù)之間。一個(gè)事件驅動(dòng)系統典型地由事件消費者和事件產(chǎn)生者組成。事件消費者向事件管理器訂閱事件,事件產(chǎn)生者向事件管理器發(fā)布事件。當事件管理器從事件產(chǎn)生者那接收到一個(gè)事件時(shí),事件管理把這個(gè)事件轉送給相應的事件消費者。如果這個(gè)事件消費者是不可用的,事件管理這將保留這個(gè)事件,一段間隔之后再次轉送該事件消費者。這種事件傳送方法在基于消息的系統里就是:儲存(store)和轉送(forward)。
構建一個(gè)包含事件驅動(dòng)構架的應用程序和系統,這樣就使得這些應用程序和系統響應更靈敏,因為事件驅動(dòng)的系統更適合應用在不可預知的和異步的環(huán)境里。
事件驅動(dòng)設計和開(kāi)發(fā)的優(yōu)勢:
事件驅動(dòng)設計和開(kāi)發(fā)所提供的優(yōu)勢如下:
· 可以更容易開(kāi)發(fā)和維護大規模分布式應用程序和不可預知的服務(wù)或異步服務(wù)
· 可以很容易,低成本地集成、再集成、再配置新的和已存在的英勇程序和服務(wù)
· 促進(jìn)遠程組件和服務(wù)的再使用,擁有一個(gè)更靈敏、沒(méi)有Bug的開(kāi)發(fā)環(huán)境
· 短期利益:更容易定制。因為設計對動(dòng)態(tài)處理又更好的響應。
· 長(cháng)期利益:系統和組織的狀態(tài)變得更精準,對實(shí)時(shí)變化的響應接近于同步。
EDA 和 SOA 整合不象請求/響應系統,要求請求者必須明確發(fā)送請求信息,而一個(gè)事件驅動(dòng) 架構提供一個(gè)機制去動(dòng)態(tài)響應事件。在一個(gè)EDA系統里,事件產(chǎn)生者發(fā)布事件,事件消費者接受事件。
業(yè)務(wù)系統可以從SOA和EDA中受益匪淺,因為當事件發(fā)生時(shí)EDA能觸發(fā)事件消費者,SOA服務(wù)可以快速地從相同的消費者中訪(fǎng)問(wèn)、查詢(xún)。
系統要有最高的響應性,當事件觸發(fā)時(shí)這個(gè)系統必須能快速決定必須的動(dòng)作。到事件結束,事件應該被發(fā)布和消費,而且事件要穿越SOA所有的邊界,包括整個(gè)體系結構和物理層。
圖1演示了事件被激發(fā)和穿越體系結構的所有層
圖1:事件穿越體系結構的層級
在圖1的環(huán)境中,一個(gè)事件能被定義為任何系統的,平臺的,組件的,業(yè)務(wù)的或英勇進(jìn)程的變化。事件可能是高層的業(yè)務(wù)事件或底層的系統事件。因為事件能被傳送和接收,訂閱事件的英勇程序和服務(wù)能對這些變化做出響應。
事件分類(lèi)和因果關(guān)系理解一個(gè)事件的秘訣是知道這個(gè)事件發(fā)生的原因,這個(gè)就是通常說(shuō)的因果關(guān)系。事件的因果關(guān)系典型地分為兩類(lèi):
· 平行關(guān)系:時(shí)間源和觸發(fā)在體系結構的同一層。
· 垂直關(guān)系:時(shí)間源和觸發(fā)在體系結構的不同層。
垂直關(guān)系意味著(zhù)一個(gè)事件的分類(lèi)方法,這些事件保留了一些不變的東西而穿越一個(gè)系統不同的層,事件分類(lèi)如下:
· 生命周期事件:一個(gè)實(shí)體生命周期的變化,例如一個(gè)進(jìn)程的停止或啟動(dòng)
· 執行事件:運行時(shí)事件,例如服務(wù)或組件的調用
· 管理事件:當一個(gè)狀態(tài)超過(guò)了預先的定義或一定范圍時(shí)
平行關(guān)系意味著(zhù)一個(gè)事件的分類(lèi)方法,這些事件保留了一些不變的東西而穿越一個(gè)系統不同的層,事件分類(lèi)如下:
· 系統層事件:系統級動(dòng)作,例如創(chuàng )建一個(gè)文件或關(guān)閉一個(gè)端口
· 平臺層事件:平臺級動(dòng)作,例如修改一個(gè)數據源或增加一個(gè)新的服務(wù)
· 組件層事件:組件級動(dòng)作,例如視圖對象的轉換或狀態(tài)機變化
· 業(yè)務(wù)層事件:業(yè)務(wù)級動(dòng)作,例如創(chuàng )建用戶(hù)或刪除賬號
· 應用層事件:應用級動(dòng)作,例如增加保險金或報價(jià)提交
許多ESB框架和平臺意識到在SOA中包含基于事件驅動(dòng)的通信有很多優(yōu)勢。在Java開(kāi)發(fā)領(lǐng)域,Mule就是這些最有前景的平臺之一。
介紹MuleMule是一個(gè)開(kāi)源消息ESB框架,一個(gè)消息代理,一個(gè)分級事件驅動(dòng)的框架(SEDA)。SEDA定義了一個(gè)依照分級隊列、高度并行的企業(yè)級平臺。Mule使用SED的概念增加事件處理的性能。
Mule支持同步、異步和請求響應事件,事件處理和傳輸實(shí)用不同的技術(shù)例如JMS,HTTP,電子郵件和基于XML的RPC。Mule能很容易地嵌入到任何應用框架中,明確支持Spring框架。Mule也支持動(dòng)態(tài)的,預定義的,基于內容的和基于規則的消息路由。Mule使得預定義的和計劃性的事務(wù)更容易,包括XA事務(wù)支持。Mule提供一個(gè)有代表性的狀態(tài)調用(REST)API提供給與Web的事件訪(fǎng)問(wèn)。
Mule ESB模式驅動(dòng)系統中所有服務(wù),這個(gè)系統有著(zhù)一個(gè)分離的消息通訊中樞。服務(wù)注冊在總線(xiàn)上,但不知道其他任何被注冊的消息;因此,每個(gè)服務(wù)只關(guān)心處理它收到的事件。Mule也把容器,傳輸,轉換細節從服務(wù)中分離出來(lái),允許任何對象作為服務(wù)注冊到總線(xiàn)的。
我使用Mule框架去演示這篇文章所討論的概念和思想
Mule框架Mule框架主要包含下列內容:
通用消息對象(UMO)APIUMO API第一了所有被Mule 管理的服務(wù)和對象交互
UMO組件在Mule系統中,UMO組件可以使任何在系統中接收、處理和發(fā)送事件消息的組件
Mule服務(wù)器Mule服務(wù)器組件是一個(gè)在Mule應用環(huán)境中自動(dòng)加載的服務(wù)器應用程序
描述器描述器組件描述一個(gè)Mule UMO屬性。新的Mule MUO對象能被它們所關(guān)聯(lián)的描述器初始化。一個(gè)描述器包含:
· UMO組件名
· UMO組件版本
· UMO組件實(shí)現類(lèi)
· 異常策略
· 入站和出站提供者
· 入站和出站路由器
· 攔截器
· 接收和發(fā)送切入點(diǎn)
· 入站和出站轉換器
· 各種各樣的特性
連接器連接器是一些組件,它們可以連接到外部系統或其他協(xié)議、管理那些系統或協(xié)議的狀態(tài)。一個(gè)連接器負責發(fā)送消息到外部消息接收器、管理消息接收器的注冊和注銷(xiāo)。
提供者提供者是一些組件,管理把事件數據發(fā)送到外部系統、從外部系統接受事件數據和轉換事件數據等事項。在Mule框架里,他們能連接到外部系統或其他組件。一個(gè)提供者就像一個(gè)從外部系統進(jìn)入Mule或從Mule內部訪(fǎng)問(wèn)外部系統的橋接器。實(shí)際上,提供者有一組對象組成,可以與下層系統連接并與之通信。提供者的組成部件是:
· 連接器:負責連接到下層系統
· 消息接收器:從系統接收事件
· 連接調度者:傳送系統到系統
· 轉換器:轉換從系統接收到的或要發(fā)送到系統的數據
· 終端:所建立連接的通道地址
· 事務(wù)配制:定義連接的事務(wù)屬性
終端調解者當UMO組件接收到一個(gè)事件時(shí),終端調解者決定去調用它的什么方法
轉換器轉換器組件負責雙向轉換消息或事件的有效載荷。當一個(gè)事件到達接收的對象之前,轉換器可以鏈接到一起去執行一系列的裝換操作。
消息適配器消息適配器提供一中公共的方式去讀外部系統的異構數據。
消息接收器消息接收器是一些列終端監聽(tīng)線(xiàn)程,負責從外部系統接收數據。
消息調度者消息調度者發(fā)送(同步)或派遣(異步)時(shí)間到下層系統。
消息路由器消息路由器是一系列組件,可以使被配制的UMO組件依據消息或其他配制圖路有一個(gè)消息到不同的提供者。
代理代理是一些幫定到外部服務(wù)的組建,例如JME服務(wù)器。
Mule模型一個(gè)Mule模型封裝和管理一個(gè)Mule服務(wù)器實(shí)例的運行時(shí)行為。一個(gè)模型包含:
· 描述器
· UMO組件
· 一個(gè)終端調解者
· 一個(gè)生命周期適配器工廠(chǎng)
· 一個(gè)組件調解者
· 一個(gè)池化工廠(chǎng)
· 一個(gè)異常策略
Mule管理器Mule管理器維護和提供以下服務(wù) :
· 代理
· 提供者
· 連接器
· 終端
· 轉換器
· 攔截器堆棧
· 一個(gè)Mule模型
· 一個(gè)Mule服務(wù)器
· 事務(wù)管理器
· 應用程序屬性
· Mule配制
圖2演示了Mule框架上層消息流視圖
圖2:Mule上層架構
Mule事件對象Mule事件對象對象包含事件數據和被組件所感知和操控的屬性。屬性是任意的,在事件創(chuàng )建之后任何時(shí)間可被設置。
org.mule.umo.UMOEvent類(lèi)代表了一個(gè)在Mule環(huán)境中出現的時(shí)間。所有在組件之間發(fā)送或接收的數據都是org.mule.umo.UMOEvent的一個(gè)實(shí)體??梢栽L(fǎng)問(wèn)一個(gè)原始的或被轉換的Mule事件對象中的數據能。一個(gè)Mule事件對象使用一個(gè)與提供者管理的提供者轉換數據,提供者收到數據后把事件中的有效載荷轉換成當前組件所識別的格式。
一個(gè)Mule事件對象的有效有效載荷能通過(guò)org.mule.umo.UMOMessage接口訪(fǎng)問(wèn),一個(gè)org.mule.umo.UMOMessage實(shí)例由有效載荷和它的屬性組成。這個(gè)接口是不同技術(shù)實(shí)現的消息對象的一個(gè)抽象。
org.mule.extras.client.MuleClient類(lèi)定義了一個(gè)簡(jiǎn)單的借口,允許Mule客戶(hù)端從Mule服務(wù)器接收和發(fā)送事件數據。在大多數Mule應用程序里,時(shí)間是被一些外部的并發(fā)行為所觸發(fā),例如一個(gè)主題上接收到消息或在目錄里一個(gè)文件被刪除。
下面演示了如何去發(fā)送一個(gè)同步事件到另外的Mule組件:
String componentName = "MyReceiver"; // The name of the receiving component.
String transformers = null; // A comma-separated list of transformers
// to apply to the result message.
String payload = "A test event"; // The payload of the event.
java.util.Map messageProperties = null; // Any properties to be associated
// with the payload.
MuleClient client = new MuleClient();
UMOMessage message = client.sendDirect(componentName,
transformers,
payload,
messageProperties);
System.out.println("Event result: " + message.getPayloadAsString());
MuleClient類(lèi)需要一個(gè)服務(wù)器URL區定義它所連接的遠程Mule服務(wù)器的終端。URL定義了傳輸協(xié)議、接收消息的地址,提供者在派遣一個(gè)事件時(shí)可以隨時(shí)使用這些信息。終端例示如下:
· vm://com.jeffhanson.receivers.Default: 使用虛擬機的提供者派遣到一個(gè)com.jeffhanson.receivers.Default
· jms://jmsProvider/accounts.topic:使用全局注冊的jmsProvider派遣一個(gè)JMS消息到ccounts.topic.
· jms://accounts.topic: 使用第一個(gè)(默認)的JMS提供者派遣JMS消息
Mule事件處理Mule可以在三種不同的方式發(fā)送和節后艘事件:
1.異步方式:一個(gè)組件可通過(guò)不同的線(xiàn)程同時(shí)處理多個(gè)事件的發(fā)送和接收
2.同步方式:在一個(gè)組件重新工作之前,一個(gè)單一的事件必須被處理完。換言之,一個(gè)創(chuàng )建了事件的組建發(fā)送事件時(shí)將被阻斷,直到發(fā)送任務(wù)完成,因此,一次只允許處理一個(gè)事件
3.請求-應答方式:一個(gè)組建專(zhuān)門(mén)請求一個(gè)事件,然后等待一個(gè)特定的時(shí)間去接收回應。
org.mule.impl.MuleComponent實(shí)現類(lèi)提供了一個(gè)具體的組建類(lèi),它包括又有創(chuàng )建,發(fā)送和接收事件的功能。
執行同步動(dòng)作的對象應該實(shí)現org.mule.umo.lifecycle.Callable接口,這個(gè)定義了一個(gè)簡(jiǎn)單的方法Object onCall(UMOEventContext eventContext)。Callable接口提供支持事件調用的UMO對象。雖然不是強制的,但這個(gè)接口提供了一個(gè)生命周期控制的方法,當實(shí)現這個(gè)接口的組建接收到一個(gè)消息時(shí)執行這個(gè)方法。下面展示了這個(gè)接口的簡(jiǎn)單實(shí)現。
import org.mule.umo.lifecycle.Callable;
public class EchoComponent
implements Callable
{
public Object onCall(UMOEventContext context) throws Exception
{
String msg = context.getMessageAsString();
// Print message to System.out
System.out.println("Received synchronous message: " + msg);
// Echo transformed message back to sender
return context.getTransformedMessage();
}
}
從onCall()方法可返回任何對象。當組件的UMOLifecycleAdapter接收這個(gè)對象時(shí),它首先看看這個(gè)對象是否是一個(gè)UMOMessage;如果這個(gè)對象既不是UMOMessage也不是Null,那么以這個(gè)對象作為有效載荷去創(chuàng )建一個(gè)新的消息。這個(gè)新事件經(jīng)由所配制的出站路有器發(fā)布,如果UMO對象已經(jīng)配制了一個(gè)出站路由器,那么在UMOEventContext實(shí)例中不能調用setStopFurtherProcessing(true)方法。
Mule使用的一個(gè)簡(jiǎn)單的事件框架讓我們把這幾段Mule的代碼放到一起去構建一個(gè)簡(jiǎn)單的事件框架。這個(gè)框架包含一個(gè)負責注冊和注銷(xiāo)事件的管理器,可以接收事件,和負責路有同步和異步消息到他們相應的服務(wù)。
Mule的虛擬機協(xié)議要求有一個(gè)放置事件管理器工作目錄META-INF/services/org/mule/providers/vm路徑下的可配制文件,配制文件為協(xié)議定義了大量的組件,例如連接器和調度工廠(chǎng)。配制文件的內容如下:
connector=org.mule.providers.vm.VMConnector
dispatcher.factory=org.mule.providers.vm.VMMessageDispatcherFactory
message.receiver=org.mule.providers.vm.VMMessageReceiver
message.adapter=org.mule.providers.vm.VMMessageAdapter
endpoint.builder=org.mule.impl.endpoint.ResourceNameEndpointBuilder
一個(gè)簡(jiǎn)單的借口定義了事件管理器的公有結構:
package com.jeffhanson.mule;
import org.mule.umo.FutureMessageResult;
public interface EventManager
{
/**
* Sends an event message synchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return Object The result, if any.
* @throws EventException on error
*/
public Object sendSynchronousEvent(String serviceName,
Object payload)
throws EventException;
/**
* Sends an event message asynchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return FutureMessageResult The result, if any.
* @throws EventException on error
*/
public FutureMessageResult sendAsynchronousEvent(String serviceName,
Object payload)
throws EventException;
/**
* Starts this event manager.
*/
public void start();
/**
* Stops this event manager.
*/
public void stop();
/**
* Retrieves the protocol this event manager uses.
* @return
*/
public String getProtocol();
/**
* Registers a service to receive event messages.
*
* @param serviceName The name to associate with the service.
* @param implementation Either a container reference to the service
* or a fully-qualified class name.
*/
public void registerService(String serviceName,
String implementation)
throws EventException;
/**
* Unregisters a service from receiving event messages.
*
* @param serviceName The name associated with the service to unregister.
*/
public void unregisterService(String serviceName)
throws EventException;
}
事件管理器類(lèi)是被封裝在一個(gè)工廠(chǎng)類(lèi)里,因此,可以依據需要去改變它的實(shí)現而不會(huì )影響到它的客戶(hù)端。事件管理器實(shí)現如下:
package com.jeffhanson.mule;
import org.mule.umo.*;
import org.mule.extras.client.MuleClient;
import org.mule.impl.endpoint.MuleEndpoint;
import org.mule.config.QuickConfigurationBuilder;
import java.util.HashMap;
import java.util.Map;
public class EventManagerFactory
{
private static HashMap instances = new HashMap();
/**
* Retrieves the event manager instance for a given protocol.
*
* @param protocol The protocol to use.
* @return EventManager The event manager instance.
*/
public static EventManager getInstance(String protocol)
{
EventManager instance = (EventManager)instances.get(protocol);
if (instance == null)
{
instance = new EventManagerImpl(protocol);
instances.put(protocol, instance);
}
return instance;
}
/**
* A concrete implementation for a simple event manager.
*/
private static class EventManagerImpl
implements EventManager
{
private UMOManager manager = null;
private QuickConfigurationBuilder builder = null;
private MuleClient eventClient = null;
private String protocol = null;
private MuleEndpoint receiveEndpoint = null;
private MuleEndpoint sendEndpoint = null;
private EventManagerImpl(String protocol)
{
this.protocol = protocol;
}
/**
* Starts this event manager.
*/
public void start()
{
try
{
builder = new QuickConfigurationBuilder();
manager = builder.createStartedManager(true,
protocol + "tmp/events");
eventClient = new MuleClient();
receiveEndpoint = new MuleEndpoint(protocol
+ "tmp/events/receive");
sendEndpoint = new MuleEndpoint(protocol + "tmp/events/send");
}
catch (UMOException e)
{
System.err.println(e);
}
}
/**
* Stops this event manager.
*/
public void stop()
{
try
{
manager.stop();
}
catch (UMOException e)
{
System.err.println(e);
}
}
/**
* Retrieves the protocol this event manager uses.
* @return
*/
public String getProtocol()
{
return protocol;
}
/**
* Registers a service to receive event messages.
*
* @param serviceName The name to associate with the service.
* @param implementation Either a container reference to the service
* or a fully-qualified class name
* to use as the component implementation.
*/
public void registerService(String serviceName,
String implementation)
throws EventException
{
if (!manager.getModel().isComponentRegistered(serviceName))
{
try
{
builder.registerComponent(implementation,
serviceName,
receiveEndpoint,
sendEndpoint);
}
catch (UMOException e)
{
throw new EventException(e.toString());
}
}
}
/**
* Unregisters a service from receiving event messages.
*
* @param serviceName The name associated with the service to unregister.
*/
public void unregisterService(String serviceName)
throws EventException
{
try
{
builder.unregisterComponent(serviceName);
}
catch (UMOException e)
{
throw new EventException(e.toString());
}
}
/**
* Sends an event message synchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message
* @return Object The result, if any.
* @throws EventException on error
*/
public Object sendSynchronousEvent(String serviceName,
Object payload)
throws EventException
{
try
{
if (!manager.getModel().isComponentRegistered(serviceName))
{
throw new EventException("Service: " + serviceName
+ " is not registered.");
}
String transformers = null;
Map messageProperties = null;
UMOMessage result = eventClient.sendDirect(serviceName,
transformers,
payload,
messageProperties);
if (result == null)
{
return null;
}
return result.getPayload();
}
catch (UMOException e)
{
throw new EventException(e.toString());
}
catch (Exception e)
{
throw new EventException(e.toString());
}
}
/**
* Sends an event message asynchronously.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return FutureMessageResult The result, if any
* @throws EventException on error
*/
public FutureMessageResult sendAsynchronousEvent(String serviceName,
Object payload)
throws EventException
{
FutureMessageResult result = null;
try
{
if (!manager.getModel().isComponentRegistered(serviceName))
{
throw new EventException("Service: " + serviceName
+ " is not registered.");
}
String transformers = null;
Map messageProperties = null;
result = eventClient.sendDirectAsync(serviceName,
transformers,
payload,
messageProperties);
}
catch (UMOException e)
{
throw new EventException(e.toString());
}
return result;
}
}
}
Mule框架依據消息有效載荷的類(lèi)型來(lái)派遣消息。事件框架使用基于有效載荷的派遣機制,這種派遣機制把注冊到事件管理器中一般定義的事件方法作為事件接收器。下面的類(lèi)定義了一個(gè)包含三個(gè)重載的receiveEvent()方法的服務(wù):
package com.jeffhanson.mule;
import java.util.Date;
public class TestService
{
public void receiveEvent(String eventMessage)
{
System.out.println("\n\nTestService.receiveEvent(String) received "
+ "event message: " + eventMessage + "\n\n");
}
public void receiveEvent(Integer eventMessage)
{
System.out.println("\n\nTestService.receiveEvent(Integer) received "
+"event message: " + eventMessage + "\n\n");
}
public void receiveEvent(Date eventMessage)
{
System.out.println("\n\nTestService.receiveEvent(Date) received "
+ "event message: " + eventMessage + "\n\n");
}
}
事件管理器客戶(hù)端應用程序發(fā)送三個(gè)事件到測試服務(wù)中,去測試每一個(gè)receiveEvent()方法??蛻?hù)端應用程序如下:
package com.jeffhanson.mule;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.BasicConfigurator;
import java.util.Date;
public class EventClient
{
static Logger logger = Logger.getLogger(EventClient.class);
public static void main(String[] args)
{
// Set up a simple configuration that logs on the console.
BasicConfigurator.configure();
logger.setLevel(Level.ALL);
try
{
EventManager eventManager =
EventManagerFactory.getInstance("vm://");
eventManager.start();
String serviceName = TestService.class.getName();
String implementation = serviceName;
eventManager.registerService(serviceName, implementation);
Object result =
eventManager.sendSynchronousEvent(serviceName, "A test message");
if (result != null)
{
System.out.println("Event result: " + result.toString());
}
result =
eventManager.sendSynchronousEvent(serviceName, new Integer(23456));
if (result != null)
{
System.out.println("Event result: " + result.toString());
}
result =
eventManager.sendSynchronousEvent(serviceName, new Date());
if (result != null)
{
System.out.println("Event result: " + result.toString());
}
eventManager.stop();
}
catch (EventException e)
{
System.err.println(e.toString());
}
}
}
Mule平臺簡(jiǎn)化和抽象了前面所敘述框架的事件方面的處理,使得你發(fā)送和接收穿越一個(gè)層級結構的同步和異步消息時(shí),不需要知道下層系統的細節。工廠(chǎng)模式和SOA準則的應用,則使得這個(gè)框架有了一個(gè)松散耦合和可擴展的設計。
總結當服務(wù)和進(jìn)程需要穿越多層結構,使用多種協(xié)議去交互時(shí),設計一個(gè)有效地事件驅動(dòng)的軟件系統可能變得復雜了??墒?,一個(gè)使用標準模式包含適當事件管理層的面向服務(wù)架構能減少,甚至消除這些問(wèn)題。
Mule 平臺提供API,組件和抽象對象,這些都可以用于去建立一個(gè)強大,健壯,事件驅動(dòng)的有著(zhù)良好的伸縮性和可維護性的系統。
關(guān)于作者Jeff Hanson 有著(zhù)18年的軟件行業(yè)從業(yè)經(jīng)驗,曾經(jīng)作為高級軟件工程師工作于Windows OpenDoc項目,作為主管架構師在Novell的Route66框架工作?,F在,作為eReinsure.com的首席架構師,正在構建Web服務(wù)框架和基于J2EE再保險平臺。Hanson已經(jīng)寫(xiě)作了大量的文章和書(shū)籍,包括:《Pro JMX: Java Management Extensions》(Apress出版社,2003年11月; ISBN: 1590591011)和《Web Services Business Strategies and Architectures》(Wrox 出版社,2002年8月; ISBN: 1904284132)