一个sql server service broker例子
-----------------------------------
USE master
GO
--------------------------------------------------
-- Create demo database section
-------------------------------------------------- IF EXISTS (SELECT name FROM sys.databases WHERE name = 'SsbDemoDb')
DROP DATABASE [SsbDemoDb]; CREATE DATABASE [SsbDemoDb]
GO USE [SsbDemoDb];
GO --------------------------------------------------
-- Dialog pool section
-------------------------------------------------- --------------------------------------------------
-- The dialog pool table.
-- Obtain a conversation handle using from service, to service, and contract.
-- Also indicates age and usage of dialog for auditing purposes.
-------------------------------------------------- IF EXISTS (SELECT name FROM sys.tables WHERE name = 'DialogPool')
DROP TABLE [DialogPool]
GO CREATE TABLE [DialogPool] (
FromService SYSNAME NOT NULL,
ToService SYSNAME NOT NULL,
OnContract SYSNAME NOT NULL,
Handle UNIQUEIDENTIFIER NOT NULL,
OwnerSPID INT NOT NULL,
CreationTime DATETIME NOT NULL,
SendCount BIGINT NOT NULL,
UNIQUE (Handle));
GO --------------------------------------------------
-- Get dialog procedure.
-- Reuse a free dialog in the pool or create a new one in case
-- no free dialogs exist.
-- Input is from service, to service, and contract.
-- Output is dialog handle and count of message previously sent on dialog.
-------------------------------------------------- IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_get_dialog')
DROP PROC usp_get_dialog
GO CREATE PROCEDURE [usp_get_dialog] (
@fromService SYSNAME,
@toService SYSNAME,
@onContract SYSNAME,
@dialogHandle UNIQUEIDENTIFIER OUTPUT,
@sendCount BIGINT OUTPUT)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @dialog TABLE
(
FromService SYSNAME NOT NULL,
ToService SYSNAME NOT NULL,
OnContract SYSNAME NOT NULL,
Handle UNIQUEIDENTIFIER NOT NULL,
OwnerSPID INT NOT NULL,
CreationTime DATETIME NOT NULL,
SendCount BIGINT NOT NULL
); -- Try to claim an unused dialog in [DialogPool]
-- READPAST option avoids blocking on locked dialogs.
BEGIN TRANSACTION;
DELETE @dialog;
UPDATE TOP(1) [DialogPool] WITH(READPAST)
SET OwnerSPID = @@SPID
OUTPUT INSERTED.* INTO @dialog
WHERE FromService = @fromService
AND ToService = @toService
AND OnContract = @OnContract
AND OwnerSPID = -1;
IF @@ROWCOUNT > 0
BEGIN
SET @dialogHandle = (SELECT Handle FROM @dialog);
SET @sendCount = (SELECT SendCount FROM @dialog);
END
ELSE
BEGIN
-- No free dialogs: need to create a new one
BEGIN DIALOG CONVERSATION @dialogHandle
FROM SERVICE @fromService
TO SERVICE @toService
ON CONTRACT @onContract
WITH ENCRYPTION = OFF;
INSERT INTO [DialogPool]
(FromService, ToService, OnContract, Handle, OwnerSPID,
CreationTime, SendCount)
VALUES
(@fromService, @toService, @onContract, @dialogHandle, @@SPID,
GETDATE(), 0);
SET @sendCount = 0;
END
COMMIT
END;
GO --------------------------------------------------
-- Free dialog procedure.
-- Return the dialog to the pool.
-- Inputs are dialog handle and updated send count.
-------------------------------------------------- IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_free_dialog')
DROP PROC usp_free_dialog
GO CREATE PROCEDURE [usp_free_dialog] (
@dialogHandle UNIQUEIDENTIFIER,
@sendCount BIGINT)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @rowcount INT;
DECLARE @string VARCHAR(50);
BEGIN TRANSACTION;
-- Release dialog by setting OwnerSPID to -1.
UPDATE [DialogPool] SET OwnerSPID = -1, SendCount = @sendCount WHERE Handle = @dialogHandle;
SELECT @rowcount = @@ROWCOUNT;
IF @rowcount = 0
BEGIN
SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));
RAISERROR('usp_free_dialog: dialog %s not found in dialog pool', 16, 1, @string) WITH LOG;
END
ELSE IF @rowcount > 1
BEGIN
SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));
RAISERROR('usp_free_dialog: duplicate dialog %s found in dialog pool', 16, 1, @string) WITH LOG;
END
COMMIT
END; GO --------------------------------------------------
-- Delete dialog procedure.
-- Delete the dialog from the pool. This does not end the dialog.
-- Input is dialog handle.
-------------------------------------------------- IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_delete_dialog')
DROP PROC usp_delete_dialog
GO CREATE PROCEDURE [usp_delete_dialog] (
@dialogHandle UNIQUEIDENTIFIER)
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRANSACTION;
DELETE [DialogPool] WHERE Handle = @dialogHandle;
COMMIT
END;
GO --------------------------------------------------
-- Application setup section.
-------------------------------------------------- --------------------------------------------------
-- Send messages from initiator to target.
-- Initiator uses dialogs from the dialog pool.
-- Initiator also retires dialogs based on application criteria,
-- which results in recycling dialogs in the pool.
--------------------------------------------------
-- This table stores the messages on the target side IF EXISTS (SELECT name FROM sys.tables WHERE name = 'MsgTable')
DROP TABLE MsgTable
GO CREATE TABLE MsgTable ( message_type SYSNAME, message_body NVARCHAR(4000))
GO -- Activated store proc for the initiator to receive messages. CREATE PROCEDURE initiator_queue_activated_procedure
AS
BEGIN
DECLARE @handle UNIQUEIDENTIFIER;
DECLARE @message_type SYSNAME;
BEGIN TRANSACTION;
WAITFOR (
RECEIVE TOP(1) @handle = [conversation_handle],
@message_type = [message_type_name]
FROM [SsbInitiatorQueue]), TIMEOUT 5000;
IF @@ROWCOUNT = 1
BEGIN
-- Expect target response to EndOfStream message.
IF @message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @handle;
END
END
COMMIT
END; GO -- Activated store proc for the target to receive messages. CREATE PROCEDURE target_queue_activated_procedure
AS
BEGIN
-- Variable table for received messages.
DECLARE @receive_table TABLE(
queuing_order BIGINT,
conversation_handle UNIQUEIDENTIFIER,
message_type_name SYSNAME,
message_body VARCHAR(MAX)); -- Cursor for received message table.
DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY
FOR SELECT
conversation_handle,
message_type_name,
message_body
FROM @receive_table ORDER BY queuing_order; DECLARE @conversation_handle UNIQUEIDENTIFIER;
DECLARE @message_type SYSNAME;
DECLARE @message_body VARCHAR(4000); -- Error variables.
DECLARE @error_number INT;
DECLARE @error_message VARCHAR(4000);
DECLARE @error_severity INT;
DECLARE @error_state INT;
DECLARE @error_procedure SYSNAME;
DECLARE @error_line INT;
DECLARE @error_dialog VARCHAR(50); BEGIN TRY
WHILE (1 = 1)
BEGIN
BEGIN TRANSACTION;
-- Receive all available messages into the table.
-- Wait 5 seconds for messages.
WAITFOR (
RECEIVE
[queuing_order],
[conversation_handle],
[message_type_name],
CAST([message_body] AS VARCHAR(4000))
FROM [SsbTargetQueue]
INTO @receive_table
), TIMEOUT 5000; IF @@ROWCOUNT = 0
BEGIN
COMMIT;
BREAK;
END
ELSE
BEGIN
OPEN message_cursor;
WHILE (1=1)
BEGIN
FETCH NEXT FROM message_cursor
INTO @conversation_handle,
@message_type,
@message_body;
IF (@@FETCH_STATUS != 0) BREAK;
-- Process a message.
-- If an exception occurs, catch and attempt to recover.
BEGIN TRY
IF @message_type = 'SsbMsgType'
BEGIN
-- process the msg. Here we will just insert it into a table
INSERT INTO MsgTable values(@message_type, @message_body);
END
ELSE IF @message_type = 'EndOfStream'
BEGIN
-- initiator is signaling end of message stream: end the dialog
END CONVERSATION @conversation_handle;
END
ELSE IF @message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
-- If the message_type indicates that the message is an error,
-- raise the error and end the conversation.
WITH XMLNAMESPACES ('http://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)
SELECT
@error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),
@error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');
SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50));
RAISERROR('Error in dialog %s: %s (%i)', 16, 1, @error_dialog, @error_message, @error_number);
END CONVERSATION @conversation_handle;
END
END TRY
BEGIN CATCH
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE(); IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- This could disable the queue if done 5 times consecutively!
ROLLBACK TRANSACTION; -- Record the error.
BEGIN TRANSACTION;
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 1);
COMMIT; -- For this level of error, it is best to exit the proc
-- and give the queue monitor control.
-- Breaking to the outer catch will accomplish this.
RAISERROR ('Message processing error', 16, 1);
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and continue processing messages.
-- Failing message could also be put aside for later processing here.
-- Otherwise it will be discarded.
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 0);
END
END CATCH
END
CLOSE message_cursor;
DELETE @receive_table;
END
COMMIT;
END
END TRY
BEGIN CATCH
-- Process the error and exit the proc to give the queue monitor control
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE(); IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- This could disable the queue if done 5 times consecutively!
ROLLBACK TRANSACTION;
-- Record the error.
BEGIN TRANSACTION;
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 1);
COMMIT;
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and commit transaction.
-- Here you could also save anything else you want before exiting.
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 0);
COMMIT;
END
END CATCH
END;
GO -- Table to store processing errors. IF EXISTS (SELECT name FROM sys.tables WHERE name = 'target_processing_errors')
DROP TABLE target_processing_errors;
GO CREATE TABLE target_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT,
error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL,
error_line INT, doomed_transaction TINYINT)
GO -- Create Initiator and Target side SSB entities
CREATE MESSAGE TYPE SsbMsgType VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE EndOfStream;
CREATE CONTRACT SsbContract
(
SsbMsgType SENT BY INITIATOR,
EndOfStream SENT BY INITIATOR
); CREATE QUEUE SsbInitiatorQueue
WITH ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 1,
PROCEDURE_NAME = [initiator_queue_activated_procedure],
EXECUTE AS OWNER);
CREATE QUEUE SsbTargetQueue
WITH ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 1,
PROCEDURE_NAME = [target_queue_activated_procedure],
EXECUTE AS OWNER); CREATE SERVICE SsbInitiatorService ON QUEUE SsbInitiatorQueue; CREATE SERVICE SsbTargetService ON QUEUE SsbTargetQueue (SsbContract); GO -- SEND procedure. Uses a dialog from the dialog pool.
-- IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_send')
DROP PROC usp_send
GO
CREATE PROCEDURE [usp_send] (
@fromService SYSNAME,
@toService SYSNAME,
@onContract SYSNAME,
@messageType SYSNAME,
@messageBody NVARCHAR(MAX))
AS
BEGIN
SET NOCOUNT ON;
DECLARE @dialogHandle UNIQUEIDENTIFIER;
DECLARE @sendCount BIGINT;
DECLARE @counter INT;
DECLARE @error INT;
SELECT @counter = 1;
BEGIN TRANSACTION;
-- Will need a loop to retry in case the dialog is
-- in a state that does not allow transmission
--
WHILE (1=1)
BEGIN
-- Claim a dialog from the dialog pool.
-- A new one will be created if none are available.
--
EXEC usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT; -- Attempt to SEND on the dialog
--
IF (@messageBody IS NOT NULL)
BEGIN
-- If the @messageBody is not null it must be sent explicitly
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody);
END
ELSE
BEGIN
-- Messages with no body must *not* specify the body,
-- cannot send a NULL value argument
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType;
END SELECT @error = @@ERROR;
IF @error = 0
BEGIN
-- Successful send, increment count and exit the loop
--
SET @sendCount = @sendCount + 1;
BREAK;
END SELECT @counter = @counter+1;
IF @counter > 10
BEGIN
-- We failed 10 times in a row, something must be broken
--
RAISERROR('Failed to SEND on a conversation for more than 10 times. Error %i.', 16, 1, @error) WITH LOG;
BREAK;
END -- Delete the associated dialog from the table and try again
--
EXEC usp_delete_dialog @dialogHandle;
SELECT @dialogHandle = NULL;
END -- “Criterion” for dialog pool removal is send count > 1000.
-- Modify to suit application.
-- When deleting also inform the target to end the dialog.
IF @sendCount > 1000
BEGIN
EXEC usp_delete_dialog @dialogHandle ;
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [EndOfStream];
END
ELSE
BEGIN
-- Free the dialog.
EXEC usp_free_dialog @dialogHandle, @sendCount;
END
COMMIT
END;
GO --------------------------------------------------------
-- Run application section
-------------------------------------------------------- -- Send some messages exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message1.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message2.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message3.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message4.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message5.</xml>'
GO -- Show the dialog pool
SELECT * FROM [DialogPool]
GO -- Show the dialogs used.
SELECT * FROM sys.conversation_endpoints;
GO -- Check whether the TARGET side has processed the messages
SELECT * FROM MsgTable
SELECT * FROM dialogpool
SELECT * FROM dbo.target_processing_errors --TRUNCATE TABLE MsgTable
GO