Package proton :: Module _tracing
[frames] | no frames]

Source Code for Module proton._tracing

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  import atexit 
 21  import functools 
 22  import os 
 23  import sys 
 24  import time 
 25  import weakref 
 26   
 27  try: 
 28      import opentracing 
 29      import jaeger_client 
 30      from opentracing.ext import tags 
 31      from opentracing.propagation import Format 
 32  except ImportError: 
 33      raise ImportError('proton tracing requires opentracing and jaeger_client modules') 
 34   
 35  import proton 
 36  from proton import Sender as ProtonSender 
 37  from proton.handlers import ( 
 38      OutgoingMessageHandler as ProtonOutgoingMessageHandler, 
 39      IncomingMessageHandler as ProtonIncomingMessageHandler 
 40  ) 
 41   
 42  _tracer = None 
 43  _trace_key = proton.symbol('x-opt-qpid-tracestate') 
 44   
45 -def get_tracer():
46 global _tracer 47 if _tracer is not None: 48 return _tracer 49 exe = sys.argv[0] if sys.argv[0] else 'interactive-session' 50 return init_tracer(os.path.basename(exe))
51
52 -def _fini_tracer():
53 time.sleep(1) 54 c = opentracing.global_tracer().close() 55 while not c.done(): 56 time.sleep(0.5)
57
58 -def init_tracer(service_name):
59 global _tracer 60 if _tracer is not None: 61 return _tracer 62 63 config = jaeger_client.Config( 64 config={}, 65 service_name=service_name, 66 validate=True 67 ) 68 config.initialize_tracer() 69 _tracer = opentracing.global_tracer() 70 # A nasty hack to ensure enough time for the tracing data to be flushed 71 atexit.register(_fini_tracer) 72 return _tracer
73 74
75 -class IncomingMessageHandler(ProtonIncomingMessageHandler):
76 - def on_message(self, event):
77 if self.delegate is not None: 78 tracer = get_tracer() 79 message = event.message 80 receiver = event.receiver 81 connection = event.connection 82 span_tags = { 83 tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER, 84 tags.MESSAGE_BUS_DESTINATION: receiver.source.address, 85 tags.PEER_ADDRESS: connection.connected_address, 86 tags.PEER_HOSTNAME: connection.hostname, 87 'inserted_by': 'proton-message-tracing' 88 } 89 if message.annotations is not None: 90 headers = message.annotations[_trace_key] 91 span_ctx = tracer.extract(Format.TEXT_MAP, headers) 92 with tracer.start_active_span('amqp-delivery-receive', child_of=span_ctx, tags=span_tags): 93 proton._events._dispatch(self.delegate, 'on_message', event) 94 else: 95 with tracer.start_active_span('amqp-delivery-receive', ignore_active_span=True, tags=span_tags): 96 proton._events._dispatch(self.delegate, 'on_message', event)
97
98 -class OutgoingMessageHandler(ProtonOutgoingMessageHandler):
99 - def on_settled(self, event):
100 if self.delegate is not None: 101 delivery = event.delivery 102 state = delivery.remote_state 103 span = delivery.span 104 span.set_tag('delivery-terminal-state', state.name) 105 span.log_kv({'event': 'delivery settled', 'state': state.name}) 106 span.finish() 107 proton._events._dispatch(self.delegate, 'on_settled', event)
108
109 -class Sender(ProtonSender):
110 - def send(self, msg):
111 tracer = get_tracer() 112 connection = self.connection 113 span_tags = { 114 tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER, 115 tags.MESSAGE_BUS_DESTINATION: self.target.address, 116 tags.PEER_ADDRESS: connection.connected_address, 117 tags.PEER_HOSTNAME: connection.hostname, 118 'inserted_by': 'proton-message-tracing' 119 } 120 span = tracer.start_span('amqp-delivery-send', tags=span_tags) 121 headers = {} 122 tracer.inject(span, Format.TEXT_MAP, headers) 123 if msg.annotations is None: 124 msg.annotations = { _trace_key: headers } 125 else: 126 msg.annotations[_trace_key] = headers 127 delivery = ProtonSender.send(self, msg) 128 delivery.span = span 129 span.set_tag('delivery-tag', delivery.tag) 130 return delivery
131 132 # Monkey patch proton for tracing (need to patch both internal and external names) 133 proton._handlers.IncomingMessageHandler = IncomingMessageHandler 134 proton._handlers.OutgoingMessageHandler = OutgoingMessageHandler 135 proton._endpoints.Sender = Sender 136 proton.handlers.IncomingMessageHandler = IncomingMessageHandler 137 proton.handlers.OutgoingMessageHandler = OutgoingMessageHandler 138 proton.Sender = Sender 139