Class | Bunny::Queue |
In: |
lib/bunny/queue08.rb
|
Parent: | Qrack::Queue |
Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers.
# File lib/bunny/queue08.rb, line 14 14: def initialize(client, name, opts = {}) 15: # check connection to server 16: raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected 17: 18: @client = client 19: @opts = opts 20: @delivery_tag = nil 21: @subscription = nil 22: 23: # Queues without a given name are named by the server and are generally 24: # bound to the process that created them. 25: if !name 26: opts = { 27: :passive => false, 28: :durable => false, 29: :exclusive => true, 30: :auto_delete => true 31: }.merge(opts) 32: end 33: 34: # ignore the :nowait option if passed, otherwise program will hang waiting for a 35: # response that will not be sent by the server 36: opts.delete(:nowait) 37: 38: client.send_frame( 39: Qrack::Protocol::Queue::Declare.new({ :queue => name || '', :nowait => false }.merge(opts)) 40: ) 41: 42: method = client.next_method 43: 44: client.check_response(method, Qrack::Protocol::Queue::DeclareOk, "Error declaring queue #{name}") 45: 46: @name = method.queue 47: client.queues[@name] = self 48: end
Acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.
# File lib/bunny/queue08.rb, line 67 67: def ack(opts={}) 68: # Set delivery tag 69: if delivery_tag.nil? and opts[:delivery_tag].nil? 70: raise Bunny::AcknowledgementError, "No delivery tag received" 71: else 72: self.delivery_tag = opts[:delivery_tag] if delivery_tag.nil? 73: end 74: 75: client.send_frame( 76: Qrack::Protocol::Basic::Ack.new({:delivery_tag => delivery_tag, :multiple => false}.merge(opts)) 77: ) 78: 79: # reset delivery tag 80: self.delivery_tag = nil 81: end
Binds a queue to an exchange. Until a queue is bound it will not receive any messages. Queues are bound to the direct exchange ’’ by default. If error occurs, a Bunny::ProtocolError is raised.
:bind_ok if successful.
# File lib/bunny/queue08.rb, line 100 100: def bind(exchange, opts = {}) 101: exchange = exchange.respond_to?(:name) ? exchange.name : exchange 102: 103: # ignore the :nowait option if passed, otherwise program will hang waiting for a 104: # response that will not be sent by the server 105: opts.delete(:nowait) 106: 107: client.send_frame( 108: Qrack::Protocol::Queue::Bind.new({ :queue => name, 109: :exchange => exchange, 110: :routing_key => opts.delete(:key), 111: :nowait => false }.merge(opts)) 112: ) 113: 114: method = client.next_method 115: 116: client.check_response(method, Qrack::Protocol::Queue::BindOk, 117: "Error binding queue: #{name} to exchange: #{exchange}") 118: 119: # return message 120: :bind_ok 121: end
Requests that a queue is deleted from broker/server. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration. Removes reference from queues if successful. If an error occurs raises Bunny::ProtocolError.
:delete_ok if successful
# File lib/bunny/queue08.rb, line 146 146: def delete(opts = {}) 147: # ignore the :nowait option if passed, otherwise program will hang waiting for a 148: # response that will not be sent by the server 149: opts.delete(:nowait) 150: 151: client.send_frame( 152: Qrack::Protocol::Queue::Delete.new({ :queue => name, :nowait => false }.merge(opts)) 153: ) 154: 155: method = client.next_method 156: 157: client.check_response(method, Qrack::Protocol::Queue::DeleteOk, "Error deleting queue #{name}") 158: 159: client.queues.delete(name) 160: 161: # return confirmation 162: :delete_ok 163: end
Gets a message from a queue in a synchronous way. If error occurs, raises Bunny::ProtocolError.
Hash {:header, :payload, :delivery_details}. :delivery_details is a hash {:consumer_tag, :delivery_tag, :redelivered, :exchange, :routing_key}.
If the queue is empty the returned hash will contain the values -
:header => nil :payload => :queue_empty :delivery_details => nil
N.B. If a block is provided then the hash will be passed into the block and the return value will be nil.
# File lib/bunny/queue08.rb, line 194 194: def pop(opts = {}, &blk) 195: 196: # do we want to have to provide an acknowledgement? 197: ack = opts.delete(:ack) 198: 199: client.send_frame( 200: Qrack::Protocol::Basic::Get.new({ :queue => name, 201: :consumer_tag => name, 202: :no_ack => !ack, 203: :nowait => true }.merge(opts)) 204: ) 205: 206: method = client.next_method 207: 208: if method.is_a?(Qrack::Protocol::Basic::GetEmpty) then 209: queue_empty = true 210: elsif !method.is_a?(Qrack::Protocol::Basic::GetOk) 211: raise Bunny::ProtocolError, "Error getting message from queue #{name}" 212: end 213: 214: if !queue_empty 215: # get delivery tag to use for acknowledge 216: self.delivery_tag = method.delivery_tag if ack 217: 218: header = client.next_payload 219: 220: # If maximum frame size is smaller than message payload body then message 221: # will have a message header and several message bodies 222: msg = '' 223: while msg.length < header.size 224: msg += client.next_payload 225: end 226: 227: msg_hash = {:header => header, :payload => msg, :delivery_details => method.arguments} 228: 229: else 230: msg_hash = {:header => nil, :payload => :queue_empty, :delivery_details => nil} 231: end 232: 233: # Pass message hash to block or return message hash 234: blk ? blk.call(msg_hash) : msg_hash 235: 236: end
Removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal "undo" mechanism. If an error occurs raises Bunny::ProtocolError.
:purge_ok if successful
# File lib/bunny/queue08.rb, line 254 254: def purge(opts = {}) 255: # ignore the :nowait option if passed, otherwise program will hang waiting for a 256: # response that will not be sent by the server 257: opts.delete(:nowait) 258: 259: client.send_frame( 260: Qrack::Protocol::Queue::Purge.new({ :queue => name, :nowait => false }.merge(opts)) 261: ) 262: 263: method = client.next_method 264: 265: client.check_response(method, Qrack::Protocol::Queue::PurgeOk, "Error purging queue #{name}") 266: 267: # return confirmation 268: :purge_ok 269: 270: end
Returns hash {:message_count, :consumer_count}.
# File lib/bunny/queue08.rb, line 280 280: def status 281: client.send_frame( 282: Qrack::Protocol::Queue::Declare.new({ :queue => name, :passive => true }) 283: ) 284: method = client.next_method 285: {:message_count => method.message_count, :consumer_count => method.consumer_count} 286: end
# File lib/bunny/queue08.rb, line 289 289: def subscribe(opts = {}, &blk) 290: # Create subscription 291: s = Bunny::Subscription.new(client, self, opts) 292: s.start(&blk) 293: 294: # Reset when subscription finished 295: @subscription = nil 296: end
Removes a queue binding from an exchange. If error occurs, a Bunny::ProtocolError is raised.
:unbind_ok if successful.
# File lib/bunny/queue08.rb, line 358 358: def unbind(exchange, opts = {}) 359: exchange = exchange.respond_to?(:name) ? exchange.name : exchange 360: 361: # ignore the :nowait option if passed, otherwise program will hang waiting for a 362: # response that will not be sent by the server 363: opts.delete(:nowait) 364: 365: client.send_frame( 366: Qrack::Protocol::Queue::Unbind.new({ :queue => name, 367: :exchange => exchange, 368: :routing_key => opts.delete(:key), 369: :nowait => false }.merge(opts) 370: ) 371: ) 372: 373: method = client.next_method 374: 375: client.check_response(method, Qrack::Protocol::Queue::UnbindOk, "Error unbinding queue #{name}") 376: 377: # return message 378: :unbind_ok 379: end
Cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer.
:unsubscribe_ok if successful
# File lib/bunny/queue08.rb, line 316 316: def unsubscribe(opts = {}) 317: # Default consumer_tag from subscription if not passed in 318: consumer_tag = subscription ? subscription.consumer_tag : opts[:consumer_tag] 319: 320: # Must have consumer tag to tell server what to unsubscribe 321: raise Bunny::UnsubscribeError, 322: "No consumer tag received" if !consumer_tag 323: 324: # Cancel consumer 325: client.send_frame( Qrack::Protocol::Basic::Cancel.new(:consumer_tag => consumer_tag, 326: :nowait => false)) 327: 328: method = client.next_method 329: 330: client.check_response(method, Qrack::Protocol::Basic::CancelOk, 331: "Error unsubscribing from queue #{name}") 332: 333: # Reset subscription 334: @subscription = nil 335: 336: # Return confirmation 337: :unsubscribe_ok 338: 339: end