Class | Bunny::Exchange |
In: |
lib/bunny/exchange08.rb
|
Parent: | Object |
Exchanges are the routing and distribution hub of AMQP. All messages that Bunny sends to an AMQP broker/server have to pass through an exchange in order to be routed to a destination queue. The AMQP specification defines the types of exchange that you can create.
At the time of writing there are four (4) types of exchange defined -
AMQP-compliant brokers/servers are required to provide default exchanges for the direct and fanout exchange types. All default exchanges are prefixed with ‘amq.’, for example -
If you want more information about exchanges, please consult the documentation for your target broker/server or visit the AMQP website to find the version of the specification that applies to your target broker/server.
client | [R] | |
key | [R] | |
name | [R] | |
opts | [R] | |
type | [R] |
# File lib/bunny/exchange08.rb, line 36 36: def initialize(client, name, opts = {}) 37: # check connection to server 38: raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected 39: 40: @client, @name, @opts = client, name, opts 41: 42: # set up the exchange type catering for default names 43: if name.match(/^amq\./) 44: new_type = name.sub(/amq\./, '') 45: # handle 'amq.match' default 46: new_type = 'headers' if new_type == 'match' 47: @type = new_type.to_sym 48: else 49: @type = opts[:type] || :direct 50: end 51: 52: @key = opts[:key] 53: @client.exchanges[@name] ||= self 54: 55: # ignore the :nowait option if passed, otherwise program will hang waiting for a 56: # response that will not be sent by the server 57: opts.delete(:nowait) 58: 59: unless name == "amq.#{type}" or name == '' 60: client.send_frame( 61: Qrack::Protocol::Exchange::Declare.new( 62: { :exchange => name, :type => type, :nowait => false }.merge(opts) 63: ) 64: ) 65: 66: method = client.next_method 67: 68: client.check_response(method, Qrack::Protocol::Exchange::DeclareOk, 69: "Error declaring exchange #{name}: type = #{type}") 70: 71: end 72: end
Requests that an exchange is deleted from broker/server. Removes reference from exchanges if successful. If an error occurs raises Bunny::ProtocolError.
:delete_ok if successful
# File lib/bunny/exchange08.rb, line 93 93: def delete(opts = {}) 94: # ignore the :nowait option if passed, otherwise program will hang waiting for a 95: # response that will not be sent by the server 96: opts.delete(:nowait) 97: 98: client.send_frame( 99: Qrack::Protocol::Exchange::Delete.new({ :exchange => name, :nowait => false }.merge(opts)) 100: ) 101: 102: method = client.next_method 103: 104: client.check_response(method, Qrack::Protocol::Exchange::DeleteOk, 105: "Error deleting exchange #{name}") 106: 107: client.exchanges.delete(name) 108: 109: # return confirmation 110: :delete_ok 111: end
Publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
nil
# File lib/bunny/exchange08.rb, line 143 143: def publish(data, opts = {}) 144: opts = opts.dup 145: out = [] 146: 147: # Set up options 148: routing_key = opts.delete(:key) || key 149: mandatory = opts.delete(:mandatory) 150: immediate = opts.delete(:immediate) 151: delivery_mode = opts.delete(:persistent) ? 2 : 1 152: 153: out << Qrack::Protocol::Basic::Publish.new( 154: { :exchange => name, 155: :routing_key => routing_key, 156: :mandatory => mandatory, 157: :immediate => immediate } 158: ) 159: data = data.to_s 160: out << Qrack::Protocol::Header.new( 161: Qrack::Protocol::Basic, 162: data.length, { 163: :content_type => 'application/octet-stream', 164: :delivery_mode => delivery_mode, 165: :priority => 0 166: }.merge(opts) 167: ) 168: out << Qrack::Transport::Body.new(data) 169: 170: client.send_frame(*out) 171: end