Source code for ari.client

# Copyright 2013-2022 The Wazo Authors  (see the AUTHORS file)
# SPDX-License-Identifier: GPL-3.0-or-later

"""ARI client library.
"""

import json
import logging
from collections import defaultdict

import swaggerpy.client
from six.moves import urllib

from .model import (
    Bridge,
    Channel,
    DeviceState,
    Endpoint,
    LiveRecording,
    Playback,
    Repository,
    Sound,
    StoredRecording,
)

log = logging.getLogger(__name__)


[docs]class Client: """ARI Client object. :param base_url: Base URL for accessing Asterisk. :param http_client: HTTP client interface. """ def __init__(self, base_url, http_client): self.base_url = base_url url = urllib.parse.urljoin(base_url, "ari/api-docs/resources.json") self.swagger = swaggerpy.client.SwaggerClient(url, http_client=http_client) self.repositories = { name: Repository(self, name, api) for (name, api) in self.swagger.resources.items() } # Extract models out of the events resource events = [ api['api_declaration'] for api in self.swagger.api_docs['apis'] if api['name'] == 'events' ] if events: self.event_models = events[0]['models'] else: self.event_models = {} self.websockets = set() self.event_listeners = {} self.exception_handler = lambda ex: log.exception( "Event listener threw exception" ) self._app_registered_callbacks = defaultdict(list) self._app_deregistered_callbacks = defaultdict(list) def __getattr__(self, item): """Exposes repositories as fields of the client. :param item: Field name """ repo = self.get_repo(item) if not repo: raise AttributeError("'%r' object has no attribute '%s'" % (self, item)) return repo
[docs] def close(self): """Close this ARI client. This method will close any currently open WebSockets, and close the underlying Swaggerclient. """ for ws in self.websockets: ws.send_close() self.swagger.close()
[docs] def get_repo(self, name): """Get a specific repo by name. :param name: Name of the repo to get :return: Repository, or None if not found. :rtype: ari.model.Repository """ return self.repositories.get(name)
def __run(self, ws): """Drains all messages from a WebSocket, sending them to the client's listeners. :param ws: WebSocket to drain. """ # TypeChecker false positive on iter(callable, sentinel) -> iterator # Fixed in plugin v3.0.1 # noinspection PyTypeChecker for msg_str in iter(lambda: ws.recv(), None): if not msg_str: log.debug("Invalid empty event") continue msg_json = json.loads(msg_str) self.on_stasis_event(msg_json)
[docs] def on_stasis_event(self, event): if not isinstance(event, dict): log.error('Invalid event not a dict: %s', event) return try: listeners = list(self.event_listeners.get(event['type'], [])) except KeyError: log.error('Invalid event no "type" key: %s', event) return for listener in listeners: # noinspection PyBroadException try: callback, args, kwargs = listener args = args or () kwargs = kwargs or {} callback(event, *args, **kwargs) except Exception as e: self.exception_handler(e)
[docs] def run(self, apps): """Connect to the WebSocket and begin processing messages. This method will block until all messages have been received from the WebSocket, or until this client has been closed. :param apps: Application (or list of applications) to connect for :type apps: str or list of str """ if isinstance(apps, list): apps = ','.join(apps) ws = self.swagger.events.eventWebsocket(app=apps) self.websockets.add(ws) self._execute_app_registered_callbacks(apps) try: self.__run(ws) finally: self._execute_app_deregistered_callbacks(apps) ws.close() self.websockets.remove(ws)
def _execute_app_deregistered_callbacks(self, apps): self._execute_app_callbacks(apps, self._app_deregistered_callbacks) def _execute_app_registered_callbacks(self, apps): self._execute_app_callbacks(apps, self._app_registered_callbacks) def _execute_app_callbacks(self, apps, callback_map): for app in apps.split(','): for fn, args, kwargs in callback_map[app]: try: fn(*args, **kwargs) except Exception as e: self.exception_handler(e)
[docs] def on_event(self, event_type, event_cb, *args, **kwargs): """Register callback for events with given type. :param event_type: String name of the event to register for. :param event_cb: Callback function :type event_cb: (dict) -> None :param args: Arguments to pass to event_cb :param kwargs: Keyword arguments to pass to event_cb """ listeners = self.event_listeners.setdefault(event_type, list()) for cb in listeners: if event_cb == cb[0]: listeners.remove(cb) callback_obj = (event_cb, args, kwargs) listeners.append(callback_obj) client = self class EventUnsubscriber(object): """Class to allow events to be unsubscribed.""" def close(self): """Unsubscribe the associated event callback.""" if callback_obj in client.event_listeners[event_type]: client.event_listeners[event_type].remove(callback_obj) return EventUnsubscriber()
[docs] def on_object_event( self, event_type, event_cb, factory_fn, model_id, *args, **kwargs ): """Register callback for events with the given type. Event fields of the given model_id type are passed along to event_cb. If multiple fields of the event have the type model_id, a dict is passed mapping the field name to the model object. :param event_type: String name of the event to register for. :param event_cb: Callback function :type event_cb: (Obj, dict) -> None or (dict[str, Obj], dict) -> :param factory_fn: Function for creating Obj from JSON :param model_id: String id for Obj from Swagger models. :param args: Arguments to pass to event_cb :param kwargs: Keyword arguments to pass to event_cb """ # Find the associated model from the Swagger declaration event_model = self.event_models.get(event_type) if not event_model: raise ValueError("Cannot find event model '%s'" % event_type) # Extract the fields that are of the expected type obj_fields = [ k for (k, v) in event_model['properties'].items() if v['type'] == model_id ] if not obj_fields: raise ValueError( "Event model '%s' has no fields of type %s" % (event_type, model_id) ) def extract_objects(event, *args, **kwargs): """Extract objects of a given type from an event. :param event: Event :param args: Arguments to pass to the event callback :param kwargs: Keyword arguments to pass to the event callback """ # Extract the fields which are of the expected type obj = { obj_field: factory_fn(self, event[obj_field]) for obj_field in obj_fields if event.get(obj_field) } # If there's only one field in the schema, just pass that along if len(obj_fields) == 1: if obj: obj = list(obj.values())[0] else: obj = None event_cb(obj, event, *args, **kwargs) return self.on_event(event_type, extract_objects, *args, **kwargs)
[docs] def on_application_registered(self, application_name, fn, *args, **kwargs): """Register callback for application registered events :param application_name: String name of the stasis application :param fn: Callback function :type fn: (\\*args, \\**kwargs) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ self._app_registered_callbacks[application_name].append((fn, args, kwargs))
[docs] def on_application_deregistered(self, application_name, fn, *args, **kwargs): """Register callback for application deregistered events :param application_name: String name of the stasis application :param fn: Callback function :type fn: (\\*args, \\**kwargs) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ self._app_deregistered_callbacks[application_name].append((fn, args, kwargs))
[docs] def on_channel_event(self, event_type, fn, *args, **kwargs): """Register callback for Channel related events :param event_type: String name of the event to register for. :param fn: Callback function :type fn: (Channel, dict) -> None or (list[Channel], dict) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ return self.on_object_event(event_type, fn, Channel, 'Channel', *args, **kwargs)
[docs] def on_bridge_event(self, event_type, fn, *args, **kwargs): """Register callback for Bridge related events :param event_type: String name of the event to register for. :param fn: Callback function :type fn: (Bridge, dict) -> None or (list[Bridge], dict) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ return self.on_object_event(event_type, fn, Bridge, 'Bridge', *args, **kwargs)
[docs] def on_playback_event(self, event_type, fn, *args, **kwargs): """Register callback for Playback related events :param event_type: String name of the event to register for. :param fn: Callback function :type fn: (Playback, dict) -> None or (list[Playback], dict) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ return self.on_object_event(event_type, fn, Playback, 'Playback', *args, **kwargs)
[docs] def on_live_recording_event(self, event_type, fn, *args, **kwargs): """Register callback for LiveRecording related events :param event_type: String name of the event to register for. :param fn: Callback function :type fn: (LiveRecording, dict) -> None or (list[LiveRecording], dict) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ return self.on_object_event( event_type, fn, LiveRecording, 'LiveRecording', *args, **kwargs )
[docs] def on_stored_recording_event(self, event_type, fn, *args, **kwargs): """Register callback for StoredRecording related events :param event_type: String name of the event to register for. :param fn: Callback function :type fn: (StoredRecording, dict) -> None or (list[StoredRecording], dict) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ return self.on_object_event( event_type, fn, StoredRecording, 'StoredRecording', *args, **kwargs )
[docs] def on_endpoint_event(self, event_type, fn, *args, **kwargs): """Register callback for Endpoint related events :param event_type: String name of the event to register for. :param fn: Callback function :type fn: (Endpoint, dict) -> None or (list[Endpoint], dict) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ return self.on_object_event(event_type, fn, Endpoint, 'Endpoint', *args, **kwargs)
[docs] def on_device_state_event(self, event_type, fn, *args, **kwargs): """Register callback for DeviceState related events :param event_type: String name of the event to register for. :param fn: Callback function :type fn: (DeviceState, dict) -> None or (list[DeviceState], dict) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ return self.on_object_event( event_type, fn, DeviceState, 'DeviceState', *args, **kwargs )
[docs] def on_sound_event(self, event_type, fn, *args, **kwargs): """Register callback for Sound related events :param event_type: String name of the event to register for. :param fn: Sound function :type fn: (Sound, dict) -> None or (list[Sound], dict) -> None :param args: Arguments to pass to fn :param kwargs: Keyword arguments to pass to fn """ return self.on_object_event(event_type, fn, Sound, 'Sound', *args, **kwargs)