本文介绍了如何在 Rabbit.js 上创建 REP/REQ的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 .Net 上使用 RabbitMQ 已经有一段时间了,我对它没有太大的问题.现在我要使用 node.js 迁移到 rabbit.js,但我对它不太熟悉.rabbit.js 的文档有限.我只知道基本的 PUSH/PULL 或 PUB/SUB.现在我想做 REQ/REP 但我不知道怎么做.任何人都可以分享一些片段.

I have been working with RabbitMQ on .Net for a while already and I don't have much of a problem with it. Now I'm moving to rabbit.js with node.js and I'm not pretty much familiar with it. rabbit.js has a limited documentation. All I know is the basic PUSH/PULL or PUB/SUB. Now I wanted to do REQ/REP and I don't know how do it. Anybody can share some snippet please.

非常感谢您的回复.

最好,

推荐答案

这可能比您要求的更多,但我有一个使用 node-amqp 而不是带有 rabbit.js 的 REQ/RES.我所做的与您在 RabbitMQ 教程中可以找到的类似关于RPC

This is perhaps more then you asked for but I have a snipplet (even though it's quite long) for doing RPC using node-amqp instead of REQ/RES with rabbit.js. What I have done is similar to what you could find in the RabbitMQ tutorial about RPC

目前消息中的内容应该是一个对象(哈希),它将被 amqp 模块转换为 json.

For the moment the content in the message should be an object (hash) that will get transformed by the amqp module to json.

AmqpRpc 类在初始化时采用 amqp 连接,那么它应该只需要调用 makeRequest 并等待回调中的响应.响应的形式为 function(err, response),其中 err 可能是超时错误

The AmqpRpc class take an amqp connection when initialized then it should only be a matter of calling makeRequest and wait for a response in the callback.The response have the form of function(err, response) where err might be a timeout error

很抱歉,这不是您所要求的,但它可能已经足够接近了.我还在 github 上发布了代码作为要点:https://gist.github.com/2720846

I'm sorry its not exactly what you asked for but it's maybe close enough.I also posted the code as a gist on github: https://gist.github.com/2720846

示例已更改以支持多个未完成的请求.

Samples changed to support multiple outstanding requests.

amqprpc.js

var amqp = require('amqp')
  , crypto = require('crypto')

var TIMEOUT=2000; //time to wait for response in ms
var CONTENT_TYPE='application/json';

exports = module.exports = AmqpRpc;

function AmqpRpc(connection){
  var self = this;
  this.connection = typeof(connection) != 'undefined' ? connection : amqp.createConnection();
  this.requests = {}; //hash to store request in wait for response
  this.response_queue = false; //plaseholder for the future queue
}

AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
  var self = this;
  //generate a unique correlation id for this call
  var correlationId = crypto.randomBytes(16).toString('hex');

  //create a timeout for what should happen if we don't get a response
  var tId = setTimeout(function(corr_id){
    //if this ever gets called we didn't get a response in a
    //timely fashion
    callback(new Error("timeout " + corr_id));
    //delete the entry from hash
    delete self.requests[corr_id];
  }, TIMEOUT, correlationId);

  //create a request entry to store in a hash
  var entry = {
    callback:callback,
    timeout: tId //the id for the timeout so we can clear it
  };

  //put the entry in the hash so we can match the response later
  self.requests[correlationId]=entry;

  //make sure we have a response queue
  self.setupResponseQueue(function(){
    //put the request on a queue
    self.connection.publish(queue_name, content, {
      correlationId:correlationId,
      contentType:CONTENT_TYPE,
      replyTo:self.response_queue});
  });
}


AmqpRpc.prototype.setupResponseQueue = function(next){
  //don't mess around if we have a queue
  if(this.response_queue) return next();

  var self = this;
  //create the queue
  self.connection.queue('', {exclusive:true}, function(q){
    //store the name
    self.response_queue = q.name;
    //subscribe to messages
    q.subscribe(function(message, headers, deliveryInfo, m){
      //get the correlationId
      var correlationId = m.correlationId;
      //is it a response to a pending request
      if(correlationId in self.requests){
        //retreive the request entry
        var entry = self.requests[correlationId];
        //make sure we don't timeout by clearing it
        clearTimeout(entry.timeout);
        //delete the entry from hash
        delete self.requests[correlationId];
        //callback, no err
        entry.callback(null, message);
      }
    });
    return next();
  });
}

关于如何使用它的一个小例子可以在下面找到.保存两个代码部分,然后运行...

A small example on how to use it can be found below. Save both code parts and just run with...

node client.js

如果您没有提供回复的服务器,则请求将超时.

If you don't have a server to provide the reply the request will time out.

client.js

//exmaple on how to use amqprpc
var amqp = require('amqp');
var connection = amqp.createConnection({host:'127.0.0.1'});

var rpc = new (require('./amqprpc'))(connection);

connection.on("ready", function(){
  console.log("ready");
  var outstanding=0; //counter of outstanding requests

  //do a number of requests
  for(var i=1; i<=10 ;i+=1){
    //we are about to make a request, increase counter
    outstanding += 1;
    rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){
      if(err)
        console.error(err);
      else
        console.log("response", response);
      //reduce for each timeout or response
      outstanding-=1;
      isAllDone();
    });
  }

  function isAllDone() {
    //if no more outstanding then close connection
    if(outstanding === 0){
      connection.end();
    }
  }

});

我什至会投入一个示例服务器以进行良好的衡量

I'll even throw in a sample server for good measure

server.js

//super simple rpc server example
var amqp = require('amqp')
  , util = require('util');

var cnn = amqp.createConnection({host:'127.0.0.1'});

cnn.on('ready', function(){
  console.log("listening on msg_queue");
  cnn.queue('msg_queue', function(q){
      q.subscribe(function(message, headers, deliveryInfo, m){
        util.log(util.format( deliveryInfo.routingKey, message));
        //return index sent
        cnn.publish(m.replyTo, {response:"OK", index:message.index}, {
            contentType:'application/json',
            contentEncoding:'utf-8',
            correlationId:m.correlationId
          });
      });
  });
});

这篇关于如何在 Rabbit.js 上创建 REP/REQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 04:09