Class: Qpid::Proton::Container

Inherits:
Object
  • Object
show all
Defined in:
lib/core/container.rb

Overview

An AMQP container manages a set of Listeners and Connections which contain #Sender and #Receiver links to transfer messages. Usually, each AMQP client or server process has a single container for all of its connections and links.

One or more threads can call #run, events generated by all the listeners and connections will be dispatched in the #run threads.

Direct Known Subclasses

Reactor::Container

Defined Under Namespace

Classes: ConnectionTask, ListenTask, SelectWaker, StoppedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id = nil) ⇒ Container #initialize(handler = nil, id = nil) ⇒ Container

Create a new Container

Overloads:

  • #initialize(id = nil) ⇒ Container

    Parameters:

    • id (String, Symbol) (defaults to: nil)

      A unique ID for this container, use random UUID if nil.

  • #initialize(handler = nil, id = nil) ⇒ Container

    Parameters:

    • id (String, Symbol) (defaults to: nil)

      A unique ID for this container, use random UUID if nil.

    • handler (MessagingHandler) (defaults to: nil)

      Optional default handler for connections that do not have their own handler (see #connect and #listen)

      Note: For multi-threaded code, it is recommended to use a separate handler instance for each connection, as a shared handler may be called concurrently.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/core/container.rb', line 55

def initialize(*args)
  @handler, @id = nil
  case args.size
  when 2 then @handler, @id = args
  when 1 then
    @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol)
    @handler = args[0] unless @id
  when 0 then
  else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2"
  end
  # Use an empty messaging adapter to give default behaviour if there's no global handler.
  @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil)
  @id = (@id || SecureRandom.uuid).freeze

  # Threading and implementation notes: see comment on #run_one
  @work = Queue.new
  @work << :start
  @work << :select
  @wake = SelectWaker.new   # Wakes #run thread in IO.select
  @auto_stop = true         # Stop when @active drops to 0
  @work_queue = WorkQueue.new(self)  # work scheduled by other threads for :select context

  # Following instance variables protected by lock
  @lock = Mutex.new
  @active = 0               # All active tasks, in @selectable, @work or being processed
  @selectable = Set.new     # Tasks ready to block in IO.select
  @running = 0              # Count of #run threads
  @stopped = false          # #stop called
  @stop_err = nil           # Optional error to pass to tasks, from #stop
  @panic = nil              # Exception caught in a run thread, to be raised by all run threads
end

Instance Attribute Details

#auto_stopBool

Auto-stop flag.

True (the default) means that the container will stop automatically, as if #stop had been called, when the last listener or connection closes.

False means #run will not return unless #stop is called.

Returns:

  • (Bool)

    auto-stop state



104
105
106
# File 'lib/core/container.rb', line 104

def auto_stop
  @auto_stop
end

#handlerMessagingHandler (readonly)

Returns The container-wide handler

Returns:



88
89
90
# File 'lib/core/container.rb', line 88

def handler
  @handler
end

#idString (readonly)

Returns unique identifier for this container

Returns:

  • (String)

    unique identifier for this container



91
92
93
# File 'lib/core/container.rb', line 91

def id
  @id
end

#stoppedBool

True if the container has been stopped and can no longer be used.

Returns:

  • (Bool)

    stopped state



108
109
110
# File 'lib/core/container.rb', line 108

def stopped
  @stopped
end

Instance Method Details

#connect(url, opts = nil) ⇒ Connection

Open an AMQP connection.

url.scheme must be “amqp” or “amqps”, url.scheme.nil? is treated as “amqp” url.user, url.password are used as defaults if opts, opts are nil

Parameters:

  • url (String, URI)

    Open a TCPSocket to url.host, url.port.

Options Hash (opts):

  • :handler (MessagingHandler)

    handler for events related to this connection.

  • :user (String)

    User name for authentication

  • :password (String)

    Authentication secret

  • :virtual_host (String)

    Virtual host name

  • :container_id (String) — default: provided by {Container}

    override advertised container-id

  • :properties (Hash<Symbol=>Object>)

    Application-defined properties

  • :offered_capabilities (Array<Symbol>)

    Extensions the endpoint supports

  • :desired_capabilities (Array<Symbol>)

    Extensions the endpoint can use

  • :idle_timeout (Numeric)

    Seconds before closing an idle connection

  • :max_sessions (Integer)

    Limit the number of active sessions

  • :max_frame_size (Integer)

    Limit the size of AMQP frames

  • :sasl_enabled (Boolean) — default: false

    Enable or disable SASL.

  • :sasl_allow_insecure_mechs (Boolean) — default: false

    Allow mechanisms that send secrets in cleartext

  • :sasl_allowed_mechs (String)

    Specify the SASL mechanisms allowed for this connection. The value is a space-separated list of mechanism names. The mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions: GSSAPI and GSS-SPNEGO are disabled by default. To enable them, you must explicitly add them using this option. Clients must set the allowed mechanisms before the the outgoing connection is attempted. Servers must set them before the listening connection is setup.

  • :ssl_domain (SSLDomain)

    SSL configuration domain.

Returns:



121
122
123
124
125
126
127
128
129
130
131
# File 'lib/core/container.rb', line 121

def connect(url, opts=nil)
  not_stopped
  url = Qpid::Proton::uri url
  opts ||= {}
  if url.user ||  url.password
    opts[:user] ||= url.user
    opts[:password] ||= url.password
  end
  opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps"
  connect_io(TCPSocket.new(url.host, url.port), opts)
end

#connect_io(io, opts = nil)

Open an AMQP protocol connection on an existing IO object

Parameters:

  • io (IO)

    An existing IO object, e.g. a TCPSocket

Options Hash (opts):

  • :handler (MessagingHandler)

    handler for events related to this connection.

  • :user (String)

    User name for authentication

  • :password (String)

    Authentication secret

  • :virtual_host (String)

    Virtual host name

  • :container_id (String) — default: provided by {Container}

    override advertised container-id

  • :properties (Hash<Symbol=>Object>)

    Application-defined properties

  • :offered_capabilities (Array<Symbol>)

    Extensions the endpoint supports

  • :desired_capabilities (Array<Symbol>)

    Extensions the endpoint can use

  • :idle_timeout (Numeric)

    Seconds before closing an idle connection

  • :max_sessions (Integer)

    Limit the number of active sessions

  • :max_frame_size (Integer)

    Limit the size of AMQP frames

  • :sasl_enabled (Boolean) — default: false

    Enable or disable SASL.

  • :sasl_allow_insecure_mechs (Boolean) — default: false

    Allow mechanisms that send secrets in cleartext

  • :sasl_allowed_mechs (String)

    Specify the SASL mechanisms allowed for this connection. The value is a space-separated list of mechanism names. The mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions: GSSAPI and GSS-SPNEGO are disabled by default. To enable them, you must explicitly add them using this option. Clients must set the allowed mechanisms before the the outgoing connection is attempted. Servers must set them before the listening connection is setup.

  • :ssl_domain (SSLDomain)

    SSL configuration domain.



136
137
138
139
140
141
142
# File 'lib/core/container.rb', line 136

def connect_io(io, opts=nil)
  not_stopped
  cd = connection_driver(io, opts)
  cd.connection.open()
  add(cd)
  cd.connection
end

#inspect



94
# File 'lib/core/container.rb', line 94

def inspect() to_s; end

#listen(url, handler = Listener::Handler.new) ⇒ Listener

Listen for incoming AMQP connections

with events for this listener and can generate a new set of options for each one.

Parameters:

  • url (String, URI)

    Listen on host:port of the AMQP URL

  • handler (Listener::Handler) (defaults to: Listener::Handler.new)

    A Listener::Handler object that will be called

Returns:



151
152
153
154
155
156
# File 'lib/core/container.rb', line 151

def listen(url, handler=Listener::Handler.new)
  not_stopped
  url = Qpid::Proton::uri url
  # TODO aconway 2017-11-01: amqps, SSL
  listen_io(TCPServer.new(url.host, url.port), handler)
end

#listen_io(io, handler = Listener::Handler.new)

Listen for incoming AMQP connections on an existing server socket.

Parameters:

  • io

    A server socket, for example a TCPServer

  • handler (Listener::Handler) (defaults to: Listener::Handler.new)

    Handler for events from this listener



162
163
164
165
166
167
# File 'lib/core/container.rb', line 162

def listen_io(io, handler=Listener::Handler.new)
  not_stopped
  l = ListenTask.new(io, handler, self)
  add(l)
  l.listener
end

#run

This method returns an undefined value.

Run the container: wait for IO activity, dispatch events to handlers.

Multi-threaading : More than one thread can call #run concurrently, the container will use all #run threads as a thread pool. Calls to MessagingHandler or Listener::Handler methods are serialized for each connection or listener. See WorkQueue for coordinating with other threads.

Exceptions: If any handler method raises an exception it will stop the container, and the exception will be raised by all calls to #run. For single threaded code this is often desirable. Multi-threaded server applications should normally rescue exceptions in the handler and deal with them in another way: logging, closing the connection with an error condition, signalling another thread etc.

Raises:



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/core/container.rb', line 191

def run
  @lock.synchronize do
    @running += 1        # Note: ensure clause below will decrement @running
    raise StoppedError if @stopped
  end
  while task = @work.pop
    run_one(task, Time.now)
  end
  @lock.synchronize { raise @panic if @panic }
ensure
  @lock.synchronize do
    if (@running -= 1) > 0
      work_wake nil         # Signal the next thread
    else
      # This is the last thread, no need to do maybe_panic around this final handler call.
      @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
    end
  end
end

#runningBool

Number of threads in #run

Returns:

  • (Bool)

    #run thread count



112
# File 'lib/core/container.rb', line 112

def running() @lock.synchronize { @running }; end

#stop(error = nil, panic = nil)

Stop the container.

Close all listeners and abort all connections without doing AMQP protocol close.

#stop returns immediately, calls to #run will return when all activity is finished.

The container can no longer be used, using a stopped container raises StoppedError. Create a new container if you want to resume activity.

Parameters:

  • error (Condition) (defaults to: nil)

    Optional error condition passed to MessagingHandler#on_transport_error for each connection and Listener::Handler::on_error for each listener.

  • panic (Exception) (defaults to: nil)

    Optional exception to raise from all calls to run()



227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/core/container.rb', line 227

def stop(error=nil, panic=nil)
  @lock.synchronize do
    return if @stopped
    @stop_err = Condition.convert(error)
    @panic = panic
    @stopped = true
    check_stop_lh
    # NOTE: @stopped =>
    # - no new run threads can join
    # - no more select calls after next wakeup
    # - once @active == 0, all threads will be stopped with nil
  end
  wake
end

#to_s



93
# File 'lib/core/container.rb', line 93

def to_s() "#<#{self.class} id=#{id.inspect}>"; end

#work_queue

Get the WorkQueue that can be used to schedule code to be run by the container.

Note: to run code that affects a Qpid::Proton::Connection or it's associated objects, use Qpid::Proton::Connection#work_queue



246
# File 'lib/core/container.rb', line 246

def work_queue() @work_queue; end