Class: Qpid::Proton::ConnectionDriver

Inherits:
Object
  • Object
show all
Defined in:
lib/core/connection_driver.rb

Overview

Associate an AMQP Connection and Transport with an IO

Thread safety: The ConnectionDriver is not thread safe but separate ConnectionDriver instances can be processed concurrently. The Container handles multiple connections concurrently in multiple threads.

Direct Known Subclasses

HandlerDriver

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io) ⇒ ConnectionDriver

Create a Qpid::Proton::Connection and Transport associated with io

Parameters:

  • io (IO)

    An IO or IO-like object that responds to IO#read_nonblock and IO#write_nonblock



37
38
39
40
41
# File 'lib/core/connection_driver.rb', line 37

def initialize(io)
  @impl = Cproton.pni_connection_driver or raise NoMemoryError
  @io = io
  @rbuf = ""              # String for re-usable read buffer
end

Instance Attribute Details

#next_tick

Time returned by the last call to #tick



130
131
132
# File 'lib/core/connection_driver.rb', line 130

def next_tick
  @next_tick
end

Instance Method Details

#can_read?Bool

Returns True if the driver can read more data.

Returns:

  • (Bool)

    True if the driver can read more data



57
# File 'lib/core/connection_driver.rb', line 57

def can_read?() Cproton.pni_connection_driver_read_size(@impl) > 0; end

#can_write?Bool

Returns True if the driver has data to write.

Returns:

  • (Bool)

    True if the driver has data to write



60
# File 'lib/core/connection_driver.rb', line 60

def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end

#close(error = nil)

Disconnect both sides of the transport sending/waiting for AMQP close frames. See comments on #close_write



158
159
160
161
# File 'lib/core/connection_driver.rb', line 158

def close error=nil
  close_write error
  close_read
end

#close_read(error = nil)

Disconnect the read side of the transport, without waiting for an AMQP close frame. See comments on #close_write



150
151
152
153
154
# File 'lib/core/connection_driver.rb', line 150

def close_read error=nil
  set_error error
  Cproton.pn_connection_driver_read_close(@impl)
  @io.close_read rescue nil # Allow double-close
end

#close_write(error = nil)

Disconnect the write side of the transport, without sending an AMQP close frame. To close politely, you should use Qpid::Proton::Connection#close, the transport will close itself once the protocol close is complete.



136
137
138
139
140
# File 'lib/core/connection_driver.rb', line 136

def close_write error=nil
  set_error error
  Cproton.pn_connection_driver_write_close(@impl)
  @io.close_write rescue nil # Allow double-close
end

#connectionConnection

Returns:



44
45
46
# File 'lib/core/connection_driver.rb', line 44

def connection()
  @connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl))
end

#each_event

Iterator for all available events



76
77
78
79
80
# File 'lib/core/connection_driver.rb', line 76

def each_event()
  while e = event
    yield e
  end
end

#event

Get the next event to dispatch, nil if no events available



67
68
69
70
# File 'lib/core/connection_driver.rb', line 67

def event()
  e = Cproton.pn_connection_driver_next_event(@impl)
  Event.new(e) if e
end

#event?Boolean

True if #event will return non-nil

Returns:

  • (Boolean)


73
# File 'lib/core/connection_driver.rb', line 73

def event?() Cproton.pn_connection_driver_has_event(@impl); end

#finished?Boolean

True if the ConnectionDriver has nothing left to do: both sides of the transport are closed and there are no events to dispatch.

Returns:

  • (Boolean)


64
# File 'lib/core/connection_driver.rb', line 64

def finished?() Cproton.pn_connection_driver_finished(@impl); end

#read

Non-blocking read from #io, generate events for #event IO errors are returned as transport errors by #event, not raised



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/core/connection_driver.rb', line 84

def read
  size = Cproton.pni_connection_driver_read_size(@impl)
  return if size <= 0
  @io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time
  Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty?
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
  # Try again later.
rescue EOFError         # EOF is not an error
  close_read
rescue IOError, SystemCallError => e
  close e
end

#read_closed?Boolean

Is the read side of the driver closed?

Returns:

  • (Boolean)


143
# File 'lib/core/connection_driver.rb', line 143

def read_closed?() Cproton.pn_connection_driver_read_closed(@impl); end

#tick(now = Time.now) ⇒ Time

Handle time-related work, for example idle-timeout events. May generate events for #event and change #can_read?, #can_write?

scheduled events. If non-nil you must call #tick again no later than this time.

Parameters:

  • now (Time) (defaults to: Time.now)

    the current time, defaults to Time#now.

Returns:

  • (Time)

    time of the next scheduled event, or nil if there are no



118
119
120
121
122
123
124
125
126
127
# File 'lib/core/connection_driver.rb', line 118

def tick(now=Time.now)
  transport = Cproton.pni_connection_driver_transport(@impl)
  ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i)
  @next_tick = ms.zero? ? nil : Time.at(ms.to_r / 1000);
  unless @next_tick
    idle = Cproton.pn_transport_get_idle_timeout(transport);
    @next_tick = now + (idle.to_r / 1000) unless idle.zero?
  end
  @next_tick
end

#to_ioIO

Returns Allows ConnectionDriver to be passed directly to IO#select.

Returns:

  • (IO)

    Allows ConnectionDriver to be passed directly to IO#select



54
# File 'lib/core/connection_driver.rb', line 54

def to_io() @io; end

#transportTransport

Returns:



49
50
51
# File 'lib/core/connection_driver.rb', line 49

def transport()
  @transport ||= Transport.wrap(Cproton.pni_connection_driver_transport(@impl))
end

#write

Non-blocking write to #io IO errors are returned as transport errors by #event, not raised



99
100
101
102
103
104
105
106
107
108
# File 'lib/core/connection_driver.rb', line 99

def write
  data = Cproton.pn_connection_driver_write_buffer(@impl)
  return unless data && data.size > 0
  n = @io.write_nonblock(data)
  Cproton.pn_connection_driver_write_done(@impl, n) if n > 0
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
  # Try again later.
rescue IOError, SystemCallError => e
  close e
end

#write_closed?Boolean

Is the write side of the driver closed?

Returns:

  • (Boolean)


146
# File 'lib/core/connection_driver.rb', line 146

def write_closed?() Cproton.pn_connection_driver_read_closed(@impl); end