Package proton :: Module _message
[frames] | no frames]

Source Code for Module proton._message

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  from cproton import PN_DEFAULT_PRIORITY, PN_OVERFLOW, pn_error_text, pn_message, \ 
 23      pn_message_annotations, pn_message_body, pn_message_clear, pn_message_correlation_id, pn_message_decode, \ 
 24      pn_message_encode, pn_message_error, pn_message_free, pn_message_get_address, pn_message_get_content_encoding, \ 
 25      pn_message_get_content_type, pn_message_get_creation_time, pn_message_get_delivery_count, \ 
 26      pn_message_get_expiry_time, pn_message_get_group_id, pn_message_get_group_sequence, pn_message_get_priority, \ 
 27      pn_message_get_reply_to, pn_message_get_reply_to_group_id, pn_message_get_subject, pn_message_get_ttl, \ 
 28      pn_message_get_user_id, pn_message_id, pn_message_instructions, pn_message_is_durable, pn_message_is_first_acquirer, \ 
 29      pn_message_is_inferred, pn_message_properties, pn_message_set_address, pn_message_set_content_encoding, \ 
 30      pn_message_set_content_type, pn_message_set_creation_time, pn_message_set_delivery_count, pn_message_set_durable, \ 
 31      pn_message_set_expiry_time, pn_message_set_first_acquirer, pn_message_set_group_id, pn_message_set_group_sequence, \ 
 32      pn_message_set_inferred, pn_message_set_priority, pn_message_set_reply_to, pn_message_set_reply_to_group_id, \ 
 33      pn_message_set_subject, pn_message_set_ttl, pn_message_set_user_id 
 34   
 35  from . import _compat 
 36  from ._common import isinteger, millis2secs, secs2millis, unicode2utf8, utf82unicode 
 37  from ._data import Data, symbol, ulong 
 38  from ._endpoints import Link 
 39  from ._exceptions import EXCEPTIONS, MessageException 
 40   
 41  # 
 42  # Hack to provide Python2 <---> Python3 compatibility 
 43  try: 
 44      unicode() 
 45  except NameError: 
 46      unicode = str 
 47   
 48   
49 -class Message(object):
50 """The L{Message} class is a mutable holder of message content. 51 52 @ivar instructions: delivery instructions for the message 53 @type instructions: dict 54 @ivar annotations: infrastructure defined message annotations 55 @type annotations: dict 56 @ivar properties: application defined message properties 57 @type properties: dict 58 @ivar body: message body 59 @type body: bytes | unicode | dict | list | int | long | float | UUID 60 """ 61 62 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 63
64 - def __init__(self, body=None, **kwargs):
65 """ 66 @param kwargs: Message property name/value pairs to initialise the Message 67 """ 68 self._msg = pn_message() 69 self._id = Data(pn_message_id(self._msg)) 70 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 71 self.instructions = None 72 self.annotations = None 73 self.properties = None 74 self.body = body 75 for k, v in _compat.iteritems(kwargs): 76 getattr(self, k) # Raise exception if it's not a valid attribute. 77 setattr(self, k, v)
78
79 - def __del__(self):
80 if hasattr(self, "_msg"): 81 pn_message_free(self._msg) 82 del self._msg
83
84 - def _check(self, err):
85 if err < 0: 86 exc = EXCEPTIONS.get(err, MessageException) 87 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 88 else: 89 return err
90
91 - def _check_property_keys(self):
92 for k in self.properties.keys(): 93 if isinstance(k, unicode): 94 # py2 unicode, py3 str (via hack definition) 95 continue 96 # If key is binary then change to string 97 elif isinstance(k, str): 98 # py2 str 99 self.properties[k.encode('utf-8')] = self.properties.pop(k) 100 else: 101 raise MessageException('Application property key is not string type: key=%s %s' % (str(k), type(k)))
102
103 - def _pre_encode(self):
104 inst = Data(pn_message_instructions(self._msg)) 105 ann = Data(pn_message_annotations(self._msg)) 106 props = Data(pn_message_properties(self._msg)) 107 body = Data(pn_message_body(self._msg)) 108 109 inst.clear() 110 if self.instructions is not None: 111 inst.put_object(self.instructions) 112 ann.clear() 113 if self.annotations is not None: 114 ann.put_object(self.annotations) 115 props.clear() 116 if self.properties is not None: 117 self._check_property_keys() 118 props.put_object(self.properties) 119 body.clear() 120 if self.body is not None: 121 body.put_object(self.body)
122
123 - def _post_decode(self):
124 inst = Data(pn_message_instructions(self._msg)) 125 ann = Data(pn_message_annotations(self._msg)) 126 props = Data(pn_message_properties(self._msg)) 127 body = Data(pn_message_body(self._msg)) 128 129 if inst.next(): 130 self.instructions = inst.get_object() 131 else: 132 self.instructions = None 133 if ann.next(): 134 self.annotations = ann.get_object() 135 else: 136 self.annotations = None 137 if props.next(): 138 self.properties = props.get_object() 139 else: 140 self.properties = None 141 if body.next(): 142 self.body = body.get_object() 143 else: 144 self.body = None
145
146 - def clear(self):
147 """ 148 Clears the contents of the L{Message}. All fields will be reset to 149 their default values. 150 """ 151 pn_message_clear(self._msg) 152 self.instructions = None 153 self.annotations = None 154 self.properties = None 155 self.body = None
156
157 - def _is_inferred(self):
158 return pn_message_is_inferred(self._msg)
159
160 - def _set_inferred(self, value):
161 self._check(pn_message_set_inferred(self._msg, bool(value)))
162 163 inferred = property(_is_inferred, _set_inferred, doc=""" 164 The inferred flag for a message indicates how the message content 165 is encoded into AMQP sections. If inferred is true then binary and 166 list values in the body of the message will be encoded as AMQP DATA 167 and AMQP SEQUENCE sections, respectively. If inferred is false, 168 then all values in the body of the message will be encoded as AMQP 169 VALUE sections regardless of their type. 170 """) 171
172 - def _is_durable(self):
173 return pn_message_is_durable(self._msg)
174
175 - def _set_durable(self, value):
176 self._check(pn_message_set_durable(self._msg, bool(value)))
177 178 durable = property(_is_durable, _set_durable, 179 doc=""" 180 The durable property indicates that the message should be held durably 181 by any intermediaries taking responsibility for the message. 182 """) 183
184 - def _get_priority(self):
185 return pn_message_get_priority(self._msg)
186
187 - def _set_priority(self, value):
188 self._check(pn_message_set_priority(self._msg, value))
189 190 priority = property(_get_priority, _set_priority, 191 doc=""" 192 The priority of the message. 193 """) 194
195 - def _get_ttl(self):
196 return millis2secs(pn_message_get_ttl(self._msg))
197
198 - def _set_ttl(self, value):
199 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
200 201 ttl = property(_get_ttl, _set_ttl, 202 doc=""" 203 The time to live of the message measured in seconds. Expired messages 204 may be dropped. 205 """) 206
207 - def _is_first_acquirer(self):
208 return pn_message_is_first_acquirer(self._msg)
209
210 - def _set_first_acquirer(self, value):
211 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
212 213 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 214 doc=""" 215 True iff the recipient is the first to acquire the message. 216 """) 217
218 - def _get_delivery_count(self):
219 return pn_message_get_delivery_count(self._msg)
220
221 - def _set_delivery_count(self, value):
222 self._check(pn_message_set_delivery_count(self._msg, value))
223 224 delivery_count = property(_get_delivery_count, _set_delivery_count, 225 doc=""" 226 The number of delivery attempts made for this message. 227 """) 228
229 - def _get_id(self):
230 return self._id.get_object()
231
232 - def _set_id(self, value):
233 if isinteger(value): 234 value = ulong(value) 235 self._id.rewind() 236 self._id.put_object(value)
237 238 id = property(_get_id, _set_id, 239 doc=""" 240 The id of the message. 241 """) 242
243 - def _get_user_id(self):
244 return pn_message_get_user_id(self._msg)
245
246 - def _set_user_id(self, value):
247 self._check(pn_message_set_user_id(self._msg, value))
248 249 user_id = property(_get_user_id, _set_user_id, 250 doc=""" 251 The user id of the message creator. 252 """) 253
254 - def _get_address(self):
255 return utf82unicode(pn_message_get_address(self._msg))
256
257 - def _set_address(self, value):
258 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
259 260 address = property(_get_address, _set_address, 261 doc=""" 262 The address of the message. 263 """) 264
265 - def _get_subject(self):
266 return utf82unicode(pn_message_get_subject(self._msg))
267
268 - def _set_subject(self, value):
269 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
270 271 subject = property(_get_subject, _set_subject, 272 doc=""" 273 The subject of the message. 274 """) 275
276 - def _get_reply_to(self):
277 return utf82unicode(pn_message_get_reply_to(self._msg))
278
279 - def _set_reply_to(self, value):
280 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
281 282 reply_to = property(_get_reply_to, _set_reply_to, 283 doc=""" 284 The reply-to address for the message. 285 """) 286
287 - def _get_correlation_id(self):
288 return self._correlation_id.get_object()
289
290 - def _set_correlation_id(self, value):
291 if isinteger(value): 292 value = ulong(value) 293 self._correlation_id.rewind() 294 self._correlation_id.put_object(value)
295 296 correlation_id = property(_get_correlation_id, _set_correlation_id, 297 doc=""" 298 The correlation-id for the message. 299 """) 300
301 - def _get_content_type(self):
302 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
303
304 - def _set_content_type(self, value):
305 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
306 307 content_type = property(_get_content_type, _set_content_type, 308 doc=""" 309 The content-type of the message. 310 """) 311
312 - def _get_content_encoding(self):
313 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
314
315 - def _set_content_encoding(self, value):
316 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
317 318 content_encoding = property(_get_content_encoding, _set_content_encoding, 319 doc=""" 320 The content-encoding of the message. 321 """) 322
323 - def _get_expiry_time(self):
324 return millis2secs(pn_message_get_expiry_time(self._msg))
325
326 - def _set_expiry_time(self, value):
327 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
328 329 expiry_time = property(_get_expiry_time, _set_expiry_time, 330 doc=""" 331 The expiry time of the message. 332 """) 333
334 - def _get_creation_time(self):
335 return millis2secs(pn_message_get_creation_time(self._msg))
336
337 - def _set_creation_time(self, value):
338 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
339 340 creation_time = property(_get_creation_time, _set_creation_time, 341 doc=""" 342 The creation time of the message. 343 """) 344
345 - def _get_group_id(self):
346 return utf82unicode(pn_message_get_group_id(self._msg))
347
348 - def _set_group_id(self, value):
349 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
350 351 group_id = property(_get_group_id, _set_group_id, 352 doc=""" 353 The group id of the message. 354 """) 355
356 - def _get_group_sequence(self):
357 return pn_message_get_group_sequence(self._msg)
358
359 - def _set_group_sequence(self, value):
360 self._check(pn_message_set_group_sequence(self._msg, value))
361 362 group_sequence = property(_get_group_sequence, _set_group_sequence, 363 doc=""" 364 The sequence of the message within its group. 365 """) 366
367 - def _get_reply_to_group_id(self):
368 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
369
370 - def _set_reply_to_group_id(self, value):
371 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
372 373 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 374 doc=""" 375 The group-id for any replies. 376 """) 377
378 - def encode(self):
379 self._pre_encode() 380 sz = 16 381 while True: 382 err, data = pn_message_encode(self._msg, sz) 383 if err == PN_OVERFLOW: 384 sz *= 2 385 continue 386 else: 387 self._check(err) 388 return data
389
390 - def decode(self, data):
391 self._check(pn_message_decode(self._msg, data)) 392 self._post_decode()
393
394 - def send(self, sender, tag=None):
395 dlv = sender.delivery(tag or sender.delivery_tag()) 396 encoded = self.encode() 397 sender.stream(encoded) 398 sender.advance() 399 if sender.snd_settle_mode == Link.SND_SETTLED: 400 dlv.settle() 401 return dlv
402
403 - def recv(self, link):
404 """ 405 Receives and decodes the message content for the current delivery 406 from the link. Upon success it will return the current delivery 407 for the link. If there is no current delivery, or if the current 408 delivery is incomplete, or if the link is not a receiver, it will 409 return None. 410 411 @type link: Link 412 @param link: the link to receive a message from 413 @return the delivery associated with the decoded message (or None) 414 415 """ 416 if link.is_sender: return None 417 dlv = link.current 418 if not dlv or dlv.partial: return None 419 dlv.encoded = link.recv(dlv.pending) 420 link.advance() 421 # the sender has already forgotten about the delivery, so we might 422 # as well too 423 if link.remote_snd_settle_mode == Link.SND_SETTLED: 424 dlv.settle() 425 self.decode(dlv.encoded) 426 return dlv
427
428 - def __repr__(self):
429 props = [] 430 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 431 "priority", "first_acquirer", "delivery_count", "id", 432 "correlation_id", "user_id", "group_id", "group_sequence", 433 "reply_to_group_id", "instructions", "annotations", 434 "properties", "body"): 435 value = getattr(self, attr) 436 if value: props.append("%s=%r" % (attr, value)) 437 return "Message(%s)" % ", ".join(props)
438