Source code for session

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import logging

from wampy.backends import async_adapter
from wampy.errors import ConnectionError, WampProtocolError
from wampy.messages import MESSAGE_TYPE_MAP
from wampy.messages.hello import Hello
from wampy.messages.goodbye import Goodbye
from wampy.messages.register import Register
from wampy.messages.subscribe import Subscribe

logger = logging.getLogger('wampy.session')


[docs]class Session(object): """ A transient conversation between two Peers attached to a Realm and running over a Transport. WAMP Sessions are established over a WAMP Connection which is the responsibility of the ``Transport`` object. Each wampy ``Session`` manages its own WAMP connection via the ``Transport``. Once the connection is established, the Session is begun when the Realm is joined. This is achieved by sending the HELLO message. .. note:: Routing occurs only between WAMP Sessions that have joined the same Realm. """ def __init__(self, client, router, transport, message_handler): """ A Session between a Client and a Router. :Parameters: client : instance An instance of :class:`peers.Client` router : instance An instance of :class:`peers.Router` transport : instance An instance of ``wampy.transports``. message_handler : instance An instance of ``wampy.message_handler.MessageHandler``, or a subclass of """ self.client = client self.router = router self.transport = transport self.connection = self.transport.connect(upgrade=True) self.message_handler = message_handler self.request_ids = {} self.subscription_map = {} self.registration_map = {} self.session_id = None # spawn a green thread to listen for incoming messages over # a connection and put them on a queue to be processed self._managed_thread = None self._message_queue = async_adapter.message_queue self._listen(self.connection, self._message_queue) @property def host(self): return self.router.host @property def port(self): return self.router.port @property def roles(self): return self.client.roles @property def realm(self): return self.client.realm @property def id(self): return self.session_id
[docs] def begin(self): return self._say_hello()
[docs] def end(self): self._say_goodbye() self.subscription_map = {} self.registration_map = {} self.session_id = None self._managed_thread.kill() self._managed_thread = None
[docs] def send_message(self, message_obj): message_type = MESSAGE_TYPE_MAP[message_obj.WAMP_CODE] message = message_obj.message logger.debug( 'sending "%s" message: "%s" for client "%s"', message_type, message, self.client.name, ) self.connection.send(message)
[docs] def recv_message(self, timeout=5): message = async_adapter.receive_message(timeout=timeout) logger.debug( 'received message: "%s" for client "%s"', message.name, self.client.name, ) return message
def _say_hello(self): message = Hello(self.realm, self.roles) self.send_message(message) response = self.recv_message() return response def _say_goodbye(self): message = Goodbye() try: self.send_message(message) except Exception as exc: # we can't be sure what the Exception is here because it will # be from the Router implementation logger.warning("GOODBYE failed!: %s", exc) else: try: message = self.recv_message(timeout=2) if message.WAMP_CODE != Goodbye.WAMP_CODE: raise WampProtocolError( "Unexpected response from GOODBYE message: {}".format( message ) ) except WampProtocolError: # Server already gone away? pass def _listen(self, connection, message_queue): def connection_handler(): while True: try: frame = connection.receive() if frame: message = frame.payload async_adapter.spawn( self.message_handler.handle_message, message, self.client ) except ( SystemExit, KeyboardInterrupt, ConnectionError, WampProtocolError, ): break gthread = async_adapter.spawn(connection_handler) self._managed_thread = gthread def _subscribe_to_topic(self, handler, topic): message = Subscribe(topic=topic) request_id = message.request_id try: self.send_message(message) except Exception as exc: raise WampProtocolError( "failed to subscribe to {}: \"{}\"".format( topic, exc) ) self.request_ids[request_id] = message, handler def _register_procedure(self, procedure_name, invocation_policy="single"): """ Register a "procedure" on a Client as callable over the Router. """ options = {"invoke": invocation_policy} message = Register(procedure=procedure_name, options=options) request_id = message.request_id try: self.send_message(message) except ValueError: raise WampProtocolError( "failed to register callee: %s", procedure_name ) self.request_ids[request_id] = procedure_name