AMQP动态创建订阅队列

[英]AMQP creating subscribing to queues dynamically


I am trying to build a simple chat application using AMQP, Websockets and Ruby. I understand that this may not be the best use-case to understand AMQP but I would like to understand where i am going wrong.

我正在尝试使用AMQP,Websockets和Ruby构建一个简单的聊天应用程序。我理解这可能不是理解AMQP的最佳用例,但我想了解我哪里出错了。

The following is my amqp-server code

以下是我的amqp服务器代码

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'

class MessageParser
  # message format => "room:harry_potter, nickname:siddharth, room:members"
  def self.parse(message)
    parsed_message = JSON.parse(message)

    response = {}
    if parsed_message['status'] == 'status'
      response[:status] = 'STATUS'
      response[:username] = parsed_message['username']
      response[:roomname] = parsed_message['roomname']
    elsif parsed_message['status'] == 'message'
      response[:status]   = 'MESSAGE'
      response[:message]  = parsed_message['message']
      response[:roomname] = parsed_message['roomname'].split().join('_')
    end

    response
  end
end

class MongoManager
  def self.establish_connection(database)
    @db ||= Mongo::Connection.new('localhost', 27017).db(database)
    @db.collection('rooms')

    @db
  end  
end


@sockets = []
EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel = AMQP::Channel.new(connection)

  puts "Connected to AMQP broker. #{AMQP::VERSION} "

  mongo = MongoManager.establish_connection("trackertalk_development")

  EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
    socket_detail = {:socket => ws}
    ws.onopen do 
      @sockets << socket_detail

    end

    ws.onmessage do |message|

      status  = MessageParser.parse(message)         
      exchange = channel.fanout(status[:roomname].split().join('_'))   

      if status[:status] == 'STATUS'               
         queue = channel.queue(status[:username], :durable => true)

        unless queue.subscribed? 
         puts "--------- SUBSCRIBED --------------"
         queue.bind(exchange).subscribe do |payload|
            puts "PAYLOAD :  #{payload}"
            ws.send(payload)
          end 
        else
          puts "----ALREADY SUBSCRIBED"
        end                  

        # only after 0.8.0rc14
        #queue = channel.queue(status[:username], :durable => true)      
        #AMQP::Consumer.new(channel, queue)        

      elsif status[:status] == 'MESSAGE'
        puts "********************* Message- published ******************************"
        exchange.publish(status[:message)  
      end                  
    end

    ws.onclose do 
      @sockets.delete ws
    end
  end    
end

I use the status to indicate whether the incoming message is a message for ongoing chat or for a status message requiring me to handle chores like subscribing to the queue.

我使用状态来指示传入消息是用于正在进行的聊天的消息还是要求我处理诸如订阅队列之类的杂事的状态消息。

The problem i face is that when I send a message like socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

我面临的问题是,当我发送一条消息,如socket.send(JSON.stringify({status:'message',message:'test',roomname:'Harry Potter'}))

The exchange.publish' is called but it still doesn't get pushed via thews.send` to the browser.

调用exchange.publish'但它仍然不会通过thews.send`推送到浏览器。

Is there something fundamentally wrong with my understanding of EventMachine and AMQP?

我对EventMachine和AMQP的理解是否存在根本错误?

Here is the pastie for the same code http://pastie.org/private/xosgb8tw1w5vuroa4w7a

这是相同代码的馅饼http://pastie.org/private/xosgb8tw1w5vuroa4w7a

My code seems to work as desired when i remove the durable => true from queue = channel.queue(status[:username], :durable => true)

当我从queue = channel.queue中删除durable => true时,我的代码似乎正常工作(status [:username],:durable => true)

The following is a snippet of my Rails view which identifies the user's username and the roomname and sends it as part of message via Websockets.

以下是我的Rails视图的片段,它标识用户的用户名和房间名,并通过Websockets将其作为消息的一部分发送。

Though the code seems to work when i remove the durable => true I fail to understand why that affects the message being delivered. Please Ignore the mongo part of as it does not play any part yet.

虽然代码似乎工作时,我删除持久=> true我无法理解为什么这会影响正在传递的消息。请忽略mongo部分,因为它还没有播放任何部分。

I would also like to know if my approach to AMQP and its usage is correct

我还想知道我对AMQP的使用方法及其用法是否正确

<script>
    $(document).ready(function(){
        var username = '<%= @user.email %>';
        var roomname = 'Bazingaa';

        socket = new WebSocket('ws://127.0.0.1:8080/');

        socket.onopen = function(msg){
            console.log('connected');
            socket.send(JSON.stringify({status:'status', username:username, roomname:roomname}));
        }

        socket.onmessage = function(msg){
            $('#chat-log').append(msg.data);

        }

    });

</script>
<div class='block'>
  <div class='content'>
    <h2 class='title'><%= @room.name %></h2>
    <div class='inner'>
      <div id="chat-log">
      </div>

      <div id="chat-console">
        <textarea rows="5" cols="40"></textarea>
      </div>
    </div>
  </div>
</div>

<style>
    #chat-log{
        color:#000;
        font-weight:bold;
        margin-top:1em;
        width:900px;
        overflow:auto;
        height:300px;
    }
    #chat-console{
        bottom:10px;
    }

    textarea{
        width:100%;
        height:60px;
    }
</style>

2 个解决方案

#1


1  

I think your problem might be the queue hangs around on the broker between invocations of ws.onmessage. When the client reconnects the queue and binding already exists so ws.send() doesn't get called.

我认为您的问题可能是在ws.onmessage调用之间的代理上挂起的队列。当客户端重新连接队列并且绑定已经存在时,ws.send()不会被调用。

By default when you create a queue, it and any bindings it has, hangs around until the broker restarts, or you explicitly tell the broker to delete it.

默认情况下,当您创建一个队列时,它及其拥有的任何绑定都会挂起,直到代理重新启动,或者您明确告诉代理将其删除。

There are two ways to change this:

有两种方法可以改变这种情况:

  • Adding the durable flag when you create the queue, which will cause the queue to stick around even if the broker restarts
  • 在创建队列时添加持久标志,即使代理重新启动,也会导致队列停留

  • Adding the auto_delete flag, which will cause the broker to automatically delete the entity after a short amount of time of not being having a consumer attached to it
  • 添加auto_delete标志,这将导致代理在没有附加消费者的短时间后自动删除实体

If you have control over the broker you are using the rabbitmq broker, an easy way to introspect what is happening on the broker is to install the management plugin, which provides a web interface to exchanges, bindings and queues on the broker.

如果您可以控制使用rabbitmq代理的代理,那么可以通过一种简单的方法来反省代理上发生的事情,即安装管理插件,该插件为代理上的交换,绑定和队列提供Web界面。

#2


0  

On the first look the AMQP bits seem to be OK, but I don't want to set up all the dependencies. If you provide a minimal example with just the AMQP part, I will check it.

首先看AMQP位似乎没问题,但我不想设置所有依赖项。如果你提供一个只有AMQP部分的最小例子,我会检查它。

智能推荐

注意!

本站翻译的文章,版权归属于本站,未经许可禁止转摘,转摘请注明本文地址:http://www.silva-art.net/blog/2011/07/19/535345accc9c2e7d046678839f7f6e03.html



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告