------(一)、启用数据库的Service Broker活动 -- Enabling Databases for Service Broker Activity USE master GO IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = 'SSB_Test')
CREATE DATABASE SSB_Test GO
ALTER DATABASE SSB_Test SET ENABLE_BROKER GO ALTER DATABASE SSB_Test SET TRUSTWORTHY ON
GO
?
-----(二)、创建数据库主密钥 -- Creating the DatabaseMaster Key for Encryption USE SSB_Test GO CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'I5Q7w1d3' GO
?
-------(三)、管理消息类型 -- Managing Message Types Use SSB_Test GO -- 发送消息类型 CREATE MESSAGE TYPE [//SSB_Test/SendMessages] VALIDATION = NONE
GO
--目标数据库发送的消息类型 CREATE MESSAGE TYPE [//SSB_Test/ReceivedMessages] VALIDATION = NONE GO
?
------(四)、创建契约(Contract) -- Creating Contracts Use SSB_Test GO CREATE CONTRACT [//SSB_Test/SSBContract] ( [//SSB_Test/SendMessages] SENT BY INITIATOR, [//SSB_Test/ReceivedMessages] SENT BY TARGET ) GO
-----------(五)、创建队列 Use SSB_Test GO --保存BookDistribution过来的消息
CREATE QUEUE SSBSendQueue WITH STATUS=ON GO
USE SSB_Test GO --保存BookStore过来的消息 CREATE QUEUE SSBReceivedQueue WITH STATUS=ON GO
?
-------(六)、创建服务 -- Creating Services Use SSB_Test GO CREATE SERVICE [//SSB_Test/FromService] ON QUEUE dbo.SSBSendQueue--指定的队列绑定到契约 ([//SSB_Test/SSBContract]) GO
USE SSB_Test GO CREATE SERVICE [//SSB_Test/ToService] ON QUEUE dbo.SSBReceivedQueue--指定的队列绑定到契约 ([//SSB_Test/SSBContract]) GO
?
?
USE SSB_Test GO --创建一个表存放接收到的订单信息 CREATE TABLE dbo.SSBReceived (ReceivedID int IDENTITY (1,1) NOT NULL, conversation_handle uniqueidentifier NOT NULL, conversation_group_id uniqueidentifier NOT NULL, message_body VarBinary(max) NOT NULL) GO
?
?
?
----1: USE SSB_Test GO
SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO
CREATE PROCEDURE [dbo].[Send_Messages] @sendMsgContext VarBinary(max), @result int OUTPUT AS ? DECLARE @Conv_Handler uniqueidentifier DECLARE @sendMsg VarBinary(max);
SET XACT_ABORT ON BEGIN TRANSACTION
BEGIN DIALOG CONVERSATION @Conv_Handler FROM SERVICE [//SSB_Test/FromService] TO SERVICE '//SSB_Test/ToService' ON CONTRACT [//SSB_Test/SSBContract] ---WITH LIFETIME = 200; SET @sendMsg = @sendMsgContext;
----第二步:发起会话 SEND ON CONVERSATION @Conv_Handler MESSAGE TYPE [//SSB_Test/SendMessages] (@sendMsg);
END CONVERSATION @Conv_Handler WITH CLEANUP;
SET @result = 1
COMMIT TRANSACTION
RETURN @result;
?
USE SSB_Test GO
SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO
CREATE PROCEDURE [dbo].[Receive_Messages] AS DECLARE @Conv_Handler uniqueidentifier DECLARE @Conv_Group uniqueidentifier DECLARE @Receive_Msg VarBinary(max); ----DECLARE @TextResponseMsg varchar(8000) ----DECLARE @ResponseMsg VarBinary(max); DECLARE @Message_Type_Name nvarchar(256);
SET XACT_ABORT ON BEGIN TRAN; --WHILE(1=1) --BEGIN ---WAITFOR( RECEIVE TOP(1) @Receive_Msg = message_body, @Conv_Handler = conversation_handle, @Conv_Group = conversation_group_id, @Message_Type_Name = message_type_name FROM dbo.SSBReceivedQueue ---),TIMEOUT 200
IF @Message_Type_Name = '//SSB_Test/SendMessages' BEGIN INSERT dbo.SSBReceived (conversation_handle,conversation_group_id,message_body) VALUES (@Conv_Handler,@Conv_Group,@Receive_Msg)
END
IF @Message_Type_Name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @Conv_Handler; END
COMMIT TRAN
GO
?
?二:程序调用发送存储过程
package com.dingli.json.util;
import java.nio.ByteBuffer; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
public class TestMutilServiceBroker { ?/** ? * @param args ? */ ?public static void main(String[] args) { ??ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(50); ??exec.scheduleAtFixedRate(new ServiceBroker(),1000, ????TimeUnit.MILLISECONDS); ?} }
class ServiceBroker implements Runnable { ?protected final Logger logger = Logger.getLogger(ServiceBroker.class); ?final static String driver = "net.sourceforge.jtds.jdbc.Driver"; ?private static Connection conn = null; ?private CallableStatement cStmt = null;
?@Override ?public void run() { ??final ServiceBroker test = new ServiceBroker(); ??final Connection con = test.getConnection(); ??final long starttime = System.currentTimeMillis(); ??Send_Message(con); ??// Mutil_Send_Message(con); ??long endtime = System.currentTimeMillis(); ??logger.info("每次发送消息响应时间毫秒=" + (endtime - starttime)); ?}
?private Connection getConnection() { ??try { ???Class.forName(driver); ???final String url = "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=SSB_Test"; ???// final String url = ???// "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=BookStore"; ???final String username = "sa"; ???final String password = "Fleet2011@DB."; ???conn = DriverManager.getConnection(url,username,password); ??} catch (final ClassNotFoundException e) { ???e.printStackTrace(); ??} catch (final SQLException e) { ???e.printStackTrace(); ??} ??return conn; ?}
?private void Send_Message(Connection con) {
??try { ???cStmt = con.prepareCall("{call [dbo].[Send_Messages](?,?)}"); ???final StringBuffer sb = new StringBuffer(); ???ByteBuffer buffer = ByteBuffer.allocate(5000); ???sb.append("<order id="3439" customer="22" orderdate="2/15/2011"><LineItem ItemNumber="1" ISBN="1-59059-592-0" Quantity="1" /></order>"); ???buffer.put(sb.toString().getBytes()); ???cStmt.setBytes("@sendMsgContext",buffer.array()); ???// cStmt.setString("@sendMsgContext",buffer); ???cStmt.setInt("@result",0); ???cStmt.execute(); ???sb.delete(0,sb.length());
??} catch (final SQLException e) { ???e.printStackTrace(); ??} finally { ???try { ????cStmt.close(); ????con.close(); ???} catch (final Exception e) { ????e.printStackTrace(); ???} ??} ?}
?private void Mutil_Send_Message(Connection con) { ??try { ???cStmt = con ?????.prepareCall("{call [dbo].[sp_PublishOrSubscriptionMessages](?,?,?)}"); ???final StringBuffer sb = new StringBuffer(); ???// sb.append("<order id="3439" customer="22" orderdate="2/15/2011"><LineItem ItemNumber="1" ISBN="1-59059-592-0" Quantity="1" /></order>"); ???// cStmt.setString("@sendMsgContext",sb.toString()); ???// cStmt.setInt("@result",0);
???sb.append("<?xml version="1.0"?><Publish xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe"><Subject>Subject1</Subject></Publish>"); ???cStmt.setString("@FromServerName","[AuthorService]"); ???cStmt.setString("@ToServerName","PublisherService"); ???cStmt.setString("@ContractName", ?????"[http://ssb.csharp.at/SSB_Book/c10/PublishContract]"); ???cStmt.setString("@MessageType", ?????"[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]"); ???cStmt.setString("@MessageBody",sb.toString()); ???cStmt.setInt("@result",sb.length());
??} catch (final SQLException e) { ???e.printStackTrace(); ??} finally { ???try { ????cStmt.close(); ????con.close(); ???} catch (final Exception e) { ????e.printStackTrace(); ???} ??} ?} }
?
三:程序接收消息调用存储过程
package com.dingli.json.util;
import java.io.Serializable; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException;
import org.apache.log4j.Logger;
public class TestServiceBrokerTwo implements Serializable { ?protected static Logger logger = Logger ???.getLogger(TestServiceBrokerTwo.class); ?/** ? * ? */ ?private static final long serialVersionUID = -8531232416149283642L;
?final String driver = "net.sourceforge.jtds.jdbc.Driver"; ?Connection conn = null; ?CallableStatement cStmt = null;
?/** ? * @param args ? */ ?public static void main(String[] args) { ??TestServiceBrokerTwo test = new TestServiceBrokerTwo(); ??Connection con = null; ??while (true) { ???con = test.getConnection(); ???long starttime = System.currentTimeMillis(); ???test.Receive_Messages(con); ???long endtime = System.currentTimeMillis(); ???logger.info("每次接收消息响应时间毫秒=" + (endtime - starttime)); ??} ?}
?public Connection getConnection() { ??try { ???Class.forName(driver); ???String url = "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=SSB_Test"; ???String username = "sa"; ???String password = "Fleet2011@DB."; ???conn = DriverManager.getConnection(url,password); ??} catch (ClassNotFoundException e) { ???e.printStackTrace(); ??} catch (SQLException e) { ???e.printStackTrace(); ??} ??return conn;
?}
?public void Receive_Messages(Connection con) { ??try { ???cStmt = con ?????.prepareCall("{call [dbo].[Receive_Messages]}"); ???cStmt.executeUpdate(); ??} catch (SQLException ex) { ???ex.printStackTrace(); ??} finally { ???ClearConnection(cStmt,con); ??} ?}
?public void ClearConnection(CallableStatement cStmt,Connection con) { ??try { ???cStmt.close(); ???con.close();
??} catch (Exception e) { ???e.printStackTrace(); ??} ?} }
?
四:检验数据
select * from dbo.SSBReceivedQueue;
select * from dbo.SSBSendQueue;
select * from dbo.SSBReceived;
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|