加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 站长学院 > MsSql教程 > 正文

SqlServer2008 点对点模式实例

发布时间:2020-12-12 14:10:48 所属栏目:MsSql教程 来源:网络整理
导读:------(一)、启用数据库的Service Broker活动 -- Enabling Databases for Service Broker Activity USE master GO IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = 'SSB_Test') CREATE DATABASE SSB_Test GO ALTER DATABASE SSB_Test SET ENA


------(一)、启用数据库的Service Broker活动
-- Enabling Databases for Service Broker Activity
USE master
GO
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = 'SSB_Test')

CREATE DATABASE SSB_Test
GO

ALTER DATABASE SSB_Test SET ENABLE_BROKER
GO
ALTER DATABASE SSB_Test SET TRUSTWORTHY ON

GO

?

-----(二)、创建数据库主密钥
-- Creating the DatabaseMaster Key for Encryption
USE SSB_Test
GO
CREATE MASTER KEY
ENCRYPTION BY PASSWORD = 'I5Q7w1d3'
GO

?

-------(三)、管理消息类型
-- Managing Message Types
Use SSB_Test
GO
-- 发送消息类型
CREATE MESSAGE TYPE [//SSB_Test/SendMessages]
VALIDATION = NONE

GO


--目标数据库发送的消息类型
CREATE MESSAGE TYPE [//SSB_Test/ReceivedMessages]
VALIDATION = NONE
GO

?


------(四)、创建契约(Contract)
-- Creating Contracts
Use SSB_Test
GO
CREATE CONTRACT
[//SSB_Test/SSBContract]
( [//SSB_Test/SendMessages]
SENT BY INITIATOR,
[//SSB_Test/ReceivedMessages]
SENT BY TARGET
)
GO


-----------(五)、创建队列
Use SSB_Test
GO
--保存BookDistribution过来的消息

CREATE QUEUE SSBSendQueue
WITH STATUS=ON
GO

USE SSB_Test
GO
--保存BookStore过来的消息
CREATE QUEUE SSBReceivedQueue
WITH STATUS=ON
GO

?

-------(六)、创建服务
-- Creating Services
Use SSB_Test
GO
CREATE SERVICE
[//SSB_Test/FromService]
ON QUEUE dbo.SSBSendQueue--指定的队列绑定到契约
([//SSB_Test/SSBContract])
GO


USE SSB_Test
GO
CREATE SERVICE [//SSB_Test/ToService]
ON QUEUE dbo.SSBReceivedQueue--指定的队列绑定到契约
([//SSB_Test/SSBContract])
GO

?

?


USE SSB_Test
GO
--创建一个表存放接收到的订单信息
CREATE TABLE dbo.SSBReceived
(ReceivedID int IDENTITY (1,1) NOT NULL,
conversation_handle uniqueidentifier NOT NULL,
conversation_group_id uniqueidentifier NOT NULL,
message_body VarBinary(max) NOT NULL)
GO

?

?

?


----1:
USE SSB_Test
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

CREATE PROCEDURE [dbo].[Send_Messages]
@sendMsgContext VarBinary(max),
@result int OUTPUT
AS
?
DECLARE @Conv_Handler uniqueidentifier
DECLARE @sendMsg VarBinary(max);

SET XACT_ABORT ON
BEGIN TRANSACTION

BEGIN DIALOG CONVERSATION @Conv_Handler
FROM SERVICE [//SSB_Test/FromService]
TO SERVICE '//SSB_Test/ToService'
ON CONTRACT [//SSB_Test/SSBContract]
---WITH LIFETIME = 200;
SET @sendMsg = @sendMsgContext;

----第二步:发起会话
SEND ON CONVERSATION @Conv_Handler
MESSAGE TYPE [//SSB_Test/SendMessages]
(@sendMsg);


END CONVERSATION @Conv_Handler WITH CLEANUP;

SET @result = 1

COMMIT TRANSACTION

RETURN @result;

?


USE SSB_Test
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

CREATE PROCEDURE [dbo].[Receive_Messages]
AS
DECLARE @Conv_Handler uniqueidentifier
DECLARE @Conv_Group uniqueidentifier
DECLARE @Receive_Msg VarBinary(max);
----DECLARE @TextResponseMsg varchar(8000)
----DECLARE @ResponseMsg VarBinary(max);
DECLARE @Message_Type_Name nvarchar(256);

SET XACT_ABORT ON
BEGIN TRAN;
--WHILE(1=1)
--BEGIN
---WAITFOR(
RECEIVE TOP(1) @Receive_Msg = message_body,
@Conv_Handler = conversation_handle,
@Conv_Group = conversation_group_id,
@Message_Type_Name = message_type_name
FROM dbo.SSBReceivedQueue
---),TIMEOUT 200


IF @Message_Type_Name = '//SSB_Test/SendMessages'
BEGIN
INSERT dbo.SSBReceived
(conversation_handle,conversation_group_id,message_body)
VALUES
(@Conv_Handler,@Conv_Group,@Receive_Msg)

END

IF @Message_Type_Name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @Conv_Handler;
END

COMMIT TRAN

GO

?

?二:程序调用发送存储过程

package com.dingli.json.util;

import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

public class TestMutilServiceBroker {
?/**
? * @param args
? */
?public static void main(String[] args) {
??ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(50);
??exec.scheduleAtFixedRate(new ServiceBroker(),1000,
????TimeUnit.MILLISECONDS);
?}
}

class ServiceBroker implements Runnable {
?protected final Logger logger = Logger.getLogger(ServiceBroker.class);
?final static String driver = "net.sourceforge.jtds.jdbc.Driver";
?private static Connection conn = null;
?private CallableStatement cStmt = null;

?@Override
?public void run() {
??final ServiceBroker test = new ServiceBroker();
??final Connection con = test.getConnection();
??final long starttime = System.currentTimeMillis();
??Send_Message(con);
??// Mutil_Send_Message(con);
??long endtime = System.currentTimeMillis();
??logger.info("每次发送消息响应时间毫秒=" + (endtime - starttime));
?}

?private Connection getConnection() {
??try {
???Class.forName(driver);
???final String url = "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=SSB_Test";
???// final String url =
???// "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=BookStore";
???final String username = "sa";
???final String password = "Fleet2011@DB.";
???conn = DriverManager.getConnection(url,username,password);
??} catch (final ClassNotFoundException e) {
???e.printStackTrace();
??} catch (final SQLException e) {
???e.printStackTrace();
??}
??return conn;
?}

?private void Send_Message(Connection con) {

??try {
???cStmt = con.prepareCall("{call [dbo].[Send_Messages](?,?)}");
???final StringBuffer sb = new StringBuffer();
???ByteBuffer buffer = ByteBuffer.allocate(5000);
???sb.append("<order id="3439" customer="22" orderdate="2/15/2011"><LineItem ItemNumber="1" ISBN="1-59059-592-0" Quantity="1" /></order>");
???buffer.put(sb.toString().getBytes());
???cStmt.setBytes("@sendMsgContext",buffer.array());
???// cStmt.setString("@sendMsgContext",buffer);
???cStmt.setInt("@result",0);
???cStmt.execute();
???sb.delete(0,sb.length());

??} catch (final SQLException e) {
???e.printStackTrace();
??} finally {
???try {
????cStmt.close();
????con.close();
???} catch (final Exception e) {
????e.printStackTrace();
???}
??}
?}

?private void Mutil_Send_Message(Connection con) {
??try {
???cStmt = con
?????.prepareCall("{call [dbo].[sp_PublishOrSubscriptionMessages](?,?,?)}");
???final StringBuffer sb = new StringBuffer();
???// sb.append("<order id="3439" customer="22" orderdate="2/15/2011"><LineItem ItemNumber="1" ISBN="1-59059-592-0" Quantity="1" /></order>");
???// cStmt.setString("@sendMsgContext",sb.toString());
???// cStmt.setInt("@result",0);

???sb.append("<?xml version="1.0"?><Publish xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe"><Subject>Subject1</Subject></Publish>");
???cStmt.setString("@FromServerName","[AuthorService]");
???cStmt.setString("@ToServerName","PublisherService");
???cStmt.setString("@ContractName",
?????"[http://ssb.csharp.at/SSB_Book/c10/PublishContract]");
???cStmt.setString("@MessageType",
?????"[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]");
???cStmt.setString("@MessageBody",sb.toString());
???cStmt.setInt("@result",sb.length());

??} catch (final SQLException e) {
???e.printStackTrace();
??} finally {
???try {
????cStmt.close();
????con.close();
???} catch (final Exception e) {
????e.printStackTrace();
???}
??}
?}
}

?

三:程序接收消息调用存储过程

package com.dingli.json.util;

import java.io.Serializable;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import org.apache.log4j.Logger;

public class TestServiceBrokerTwo implements Serializable {
?protected static Logger logger = Logger
???.getLogger(TestServiceBrokerTwo.class);
?/**
? *
? */
?private static final long serialVersionUID = -8531232416149283642L;

?final String driver = "net.sourceforge.jtds.jdbc.Driver";
?Connection conn = null;
?CallableStatement cStmt = null;

?/**
? * @param args
? */
?public static void main(String[] args) {
??TestServiceBrokerTwo test = new TestServiceBrokerTwo();
??Connection con = null;
??while (true) {
???con = test.getConnection();
???long starttime = System.currentTimeMillis();
???test.Receive_Messages(con);
???long endtime = System.currentTimeMillis();
???logger.info("每次接收消息响应时间毫秒=" + (endtime - starttime));
??}
?}

?public Connection getConnection() {
??try {
???Class.forName(driver);
???String url = "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=SSB_Test";
???String username = "sa";
???String password = "Fleet2011@DB.";
???conn = DriverManager.getConnection(url,password);
??} catch (ClassNotFoundException e) {
???e.printStackTrace();
??} catch (SQLException e) {
???e.printStackTrace();
??}
??return conn;

?}

?public void Receive_Messages(Connection con) {
??try {
???cStmt = con
?????.prepareCall("{call [dbo].[Receive_Messages]}");
???cStmt.executeUpdate();
??} catch (SQLException ex) {
???ex.printStackTrace();
??} finally {
???ClearConnection(cStmt,con);
??}
?}

?public void ClearConnection(CallableStatement cStmt,Connection con) {
??try {
???cStmt.close();
???con.close();

??} catch (Exception e) {
???e.printStackTrace();
??}
?}
}

?

四:检验数据

select * from dbo.SSBReceivedQueue;

select * from dbo.SSBSendQueue;

select * from dbo.SSBReceived;

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读