Class: Qpid::Proton::Container
- Inherits:
-
Object
- Object
- Qpid::Proton::Container
- Defined in:
- lib/core/container.rb
Overview
An AMQP container manages a set of Listeners and Connections which contain #Sender and #Receiver links to transfer messages. Usually, each AMQP client or server process has a single container for all of its connections and links.
One or more threads can call #run, events generated by all the listeners and connections will be dispatched in the #run threads.
Direct Known Subclasses
Defined Under Namespace
Classes: ConnectionTask, ListenTask, SelectWaker, StoppedError
Instance Attribute Summary collapse
-
#auto_stop ⇒ Bool
Auto-stop flag.
-
#handler ⇒ MessagingHandler
readonly
The container-wide handler.
-
#id ⇒ String
readonly
Unique identifier for this container.
-
#stopped ⇒ Bool
True if the container has been stopped and can no longer be used.
Instance Method Summary collapse
-
#connect(url, opts = nil) ⇒ Connection
Open an AMQP connection.
-
#connect_io(io, opts = nil)
Open an AMQP protocol connection on an existing IO object.
-
#initialize(*args) ⇒ Container
constructor
Create a new Container.
- #inspect
-
#listen(url, handler = Listener::Handler.new) ⇒ Listener
Listen for incoming AMQP connections.
-
#listen_io(io, handler = Listener::Handler.new)
Listen for incoming AMQP connections on an existing server socket.
-
#run
Run the container: wait for IO activity, dispatch events to handlers.
-
#running ⇒ Bool
Number of threads in #run.
-
#stop(error = nil, panic = nil)
Stop the container.
- #to_s
-
#work_queue
Get the WorkQueue that can be used to schedule code to be run by the container.
Constructor Details
#initialize(id = nil) ⇒ Container #initialize(handler = nil, id = nil) ⇒ Container
Create a new Container
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/core/container.rb', line 55 def initialize(*args) @handler, @id = nil case args.size when 2 then @handler, @id = args when 1 then @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol) @handler = args[0] unless @id when 0 then else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2" end # Use an empty messaging adapter to give default behaviour if there's no global handler. @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil) @id = (@id || SecureRandom.uuid).freeze # Threading and implementation notes: see comment on #run_one @work = Queue.new @work << :start @work << :select @wake = SelectWaker.new # Wakes #run thread in IO.select @auto_stop = true # Stop when @active drops to 0 @work_queue = WorkQueue.new(self) # work scheduled by other threads for :select context # Following instance variables protected by lock @lock = Mutex.new @active = 0 # All active tasks, in @selectable, @work or being processed @selectable = Set.new # Tasks ready to block in IO.select @running = 0 # Count of #run threads @stopped = false # #stop called @stop_err = nil # Optional error to pass to tasks, from #stop @panic = nil # Exception caught in a run thread, to be raised by all run threads end |
Instance Attribute Details
#auto_stop ⇒ Bool
104 105 106 |
# File 'lib/core/container.rb', line 104 def auto_stop @auto_stop end |
#handler ⇒ MessagingHandler (readonly)
Returns The container-wide handler.
88 89 90 |
# File 'lib/core/container.rb', line 88 def handler @handler end |
#id ⇒ String (readonly)
Returns unique identifier for this container.
91 92 93 |
# File 'lib/core/container.rb', line 91 def id @id end |
#stopped ⇒ Bool
True if the container has been stopped and can no longer be used.
108 109 110 |
# File 'lib/core/container.rb', line 108 def stopped @stopped end |
Instance Method Details
#connect(url, opts = nil) ⇒ Connection
121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/core/container.rb', line 121 def connect(url, opts=nil) not_stopped url = Qpid::Proton::uri url opts ||= {} if url.user || url.password opts[:user] ||= url.user opts[:password] ||= url.password end opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps" connect_io(TCPSocket.new(url.host, url.port), opts) end |
#connect_io(io, opts = nil)
Open an AMQP protocol connection on an existing IO object
136 137 138 139 140 141 142 |
# File 'lib/core/container.rb', line 136 def connect_io(io, opts=nil) not_stopped cd = connection_driver(io, opts) cd.connection.open() add(cd) cd.connection end |
#inspect
94 |
# File 'lib/core/container.rb', line 94 def inspect() to_s; end |
#listen(url, handler = Listener::Handler.new) ⇒ Listener
Listen for incoming AMQP connections
with events for this listener and can generate a new set of options for each one.
151 152 153 154 155 156 |
# File 'lib/core/container.rb', line 151 def listen(url, handler=Listener::Handler.new) not_stopped url = Qpid::Proton::uri url # TODO aconway 2017-11-01: amqps, SSL listen_io(TCPServer.new(url.host, url.port), handler) end |
#listen_io(io, handler = Listener::Handler.new)
Listen for incoming AMQP connections on an existing server socket.
162 163 164 165 166 167 |
# File 'lib/core/container.rb', line 162 def listen_io(io, handler=Listener::Handler.new) not_stopped l = ListenTask.new(io, handler, self) add(l) l.listener end |
#run
This method returns an undefined value.
Run the container: wait for IO activity, dispatch events to handlers.
Multi-threaading : More than one thread can call #run concurrently, the container will use all #run threads as a thread pool. Calls to MessagingHandler or Listener::Handler methods are serialized for each connection or listener. See WorkQueue for coordinating with other threads.
Exceptions: If any handler method raises an exception it will stop the container, and the exception will be raised by all calls to #run. For single threaded code this is often desirable. Multi-threaded server applications should normally rescue exceptions in the handler and deal with them in another way: logging, closing the connection with an error condition, signalling another thread etc.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/core/container.rb', line 191 def run @lock.synchronize do @running += 1 # Note: ensure clause below will decrement @running raise StoppedError if @stopped end while task = @work.pop run_one(task, Time.now) end @lock.synchronize { raise @panic if @panic } ensure @lock.synchronize do if (@running -= 1) > 0 work_wake nil # Signal the next thread else # This is the last thread, no need to do maybe_panic around this final handler call. @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop end end end |
#running ⇒ Bool
Number of threads in #run
112 |
# File 'lib/core/container.rb', line 112 def running() @lock.synchronize { @running }; end |
#stop(error = nil, panic = nil)
Stop the container.
Close all listeners and abort all connections without doing AMQP protocol close.
#stop returns immediately, calls to #run will return when all activity is finished.
The container can no longer be used, using a stopped container raises StoppedError. Create a new container if you want to resume activity.
227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/core/container.rb', line 227 def stop(error=nil, panic=nil) @lock.synchronize do return if @stopped @stop_err = Condition.convert(error) @panic = panic @stopped = true check_stop_lh # NOTE: @stopped => # - no new run threads can join # - no more select calls after next wakeup # - once @active == 0, all threads will be stopped with nil end wake end |
#to_s
93 |
# File 'lib/core/container.rb', line 93 def to_s() "#<#{self.class} id=#{id.inspect}>"; end |
#work_queue
Get the WorkQueue that can be used to schedule code to be run by the container.
Note: to run code that affects a Qpid::Proton::Connection or it’s associated objects, use Qpid::Proton::Connection#work_queue
246 |
# File 'lib/core/container.rb', line 246 def work_queue() @work_queue; end |