Class: Qpid::Proton::ConnectionDriver
- Inherits:
-
Object
- Object
- Qpid::Proton::ConnectionDriver
- Defined in:
- lib/core/connection_driver.rb
Overview
Associate an AMQP Connection and Transport with an IO
-
#read reads AMQP binary data from the IO and generates events
-
#tick generates timing-related events
-
#event gets events to be dispatched to Handler::MessagingHandlers
-
#write writes AMQP binary data to the IO
Thread safety: The ConnectionDriver is not thread safe but separate ConnectionDriver instances can be processed concurrently. The Container handles multiple connections concurrently in multiple threads.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#next_tick
Time returned by the last call to #tick.
Instance Method Summary collapse
-
#can_read? ⇒ Bool
True if the driver can read more data.
-
#can_write? ⇒ Bool
True if the driver has data to write.
-
#close(error = nil)
Disconnect both sides of the transport sending/waiting for AMQP close frames.
-
#close_read(error = nil)
Disconnect the read side of the transport, without waiting for an AMQP close frame.
-
#close_write(error = nil)
Disconnect the write side of the transport, without sending an AMQP close frame.
- #connection ⇒ Connection
-
#each_event
Iterator for all available events.
-
#event
Get the next event to dispatch, nil if no events available.
-
#event? ⇒ Boolean
True if #event will return non-nil.
-
#finished? ⇒ Boolean
True if the ConnectionDriver has nothing left to do: both sides of the transport are closed and there are no events to dispatch.
-
#initialize(io) ⇒ ConnectionDriver
constructor
Create a Connection and Transport associated with
io
. - #read
-
#read_closed? ⇒ Boolean
Is the read side of the driver closed?.
-
#tick(now = Time.now) ⇒ Time
Handle time-related work, for example idle-timeout events.
-
#to_io ⇒ IO
Allows ConnectionDriver to be passed directly to IO#select.
- #transport ⇒ Transport
-
#write
Non-blocking write to #io IO errors are returned as transport errors by #event, not raised.
-
#write_closed? ⇒ Boolean
Is the write side of the driver closed?.
Constructor Details
#initialize(io) ⇒ ConnectionDriver
Create a Qpid::Proton::Connection and Transport associated with io
37 38 39 40 41 |
# File 'lib/core/connection_driver.rb', line 37 def initialize(io) @impl = Cproton.pni_connection_driver or raise NoMemoryError @io = io @rbuf = "" # String for re-usable read buffer end |
Instance Attribute Details
#next_tick
Time returned by the last call to #tick
130 131 132 |
# File 'lib/core/connection_driver.rb', line 130 def next_tick @next_tick end |
Instance Method Details
#can_read? ⇒ Bool
Returns True if the driver can read more data.
57 |
# File 'lib/core/connection_driver.rb', line 57 def can_read?() Cproton.pni_connection_driver_read_size(@impl) > 0; end |
#can_write? ⇒ Bool
Returns True if the driver has data to write.
60 |
# File 'lib/core/connection_driver.rb', line 60 def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end |
#close(error = nil)
Disconnect both sides of the transport sending/waiting for AMQP close frames. See comments on #close_write
158 159 160 161 |
# File 'lib/core/connection_driver.rb', line 158 def close error=nil close_write error close_read end |
#close_read(error = nil)
Disconnect the read side of the transport, without waiting for an AMQP close frame. See comments on #close_write
150 151 152 153 154 |
# File 'lib/core/connection_driver.rb', line 150 def close_read error=nil set_error error Cproton.pn_connection_driver_read_close(@impl) @io.close_read rescue nil # Allow double-close end |
#close_write(error = nil)
Disconnect the write side of the transport, without sending an AMQP close frame. To close politely, you should use Qpid::Proton::Connection#close, the transport will close itself once the protocol close is complete.
136 137 138 139 140 |
# File 'lib/core/connection_driver.rb', line 136 def close_write error=nil set_error error Cproton.pn_connection_driver_write_close(@impl) @io.close_write rescue nil # Allow double-close end |
#connection ⇒ Connection
44 45 46 |
# File 'lib/core/connection_driver.rb', line 44 def connection() @connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl)) end |
#each_event
Iterator for all available events
76 77 78 79 80 |
# File 'lib/core/connection_driver.rb', line 76 def each_event() while e = event yield e end end |
#event
Get the next event to dispatch, nil if no events available
67 68 69 70 |
# File 'lib/core/connection_driver.rb', line 67 def event() e = Cproton.pn_connection_driver_next_event(@impl) Event.new(e) if e end |
#event? ⇒ Boolean
True if #event will return non-nil
73 |
# File 'lib/core/connection_driver.rb', line 73 def event?() Cproton.pn_connection_driver_has_event(@impl); end |
#finished? ⇒ Boolean
True if the ConnectionDriver has nothing left to do: both sides of the transport are closed and there are no events to dispatch.
64 |
# File 'lib/core/connection_driver.rb', line 64 def finished?() Cproton.pn_connection_driver_finished(@impl); end |
#read
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/core/connection_driver.rb', line 84 def read size = Cproton.pni_connection_driver_read_size(@impl) return if size <= 0 @io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty? rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR # Try again later. rescue EOFError # EOF is not an error close_read rescue IOError, SystemCallError => e close e end |
#read_closed? ⇒ Boolean
Is the read side of the driver closed?
143 |
# File 'lib/core/connection_driver.rb', line 143 def read_closed?() Cproton.pn_connection_driver_read_closed(@impl); end |
#tick(now = Time.now) ⇒ Time
Handle time-related work, for example idle-timeout events. May generate events for #event and change #can_read?, #can_write?
scheduled events. If non-nil you must call #tick again no later than this time.
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/core/connection_driver.rb', line 118 def tick(now=Time.now) transport = Cproton.pni_connection_driver_transport(@impl) ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i) @next_tick = ms.zero? ? nil : Time.at(ms.to_r / 1000); unless @next_tick idle = Cproton.pn_transport_get_idle_timeout(transport); @next_tick = now + (idle.to_r / 1000) unless idle.zero? end @next_tick end |
#to_io ⇒ IO
Returns Allows ConnectionDriver to be passed directly to IO#select.
54 |
# File 'lib/core/connection_driver.rb', line 54 def to_io() @io; end |
#transport ⇒ Transport
49 50 51 |
# File 'lib/core/connection_driver.rb', line 49 def transport() @transport ||= Transport.wrap(Cproton.pni_connection_driver_transport(@impl)) end |
#write
Non-blocking write to #io IO errors are returned as transport errors by #event, not raised
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/core/connection_driver.rb', line 99 def write data = Cproton.pn_connection_driver_write_buffer(@impl) return unless data && data.size > 0 n = @io.write_nonblock(data) Cproton.pn_connection_driver_write_done(@impl, n) if n > 0 rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR # Try again later. rescue IOError, SystemCallError => e close e end |
#write_closed? ⇒ Boolean
Is the write side of the driver closed?
146 |
# File 'lib/core/connection_driver.rb', line 146 def write_closed?() Cproton.pn_connection_driver_read_closed(@impl); end |