# Copyright (c) 2011-2021 Vanguard Computer Technology Labs <answers@vctlabs.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
"""
This is a Python module to provide 'thin' client/server classes implementing
a light-weight IPC mechanism using JSON formatting and the Redis server as
'message bus'.
"""
import json
import os
import tempfile
import time
from importlib.metadata import version
from pathlib import Path
import redis
from redis import ConnectionPool, StrictRedis
# instead of global pdb import, add this where you want to start debugger:
# import pdb; pdb.set_trace()
__version__ = version('redis_ipc')
# exceptions
[docs]
class RedisIpcExc(Exception):
"""
Generic redis-ipc error, used with one of the exception definitions
defined below.
"""
NoRedis = RedisIpcExc('redis server not available')
NotDict = RedisIpcExc('redis message was not a Python dictionary')
BadMessage = RedisIpcExc('redis message not a recognizable message')
MsgTimeout = RedisIpcExc('redis message request timed out')
# module-level functions and variables
[docs]
def is_jsonable(obj):
"""
Test if an object can be dumped as JSON.
:param obj: object to test
:return: True if dumpable else False
"""
try:
json.dumps(obj)
return True
except (TypeError, OverflowError):
return False
[docs]
def is_unjsonable(obj):
"""
Test if an object can be loaded as JSON.
:param obj: object to test
:return: True if loadable else False
"""
try:
json.loads(obj)
return True
except (TypeError, ValueError):
return False
[docs]
def pdic2jdic(pdic):
"""
Covert an object to JSON.
:param pdic: a dictionary
:return: a JSON string
"""
if not (is_jsonable(pdic) and isinstance(pdic, dict)):
raise BadMessage
return json.dumps(pdic)
[docs]
def jdic2pdic(jstr):
"""
Convert JSON to a dictionary, list, etc.
:param jstr: a JSON string
:return: an object
"""
if not is_unjsonable(jstr):
raise NotDict
return json.loads(jstr)
# default socket path or address should only be used in a trusted/isolated
# environment
[docs]
def get_runtimepath():
"""
Get the runtime socket path.
:return: socket path string
"""
temp_dir = tempfile.gettempdir()
run_dir = os.getenv('RIPC_RUNTIME_DIR', temp_dir)
return os.path.join(run_dir, 'redis-ipc', 'socket')
[docs]
def get_serveraddr():
"""
Get the redis server address if defined in ENV (should be either
a resolvable hostname or ``localhost``).
:return: address or None
"""
if os.getenv('RIPC_TEST_ENV'):
return os.getenv('RIPC_SERVER_ADDR')
return None
ripc_socket_path = get_runtimepath()
ripc_server_address = get_serveraddr()
[docs]
def redis_connect(socket_path=ripc_socket_path, server_addr=ripc_server_address):
"""
Attempt to open a connection to the Redis server, and raise an exception
if this does not work. Return the connection object if successful.
:param socket_path: path to redis socket
:param server_addr: address of redis server
:return: client object
:raises: NoRedis
"""
if not Path(socket_path).is_socket():
raise_msg = f'Socket path {socket_path} is not a valid socket'
raise RedisIpcExc(raise_msg)
try:
if not server_addr:
pool = ConnectionPool.from_url(f'unix://{socket_path}')
else:
pool = ConnectionPool.from_url(f'redis://{socket_path}')
client = StrictRedis(connection_pool=pool)
except redis.exceptions.ConnectionError as exc:
raise NoRedis from exc
return client
[docs]
class RedisClient:
"""
Provide a friendly component name for calling program (e.g. how it is
labeled on system architecture diagrams as opposed to exact executable
name). Allows IPC from multiple threads in a multi-threaded program.
:param component: name of component
:param thread: friendly name for specific thread of execution
"""
def __init__(self, component, thread='main'):
self.component = component
self.thread = thread
# process number of this component (a python program)
self.process_number = os.getpid()
# construct name of queue where replies to commands should arrive
self.results_queue = f'queues.results.{component}.{thread}'
# initialize redis connection
self.redis_conn = redis_connect()
def __generate_msg_id(self):
# unique id for message
# component name, process number, timestamp
timestamp = str(time.time()) # floating timestamp
msg_id = self.component + ':' + str(self.process_number) + ':' + timestamp
return msg_id, timestamp
[docs]
def redis_ipc_send_and_receive(self, dest, cmd, tmout):
"""
:param dest: name of the component to handle this command (string)
:param cmd: the command to send (dictionary)
:param tmout: timeout for receiving a response (float seconds)
"""
# add standard fields to the command dictionary
late_news = self.__generate_msg_id() # id and timestamp
cmd['timestamp'] = late_news[1] # just the timestamp
cmd['component'] = self.component
cmd['thread'] = self.thread
cmd['tid'] = self.process_number
cmd['results_queue'] = self.results_queue
cmd['command_id'] = late_news[0] # the id includes the timestamp
# calculate name of command queue
dest_queue = f'queues.commands.{dest}'
# send off the command message # still a Python dictionary
self.__redis_ipc_send_command(dest_queue, cmd)
# wait on results queue for the answer
# an exception is raised by the request function if it times out
response = self.__redis_ipc_receive_reply(cmd, tmout)
return response
def __redis_ipc_send_command(self, dest_queue, cmd):
"""
This routine does not block, it just sends the command to the back
of the queue.
:param dest_queue: command queue serviced by destination component
:param cmd: command known to the receiving component
"""
# turn command into a JSON dictionary before sending it
msg = pdic2jdic(cmd)
# send it via Redis
self.redis_conn.rpush(dest_queue, msg) # no waiting
def __redis_ipc_receive_reply(self, cmd, tmout):
"""
A proper response is a JSON string (dictionary), turn it back into
a dictionary. If the request timed out, the response is empty,
and an exception will be raised. if a non-empty value was received,
then::
if it is not the response to the specified command
try again
else
return this result
:param cmd: command for which we await a reply
:param tmout: timeout for receiving a response (float seconds)
"""
# use self.results_queue as name of queue to wait on
# throw out received messages until reply["command_id"] == cmd["command_id"]
while True:
redis_reply = self.redis_conn.blpop(self.results_queue, tmout)
if redis_reply is None:
raise MsgTimeout
decoded_reply = jdic2pdic(redis_reply[1])
if decoded_reply['command_id'] != cmd['command_id']:
continue # skip this message, not our response
# take it
return decoded_reply # good enough
[docs]
class RedisServer:
"""
Provide a friendly component name for calling program (e.g. how it is
labeled on system architecture diagrams as opposed to exact executable
name).
:param component: name of component
"""
def __init__(self, component):
# global redis_connect
self.component = component
# process number of this component (a python program)
self.process_number = os.getpid()
# construct name of queue where commands should arrive
self.command_queue = f'queues.commands.{component}'
# initialize redis connection
self.redis_conn = redis_connect()
[docs]
def redis_ipc_receive_command(self):
"""
Blocks for command string to arrive in own command queue.
:return: dictionary
"""
# get serialized command message
redis_reply = self.redis_conn.blpop(self.command_queue)
decoded_reply = jdic2pdic(redis_reply[1])
return decoded_reply
[docs]
def redis_ipc_send_reply(self, cmd, result):
"""
This routine does not block, it just sends the reply to the back
of the queue.
:param cmd: command that was processed so result is now available
:return result: the generated result
"""
# command contains name of reply queue
dest_queue = cmd['results_queue']
# tie reply to its command with matching command_id
result['command_id'] = cmd['command_id']
# turn result into a JSON string before sending it
msg = pdic2jdic(result)
# send it via Redis
self.redis_conn.rpush(dest_queue, msg)