Class: Qpid::Proton::Handler::ReactorMessagingAdapter
- Inherits:
-
Adapter
- Object
- Adapter
- Qpid::Proton::Handler::ReactorMessagingAdapter
show all
- Defined in:
- lib/handler/reactor_messaging_adapter.rb
Overview
Adapter to convert raw proton events for the old MessagingHandler used by the Reactor.
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Adapter
adapt, adapter, #forward, #proton_adapter_class
Constructor Details
Returns a new instance of ReactorMessagingAdapter.
26
27
28
29
30
31
32
33
34
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 26
def initialize handler
super
@opts = (handler.options if handler.respond_to?(:options)) || {}
@opts[:prefetch] ||= 10
@opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error
[:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k|
@opts[k] = true unless @opts.include? k
end
end
|
Class Method Details
.open_close(endpoint)
Define repetative on_xxx_open/close methods for each endpoint type
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 55
def self.open_close(endpoint)
on_opening = :"on_#{endpoint}_opening"
on_opened = :"on_#{endpoint}_opened"
on_closing = :"on_#{endpoint}_closing"
on_closed = :"on_#{endpoint}_closed"
on_error = :"on_#{endpoint}_error"
Module.new do
define_method(:"on_#{endpoint}_local_open") do |event|
delegate(on_opened, event) if event.context.remote_open?
end
define_method(:"on_#{endpoint}_remote_open") do |event|
if event.context.local_open?
delegate(on_opened, event)
elsif event.context.local_uninit?
delegate(on_opening, event)
event.context.open if @opts[:auto_open]
end
end
define_method(:"on_#{endpoint}_local_close") do |event|
delegate(on_closed, event) if event.context.remote_closed?
end
define_method(:"on_#{endpoint}_remote_close") do |event|
if event.context.remote_condition
delegate_error(on_error, event)
elsif event.context.local_closed?
delegate(on_closed, event)
elsif @opts[:peer_close_is_error]
Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close")
delegate_error(on_error, event)
else
delegate(on_closing, event)
end
event.context.close if @opts[:auto_close]
end
end
end
|
Instance Method Details
#add_credit(event)
149
150
151
152
153
154
155
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 149
def add_credit(event)
r = event.receiver
prefetch = @opts[:prefetch]
if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit)
r.flow(prefetch - r.credit)
end
end
|
#delegate(method, event)
38
39
40
41
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 38
def delegate(method, event)
event.method = method event.dispatch(@handler) or dispatch(:on_unhandled, event)
end
|
#delegate_error(method, event)
43
44
45
46
47
48
49
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 43
def delegate_error(method, event)
event.method = method
unless event.dispatch(@handler) || dispatch(:on_error, event)
dispatch(:on_unhandled, event)
event.connection.close(event.context.condition) if @opts[:auto_close]
end
end
|
#on_container_start(container)
51
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 51
def on_container_start(container) delegate(:on_start, Event.new(nil, nil, container)); end
|
#on_container_stop(container)
52
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 52
def on_container_stop(container) delegate(:on_stop, Event.new(nil, nil, container)); end
|
#on_delivery(event)
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 106
def on_delivery(event)
if event.link.receiver? d = event.delivery
if d.aborted?
delegate(:on_aborted, event)
d.settle
elsif d.complete?
if d.link.local_closed? && @opts[:auto_accept]
d.release
else
begin
delegate(:on_message, event)
d.accept if @opts[:auto_accept] && !d.settled?
rescue Qpid::Proton::Reject
d.reject
rescue Qpid::Proton::Release
d.release(true)
end
end
end
delegate(:on_settled, event) if d.settled?
add_credit(event)
else t = event.tracker
if t.updated?
case t.state
when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event)
when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event)
when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event)
when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event)
end
delegate(:on_settled, event) if t.settled?
t.settle if @opts[:auto_settle]
end
end
end
|
#on_link_flow(event)
143
144
145
146
147
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 143
def on_link_flow(event)
add_credit(event)
l = event.link
delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0
end
|
#on_link_local_open(event)
Add flow control for link opening events
102
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 102
def on_link_local_open(event) super; add_credit(event); end
|
#on_link_remote_open(event)
103
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 103
def on_link_remote_open(event) super; add_credit(event); end
|
#on_transport_closed(event)
99
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 99
def on_transport_closed(event) delegate(:on_transport_closed, event); end
|
#on_transport_error(event)
98
|
# File 'lib/handler/reactor_messaging_adapter.rb', line 98
def on_transport_error(event) delegate_error(:on_transport_error, event); end
|