Menu Search

agent.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import cqpid
from qmf2 import *


class ExampleAgent(AgentHandler):
  """
  This example agent is implemented as a single class that inherits AgentHandler.
  It does not use a separate thread since once set up, it is driven strictly by
  incoming method calls.
  """

  def __init__(self, url):
    ##
    ## Create and open a messaging connection to a broker.
    ##
    self.connection = cqpid.Connection(url, "{reconnect:True}")
    self.session = None
    self.connection.open()

    ##
    ## Create, configure, and open a QMFv2 agent session using the connection.
    ##
    self.session = AgentSession(self.connection, "{interval:30}")
    self.session.setVendor('profitron.com')
    self.session.setProduct('blastinator')
    self.session.setAttribute('attr1', 1000)
    self.session.open()

    ##
    ## Initialize the parent class.
    ##
    AgentHandler.__init__(self, self.session)


  def shutdown(self):
    """
    Clean up the session and connection.
    """
    if self.session:
      self.session.close()
    self.connection.close()


  def method(self, handle, methodName, args, subtypes, addr, userId):
    """
    Handle incoming method calls.
    """
    if addr == self.controlAddr:
      self.control.methodCount += 1

      try:
        if methodName == "stop":
          self.session.methodSuccess(handle)
          self.cancel()

        elif methodName == "echo":
          handle.addReturnArgument("sequence", args["sequence"])
          handle.addReturnArgument("map", args["map"])
          self.session.methodSuccess(handle)

        elif methodName == "event":
          ev = Data(self.sch_event)
          ev.text = args['text']
          self.session.raiseEvent(ev, args['severity'])
          self.session.methodSuccess(handle)

        elif methodName == "fail":
          if args['useString']:
            self.session.raiseException(handle, args['stringVal'])
          else:
            ex = Data(self.sch_exception)
            ex.whatHappened = "It Failed"
            ex.howBad = 75
            ex.details = args['details']
            self.session.raiseException(handle, ex)

        elif methodName == "create_child":
          name = args['name']
          child = Data(self.sch_child)
          child.name = name
          addr = self.session.addData(child, name)
          handle.addReturnArgument("childAddr", addr.asMap())
          self.session.methodSuccess(handle)
      except BaseException, e:
        self.session.raiseException(handle, "%r" % e)


  def setupSchema(self):
    """
    Create and register the schema for this agent.
    """
    package = "com.profitron.bntor"

    ##
    ## Declare a schema for a structured exception that can be used in failed
    ## method invocations.
    ##
    self.sch_exception = Schema(SCHEMA_TYPE_DATA, package, "exception")
    self.sch_exception.addProperty(SchemaProperty("whatHappened", SCHEMA_DATA_STRING))
    self.sch_exception.addProperty(SchemaProperty("howBad", SCHEMA_DATA_INT))
    self.sch_exception.addProperty(SchemaProperty("details", SCHEMA_DATA_MAP))

    ##
    ## Declare a control object to test methods against.
    ##
    self.sch_control = Schema(SCHEMA_TYPE_DATA, package, "control")
    self.sch_control.addProperty(SchemaProperty("state", SCHEMA_DATA_STRING))
    self.sch_control.addProperty(SchemaProperty("methodCount", SCHEMA_DATA_INT))

    stopMethod = SchemaMethod("stop", desc="Stop Agent")
    stopMethod.addArgument(SchemaProperty("message", SCHEMA_DATA_STRING, direction=DIR_IN))
    self.sch_control.addMethod(stopMethod)

    echoMethod = SchemaMethod("echo", desc="Echo Arguments")
    echoMethod.addArgument(SchemaProperty("sequence", SCHEMA_DATA_INT, direction=DIR_IN_OUT))
    echoMethod.addArgument(SchemaProperty("map", SCHEMA_DATA_MAP, direction=DIR_IN_OUT))
    self.sch_control.addMethod(echoMethod)

    eventMethod = SchemaMethod("event", desc="Raise an Event")
    eventMethod.addArgument(SchemaProperty("text", SCHEMA_DATA_STRING, direction=DIR_IN))
    eventMethod.addArgument(SchemaProperty("severity", SCHEMA_DATA_INT, direction=DIR_IN))
    self.sch_control.addMethod(eventMethod)

    failMethod = SchemaMethod("fail", desc="Expected to Fail")
    failMethod.addArgument(SchemaProperty("useString", SCHEMA_DATA_BOOL, direction=DIR_IN))
    failMethod.addArgument(SchemaProperty("stringVal", SCHEMA_DATA_STRING, direction=DIR_IN))
    failMethod.addArgument(SchemaProperty("details", SCHEMA_DATA_MAP, direction=DIR_IN))
    self.sch_control.addMethod(failMethod)

    createMethod = SchemaMethod("create_child", desc="Create Child Object")
    createMethod.addArgument(SchemaProperty("name", SCHEMA_DATA_STRING, direction=DIR_IN))
    createMethod.addArgument(SchemaProperty("childAddr", SCHEMA_DATA_MAP, direction=DIR_OUT))
    self.sch_control.addMethod(createMethod)

    ##
    ## Declare a child object
    ##
    self.sch_child = Schema(SCHEMA_TYPE_DATA, package, "child")
    self.sch_child.addProperty(SchemaProperty("name", SCHEMA_DATA_STRING))

    ##
    ## Declare the event class
    ##
    self.sch_event = Schema(SCHEMA_TYPE_EVENT, package, "event")
    self.sch_event.addProperty(SchemaProperty("text", SCHEMA_DATA_STRING))

    ##
    ## Register our schemata with the agent session.
    ##
    self.session.registerSchema(self.sch_exception)
    self.session.registerSchema(self.sch_control)
    self.session.registerSchema(self.sch_child)
    self.session.registerSchema(self.sch_event)


  def populateData(self):
    """
    Create a control object and give it to the agent session to manage.
    """
    self.control = Data(self.sch_control)
    self.control.state = "OPERATIONAL"
    self.control.methodCount = 0
    self.controlAddr = self.session.addData(self.control, "singleton")


try:
  agent = ExampleAgent("localhost")
  agent.setupSchema()
  agent.populateData()
  agent.run()  # Use agent.start() to launch the agent in a separate thread
  agent.shutdown()
except Exception, e:
  print "Exception Caught:", e

Download this file