nuropb_gw.handler_manager

Module Contents

Classes

HandlerManager

HandlerManager is responsible for the management of consumer connection handlers, and does the following:

Data

logger

API

nuropb_gw.handler_manager.logger

None

class nuropb_gw.handler_manager.HandlerManager(service_name: str, instance_id: str, on_handler_message_received: Optional[Callable[[..., Any], Any]] = None)[source]

HandlerManager is responsible for the management of consumer connection handlers, and does the following:

  • Registers handlers for incoming connections

  • Unregistering handlers for closed connections

  • Authorization validation of new connections

  • Receiving messages from handlers

  • Sending messages to handlers

  • Sending messages to:

    • all registered handlers

    • one specific handler

    • all handlers registered to one user or a set of specific users

Handlers can be any WebSocket or TCP connection, however the design is targeted for WebSocket connections.

NOTE: An authorized user might have many concurrent connections, this is the reason for a user_id to session_id mapping, this allows directed user communications to span all their connections.

Initialization

_service_name: str

None

_instance_id: str

None

_session_id_to_handler: Dict[str, Handler]

None

_user_id_to_session_ids: Dict[str, Set[str]]

None

_on_handler_message_received: Callable[[..., Any], Any] | None

None

A callback set by the ServiceMeshManager to receive incoming messages from handlers. these incoming messages are asynchronously processed by the ServiceMeshManager. Incoming messages are not queued or explicitly persisted before processing and forwarding on to the service mesh.

_max_concurrent_connections: int

None

async on_handler_message(handler, message: str) None[source]

Called from a handler, when new message received over the handler’s connection.

NOTE a response from a self._on_handler_message_received call (below) is awaited. Messages received from the handler are not queued, and are processed in order. However, due to the asynchronous nature of the service mesh, responses back to the handler can be received out of the order in which they were sent.

If the response time for the below call is (N) seconds, if a second message is received from the handler immediately after this message. The second message could be processed before the response to the first message is received.

If the response time for the second message is < (N) seconds, then the response will be transmitted back to the handler before the response to the first message is received.

JSON decoding / encoding: decoding is handled here, encoding is handled directly by the handler.

Parameters:
  • handler – Handler instance

  • message – JSON encoded string

Returns:

async to_handler_message(handler, message) None[source]

Sends a service mesh message to the handler. This method is passed as a callback to the service mesh manager.

JSON encoding is handled directly by the handler, JSON decoding is handled in by self.on_handler_message

Parameters:
  • handler

  • message

Returns:

get_handler(session_id: str) Optional[Handler][source]

Returns a handler from session id, None if not found

register(handler: Handler) None[source]

Registers a handler

unregister(handler: Handler) None[source]

Unregisters a handler

async send_message(session_id: str, message: str) None[source]

Sends the message to the handler

async send_message_to_user_ids(user_ids: List[str], message: str) None[source]

Sends the message to all handlers for all given user_ids There is no intention to support an additional method for a single user, for that use case provide a list with one user_id.

async send_message_to_all(message: str)[source]

Sends the message to all handlers

close(session_id: str)[source]

Close a handler

close_user_ids(user_ids: List[str])[source]

Close a handlers for all user_ids provided

close_all()[source]

Close all handlers