Source code for clients

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import inspect
import logging
import os

from wampy.constants import (
    CROSSBAR_DEFAULT, DEFAULT_ROLES, DEFAULT_REALM
)
from wampy.errors import WampProtocolError, WampyError
from wampy.session import Session
from wampy.messages import Abort, Challenge
from wampy.message_handler import MessageHandler
from wampy.peers.routers import Router
from wampy.roles.caller import CallProxy, RpcProxy
from wampy.roles.publisher import PublishProxy
from wampy.transports import WebSocket, SecureWebSocket

logger = logging.getLogger("wampy.clients")


[docs]class Client(object): """ A WAMP Client for use in Python applications, scripts and shells. """ def __init__( self, url=None, cert_path=None, realm=DEFAULT_REALM, roles=DEFAULT_ROLES, message_handler=None, name=None, router=None, ): """ A WAMP Client "Peer". WAMP is designed for application code to run within Clients, i.e. Peers. Peers have the roles Callee, Caller, Publisher, and Subscriber. Subclass this base class to implemente the Roles for your application. :Parameters: url : string The URL of the Router Peer. This must include protocol, host and port and an optional path, e.g. "ws://example.com:8080" or "wss://example.com:8080/ws". Note though that "ws" protocol defaults to port 8080, an "wss" to 443. cert_path : str If using ``wss`` protocol, a certificate might be required by the Router. If so, provide here. realm : str The routing namespace to construct the ``Session`` over. Defaults to ``realm1``. roles : dictionary Description of the Roles implemented by the ``Client``. Defaults to ``wampy.constants.DEFAULT_ROLES``. message_handler : instance An instance of ``wampy.message_handler.MessageHandler``, or a subclass of. This controls the conversation between the two Peers. name : string Optional name for your ``Client``. Useful for when testing your app or for logging. router : instance An alternative way to connect to a Router rather than ``url``. An instance of a Router Peer, e.g. ``wampy.peers.routers.Crossbar`` This is more configurable and powerful, but requires a copy of the Router's config file, making this only really useful in single host setups or testing. """ if url and router: raise WampyError( 'Both ``url`` and ``router`` decide how your client connects ' 'to the Router, and so only one can be defined on ' 'instantiation. Please choose one or the other.' ) # the endpoint of a WAMP Router self.url = url or CROSSBAR_DEFAULT # the ``realm`` is the administrive domain to route messages over. self.realm = realm # the ``roles`` define what Roles (features) the Client can act, # but also configure behaviour such as auth self.roles = roles # a Session is a transient conversation between two Peers - a Client # and a Router. Here we model the Peer we are going to connect to. self.router = router or Router(url=self.url, cert_path=cert_path) # wampy uses a decoupled "messge handler" to process incoming messages. # wampy also provides a very adequate default. self.message_handler = message_handler or MessageHandler() # this conversation is over a transport. WAMP messages are transmitted # as WebSocket messages by default (well, actually... that's because no # other transports are supported!) if self.router.scheme == "ws": self.transport = WebSocket( server_url=self.router.url, ipv=self.router.ipv, ) elif self.router.scheme == "wss": self.transport = SecureWebSocket( server_url=self.router.url, ipv=self.router.ipv, certificate_path=self.router.certificate, ) else: raise WampyError( 'Network protocl must be "ws" or "wss"' ) # generally ``name`` is used for debugging and logging only self.name = name or self.__class__.__name__ self._session = None def __enter__(self): self.start() return self def __exit__(self, exception_type, exception_value, traceback): self.stop() @property def session(self): return self._session @property def subscription_map(self): return self.session.subscription_map @property def registration_map(self): return self.session.registration_map @property def request_ids(self): return self.session.request_ids @property def call(self): return CallProxy(client=self) @property def rpc(self): return RpcProxy(client=self) @property def publish(self): return PublishProxy(client=self)
[docs] def start(self): # create a Session repr between ourselves and the Router. # pass in out ``MessageHandler`` which will process messages # before they are passed back to the client. self._session = Session( client=self, router=self.router, transport=self.transport, message_handler=self.message_handler, ) # establish the session message_obj = self.session.begin() # raise if Router aborts handshake or we cannot respond to a # Challenge. if message_obj.WAMP_CODE == Abort.WAMP_CODE: raise WampyError(message_obj.message) if message_obj.WAMP_CODE == Challenge.WAMP_CODE: if 'WAMPYSECRET' not in os.environ: raise WampyError( "Wampy requires a client's secret to be " "in the environment as ``WAMPYSECRET``" ) raise WampyError("Failed to handle CHALLENGE") logger.debug( 'client %s has established a session with id "%s"', self.name, self.session.id )
[docs] def stop(self): if self.session and self.session.id: self.session.end() self.transport.disconnect()
[docs] def send_message(self, message): self.session.send_message(message)
[docs] def recv_message(self): return self.session.recv_message()
[docs] def make_rpc(self, message): logger.debug("%s sending message: %s", self.name, message) self.session.send_message(message) try: response = self.session.recv_message() except WampProtocolError as wamp_err: logger.error(wamp_err) raise except Exception as exc: logger.warning("rpc failed!!") logger.exception(str(exc)) raise return response
[docs] def register_roles(self): # over-ride this if you want to customise how your client regisers # its Roles logger.info("registering roles for: %s", self.name) maybe_roles = [] bases = [b for b in inspect.getmro(self.__class__) if b is not object] for base in bases: maybe_roles.extend( v for v in base.__dict__.values() if inspect.isclass(base) and callable(v) ) for maybe_role in maybe_roles: if hasattr(maybe_role, 'callee'): procedure_name = maybe_role.__name__ invocation_policy = maybe_role.invocation_policy self.session._register_procedure( procedure_name, invocation_policy) logger.debug( '%s registered callee "%s"', self.name, procedure_name, ) if hasattr(maybe_role, 'subscriber'): topic = maybe_role.topic handler_name = maybe_role.handler.__name__ handler = getattr(self, handler_name) self.session._subscribe_to_topic(handler, topic) logger.debug( '%s subscribed to topic "%s"', self.name, topic, )