Class: Qpid::Proton::Handler::ReactorMessagingAdapter

Inherits:
Adapter
  • Object
show all
Defined in:
lib/handler/reactor_messaging_adapter.rb

Overview

Adapter to convert raw proton events for the old MessagingHandler used by the Reactor.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Adapter

adapt, adapter, #forward, #proton_adapter_class

Constructor Details

#initialize(handler) ⇒ ReactorMessagingAdapter

Returns a new instance of ReactorMessagingAdapter.



26
27
28
29
30
31
32
33
34
# File 'lib/handler/reactor_messaging_adapter.rb', line 26

def initialize handler
  super
  @opts = (handler.options if handler.respond_to?(:options)) || {}
  @opts[:prefetch] ||= 10
  @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error
  [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k|
    @opts[k] = true unless @opts.include? k
  end
end

Class Method Details

.open_close(endpoint)

Define repetative on_xxx_open/close methods for each endpoint type



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/handler/reactor_messaging_adapter.rb', line 55

def self.open_close(endpoint)
  on_opening = :"on_#{endpoint}_opening"
  on_opened = :"on_#{endpoint}_opened"
  on_closing = :"on_#{endpoint}_closing"
  on_closed = :"on_#{endpoint}_closed"
  on_error = :"on_#{endpoint}_error"

  Module.new do
    define_method(:"on_#{endpoint}_local_open") do |event|
      delegate(on_opened, event) if event.context.remote_open?
    end

    define_method(:"on_#{endpoint}_remote_open") do |event|
      if event.context.local_open?
        delegate(on_opened, event)
      elsif event.context.local_uninit?
        delegate(on_opening, event)
        event.context.open if @opts[:auto_open]
      end
    end

    define_method(:"on_#{endpoint}_local_close") do |event|
      delegate(on_closed, event) if event.context.remote_closed?
    end

    define_method(:"on_#{endpoint}_remote_close") do |event|
      if event.context.remote_condition
        delegate_error(on_error, event)
      elsif event.context.local_closed?
        delegate(on_closed, event)
      elsif @opts[:peer_close_is_error]
        Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close")
        delegate_error(on_error, event)
      else
        delegate(on_closing, event)
      end
      event.context.close if @opts[:auto_close]
    end
  end
end

Instance Method Details

#add_credit(event)



149
150
151
152
153
154
155
# File 'lib/handler/reactor_messaging_adapter.rb', line 149

def add_credit(event)
  r = event.receiver
  prefetch = @opts[:prefetch]
  if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit)
    r.flow(prefetch - r.credit)
  end
end

#delegate(method, event)



38
39
40
41
# File 'lib/handler/reactor_messaging_adapter.rb', line 38

def delegate(method, event)
  event.method = method     # Update the event with the new method
  event.dispatch(@handler) or dispatch(:on_unhandled, event)
end

#delegate_error(method, event)



43
44
45
46
47
48
49
# File 'lib/handler/reactor_messaging_adapter.rb', line 43

def delegate_error(method, event)
  event.method = method
  unless event.dispatch(@handler) || dispatch(:on_error, event)
    dispatch(:on_unhandled, event)
    event.connection.close(event.context.condition) if @opts[:auto_close]
  end
end

#on_container_start(container)



51
# File 'lib/handler/reactor_messaging_adapter.rb', line 51

def on_container_start(container) delegate(:on_start, Event.new(nil, nil, container)); end

#on_container_stop(container)



52
# File 'lib/handler/reactor_messaging_adapter.rb', line 52

def on_container_stop(container) delegate(:on_stop, Event.new(nil, nil, container)); end

#on_delivery(event)



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/handler/reactor_messaging_adapter.rb', line 106

def on_delivery(event)
  if event.link.receiver?       # Incoming message
    d = event.delivery
    if d.aborted?
      delegate(:on_aborted, event)
      d.settle
    elsif d.complete?
      if d.link.local_closed? && @opts[:auto_accept]
        d.release
      else
        begin
          delegate(:on_message, event)
          d.accept if @opts[:auto_accept] && !d.settled?
        rescue Qpid::Proton::Reject
          d.reject
        rescue Qpid::Proton::Release
          d.release(true)
        end
      end
    end
    delegate(:on_settled, event) if d.settled?
    add_credit(event)
  else                      # Outgoing message
    t = event.tracker
    if t.updated?
      case t.state
      when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event)
      when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event)
      when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event)
      when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event)
      end
      delegate(:on_settled, event) if t.settled?
      t.settle if @opts[:auto_settle]
    end
  end
end


143
144
145
146
147
# File 'lib/handler/reactor_messaging_adapter.rb', line 143

def on_link_flow(event)
  add_credit(event)
  l = event.link
  delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0
end

Add flow control for link opening events



102
# File 'lib/handler/reactor_messaging_adapter.rb', line 102

def on_link_local_open(event) super; add_credit(event); end


103
# File 'lib/handler/reactor_messaging_adapter.rb', line 103

def on_link_remote_open(event) super; add_credit(event); end

#on_transport_closed(event)



99
# File 'lib/handler/reactor_messaging_adapter.rb', line 99

def on_transport_closed(event) delegate(:on_transport_closed, event); end

#on_transport_error(event)



98
# File 'lib/handler/reactor_messaging_adapter.rb', line 98

def on_transport_error(event) delegate_error(:on_transport_error, event); end