在xml的汪洋中遨游之mule篇
mule号称开源ESB的最好实现,研究mule也有段时间了, 在“浩如烟海”的xml配置中,保持清醒的头脑确实不容易。 需求:给指定的email地址发送邮件. 一:基本实现: 1: 实现命令行输入发送email:
1
<
stdio:connector
name
="SystemStreamConnector"
promptMessage
="Pleaseenteremailcontent(emailaddress,contents):"
messageDelayTime
="1000"
/>
2:配置一个UMO,把输入的内容放入一个队列:
servicename="contentUMO">
<!-- anynumberofendpointscanbeaddedtoaninboundrouter --> < inbound > < stdio:inbound-endpoint system ="IN" /> </ inbound > < outbound > < pass-through-router > < vm:outbound-endpoint path ="content" /> </ pass-through-router > </ outbound > </ service > outbound节点的配置, 把输入的内容(String) 路由到一个叫“content”的queue中, 此queue为jvm中的内存队列。 3:配置一个UMO,实现发送email:
< service name ="EmailBridge" > < inbound > < vm:inbound-endpoint path ="content" /> </ inbound > < outbound > < pass-through-router > < smtps:outbound-endpoint user ="lcllcl987" password ="yourpassword" host ="smtp.gmail.com" transformer-refs ="ContentToEmailStringToMimeMessage" connector-ref ="emailConnector" from ="hujintao@mycomp.com.cn" subject ="testformuleemailbridge!" /> </ pass-through-router > </ outbound > </ service >
BTW:为了在mule中使用smtp, 需要在xml的namespace中声明: xmlns:smtps="http://www.mulesource.org/schema/mule/smtps/2.1" mule有很多对于具体协议的transport实现,每一个transport的实现作为一个jar包存在(比如mule-transport-email-2.1.2.jar), 在jar中的META-INF/spring.schemas文件中, 写明了xsd文件的对应关系, META-INF/sping.handers配置了相关命名空间的handle class,可以据此在mule的配置文件中声明命名空间.
<?
xmlversion="1.0"encoding="UTF-8"
?>
< mule xmlns ="http://www.mulesource.org/schema/mule/core/2.1" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:spring ="http://www.springframework.org/schema/beans" xmlns:stdio ="http://www.mulesource.org/schema/mule/stdio/2.1" xmlns:vm ="http://www.mulesource.org/schema/mule/vm/2.1" xmlns:smtps ="http://www.mulesource.org/schema/mule/smtps/2.1" xsi:schemaLocation =" http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.mulesource.org/schema/mule/core/2.1http://www.mulesource.org/schema/mule/core/2.1/mule.xsd http://www.mulesource.org/schema/mule/stdio/2.1http://www.mulesource.org/schema/mule/stdio/2.1/mule-stdio.xsd http://www.mulesource.org/schema/mule/vm/2.1http://www.mulesource.org/schema/mule/vm/2.1/mule-vm.xsd http://www.mulesource.org/schema/mule/smtps/2.1http://www.mulesource.org/schema/mule/smtps/2.1/mule-smtps.xsd" > < description > Thisisasimplecomponentexamplethatdemostrateshowtosend ae-mail </ description > < stdio:connector name ="SystemStreamConnector" promptMessage ="Pleaseenteremailcontent(emailaddress,contents):" messageDelayTime ="1000" /> <!-- Thisconfiguresanextrasettingifyou'reusingGMail'sSMTP --> < custom-connector name ="emailConnector" class ="co.mule.mail.SmtpConnector" /> < custom-transformer name ="ContentToEmail" class ="co.mule.mail.ContentToEmailTransformer" /> < custom-transformer name ="StringToMimeMessage" class ="org.mule.transport.email.transformers.StringToEmailMessage" /> <!-- TheMulemodelinitialisesandmanagesyourUMOcomponents --> < model name ="myEmail" > <!-- AMuleservicedefinesallthenecessaryinformationabouthowyourcomponentswill interactwiththeframework,othercomponentsinthesystemandexternalsources. PleaserefertotheConfigurationGuideforafulldescriptionofalltheparameters. --> < service name ="contentUMO" > <!-- anynumberofendpointscanbeaddedtoaninboundrouter --> < inbound > < stdio:inbound-endpoint system ="IN" /> </ inbound > < outbound > < pass-through-router > < vm:outbound-endpoint path ="content" /> </ pass-through-router > </ outbound > </ service > < service name ="EmailBridge" > < inbound > < vm:inbound-endpoint path ="content" /> </ inbound > < outbound > < pass-through-router > < smtps:outbound-endpoint user ="lcllcl987" password ="yourpassword" host ="smtp.gmail.com" transformer-refs ="ContentToEmailStringToMimeMessage" connector-ref ="emailConnector" from ="hujintao@mycomp.com.cn" subject ="testformuleemailbridge!" /> </ pass-through-router > </ outbound > </ service > </ model > </ mule >
相关class如下:
public
class
ContentToEmailTransformer
extends
AbstractTransformer
{ @Override protected ObjectdoTransform(Objectsrc,Stringencoding) throws TransformerException { Stringbody = (String)src; String[]msg = body.split( " , " ); Stringemail = msg[ 0 ]; Stringcontent = msg[ 1 ]; RequestContext.getEventContext().getMessage().setProperty( MailProperties.TO_ADDRESSES_PROPERTY,email); System.out.println( " Sentemailto " + email + " ,content: " + content); return content; } }
自定义smtp连接器(smtp connector):
public
class
SmtpConnector
extends
org.mule.transport.email.SmtpsConnector
{ @Override protected void extendPropertiesForSession(Propertiesglobal,Propertieslocal,URLNameurl){ super .extendPropertiesForSession(global,local,url); local.setProperty( " mail.smtp.starttls.enable " , " true " ); local.setProperty( " mail.smtp.auth " , " true " ); local.setProperty( " mail.smtps.starttls.enable " , " true " ); local.setProperty( " mail.smtps.auth " , " true " ); } }
运行此程序, 根据提示, 在命令行输入:
Pleaseenteremailcontent(emailaddress,contents):
lichunlei@mycompt.com.cn,IcomefromWuhancity !
二: 升级:增加一个component. 修改UMO:EmailBridge配置, 增加一个component:
<
servicename
=
"
EmailBridge
"
>
< inbound > < vm:inbound - endpointpath = " content " /> </ inbound > < component class = " co.mule.mail.EmailComponent " /> < outbound > < pass - through - router > < smtps:outbound - endpointuser = " lcllcl987 " password = " yourpassword " host = " smtp.gmail.com " transformer - refs = " emailModelToStringStringToMimeMessage " connector - ref = " emailConnector " from = " hujintao@mycomp.com.cn " subject = " testformuleemailbridge! " /> </ pass - through - router > </ outbound > </ service >
注意到增加了一个component, 接受命令行的输入(String),产生一个EmailModel的对象.之后,这个EmailModel对象进入outbound,并经过
<
custom
-
transformername
=
"
emailModelToString
"
class = " co.mule.mail.EmailModelToString " />
相关class如下:
package
co.mule.mail;
public class EmailModel { private Stringaddress; private Stringcontent; public EmailModel(Stringaddress,Stringcontent) { this .address = address; this .content = content; } public StringgetAddress() { return address; } public void setAddress(Stringaddress) { this .address = address; } public StringgetContent() { return content; } public void setContent(Stringcontent) { this .content = content; } @Override public StringtoString() { // TODOAuto-generatedmethodstub return " address= " + address + " ,content= " + content; } } EmailComponent.java
<
component
class
="co.mule.mail.EmailComponent"
>
< method-entry-point-resolver > < include-entry-point method ="foo" /> </ method-entry-point-resolver > </ component >
package
co.mule.mail;
import org.mule.RequestContext; import org.mule.transport.email.MailProperties; public class EmailComponent { public Objectfoo(Stringinput) { String[]msg = input.split( " , " ); Stringaddress = msg[ 0 ]; Stringcontent = msg[ 1 ]; EmailModelemail = new EmailModel(address,content); System.out.println( " createemailmodel: " + email); RequestContext.getEventContext().getMessage().setProperty( MailProperties.TO_ADDRESSES_PROPERTY,email.getAddress()); return new EmailModel(address,content); } }
package
co.mule.mail;
import org.mule.api.transformer.TransformerException; import org.mule.transformer.AbstractTransformer; public class EmailModelToString extends AbstractTransformer { public EmailModelToString() { super (); this .registerSourceType(EmailModel. class ); this .setReturnClass(String. class ); } @Override protected ObjectdoTransform(Objectsrc,Stringencoding) throws TransformerException{ EmailModelemailModel = (EmailModel)src; return emailModel.toString(); } }
三:继续升级:不满足于在命令行输入, 需要在浏览器输入, 也就是发布一个http接口。
<servicename="contentUMO">
<!--anynumberofendpointscanbeaddedtoaninboundrouter--> <inbound> <!--IncomingHTTPrequests--> <inbound-endpointaddress="http://localhost:9999" transformer-refs="HttpRequestToString" synchronous="true"/> </inbound> <outbound> <pass-through-router> <vm:outbound-endpointpath="content"/> </pass-through-router> </outbound> </service> 过http请求得到输入参数, 经过HttpRequestToString的转换, 放入“content” queue,为了和content中的数据格式匹配,在浏览器中按如下方式输入:
packageco.mule.mail;
importorg.mule.api.transformer.TransformerException; importorg.mule.transformer.AbstractTransformer; importorg.mule.util.IOUtils; importjava.io.InputStream; importjava.io.UnsupportedEncodingException; importjava.net.URLDecoder; publicclassHttpRequestToStringextendsAbstractTransformer { privatestaticfinalStringEMAIL_REQUEST_PARAMETER="email="; publicHttpRequestToString() { super(); this.registerSourceType(String.class); this.setReturnClass(String.class); } publicObjectdoTransform(Objectsrc,Stringencoding)throwsTransformerException { returnextractEmailValue(extractRequestQuery(convertRequestToString(src,encoding))); } privateStringconvertRequestToString(Objectsrc,Stringencoding) { returnsrc.toString(); } privateStringextractRequestQuery(Stringrequest) { StringrequestQuery=null; if(request!=null&&request.length()>0&&request.indexOf('?')!=-1) { requestQuery=request.substring(request.indexOf('?')+1).trim(); } returnrequestQuery; } privateStringextractEmailValue(StringrequestQuery)throwsTransformerException { StringemailValue=null; if(requestQuery!=null&&requestQuery.length()>0) { intnameParameterPos=requestQuery.indexOf(EMAIL_REQUEST_PARAMETER); if(nameParameterPos!=-1) { intnextParameterValuePos=requestQuery.indexOf('&'); if(nextParameterValuePos==-1||nextParameterValuePos<nameParameterPos) { nextParameterValuePos=requestQuery.length(); } emailValue=requestQuery.substring(nameParameterPos+EMAIL_REQUEST_PARAMETER.length(),nextParameterValuePos); } if(emailValue!=null&&emailValue.length()>0) { try { emailValue=URLDecoder.decode(emailValue,"UTF-8"); } catch(UnsupportedEncodingExceptionuee) { logger.error(uee.getMessage()); } } } if(emailValue==null) { emailValue=""; } returnemailValue; } }
继续在mule的xml汪洋中遨游。
<?xmlversion="1.0"encoding="UTF-8"?>
<mulexmlns="http://www.mulesource.org/schema/mule/core/2.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jdbc="http://www.mulesource.com/schema/mule/jdbc/2.1" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.1" xsi:schemaLocation=" http://www.mulesource.com/schema/mule/jdbc/2.1http://www.mulesource.com/schema/mule/jdbc/2.1/mule-jdbc-ee.xsd http://www.mulesource.org/schema/mule/core/2.1http://www.mulesource.org/schema/mule/core/2.1/mule.xsd http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.mulesource.org/schema/mule/vm/2.1http://www.mulesource.org/schema/mule/vm/2.1/mule-vm.xsd"> <spring:beanid="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <spring:propertyname="driverClassName" value="com.mysql.jdbc.Driver"/> <spring:propertyname="url" value="jdbc:mysql://192.168.10.120/sand_res"/> <spring:propertyname="username"value="username"/> <spring:propertyname="password"value="888"/> <spring:propertyname="maxActive"value="30"/> <spring:propertyname="maxIdle"value="10"/> <spring:propertyname="maxWait"value="1000"/> <spring:propertyname="defaultAutoCommit"value="true"/> </spring:bean> <jdbc:connectorname="jdbcConnector"dataSource-ref="dataSource"> <jdbc:querykey="selectUser" value="SELECTfirst_name,last_nameFROMapp_userwherefirst_name=#[map-payload:firstName]"/> <jdbc:querykey="insertUser" value="insertintoapp_user (id,first_name,last_name)values(#[map-payload:id],#[map-payload:firstName],#[map-payload:lastName])"/> </jdbc:connector> <!-- TheMulemodelinitialisesandmanagesyourUMOcomponents --> <modelname="databaseModel"> <servicename="insertUMO"> <!--anynumberofendpointscanbeaddedtoaninboundrouter--> <inbound> <vm:inbound-endpointpath="query"/> </inbound> <!-- Anoutboundroutercanhaveoneormorerouterconfigurationsthatcanbe invokeddependingonbusinessrules,messagecontents,headersoranyother criteria.Thepass-through-routerisarouterthatautomaticallypasses oneverymessageitreceives --> <outbound> <pass-through-router> <jdbc:outbound-endpointqueryKey="selectUser"synchronous="true"/> </pass-through-router> </outbound> </service> </model> </mule>
注意: 如果mule采用2.1, jdbc transport的namespase后缀为com, 而不是org, 如果写错,IDE不会提示,程序异常也很奇怪,让我折腾了一个下午:(
publicclassMyMuleClientTest
{ publicstaticvoidmain(String[]args)throwsMuleException { //createmule MuleContextmuleContext; Stringconfig="my-mule-jdbc-config.xml"; muleContext=newDefaultMuleContextFactory().createMuleContext(config); muleContext.start(); //creatmuleclient MuleClientclient=newMuleClient(); Mapmap=newHashMap(); map.put("firstName","feng"); MuleMessageresponse=client.send("vm://query",map,null); System.out.println("response="+response.getPayload()); } }
执行的sql为:
SELECTfirst_name,last_nameFROMapp_userwherefirst_name="feng"
insert的执行类似,只需修改如下:
<outbound>
<pass-through-router> <jdbc:outbound-endpointqueryKey="insertUser"synchronous="true"/> </pass-through-router> </outbound>
mule的jdbc transport功能过于简单,今天的需求是把ibatis集成进来, 作为一个service的component,以增强持久层功能.
<spring:beans>
<spring:importresource="applicationContext.xml"/> <spring:importresource="applicationContext-ibatis.xml"/> </spring:beans>
作为一个演示,我需要往一个vm:queue中写入消息,component(由spring bean充当)
<modelname="databaseModel">
<servicename="databaseUMO"> <!--anynumberofendpointscanbeaddedtoaninboundrouter--> <inbound> <vm:inbound-endpointpath="query"/> </inbound> <component> <method-entry-point-resolver> <include-entry-pointmethod="getUser"/> </method-entry-point-resolver> <spring-objectbean="userDao"></spring-object> </component> </service>
mule中关于component的xsd很不友好, component的子项居然是一个序列, 次序不能颠倒.
publicinterfaceDao{
publicObjectsave(StringsqlId,ObjectparameterObject); publicintdelete(StringsqlId,ObjectparameterObject); publicintupdate(StringsqlId,ObjectparameterObject); publicListquery(StringsqlId,ObjectparameterObject); publicObjectqueryObject(StringsqlId,ObjectparameterObject); publicConnectiongetConn();
publicinterfaceUserDaoextendsDao{
publicListgetUsers(); publicUsergetUser(LonguserId); publicvoidsaveUser(Useruser); publicvoidremoveUser(LonguserId); }
publicclassUserDaoiBatisextendsBaseDaoimplementsUserDao{
privateDataFieldMaxValueIncrementerincrementer; publicvoidsetIncrementer(DataFieldMaxValueIncrementerincrementer){ this.incrementer=incrementer; } publicListgetUsers(){ returngetSqlMapClientTemplate().queryForList("getUsers",null); } publicUsergetUser(Longid){ Useruser= (User)getSqlMapClientTemplate().queryForObject("getUser",id); if(user==null){ thrownewObjectRetrievalFailureException(User.class,id); } returnuser; } publicvoidsaveUser(Useruser){ if(user.getId()==null){ Longid=newLong(incrementer.nextLongValue()); user.setId(id); //TouseiBatis's<selectKey>feature,whichisdb-specific,comment //outtheabovetwolinesandusethelinebelowinstead //Longid=(Long)getSqlMapClientTemplate().insert("addUser",user); getSqlMapClientTemplate().insert("addUser",user); logger.info("newUseridsetto:"+id); }else{ getSqlMapClientTemplate().update("updateUser",user); } } publicvoidremoveUser(Longid){ getSqlMapClientTemplate().update("deleteUser",id); } } 在spring配置文件中, 装配userDao:
<?xmlversion="1.0"encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <beanid="dataSource"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close"> <propertyname="driverClassName"value="${jdbc.driverClassName}"/> <propertyname="url"value="${jdbc.url}"/> <propertyname="username"value="${jdbc.username}"/> <propertyname="password"value="${jdbc.password}"/> <propertyname="maxActive"value="30"/> <propertyname="maxIdle"value="10"/> <propertyname="maxWait"value="1000"/> <propertyname="defaultAutoCommit"value="true"/> </bean> <!--TransactionmanagerforiBATISDAOs--> <beanid="transactionManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <propertyname="dataSource"ref="dataSource"/> </bean> <!--SqlMapsetupforiBATISDatabaseLayer--> <beanid="sqlMapClient"class="org.springframework.orm.ibatis.SqlMapClientFactoryBean"> <propertyname="dataSource"ref="dataSource"/> <propertyname="configLocation"value="classpath:/co/iplatform/dao/sql-map-config.xml"/> </bean> <beanid="userIncrementer"class="org.springframework.jdbc.support.incrementer.MySQLMaxValueIncrementer"> <propertyname="dataSource"ref="dataSource"/> <propertyname="incrementerName"value="user_sequence"/> <propertyname="columnName"value="value"/> </bean> <beanid="userDao"class="co.iplatform.dao.UserDaoiBatis"> <propertyname="incrementer"ref="userIncrementer"/> <propertyname="sqlMapClient"ref="sqlMapClient"/> </bean> <!--AddadditionalDAOdefinitionshere--> </beans>
ibatis的配置文件长大很标准,就不贴了。
publicabstractclassBaseDaoTestCaseextendsAbstractTransactionalDataSourceSpringContextTests{
protectedfinalLoglog=logger; privateApplicationContextctx; protectedString[]getConfigLocations(){ setAutowireMode(AUTOWIRE_BY_NAME); String[]paths={"classpath*:applicationContext*.xml"}; returnpaths; } } publicclassUserDaoTestextendsBaseDaoTestCase{ privateUseruser=null; privateUserDaodao=null; publicvoidsetUserDao(UserDaouserDao){ this.dao=userDao; } publicvoidtestGetUsers(){ user=newUser(); user.setFirstName("li"); user.setLastName("chunlei"); dao.saveUser(user); System.out.println("size--"+dao.getUsers().size()); assertTrue(dao.getUsers().size()>=1); } publicvoidtestSaveUser()throwsException{ user=newUser(); user.setFirstName("li"); user.setLastName("chunlei"); dao.saveUser(user); assertTrue("primarykeyassigned",user.getId()!=null); assertNotNull(user.getFirstName()); } publicvoidtestAddAndRemoveUser()throwsException{ user=newUser(); user.setFirstName("feng"); user.setLastName("Joy"); dao.saveUser(user); assertNotNull(user.getId()); assertTrue(user.getFirstName().equals("feng")); log.debug("removinguser"); dao.removeUser(user.getId()); endTransaction(); try{ user=dao.getUser(user.getId()); fail("Userfoundindatabase"); }catch(DataAccessExceptiondae){ log.debug("Expectedexception:"+dae.getMessage()); assertNotNull(dae); } } }
public
class
MyMuleClientTest
{ public static void main(String[]args) throws MuleException { // createmule MuleContextmuleContext; Stringconfig = " mule-database-config.xml " ; muleContext = new DefaultMuleContextFactory().createMuleContext(config); muleContext.start(); // creatmuleclient MuleClientclient = new MuleClient(); MuleMessageresponse = client.send( " vm://query " , new Long( 11 ), null ); System.out.println( " response= " + response.getPayload()); } }
Mule的消息路由 异步方式
异步方式是一种单向调用,调用者不需要获得响应。 如果只想将消息以“即发即弃(fire andforget)”的方式发送给一个服务,(并不需要给调用者返回响应),那么可使用异步消息类型。如果将入站端点的synchronous属性设置为false,它就不会给调用者返回响应。
<
model
name
="Asynchronous_Message_Pattern"
>
< service name ="AsynchronousService" > < inbound > < jms:inbound-endpoint queue ="test.in" synchronous ="false" /> </ inbound > < component class ="org.myorg.WidgetHandler" /> < outbound > < pass-through-router > < jms:outbound-endpoint queue ="test.out" > </ pass-through-router > </ outbound > </ service > </ model >
请求-响应方式
请求-响应方式即请求方调用服务后,服务立即处理并返回响应结果,不需将消息再次传递。 在简单的Request-Response场景中,服务在一个同步的入口端点上接收请求,并处理该请求,然后将它作为回复发送给调用者。例如,如果用户在HTML表单中输入一个值,想转换该值并将其结果显示在同一个页面上,那么可以在该服务上简单地配置一个同步入站端点,由该服务完成数据转换。这种场景并不需要使用出站端点。这就是request-response消息类型。
<
model
name
="Request-Response_Message_Pattern"
>
< service name ="SynchronousService" > <!-- 为了返回response将synchronous的值设置为“true” --> < inbound > < http:inbound-endpoint host ="localhost" port ="8080" path ="/mule/services" synchronous ="true" /> </ inbound > <!-- 指定处理该请求的组件 --> < component class ="org.myorg.WidgetHandler" /> </ service > </ model >
同步如果为了进一步处理消息,需要将消息传递给第二个服务,那么需要在第一个服务上配置一个出站路由器将该消息传递给第二个服务。在第二个服务处理完消息后,第一个服务将它作为回复发送给调用者。值得注意的是将第一个服务设置为同步入口端点就意味着之后的所有服务都会以同步的方式处理该消息,所以无需在第二个服务上设置synchronous属性的值。这就是同步消息类型。
<
model
name
="Synchronous_Message_Pattern"
>
< service name ="SynchronousService" > < inbound > <!-- 为了返回response将synchronous的值设置为“true” --> < jms:inbound-endpoint queue ="test.in" synchronous ="true" /> </ inbound > < component class ="org.myorg.WidgetHandler" /> < outbound > <!-- 使用pass-through路由器时,如果想返回response必须将synchronous的值设置为“true” --> < pass-through-router > <!-- 设置出站端点 --> < jms:outbound-endpoint queue ="test.out" synchronous ="true" /> </ pass-through-router > </ outbound > </ service > <!-- 配置第二个服务,并将它的入站端点设置为上一个服务的出站端点的路径。值得注意的是无需设置synchronous的值,因为在第一个服务中已经将消息设置为synchronous了。 --> < service > < inbound > < jms:inbound-endpoint queue ="test.out" /> </ inbound > < component class ="org.myorg.WidgetProcesser" /> </ service > </ model >
异步请求-响应方式 异步请求-响应方式即请求方调用服务后不需要立即获得返回结果,component将请求发送给其他外围系统处理(可能有多个),全部处理完毕后通过指定的异步应答Router返回给请求方。 在大多数复杂的场景中,可以使用request-response消息,并使用后端(back-end)流程调用其它的服务,并基于多个服务调用的结果异步地返回一个回复。你可以将入站端点的synchronous属性设置为false,因为异步回复路由器会处理该回复,除非你想给调用者发送响应。这就是异步request-response消息类型。
在下面的例子中,HTTP端点接收一个请求,并使用Multicast路由器将该请求广播到两个端点,再将这些结果以异步的方式发送到一个JMS端点。
<
model
name
="Async_Request-Response_Message_Pattern"
>
< service name ="AsyncRequestResponseService" > < inbound > <!-- 将synchronous设置为“false”,因为response将由异步回复路由器处理 --> < http:inbound-endpoint host ="localhost" port ="8080" path ="/mule/services" synchronoussynchronous ="false" /> </ inbound > < component class ="org.myorg.WidgetHandler" /> <!-- 配置异步回复的设置。这个例子使用了收集异步回复路由器,在发送回复信息之前,它将所有的响应信息收集在一起。 --> < async-reply timeout ="5000> <collection-async-reply-router/> <jms:inbound-endpointqueue=" reply.queue" /> </ async-reply > <!-- 设置负责接收和处理消息的端点以及回复消息的端点 --> < outbound > < multicasting-router > < reply-to address ="jms://reply.queue" /> < jms:outbound-endpoint queue ="service1" synchronous ="false" /> < jms:outbound-endpoint queue ="service2" synchronous ="false" /> </ multicasting-router > </ outbound > </ service > </ model >
将消息传递到另一个端点pass-through路由器是为简化端点间的消息传递而设计的。比如,它对分发消息给一个队列非常有用。 也可以使用pass-through路由器将协议桥接到其它的出站端点。例如:
<
service
name
="HttpProxyService"
>
< inbound > < inbound-endpoint address ="http://localhost:8888" synchronous ="true" /> </ inbound > < outbound > < pass-through-router > < outbound-endpoint address ="http://www.webservicex.net#[header:http.request]" synchronous ="true" /> </ pass-through-router > </ outbound > </ service > 当使用pass-through路由器时,如果想返回一个响应,必须将出站端点的synchronous属性设置为true。其它的路由器,比如chaining路由器并不需将出站端点的synchronous属性设置为true,该路由器总会在同步的场景中返回一个响应。因此,如果将消费发送给多个服务,可能会用chaining路由器代替pass-through路由器,因为chaining路由器中不需要将每个端点的synchronous设置为true。
过滤消息使用过滤器可以控制服务处理哪些消息。选择性消费者路由器(Selective ConsumerRouter)用于入站端点,它可以控制服务处理哪些消息。过滤路由器(FilteringRouter)用于出站端点,可以控制哪些消息发送到下一个服务上。可以组合使用这些过滤器来控制消息流。 例如,如果只想处理不包含错误的消息,那么可以使用选择性消费者以确保只处理结果代码为success的消息。并使用Catch-all策略将其它的消息转发到另外端点上作为错误处理:
<
inbound
>
< selective-consumer-router > < mulexml:jxpath-filter expression ="msg/header/resultcode='success'" /> </ selective-consumer-router > < forwarding-catch-all-strategy > < jms:endpoint topic ="error.topic" /> </ forwarding-catch-all-strategy > </ inbound > 服务处理消息时,如果想通过指定的标准决定将消息发送到哪个端点,那么可以在出站端点上使用过滤路由器。在下面的示例中,将包含异常信息的消息发送到系统管理员的email邮箱,将包含特定字符串的消息发送到名为string.queue的队列,并使用forwardingcatch-all路由器接收余下的所有消息,并将它们发送到名为error.queue的死信队列:
<
outbound
>
< filtering-router > < smtp:outbound-endpoint to ="ross@muleumo.org" /> < payload-type-filter expectedTypeexpectedType ="java.lang.Exception" /> </ filtering-router > < filtering-router > < jms:outbound-endpoint to ="string.queue" /> < and-filter > < payload-type-filter expectedType ="java.lang.String" /> < regex-filter pattern ="thequickbrown(.*)" /> </ and-filter > </ filtering-router > < forwarding-catch-all-strategy > < jms:outbound-endpoint queue ="error.queue" /> </ forwarding-catch-all-strategy > </ outbound > 与过滤路由器(filtering router)相似的路由器有转发路由器(forwardingrouter),它可以处理一些消息并可以选择性地将消息转发到其它路由器,还有wiretaprouter,这种路由器可以处理所有的消息,并将它们发送到端点上,同时也将消息的副本发送到另外一个端点。
将多个出站端点链接在一起假设我们有一个验证服务,当消息没有通过验证时,想将该消息以及验证异常转发到另一个服务,并将消息和验证异常返回给调用者。那么可以使用链接路由器(chainingrouter),它是一个高速的、轻量级的可配置路由器,可用于将消息发送到端点,然后将该端点的输出结果发送到另一个端点。例如:
<
chaining-router
>
<!-- 首先,将消息发送到这个端点,用于验证。 --> < vm:outbound-endpoint path ="ValidationService" synchronous ="true" /> <!-- 接着将包含表达式的消息发送到这个端点上 --> < vm:outbound-endpoint path ="ValidationError" synchronous ="true" > < exception-type-filter expectedType ="java.lang.Exception" /> </ vm:outbound-endpoint > </ chaining-router >
消息分解消息分解器(message splitter)可用于将输出消息(outgoingmessage)分解成多个部分,再将他们分发到配置在路由器(router)上的不同端点。例如,在订单处理应用中,如果想将经消息分解后的不同部分分发给不同的服务去处理,那么可以使用下面的路由器: 列表消息分解器(List MessageSplitter):接收一个对象列表,这些对象将被路由到不同的端点。例如:
<
outbound
>
< list-message-splitter-router " > <!-- 将order路由到队列order.queue --> < jms:outbound-endpoint queue ="order.queue" > < payload-type-filter expectedType ="com.foo.Order" /> </ jms:outbound-endpoint > <!-- 将items路由到队列item.queue --> < jms:outbound-endpoint queue ="item.queue" > < payload-type-filter expectedType ="com.foo.Item" /> </ jms:outbound-endpoint > </ list-message-splitter-router > </ outbound > 表达式分解路由器(Expression SplitterRouter):它与列表消息分解器相似,只是它是基于表达式分解消息,将消息分解成一个或者多个部分。例如:
<
outbound
>
< expression-splitter-router evaluator ="xpath" expression ="/mule:mule/mule:model/mule:service" disableRoundRobin ="true" failIfNoMatch ="false" > < outbound-endpoint ref ="service1" > < expression-filter evaluator ="xpath" expression ="/mule:service/@name='servicesplitter'" /> </ outbound-endpoint > < outbound-endpoint ref ="service2" > < expression-filter evaluator ="xpath" expression ="/mule:service/@name='roundrobindeterministic'" /> </ outbound-endpoint > </ expression-splitter-router > </ outbound >
为了提高性能也可以将消息分解成多个部分。轮叫(RoundRobin)消息分解器将消息分解成多个部分,并以轮叫(round-robin)的方式将它们发送到端点。Message ChunkingRouter将消息按固定长度分解成多个部分,并将它们路由到同一个端点。 消息分解之后,可以使用Message ChunkingAggregator重新将消息块聚合在一起。该聚合器(aggregator)通过关联ID(correlationID)来识别哪些消息块属于同一个消息,关联ID(correlation ID)在出站路由器(outbound router)上设置。
<
inbound
>
< message-chunking-aggregator-router > < expression-message-info-mapping correlationIdExpression ="#[header:correlation]" /> < payload-type-filter expectedType ="org.foo.some.Object" /> </ message-chunking-aggregator-router > </ inbound >
处理消息仅有一次幂等接收器(IdempotentReceiver)通过核对输入消息的唯一消息ID来保证只有拥有唯一ID的消息才能被服务所接收。消息ID可以通过使用一个表达式从消息中产生,该表达式在idExpression属性中定义。#[message:id]是默认的表达式,也就是说如果要实现该功能,端点必须支持唯一性消息ID。在下面的例子中,唯一性ID是由消息ID和消息标头中标签的内容组合而成。所有的消息ID都被记录到一个简单的文本文件中,用于追踪哪些消息已经处理过。
<
inbound
>
< idempotent-receiver-router idExpression ="#[message:id]-#[header:label]" > < simple-text-file-store directory ="./idempotent" /> </ idempotent-receiver-router > </ inbound >
通过组件绑定调用外部服务除了使用消息路由器控制服务间的消息流之外,也可以通过组件绑定(Component Bindings)调用处理消息的外部服务(ExternalService)。
在这个方法中,可以将Mule的端点绑定到Java接口方法。该方法的优势在于,在组件仍在处理消息时,你可以使用外部服务,而无需使用Mule的API或者修改组件的代码。相反,只需要在XML配置文件中配置组件绑定,从而指定外部服务的端点。例如,在下面的绑定例子中,当sayHello方法被调用时,HelloInterface中的sayHello方法会调用外部的HelloWeb服务。
<
component
class
="org.mule.examples.bindings.InvokerComponent"
>
< binding interface ="org.mule.examples.bindings.HelloInterface" method ="sayHello" > < cxf:outbound-endpoint address ="http://myhost.com:81/services/HelloWeb?method=helloMethod" synchronous ="true" /> </ binding > </ component > (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |