Class | Bunny::Subscription |
In: |
lib/bunny/subscription08.rb
|
Parent: | Qrack::Subscription |
Asks the server to start a "consumer", which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them with an unsubscribe. Every time a message reaches the queue it is passed to the blk for processing. If error occurs, Bunny::ProtocolError is raised.
Passes a hash of message information to the block, if one has been supplied. The hash contains :header, :payload and :delivery_details. The structure of the data is as follows -
:header has instance variables -
@klass @size @weight @properties is a hash containing - :content_type :delivery_mode :priority
:payload contains the message contents
:delivery details is a hash containing -
:consumer_tag :delivery_tag :redelivered :exchange :routing_key
If the :timeout option is specified then Qrack::ClientTimeout is raised if method times out waiting to receive the next message from the queue.
my_queue.subscribe(:timeout => 5) {|msg| puts msg[:payload]}
my_queue.subscribe(:message_max => 10, :ack => true) {|msg| puts msg[:payload]}
# File lib/bunny/subscription08.rb, line 65 65: def setup_consumer 66: client.send_frame( 67: Qrack::Protocol::Basic::Consume.new({ :queue => queue.name, 68: :consumer_tag => consumer_tag, 69: :no_ack => !ack, 70: :exclusive => exclusive, 71: :nowait => false }.merge(@opts)) 72: ) 73: 74: method = client.next_method 75: 76: client.check_response(method, Qrack::Protocol::Basic::ConsumeOk, 77: "Error subscribing to queue #{queue.name}") 78: 79: @consumer_tag = method.consumer_tag 80: 81: end