Class | Bunny::Client09 |
In: |
lib/bunny/client09.rb
|
Parent: | Qrack::Client |
Sets up a Bunny::Client object ready for connection to a broker/server. Client.status is set to :not_connected.
# File lib/bunny/client09.rb, line 43 43: def initialize(opts = {}) 44: super 45: @spec = '0-9-1' 46: @port = opts[:port] || (opts[:ssl] ? Qrack::Protocol09::SSL_PORT : Qrack::Protocol09::PORT) 47: end
Checks response from AMQP methods and takes appropriate action
# File lib/bunny/client09.rb, line 57 57: def check_response(received_method, expected_method, err_msg, err_class = Bunny::ProtocolError) 58: case 59: when received_method.is_a?(Qrack::Protocol09::Connection::Close) 60: # Clean up the socket 61: close_socket 62: 63: raise Bunny::ForcedConnectionCloseError, 64: "Error Reply Code: #{received_method.reply_code}\nError Reply Text: #{received_method.reply_text}" 65: 66: when received_method.is_a?(Qrack::Protocol09::Channel::Close) 67: # Clean up the channel 68: channel.active = false 69: 70: raise Bunny::ForcedChannelCloseError, 71: "Error Reply Code: #{received_method.reply_code}\nError Reply Text: #{received_method.reply_text}" 72: 73: when !received_method.is_a?(expected_method) 74: raise err_class, err_msg 75: 76: else 77: :response_ok 78: end 79: end
# File lib/bunny/client09.rb, line 81 81: def close_connection 82: # Set client channel to zero 83: switch_channel(0) 84: 85: send_frame( 86: Qrack::Protocol09::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0) 87: ) 88: 89: method = next_method 90: 91: check_response(method, Qrack::Protocol09::Connection::CloseOk, "Error closing connection") 92: 93: end
# File lib/bunny/client09.rb, line 95 95: def create_channel 96: channels.each do |c| 97: return c if (!c.open? and c.number != 0) 98: end 99: # If no channel to re-use instantiate new one 100: Bunny::Channel09.new(self) 101: end
Declares an exchange to the broker/server. If the exchange does not exist, a new one is created using the arguments passed in. If the exchange already exists, a reference to it is created, provided that the arguments passed in do not conflict with the existing attributes of the exchange. If an error occurs a Bunny::ProtocolError is raised.
# File lib/bunny/client09.rb, line 130 130: def exchange(name, opts = {}) 131: exchanges[name] || Bunny::Exchange09.new(self, name, opts) 132: end
# File lib/bunny/client09.rb, line 134 134: def init_connection 135: write(Qrack::Protocol09::HEADER) 136: write([0, Qrack::Protocol09::VERSION_MAJOR, Qrack::Protocol09::VERSION_MINOR, Qrack::Protocol09::REVISION].pack('C4')) 137: 138: frame = next_frame 139: if frame.nil? or !frame.payload.is_a?(Qrack::Protocol09::Connection::Start) 140: raise Bunny::ProtocolError, 'Connection initiation failed' 141: end 142: end
# File lib/bunny/client09.rb, line 144 144: def next_frame(opts = {}) 145: frame = nil 146: 147: case 148: when channel.frame_buffer.size > 0 149: frame = channel.frame_buffer.shift 150: when opts.has_key?(:timeout) 151: Timeout::timeout(opts[:timeout], Qrack::ClientTimeout) do 152: frame = Qrack::Transport09::Frame.parse(buffer) 153: end 154: else 155: frame = Qrack::Transport09::Frame.parse(buffer) 156: end 157: 158: @logger.info("received") { frame } if @logging 159: 160: raise Bunny::ConnectionError, 'No connection to server' if (frame.nil? and !connecting?) 161: 162: # Monitor server activity and discard heartbeats 163: @message_in = true 164: 165: case 166: when frame.is_a?(Qrack::Transport09::Heartbeat) 167: next_frame(opts) 168: when frame.nil? 169: frame 170: when ((frame.channel != channel.number) and (frame.channel != 0)) 171: channel.frame_buffer << frame 172: next_frame(opts) 173: else 174: frame 175: end 176: 177: end
# File lib/bunny/client09.rb, line 179 179: def open_connection 180: send_frame( 181: Qrack::Protocol09::Connection::StartOk.new( 182: :client_properties => {:platform => 'Ruby', :product => 'Bunny', :information => 'http://github.com/celldee/bunny', :version => VERSION}, 183: :mechanism => 'PLAIN', 184: :response => "\0" + @user + "\0" + @pass, 185: :locale => 'en_US' 186: ) 187: ) 188: 189: frame = next_frame 190: raise Bunny::ProtocolError, "Connection failed - user: #{@user}" if frame.nil? 191: 192: method = frame.payload 193: 194: if method.is_a?(Qrack::Protocol09::Connection::Tune) 195: send_frame( 196: Qrack::Protocol09::Connection::TuneOk.new( :channel_max => @channel_max, :frame_max => @frame_max, :heartbeat => @heartbeat) 197: ) 198: end 199: 200: send_frame( 201: Qrack::Protocol09::Connection::Open.new(:virtual_host => @vhost, :reserved_1 => 0, :reserved_2 => false) 202: ) 203: 204: raise Bunny::ProtocolError, 'Cannot open connection' unless next_method.is_a?(Qrack::Protocol09::Connection::OpenOk) 205: end
Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a QoS method always depend on the content class semantics. Though the QoS method could in principle apply to both peers, it is currently meaningful only for the server.
messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning "no specific limit", although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set.
of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set.
true, they are applied to the entire connection.
:qos_ok if successful.
# File lib/bunny/client09.rb, line 239 239: def qos(opts = {}) 240: 241: send_frame( 242: Qrack::Protocol09::Basic::Qos.new({ :prefetch_size => 0, :prefetch_count => 1, :global => false }.merge(opts)) 243: ) 244: 245: method = next_method 246: 247: check_response(method, Qrack::Protocol09::Basic::QosOk, "Error specifying Quality of Service") 248: 249: # return confirmation 250: :qos_ok 251: end
Declares a queue to the broker/server. If the queue does not exist, a new one is created using the arguments passed in. If the queue already exists, a reference to it is created, provided that the arguments passed in do not conflict with the existing attributes of the queue. If an error occurs a Bunny::ProtocolError is raised.
# File lib/bunny/client09.rb, line 287 287: def queue(name = nil, opts = {}) 288: if name.is_a?(Hash) 289: opts = name 290: name = nil 291: end 292: 293: # Queue is responsible for placing itself in the list of queues 294: queues[name] || Bunny::Queue09.new(self, name, opts) 295: end
Asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered.
redelivered to the original recipient. If set to true, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
# File lib/bunny/client09.rb, line 312 312: def recover(opts = {}) 313: 314: send_frame( 315: Qrack::Protocol09::Basic::Recover.new({ :requeue => false }.merge(opts)) 316: ) 317: 318: end
# File lib/bunny/client09.rb, line 320 320: def send_frame(*args) 321: args.each do |data| 322: data = data.to_frame(channel.number) unless data.is_a?(Qrack::Transport09::Frame) 323: data.channel = channel.number 324: 325: @logger.info("send") { data } if @logging 326: write(data.to_s) 327: 328: # Monitor client activity for heartbeat purposes 329: @message_out = true 330: end 331: 332: nil 333: end
# File lib/bunny/client09.rb, line 335 335: def send_heartbeat 336: # Create a new heartbeat frame 337: hb = Qrack::Transport09::Heartbeat.new('') 338: # Channel 0 must be used 339: switch_channel(0) if @channel.number > 0 340: # Send the heartbeat to server 341: send_frame(hb) 342: end
Opens a communication channel and starts a connection. If an error occurs, a Bunny::ProtocolError is raised. If successful, Client.status is set to :connected.
:connected if successful.
# File lib/bunny/client09.rb, line 357 357: def start_session 358: @connecting = true 359: 360: # Create/get socket 361: socket 362: 363: # Initiate connection 364: init_connection 365: 366: # Open connection 367: open_connection 368: 369: # Open another channel because channel zero is used for specific purposes 370: c = create_channel() 371: c.open 372: 373: @connecting = false 374: 375: # return status 376: @status = :connected 377: end
This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.
:commit_ok if successful.
# File lib/bunny/client09.rb, line 394 394: def tx_commit 395: send_frame(Qrack::Protocol09::Tx::Commit.new()) 396: 397: method = next_method 398: 399: check_response(method, Qrack::Protocol09::Tx::CommitOk, "Error commiting transaction") 400: 401: # return confirmation 402: :commit_ok 403: end
This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.
:rollback_ok if successful.
# File lib/bunny/client09.rb, line 418 418: def tx_rollback 419: send_frame(Qrack::Protocol09::Tx::Rollback.new()) 420: 421: method = next_method 422: 423: check_response(method, Qrack::Protocol09::Tx::RollbackOk, "Error rolling back transaction") 424: 425: # return confirmation 426: :rollback_ok 427: end
This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.
:select_ok if successful.
# File lib/bunny/client09.rb, line 442 442: def tx_select 443: send_frame(Qrack::Protocol09::Tx::Select.new()) 444: 445: method = next_method 446: 447: check_response(method, Qrack::Protocol::Tx::SelectOk, "Error initiating transactions for current channel") 448: 449: # return confirmation 450: :select_ok 451: end