modules
Top-level package for centraal-client-flow.
connections
special
¶
cosmosdb
¶
Modulo de conexión a cosmos.
CosmosDBSingleton
¶
Singleton class for Cosmos DB client.
Source code in centraal_client_flow/connections/cosmosdb.py
class CosmosDBSingleton:
"""Singleton class for Cosmos DB client."""
_instance: Optional["CosmosDBSingleton"] = None
_lock: Lock = Lock()
def __new__(
cls,
connection_string: Optional[str] = None,
database_name: Optional[str] = None,
) -> "CosmosDBSingleton":
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
connection_string: Optional[str] = None,
database_name: Optional[str] = None,
) -> None:
if not hasattr(self, "_initialized"):
self._initialized = False
self.client: Optional[CosmosClient] = None
self.database: Optional[CosmosClient] = None
self.connection_string = connection_string or os.getenv(
"COSMOS_CONNECTION_STRING"
)
self.database_name = database_name or os.getenv("DATABASE_NAME")
def _initialize(self) -> None:
"""Initialize the Cosmos DB client and database."""
if self.client is None or self.database is None:
if not self.connection_string or not self.database_name:
raise ValueError("Connection string and database name must be provided")
self.client = CosmosClient.from_connection_string(self.connection_string)
self.database = self.client.get_database_client(self.database_name)
self._initialized = True
def get_container_client(self, container_name: str) -> ContainerProxy:
"""Get a container client."""
self._initialize()
return self.database.get_container_client(container_name)
def set_mock_client(
self, mock_client: CosmosClient, mock_database: CosmosClient
) -> None:
"""Set a mock client and database for testing purposes."""
self.client = mock_client
self.database = mock_database
__new__(cls, connection_string=None, database_name=None)
special
staticmethod
¶
Create and return a new object. See help(type) for accurate signature.
Source code in centraal_client_flow/connections/cosmosdb.py
def __new__(
cls,
connection_string: Optional[str] = None,
database_name: Optional[str] = None,
) -> "CosmosDBSingleton":
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
get_container_client(self, container_name)
¶
Get a container client.
Source code in centraal_client_flow/connections/cosmosdb.py
def get_container_client(self, container_name: str) -> ContainerProxy:
"""Get a container client."""
self._initialize()
return self.database.get_container_client(container_name)
set_mock_client(self, mock_client, mock_database)
¶
Set a mock client and database for testing purposes.
Source code in centraal_client_flow/connections/cosmosdb.py
def set_mock_client(
self, mock_client: CosmosClient, mock_database: CosmosClient
) -> None:
"""Set a mock client and database for testing purposes."""
self.client = mock_client
self.database = mock_database
service_bus
¶
Conexiones a service bus.
IServiceBusClient (Protocol)
¶
Interfaz.
Source code in centraal_client_flow/connections/service_bus.py
@runtime_checkable
class IServiceBusClient(Protocol):
"""Interfaz."""
client: Optional[ServiceBusClient] = None
connection_str: Optional[str] = None
def send_message_to_queue(self, message: dict, session_id: str, queue_name: str):
"""Envía un mensaje a la cola de Service Bus especificada.
Args:
message: El mensaje a enviar representado como un diccionario.
session_id: ID de sesión para el mensaje. Debe ser el id del modelo.
queue_name: Nombre de la cola a la que se enviará el mensaje.
"""
send_message_to_queue(self, message, session_id, queue_name)
¶
Envía un mensaje a la cola de Service Bus especificada.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message |
dict |
El mensaje a enviar representado como un diccionario. |
required |
session_id |
str |
ID de sesión para el mensaje. Debe ser el id del modelo. |
required |
queue_name |
str |
Nombre de la cola a la que se enviará el mensaje. |
required |
Source code in centraal_client_flow/connections/service_bus.py
def send_message_to_queue(self, message: dict, session_id: str, queue_name: str):
"""Envía un mensaje a la cola de Service Bus especificada.
Args:
message: El mensaje a enviar representado como un diccionario.
session_id: ID de sesión para el mensaje. Debe ser el id del modelo.
queue_name: Nombre de la cola a la que se enviará el mensaje.
"""
ServiceBusClientSingleton (IServiceBusClient)
¶
Singleton para manejar la conexión a Azure Service Bus.
No se cachea el sender: se obtiene uno nuevo por envío (with client.get_queue_sender) para evitar el error AttributeError por sesión AMQP cerrada (issue #32967).
ServiceBusSender y la recreación del cliente bajo reintentos no son seguros entre
hilos: un threading.Lock a nivel de instancia serializa send_message_to_queue,
el uso de get_sender y close.
Source code in centraal_client_flow/connections/service_bus.py
class ServiceBusClientSingleton(IServiceBusClient):
"""Singleton para manejar la conexión a Azure Service Bus.
No se cachea el sender: se obtiene uno nuevo por envío (with client.get_queue_sender)
para evitar el error AttributeError por sesión AMQP cerrada (issue #32967).
``ServiceBusSender`` y la recreación del cliente bajo reintentos no son seguros entre
hilos: un ``threading.Lock`` a nivel de instancia serializa ``send_message_to_queue``,
el uso de ``get_sender`` y ``close``.
"""
_instance = None
_lock: threading.Lock
client: Optional[ServiceBusClient] = None
connection_str: Optional[str] = None
logging_enable: bool = False
MAX_RETRIES = 3
RETRY_DELAY = 1
def __new__(cls, connection_str: str, logging_enable: bool = False):
"""Crea una instancia única de ServiceBusClientSingleton si no existe.
Args:
connection_str: Cadena de conexión a Azure Service Bus.
logging_enable: Si True, habilita logging del cliente (útil para depurar
timeouts/conexión, p. ej. "Failed to initiate the connection due to exception").
"""
if cls._instance is None:
cls._instance = super(ServiceBusClientSingleton, cls).__new__(cls)
cls._instance._lock = threading.Lock()
cls._instance.connection_str = connection_str
cls._instance.logging_enable = logging_enable
cls._instance._initialize_client()
return cls._instance
def _initialize_client(self):
"""Initialize the Service Bus client with retry logic."""
for attempt in range(self.MAX_RETRIES):
try:
self.client = ServiceBusClient.from_connection_string(
self.connection_str,
logging_enable=self.logging_enable,
)
logger.info("Successfully initialized Service Bus client")
return
except (ServiceBusError, ServiceBusConnectionError, AttributeError) as e:
if attempt == self.MAX_RETRIES - 1:
logger.error(
f"Failed to initialize Service Bus client after {self.MAX_RETRIES} attempts: {str(e)}")
raise
logger.warning(
f"Attempt {attempt + 1} failed to initialize Service Bus client: {str(e)}")
time.sleep(self.RETRY_DELAY)
def _ensure_client_connection(self) -> ServiceBusClient:
"""Ensure the client is connected and valid."""
if not self.client:
self._initialize_client()
return self.client
def synchronize(self):
"""Context manager que toma el mismo lock que envíos y ``get_sender``.
Usar al acceder a ``client`` directamente (p. ej. ``get_topic_sender``) para
no competir con otros hilos que usan el singleton.
"""
return self._lock
def get_sender(self, queue_name: str):
"""Obtiene un sender para la cola (retrocompatibilidad).
Cada llamada devuelve un sender nuevo (no cacheado) para evitar sesión AMQP
cerrada (issue #32967). Usar como context manager y cerrar después de usar:
with service_bus_client.get_sender(queue_name) as sender:
sender.send_messages(msg)
El lock del singleton se mantiene durante todo el bloque ``with`` para evitar
condiciones de carrera entre hilos (p. ej. Azure Functions worker).
"""
return _LockedQueueSenderContext(self, queue_name)
def send_message_to_queue(self, message: dict, session_id: str, queue_name: str):
"""Envía un mensaje a la cola de Service Bus especificada con retry logic.
Obtiene un sender nuevo por operación (no cacheado) para evitar reutilizar
una conexión/sesión AMQP cerrada que provoca AttributeError en create_sender_link.
"""
with self._lock:
for attempt in range(self.MAX_RETRIES):
try:
client = self._ensure_client_connection()
with client.get_queue_sender(queue_name) as sender:
msg = ServiceBusMessage(body=json.dumps(message))
msg.session_id = session_id
sender.send_messages(msg)
logger.debug(
f"Successfully sent message to queue {queue_name} with session_id {session_id}")
return
except (ServiceBusError, ServiceBusConnectionError, ServiceRequestError, AttributeError) as e:
if attempt == self.MAX_RETRIES - 1:
logger.error(
f"Failed to send message to queue {queue_name} after {self.MAX_RETRIES} attempts: {str(e)}")
raise e
logger.warning(
f"Attempt {attempt + 1} failed to send message to queue {queue_name}: {str(e)}")
time.sleep(self.RETRY_DELAY)
self._initialize_client()
def close(self):
"""Cierra la conexión con Azure Service Bus."""
with self._lock:
try:
if self.client:
try:
self.client.close()
logger.info("Successfully closed Service Bus client")
except Exception as e:
logger.warning(f"Error closing client: {str(e)}")
self.client = None
logger.info("Successfully closed all Service Bus connections")
except Exception as e:
logger.error(f"Error during Service Bus cleanup: {str(e)}")
raise
__new__(cls, connection_str, logging_enable=False)
special
staticmethod
¶
Crea una instancia única de ServiceBusClientSingleton si no existe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_str |
str |
Cadena de conexión a Azure Service Bus. |
required |
logging_enable |
bool |
Si True, habilita logging del cliente (útil para depurar timeouts/conexión, p. ej. "Failed to initiate the connection due to exception"). |
False |
Source code in centraal_client_flow/connections/service_bus.py
def __new__(cls, connection_str: str, logging_enable: bool = False):
"""Crea una instancia única de ServiceBusClientSingleton si no existe.
Args:
connection_str: Cadena de conexión a Azure Service Bus.
logging_enable: Si True, habilita logging del cliente (útil para depurar
timeouts/conexión, p. ej. "Failed to initiate the connection due to exception").
"""
if cls._instance is None:
cls._instance = super(ServiceBusClientSingleton, cls).__new__(cls)
cls._instance._lock = threading.Lock()
cls._instance.connection_str = connection_str
cls._instance.logging_enable = logging_enable
cls._instance._initialize_client()
return cls._instance
close(self)
¶
Cierra la conexión con Azure Service Bus.
Source code in centraal_client_flow/connections/service_bus.py
def close(self):
"""Cierra la conexión con Azure Service Bus."""
with self._lock:
try:
if self.client:
try:
self.client.close()
logger.info("Successfully closed Service Bus client")
except Exception as e:
logger.warning(f"Error closing client: {str(e)}")
self.client = None
logger.info("Successfully closed all Service Bus connections")
except Exception as e:
logger.error(f"Error during Service Bus cleanup: {str(e)}")
raise
get_sender(self, queue_name)
¶
Obtiene un sender para la cola (retrocompatibilidad).
Cada llamada devuelve un sender nuevo (no cacheado) para evitar sesión AMQP cerrada (issue #32967). Usar como context manager y cerrar después de usar:
1 2 | |
El lock del singleton se mantiene durante todo el bloque with para evitar
condiciones de carrera entre hilos (p. ej. Azure Functions worker).
Source code in centraal_client_flow/connections/service_bus.py
def get_sender(self, queue_name: str):
"""Obtiene un sender para la cola (retrocompatibilidad).
Cada llamada devuelve un sender nuevo (no cacheado) para evitar sesión AMQP
cerrada (issue #32967). Usar como context manager y cerrar después de usar:
with service_bus_client.get_sender(queue_name) as sender:
sender.send_messages(msg)
El lock del singleton se mantiene durante todo el bloque ``with`` para evitar
condiciones de carrera entre hilos (p. ej. Azure Functions worker).
"""
return _LockedQueueSenderContext(self, queue_name)
send_message_to_queue(self, message, session_id, queue_name)
¶
Envía un mensaje a la cola de Service Bus especificada con retry logic.
Obtiene un sender nuevo por operación (no cacheado) para evitar reutilizar una conexión/sesión AMQP cerrada que provoca AttributeError en create_sender_link.
Source code in centraal_client_flow/connections/service_bus.py
def send_message_to_queue(self, message: dict, session_id: str, queue_name: str):
"""Envía un mensaje a la cola de Service Bus especificada con retry logic.
Obtiene un sender nuevo por operación (no cacheado) para evitar reutilizar
una conexión/sesión AMQP cerrada que provoca AttributeError en create_sender_link.
"""
with self._lock:
for attempt in range(self.MAX_RETRIES):
try:
client = self._ensure_client_connection()
with client.get_queue_sender(queue_name) as sender:
msg = ServiceBusMessage(body=json.dumps(message))
msg.session_id = session_id
sender.send_messages(msg)
logger.debug(
f"Successfully sent message to queue {queue_name} with session_id {session_id}")
return
except (ServiceBusError, ServiceBusConnectionError, ServiceRequestError, AttributeError) as e:
if attempt == self.MAX_RETRIES - 1:
logger.error(
f"Failed to send message to queue {queue_name} after {self.MAX_RETRIES} attempts: {str(e)}")
raise e
logger.warning(
f"Attempt {attempt + 1} failed to send message to queue {queue_name}: {str(e)}")
time.sleep(self.RETRY_DELAY)
self._initialize_client()
synchronize(self)
¶
Context manager que toma el mismo lock que envíos y get_sender.
Usar al acceder a client directamente (p. ej. get_topic_sender) para
no competir con otros hilos que usan el singleton.
Source code in centraal_client_flow/connections/service_bus.py
def synchronize(self):
"""Context manager que toma el mismo lock que envíos y ``get_sender``.
Usar al acceder a ``client`` directamente (p. ej. ``get_topic_sender``) para
no competir con otros hilos que usan el singleton.
"""
return self._lock
events
special
¶
Codigo compartido por los submodulos.
EventProcessor (LoggerMixin, ABC)
¶
Clase base abstracta para procesadores de eventos.
Source code in centraal_client_flow/events/__init__.py
class EventProcessor(LoggerMixin, ABC):
"""Clase base abstracta para procesadores de eventos."""
@abstractmethod
def process_event(self, event: BaseModel) -> EventoBase:
"""
Procesa el evento recibido. y retorna el modelo de EventoBase
Parameters:
event: Objeto que corresponde a modelo pydantic.
"""
process_event(self, event)
¶
Procesa el evento recibido. y retorna el modelo de EventoBase
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event |
BaseModel |
Objeto que corresponde a modelo pydantic. |
required |
Source code in centraal_client_flow/events/__init__.py
@abstractmethod
def process_event(self, event: BaseModel) -> EventoBase:
"""
Procesa el evento recibido. y retorna el modelo de EventoBase
Parameters:
event: Objeto que corresponde a modelo pydantic.
"""
PullProcessor (LoggerMixin, ABC)
¶
Clase base abstracta para procesadores de eventos.
Source code in centraal_client_flow/events/__init__.py
class PullProcessor(LoggerMixin, ABC):
"""Clase base abstracta para procesadores de eventos."""
@abstractmethod
def get_data(self) -> List[BaseModel]:
"""
Obtiene la informacion
"""
@abstractmethod
def process_event(self, event_data: BaseModel) -> EventoBase:
"""
Procesa el evento recibido. y retorna el modelo de EventoBase
Parameters:
event_data: Objeto que corresponde a modelo pydantic.
"""
get_data(self)
¶
Obtiene la informacion
Source code in centraal_client_flow/events/__init__.py
@abstractmethod
def get_data(self) -> List[BaseModel]:
"""
Obtiene la informacion
"""
process_event(self, event_data)
¶
Procesa el evento recibido. y retorna el modelo de EventoBase
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_data |
BaseModel |
Objeto que corresponde a modelo pydantic. |
required |
Source code in centraal_client_flow/events/__init__.py
@abstractmethod
def process_event(self, event_data: BaseModel) -> EventoBase:
"""
Procesa el evento recibido. y retorna el modelo de EventoBase
Parameters:
event_data: Objeto que corresponde a modelo pydantic.
"""
processor
¶
Definicion de clase EventProcessor.
EventProcessor (ABC)
¶
Clase para procesar eventos. Esta clase estandariza la manera de procesar eventos basado en pydantic.
La clase sirve coom clase a heredar y debe implementar un metodo: - process_event que general puede recibir cualquier tipo de dato pero debe devolver un evento validado (EventoBase) o una lista de evento validados (List[EventoBase]).
el usuario solo tiene implemenetar ese metodo ya que luego la clase se encarga de: 1. controlar errores de validación, para hacer un logger adecuado 2. enviar el evento a la cola de eventos de manera asincrona(usando batch si es necesario).
Source code in centraal_client_flow/events/processor.py
class EventProcessor(ABC):
"""Clase para procesar eventos.
Esta clase estandariza la manera de procesar eventos basado en pydantic.
La clase sirve coom clase a heredar y debe implementar un metodo:
- process_event que general puede recibir cualquier tipo de dato pero debe devolver un evento validado (EventoBase) o una lista
de evento validados (List[EventoBase]).
el usuario solo tiene implemenetar ese metodo ya que luego la clase se encarga de:
1. controlar errores de validación, para hacer un logger adecuado
2. enviar el evento a la cola de eventos de manera asincrona(usando batch si es necesario).
"""
@abstractmethod
def process_event(self, event: Any) -> Union[EventoBase, List[EventoBase]]:
"""Procesa un evento."""
pass
def handle_event(self, data: Any) -> None:
try:
eventos = self.process_event(data)
if not isinstance(eventos, list):
eventos = [eventos]
self.send_to_queue(eventos)
except ValidationError as ve:
# Aquí manejarías el logging adecuado de los errores de validación
print(f"Error de validación: {ve}")
except Exception as e:
# Manejo de otras excepciones
print(f"Error al procesar el evento: {e}")
def send_to_queue(self, eventos: List[EventoBase]) -> None:
"""Envia un evento a la cola de eventos."""
pass
process_event(self, event)
¶
Procesa un evento.
Source code in centraal_client_flow/events/processor.py
@abstractmethod
def process_event(self, event: Any) -> Union[EventoBase, List[EventoBase]]:
"""Procesa un evento."""
pass
send_to_queue(self, eventos)
¶
Envia un evento a la cola de eventos.
Source code in centraal_client_flow/events/processor.py
def send_to_queue(self, eventos: List[EventoBase]) -> None:
"""Envia un evento a la cola de eventos."""
pass
receiver
¶
Módulo para recibir eventos desde una fuente externa y procesarlos a través de Azure Functions.
EventFunctionBuilder
¶
Clase para construir y registrar funciones de Azure para recibir y procesar eventos.
Esta clase permite construir dinámicamente funciones de Azure que se desencadenan por solicitudes HTTP POST para recibir y procesar eventos utilizando un modelo de evento específico y un procesador de eventos.
Source code in centraal_client_flow/events/receiver.py
class EventFunctionBuilder:
"""
Clase para construir y registrar funciones de Azure para recibir y procesar eventos.
Esta clase permite construir dinámicamente funciones de Azure que se desencadenan
por solicitudes HTTP POST para recibir y procesar eventos utilizando un modelo de evento
específico y un procesador de eventos.
"""
def __init__(
self,
function_name: str,
event_source: str,
queue_name: str,
service_bus_client: IServiceBusClient,
processor: EventProcessor,
event_model: type[BaseModel],
):
"""
Inicializa un constructor de funciones para eventos con los parámetros especificados.
Args:
function_name: Nombre único de la función que se va a registrar.
event_source: Nombre de la fuente del evento.
queue_name: Nombre de la cola de Service Bus donde se enviarán los mensajes.
service_bus_client: Cliente de Service Bus para enviar mensajes.
processor: Procesador de eventos que hereda de EventProcessor.
event_model: Modelo Pydantic para validar y parsear los eventos.
"""
self.function_name = function_name
self.event_source = event_source
self.queue_name = queue_name
self.service_bus_client = service_bus_client
self.processor = processor
self.event_model = event_model
def build_function(self):
"""
Construye la función de Azure para recibir y procesar eventos.
Returns:
Una función que procesa solicitudes HTTP POST, valida los eventos recibidos
y los envía a una cola de Service Bus.
"""
def receive_event(req: HttpRequest) -> HttpResponse:
event_data = req.get_json()
logging.info("validando informacion")
event = self.event_model.model_validate(event_data)
event_validado = self.processor.process_event(event)
data_validada = event_validado.model_dump(mode="json", exclude_none=True)
logging.info("enviando informacion")
self.service_bus_client.send_message_to_queue(
data_validada, str(event_validado.id), self.queue_name
)
return HttpResponse(
f"Evento de {self.event_source} es procesado.", status_code=200
)
return receive_event
def register_function(self, bp: Blueprint):
"""
Registra la función construida en el Blueprint proporcionado.
Args:
bp: Blueprint de Azure Functions donde se registrará la función.
Returns:
El Blueprint actualizado con la función registrada.
"""
receive_event = self.build_function()
bp.function_name(name=self.function_name)(
bp.route(methods=["POST"])(receive_event)
)
return bp
__init__(self, function_name, event_source, queue_name, service_bus_client, processor, event_model)
special
¶
Inicializa un constructor de funciones para eventos con los parámetros especificados.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
function_name |
str |
Nombre único de la función que se va a registrar. |
required |
event_source |
str |
Nombre de la fuente del evento. |
required |
queue_name |
str |
Nombre de la cola de Service Bus donde se enviarán los mensajes. |
required |
service_bus_client |
IServiceBusClient |
Cliente de Service Bus para enviar mensajes. |
required |
processor |
EventProcessor |
Procesador de eventos que hereda de EventProcessor. |
required |
event_model |
type[pydantic.main.BaseModel] |
Modelo Pydantic para validar y parsear los eventos. |
required |
Source code in centraal_client_flow/events/receiver.py
def __init__(
self,
function_name: str,
event_source: str,
queue_name: str,
service_bus_client: IServiceBusClient,
processor: EventProcessor,
event_model: type[BaseModel],
):
"""
Inicializa un constructor de funciones para eventos con los parámetros especificados.
Args:
function_name: Nombre único de la función que se va a registrar.
event_source: Nombre de la fuente del evento.
queue_name: Nombre de la cola de Service Bus donde se enviarán los mensajes.
service_bus_client: Cliente de Service Bus para enviar mensajes.
processor: Procesador de eventos que hereda de EventProcessor.
event_model: Modelo Pydantic para validar y parsear los eventos.
"""
self.function_name = function_name
self.event_source = event_source
self.queue_name = queue_name
self.service_bus_client = service_bus_client
self.processor = processor
self.event_model = event_model
build_function(self)
¶
Construye la función de Azure para recibir y procesar eventos.
Returns:
| Type | Description |
|---|---|
Una función que procesa solicitudes HTTP POST, valida los eventos recibidos y los envía a una cola de Service Bus. |
Source code in centraal_client_flow/events/receiver.py
def build_function(self):
"""
Construye la función de Azure para recibir y procesar eventos.
Returns:
Una función que procesa solicitudes HTTP POST, valida los eventos recibidos
y los envía a una cola de Service Bus.
"""
def receive_event(req: HttpRequest) -> HttpResponse:
event_data = req.get_json()
logging.info("validando informacion")
event = self.event_model.model_validate(event_data)
event_validado = self.processor.process_event(event)
data_validada = event_validado.model_dump(mode="json", exclude_none=True)
logging.info("enviando informacion")
self.service_bus_client.send_message_to_queue(
data_validada, str(event_validado.id), self.queue_name
)
return HttpResponse(
f"Evento de {self.event_source} es procesado.", status_code=200
)
return receive_event
register_function(self, bp)
¶
Registra la función construida en el Blueprint proporcionado.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bp |
Blueprint |
Blueprint de Azure Functions donde se registrará la función. |
required |
Returns:
| Type | Description |
|---|---|
El Blueprint actualizado con la función registrada. |
Source code in centraal_client_flow/events/receiver.py
def register_function(self, bp: Blueprint):
"""
Registra la función construida en el Blueprint proporcionado.
Args:
bp: Blueprint de Azure Functions donde se registrará la función.
Returns:
El Blueprint actualizado con la función registrada.
"""
receive_event = self.build_function()
bp.function_name(name=self.function_name)(
bp.route(methods=["POST"])(receive_event)
)
return bp
Recieve
¶
Clase para manejar la recepción y procesamiento de eventos desde una fuente específica.
Esta clase define y registra reglas de procesamiento de eventos utilizando una fuente de eventos, un modelo de evento, y un procesador de eventos.
Source code in centraal_client_flow/events/receiver.py
class Recieve:
"""
Clase para manejar la recepción y procesamiento de eventos desde una fuente específica.
Esta clase define y registra reglas de procesamiento de eventos utilizando una fuente de eventos,
un modelo de evento, y un procesador de eventos.
"""
def __init__(
self,
event_source: str,
queue_name: str,
service_bus_client: IServiceBusClient,
):
"""
Inicializa una instancia de Receiver.
Args:
event_source: Nombre de la fuente del evento.
queue_name: Nombre de la cola de Service Bus donde se enviarán los mensajes.
service_bus_client: Cliente de Service Bus para enviar mensajes.
"""
self.function_name = f"{event_source.lower()}_receive_event"
self.event_source = event_source
self.queue_name = queue_name
self.service_bus_client = service_bus_client
def register_function(
self,
bp: Blueprint,
processor: EventProcessor,
event_model: type[BaseModel],
) -> None:
"""
Crea y registra una función para recibir y procesar eventos en el Blueprint proporcionado.
Args:
bp: Blueprint de Azure Functions donde se registrará la función.
processor: Instancia de una clase que hereda de EventProcessor.
event_model: Modelo Pydantic para validar y parsear el evento.
Returns:
El Blueprint actualizado con la función de procesamiento de eventos registrada.
"""
builder = EventFunctionBuilder(
function_name=self.function_name,
event_source=self.event_source,
queue_name=self.queue_name,
service_bus_client=self.service_bus_client,
processor=processor,
event_model=event_model,
)
builder.register_function(bp)
__init__(self, event_source, queue_name, service_bus_client)
special
¶
Inicializa una instancia de Receiver.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_source |
str |
Nombre de la fuente del evento. |
required |
queue_name |
str |
Nombre de la cola de Service Bus donde se enviarán los mensajes. |
required |
service_bus_client |
IServiceBusClient |
Cliente de Service Bus para enviar mensajes. |
required |
Source code in centraal_client_flow/events/receiver.py
def __init__(
self,
event_source: str,
queue_name: str,
service_bus_client: IServiceBusClient,
):
"""
Inicializa una instancia de Receiver.
Args:
event_source: Nombre de la fuente del evento.
queue_name: Nombre de la cola de Service Bus donde se enviarán los mensajes.
service_bus_client: Cliente de Service Bus para enviar mensajes.
"""
self.function_name = f"{event_source.lower()}_receive_event"
self.event_source = event_source
self.queue_name = queue_name
self.service_bus_client = service_bus_client
register_function(self, bp, processor, event_model)
¶
Crea y registra una función para recibir y procesar eventos en el Blueprint proporcionado.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bp |
Blueprint |
Blueprint de Azure Functions donde se registrará la función. |
required |
processor |
EventProcessor |
Instancia de una clase que hereda de EventProcessor. |
required |
event_model |
type[pydantic.main.BaseModel] |
Modelo Pydantic para validar y parsear el evento. |
required |
Returns:
| Type | Description |
|---|---|
None |
El Blueprint actualizado con la función de procesamiento de eventos registrada. |
Source code in centraal_client_flow/events/receiver.py
def register_function(
self,
bp: Blueprint,
processor: EventProcessor,
event_model: type[BaseModel],
) -> None:
"""
Crea y registra una función para recibir y procesar eventos en el Blueprint proporcionado.
Args:
bp: Blueprint de Azure Functions donde se registrará la función.
processor: Instancia de una clase que hereda de EventProcessor.
event_model: Modelo Pydantic para validar y parsear el evento.
Returns:
El Blueprint actualizado con la función de procesamiento de eventos registrada.
"""
builder = EventFunctionBuilder(
function_name=self.function_name,
event_source=self.event_source,
queue_name=self.queue_name,
service_bus_client=self.service_bus_client,
processor=processor,
event_model=event_model,
)
builder.register_function(bp)
timer
¶
Módulo para recibir eventos desde una fuente externa y procesarlos a través de Azure Functions.
Pull
¶
Clase para manejar la ejecución de tareas programadas y el envío de datos a Service Bus.
Esta clase define y registra reglas de procesamiento de tareas utilizando un temporizador, un procesador de eventos y un cliente de Service Bus.
Source code in centraal_client_flow/events/timer.py
class Pull:
"""
Clase para manejar la ejecución de tareas programadas y el envío de datos a Service Bus.
Esta clase define y registra reglas de procesamiento de tareas utilizando un temporizador,
un procesador de eventos y un cliente de Service Bus.
"""
def __init__(
self,
schedule: str,
event_source: str,
queue_name: str,
service_bus_client: IServiceBusClient,
):
"""
Inicializa una instancia de Pull.
Args:
schedule: Cadena que define el horario del temporizador (CRON).
event_source: Nombre de la fuente del evento.
queue_name: Nombre de la cola de Service Bus donde se enviarán los mensajes.
service_bus_client: Cliente de Service Bus para enviar mensajes.
"""
self.function_name = f"{event_source.lower()}_scheduled_event"
self.schedule = schedule
self.event_source = event_source
self.queue_name = queue_name
self.service_bus_client = service_bus_client
def register_function(
self,
bp: Blueprint,
processor: PullProcessor,
) -> None:
"""
Crea y registra una función programada para ejecutar tareas periódicamente en el Blueprint proporcionado.
Args:
bp: Blueprint de Azure Functions donde se registrará la función.
processor: Instancia de una clase que hereda de PullProcessor.
Returns:
El Blueprint actualizado con la función programada registrada.
"""
builder = TimerFunctionBuilder(
function_name=self.function_name,
schedule=self.schedule,
event_source=self.event_source,
queue_name=self.queue_name,
service_bus_client=self.service_bus_client,
processor=processor,
)
builder.register_function(bp)
__init__(self, schedule, event_source, queue_name, service_bus_client)
special
¶
Inicializa una instancia de Pull.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schedule |
str |
Cadena que define el horario del temporizador (CRON). |
required |
event_source |
str |
Nombre de la fuente del evento. |
required |
queue_name |
str |
Nombre de la cola de Service Bus donde se enviarán los mensajes. |
required |
service_bus_client |
IServiceBusClient |
Cliente de Service Bus para enviar mensajes. |
required |
Source code in centraal_client_flow/events/timer.py
def __init__(
self,
schedule: str,
event_source: str,
queue_name: str,
service_bus_client: IServiceBusClient,
):
"""
Inicializa una instancia de Pull.
Args:
schedule: Cadena que define el horario del temporizador (CRON).
event_source: Nombre de la fuente del evento.
queue_name: Nombre de la cola de Service Bus donde se enviarán los mensajes.
service_bus_client: Cliente de Service Bus para enviar mensajes.
"""
self.function_name = f"{event_source.lower()}_scheduled_event"
self.schedule = schedule
self.event_source = event_source
self.queue_name = queue_name
self.service_bus_client = service_bus_client
register_function(self, bp, processor)
¶
Crea y registra una función programada para ejecutar tareas periódicamente en el Blueprint proporcionado.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bp |
Blueprint |
Blueprint de Azure Functions donde se registrará la función. |
required |
processor |
PullProcessor |
Instancia de una clase que hereda de PullProcessor. |
required |
Returns:
| Type | Description |
|---|---|
None |
El Blueprint actualizado con la función programada registrada. |
Source code in centraal_client_flow/events/timer.py
def register_function(
self,
bp: Blueprint,
processor: PullProcessor,
) -> None:
"""
Crea y registra una función programada para ejecutar tareas periódicamente en el Blueprint proporcionado.
Args:
bp: Blueprint de Azure Functions donde se registrará la función.
processor: Instancia de una clase que hereda de PullProcessor.
Returns:
El Blueprint actualizado con la función programada registrada.
"""
builder = TimerFunctionBuilder(
function_name=self.function_name,
schedule=self.schedule,
event_source=self.event_source,
queue_name=self.queue_name,
service_bus_client=self.service_bus_client,
processor=processor,
)
builder.register_function(bp)
TimerFunctionBuilder
¶
Clase para construir y registrar funciones de Azure programadas.
Esta clase permite construir dinámicamente funciones de Azure que se desencadenan por un temporizador, procesan eventos y los envían a un Service Bus.
Source code in centraal_client_flow/events/timer.py
class TimerFunctionBuilder:
"""
Clase para construir y registrar funciones de Azure programadas.
Esta clase permite construir dinámicamente funciones de Azure que se desencadenan
por un temporizador, procesan eventos y los envían a un Service Bus.
"""
def __init__(
self,
function_name: str,
schedule: str,
event_source: str,
queue_name: str,
service_bus_client: IServiceBusClient,
processor: PullProcessor,
):
"""
Inicializa un constructor de funciones programadas con los parámetros especificados.
Args:
function_name: Nombre único de la función que se va a registrar.
schedule: Cadena que define el horario del temporizador (CRON).
event_source: Nombre de la fuente del evento.
queue_name: Nombre de la cola de Service Bus donde se enviarán los mensajes.
service_bus_client: Cliente de Service Bus para enviar mensajes.
processor: Procesador de eventos que hereda de PullProcessor.
"""
self.function_name = function_name
self.schedule = schedule
self.event_source = event_source
self.queue_name = queue_name
self.service_bus_client = service_bus_client
self.processor = processor
def build_function(self):
"""
Construye la función de Azure programada para ejecutar tareas periódicamente.
Returns:
Una función que se ejecuta en base a un temporizador, procesa datos y los envía a una cola de Service Bus.
"""
def timer_function(mytimer: TimerRequest):
if mytimer.past_due:
logging.info("The timer is past due!")
event_data = self.processor.get_data()
for event_in_data in event_data:
try:
event_validado = self.processor.process_event(event_in_data)
data_validada = event_validado.model_dump(
mode="json", exclude_none=True
)
self.service_bus_client.send_message_to_queue(
data_validada, str(event_validado.id), self.queue_name
)
except ValidationError as e:
logging.error("Error en %s, excepción:\n%s", event_in_data, e)
return timer_function
def register_function(self, bp: Blueprint):
"""
Registra la función construida en el Blueprint proporcionado.
Args:
bp: Blueprint de Azure Functions donde se registrará la función.
Returns:
El Blueprint actualizado con la función registrada.
"""
timer_function = self.build_function()
bp.function_name(name=self.function_name)(
bp.schedule(
schedule=self.schedule, arg_name="mytimer", run_on_startup=False
)(timer_function)
)
return bp
__init__(self, function_name, schedule, event_source, queue_name, service_bus_client, processor)
special
¶
Inicializa un constructor de funciones programadas con los parámetros especificados.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
function_name |
str |
Nombre único de la función que se va a registrar. |
required |
schedule |
str |
Cadena que define el horario del temporizador (CRON). |
required |
event_source |
str |
Nombre de la fuente del evento. |
required |
queue_name |
str |
Nombre de la cola de Service Bus donde se enviarán los mensajes. |
required |
service_bus_client |
IServiceBusClient |
Cliente de Service Bus para enviar mensajes. |
required |
processor |
PullProcessor |
Procesador de eventos que hereda de PullProcessor. |
required |
Source code in centraal_client_flow/events/timer.py
def __init__(
self,
function_name: str,
schedule: str,
event_source: str,
queue_name: str,
service_bus_client: IServiceBusClient,
processor: PullProcessor,
):
"""
Inicializa un constructor de funciones programadas con los parámetros especificados.
Args:
function_name: Nombre único de la función que se va a registrar.
schedule: Cadena que define el horario del temporizador (CRON).
event_source: Nombre de la fuente del evento.
queue_name: Nombre de la cola de Service Bus donde se enviarán los mensajes.
service_bus_client: Cliente de Service Bus para enviar mensajes.
processor: Procesador de eventos que hereda de PullProcessor.
"""
self.function_name = function_name
self.schedule = schedule
self.event_source = event_source
self.queue_name = queue_name
self.service_bus_client = service_bus_client
self.processor = processor
build_function(self)
¶
Construye la función de Azure programada para ejecutar tareas periódicamente.
Returns:
| Type | Description |
|---|---|
Una función que se ejecuta en base a un temporizador, procesa datos y los envía a una cola de Service Bus. |
Source code in centraal_client_flow/events/timer.py
def build_function(self):
"""
Construye la función de Azure programada para ejecutar tareas periódicamente.
Returns:
Una función que se ejecuta en base a un temporizador, procesa datos y los envía a una cola de Service Bus.
"""
def timer_function(mytimer: TimerRequest):
if mytimer.past_due:
logging.info("The timer is past due!")
event_data = self.processor.get_data()
for event_in_data in event_data:
try:
event_validado = self.processor.process_event(event_in_data)
data_validada = event_validado.model_dump(
mode="json", exclude_none=True
)
self.service_bus_client.send_message_to_queue(
data_validada, str(event_validado.id), self.queue_name
)
except ValidationError as e:
logging.error("Error en %s, excepción:\n%s", event_in_data, e)
return timer_function
register_function(self, bp)
¶
Registra la función construida en el Blueprint proporcionado.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bp |
Blueprint |
Blueprint de Azure Functions donde se registrará la función. |
required |
Returns:
| Type | Description |
|---|---|
El Blueprint actualizado con la función registrada. |
Source code in centraal_client_flow/events/timer.py
def register_function(self, bp: Blueprint):
"""
Registra la función construida en el Blueprint proporcionado.
Args:
bp: Blueprint de Azure Functions donde se registrará la función.
Returns:
El Blueprint actualizado con la función registrada.
"""
timer_function = self.build_function()
bp.function_name(name=self.function_name)(
bp.schedule(
schedule=self.schedule, arg_name="mytimer", run_on_startup=False
)(timer_function)
)
return bp
helpers
special
¶
cosmos
¶
Helpers para interactuar con cosmos.
save_model_to_cosmos(cosmos_db, container_name, model_instance)
¶
Saves a Pydantic model instance to the specified Cosmos DB container.
Source code in centraal_client_flow/helpers/cosmos.py
def save_model_to_cosmos(
cosmos_db: CosmosDBSingleton, container_name: str, model_instance: Type[BaseModel]
) -> dict:
"""Saves a Pydantic model instance to the specified Cosmos DB container."""
container_client = cosmos_db.get_container_client(container_name)
item_written = write_model_to_cosmos(container_client, model_instance)
return item_written
write_model_to_cosmos(container_client, model_instance)
¶
Writes a Pydantic model instance to the specified Cosmos DB container.
Source code in centraal_client_flow/helpers/cosmos.py
def write_model_to_cosmos(
container_client: ContainerProxy, model_instance: Type[BaseModel]
) -> dict:
"""Writes a Pydantic model instance to the specified Cosmos DB container."""
return container_client.upsert_item(
body=model_instance.model_dump(mode="json", exclude_none=True)
)
logger
¶
Utilidades para logger.
LoggerMixin
¶
Clase base abstracta para proveer funcionalidad de logging.
Source code in centraal_client_flow/helpers/logger.py
class LoggerMixin:
"""Clase base abstracta para proveer funcionalidad de logging."""
def __init__(self, logger: Optional[logging.Logger] = None):
"""
Inicializa la clase base con un logger opcional.
Parameters:
logger: Instancia de logging.Logger para registrar eventos.
"""
self.logger = logger or logging.getLogger(self.__class__.__name__)
__init__(self, logger=None)
special
¶
Inicializa la clase base con un logger opcional.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
logger |
Optional[logging.Logger] |
Instancia de logging.Logger para registrar eventos. |
None |
Source code in centraal_client_flow/helpers/logger.py
def __init__(self, logger: Optional[logging.Logger] = None):
"""
Inicializa la clase base con un logger opcional.
Parameters:
logger: Instancia de logging.Logger para registrar eventos.
"""
self.logger = logger or logging.getLogger(self.__class__.__name__)
pydantic
¶
Helpers relacionados con pydantic.
built_valid_json_str_with_aditional_info(error_message, additional_info)
¶
Construye una cadena JSON válida con información adicional.
Source code in centraal_client_flow/helpers/pydantic.py
def built_valid_json_str_with_aditional_info(
error_message: str, additional_info: str
) -> str:
"""
Construye una cadena JSON válida con información adicional.
"""
valid_dict = {"error_validacion": error_message}
if additional_info:
valid_dict["error_validacion_detalle"] = additional_info
return json.dumps(valid_dict, ensure_ascii=False)
serialize_validation_errors(errors)
¶
Serializa los errores de validación en una cadena JSON adecuada para Cosmos DB.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
errors |
list[pydantic_core.ErrorDetails] |
Lista de diccionarios que describen los errores de validación. |
required |
Returns:
| Type | Description |
|---|---|
str |
Cadena JSON que representa los errores de validación. |
Source code in centraal_client_flow/helpers/pydantic.py
def serialize_validation_errors(errors: list[ErrorDetails]) -> str:
"""
Serializa los errores de validación en una cadena JSON adecuada para Cosmos DB.
Args:
errors: Lista de diccionarios que describen los errores de validación.
Returns:
Cadena JSON que representa los errores de validación.
"""
return json.dumps(errors, default=_custom_serializer, ensure_ascii=False)
rules
special
¶
Comun para rules.
NoHayReglas (Exception)
¶
Excepción personalizada cuando no existen reglas.
Source code in centraal_client_flow/rules/__init__.py
class NoHayReglas(Exception):
"""Excepción personalizada cuando no existen reglas."""
def __init__(self, mensaje: str):
super().__init__(mensaje)
self.mensaje = mensaje
integration
special
¶
processor
¶
Reglas de integración.
IntegrationRule
¶
Clase para definir y registrar reglas de integración basadas en topics de Service Bus.
Esta clase encapsula la lógica para definir una regla de integración utilizando un topic, una suscripción y una estrategia de integración específica.
Source code in centraal_client_flow/rules/integration/processor.py
class IntegrationRule:
"""
Clase para definir y registrar reglas de integración basadas en topics de Service Bus.
Esta clase encapsula la lógica para definir una regla de integración utilizando un topic,
una suscripción y una estrategia de integración específica.
"""
def __init__(
self,
topic_name: str,
connection_str: str,
subscription_name: str,
integration_strategy: IntegrationStrategy,
model_unficado: type[EntradaEsquemaUnificado],
):
"""
Inicializa una regla de integración con los parámetros especificados.
Args:
topic_name: Nombre del topic de Service Bus que se utilizará para la integración.
connection_str: Cadena de conexión para el Service Bus.
subscription_name: Nombre de la suscripción en el topic de Service Bus.
integration_strategy: Estrategia de integración a aplicar en los mensajes procesados.
model_unficado: Modelo de esquema unificado para validar y mapear los mensajes recibidos.
"""
if integration_strategy.name is not None:
self.function_name = (
f"{integration_strategy.name.lower()}_{topic_name}_intrule"
)
else:
self.function_name = f"{topic_name}_intrule"
self.topic_name = topic_name
self.connection_str = connection_str
self.subscription_name = subscription_name
self.integration_strategy = integration_strategy
self.model_unficado = model_unficado
self.id_esquema: Optional[IDModel] = None
def run(
self, message: ServiceBusMessage | dict, logger: logging.Logger
) -> Optional[StrategyResult]:
"""Ejecuta la regla de integración."""
if isinstance(message, ServiceBusMessage):
message = json.loads(message.message)
try:
message_esquema = self.model_unficado.model_validate(message)
self.id_esquema = message_esquema.id
output_model = self.integration_strategy.modelo_unificado_mapping(
message_esquema
)
except ValidationError as e:
error_val_cosmos_friendly = serialize_validation_errors(e.errors())
logger.error(
"Error antes de integración en validación %s",
error_val_cosmos_friendly,
exc_info=True,
)
return StrategyResult(
success=False,
response={"error_validacion": error_val_cosmos_friendly},
bodysent={"error_validacion": True},
)
return self.integration_strategy.integrate(output_model)
def register_log(
self,
result: StrategyResult,
cosmos_client: CosmosDBSingleton,
container_name: str,
):
container = cosmos_client.get_container_client(container_name)
if self.id_esquema is not None:
entry = AuditoriaEntryIntegracion(
id=self.id_esquema,
regla=self.function_name,
contenido=result.bodysent,
sucess=result.success,
response=result.response,
)
item_written = container.upsert_item(
entry.model_dump(mode="json", exclude_none=True),
)
return item_written
raise ValueError("No es posible usar registro del log.")
__init__(self, topic_name, connection_str, subscription_name, integration_strategy, model_unficado)
special
¶
Inicializa una regla de integración con los parámetros especificados.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name |
str |
Nombre del topic de Service Bus que se utilizará para la integración. |
required |
connection_str |
str |
Cadena de conexión para el Service Bus. |
required |
subscription_name |
str |
Nombre de la suscripción en el topic de Service Bus. |
required |
integration_strategy |
IntegrationStrategy |
Estrategia de integración a aplicar en los mensajes procesados. |
required |
model_unficado |
type[centraal_client_flow.models.schemas.EntradaEsquemaUnificado] |
Modelo de esquema unificado para validar y mapear los mensajes recibidos. |
required |
Source code in centraal_client_flow/rules/integration/processor.py
def __init__(
self,
topic_name: str,
connection_str: str,
subscription_name: str,
integration_strategy: IntegrationStrategy,
model_unficado: type[EntradaEsquemaUnificado],
):
"""
Inicializa una regla de integración con los parámetros especificados.
Args:
topic_name: Nombre del topic de Service Bus que se utilizará para la integración.
connection_str: Cadena de conexión para el Service Bus.
subscription_name: Nombre de la suscripción en el topic de Service Bus.
integration_strategy: Estrategia de integración a aplicar en los mensajes procesados.
model_unficado: Modelo de esquema unificado para validar y mapear los mensajes recibidos.
"""
if integration_strategy.name is not None:
self.function_name = (
f"{integration_strategy.name.lower()}_{topic_name}_intrule"
)
else:
self.function_name = f"{topic_name}_intrule"
self.topic_name = topic_name
self.connection_str = connection_str
self.subscription_name = subscription_name
self.integration_strategy = integration_strategy
self.model_unficado = model_unficado
self.id_esquema: Optional[IDModel] = None
run(self, message, logger)
¶
Ejecuta la regla de integración.
Source code in centraal_client_flow/rules/integration/processor.py
def run(
self, message: ServiceBusMessage | dict, logger: logging.Logger
) -> Optional[StrategyResult]:
"""Ejecuta la regla de integración."""
if isinstance(message, ServiceBusMessage):
message = json.loads(message.message)
try:
message_esquema = self.model_unficado.model_validate(message)
self.id_esquema = message_esquema.id
output_model = self.integration_strategy.modelo_unificado_mapping(
message_esquema
)
except ValidationError as e:
error_val_cosmos_friendly = serialize_validation_errors(e.errors())
logger.error(
"Error antes de integración en validación %s",
error_val_cosmos_friendly,
exc_info=True,
)
return StrategyResult(
success=False,
response={"error_validacion": error_val_cosmos_friendly},
bodysent={"error_validacion": True},
)
return self.integration_strategy.integrate(output_model)
strategy
¶
Estrategias.
IntegrationStrategy (ABC)
¶
Clase Abstracta para definir estrategias de integracion.
Source code in centraal_client_flow/rules/integration/strategy.py
class IntegrationStrategy(ABC):
"""Clase Abstracta para definir estrategias de integracion."""
name: Optional[str] = None
logger: logging.Logger
def __init__(
self, logger: Optional[logging.Logger] = None, name: Optional[str] = None
):
"""
Inicializa la estrategia de integración con un logger opcional.
Parameters:
logger: Instancia opcional de logging.Logger.
"""
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.name = name
@abstractmethod
def modelo_unificado_mapping(
self, message: EntradaEsquemaUnificado
) -> Optional[BaseModel]:
"""Mapea el mensaje de entrada a un modelo unificado de salida.
Args:
message: El mensaje de entrada a ser mapeado.
Returns:
Un modelo Pydantic que representa la salida mapeada.
"""
@abstractmethod
def integrate(self, output_model: Optional[BaseModel]) -> Optional[Any]:
"""Realiza la integración utilizando el modelo de salida.
Args:
output_model: El modelo de datos ya mapeado que se enviará a la integración.
Returns:
La respuesta de la integración, generalmente en formato JSON.
"""
__init__(self, logger=None, name=None)
special
¶
Inicializa la estrategia de integración con un logger opcional.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
logger |
Optional[logging.Logger] |
Instancia opcional de logging.Logger. |
None |
Source code in centraal_client_flow/rules/integration/strategy.py
def __init__(
self, logger: Optional[logging.Logger] = None, name: Optional[str] = None
):
"""
Inicializa la estrategia de integración con un logger opcional.
Parameters:
logger: Instancia opcional de logging.Logger.
"""
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.name = name
integrate(self, output_model)
¶
Realiza la integración utilizando el modelo de salida.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output_model |
Optional[pydantic.main.BaseModel] |
El modelo de datos ya mapeado que se enviará a la integración. |
required |
Returns:
| Type | Description |
|---|---|
Optional[Any] |
La respuesta de la integración, generalmente en formato JSON. |
Source code in centraal_client_flow/rules/integration/strategy.py
@abstractmethod
def integrate(self, output_model: Optional[BaseModel]) -> Optional[Any]:
"""Realiza la integración utilizando el modelo de salida.
Args:
output_model: El modelo de datos ya mapeado que se enviará a la integración.
Returns:
La respuesta de la integración, generalmente en formato JSON.
"""
modelo_unificado_mapping(self, message)
¶
Mapea el mensaje de entrada a un modelo unificado de salida.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message |
EntradaEsquemaUnificado |
El mensaje de entrada a ser mapeado. |
required |
Returns:
| Type | Description |
|---|---|
Optional[pydantic.main.BaseModel] |
Un modelo Pydantic que representa la salida mapeada. |
Source code in centraal_client_flow/rules/integration/strategy.py
@abstractmethod
def modelo_unificado_mapping(
self, message: EntradaEsquemaUnificado
) -> Optional[BaseModel]:
"""Mapea el mensaje de entrada a un modelo unificado de salida.
Args:
message: El mensaje de entrada a ser mapeado.
Returns:
Un modelo Pydantic que representa la salida mapeada.
"""
OAuthConfigPassFlow
dataclass
¶
Configuración necesaria para la autenticación OAuth 2.0 con grant_type=password.
Source code in centraal_client_flow/rules/integration/strategy.py
@dataclass
class OAuthConfigPassFlow:
"""Configuración necesaria para la autenticación OAuth 2.0 con grant_type=password."""
client_id: str
client_secret: str
username: str
password: str
token_resource: str
api_url: str
use_url_params_for_auth: bool = True
OAuthTokenPass
dataclass
¶
Representa el token OAuth obtenido tras la autenticación.
Source code in centraal_client_flow/rules/integration/strategy.py
@dataclass
class OAuthTokenPass:
"""Representa el token OAuth obtenido tras la autenticación."""
access_token: str
instance_url: str
id: str
token_type: str
issued_at: int
signature: str
expires_in: int = 1800
def __post_init__(self):
if isinstance(self.issued_at, str):
self.issued_at = int(self.issued_at)
RESTIntegration (IntegrationStrategy)
¶
Estrategia de integracion basada en REST.
Source code in centraal_client_flow/rules/integration/strategy.py
class RESTIntegration(IntegrationStrategy):
"""Estrategia de integracion basada en REST."""
def __init__(
self,
oauth_config: OAuthConfigPassFlow,
method: str = "POST",
resource: str = "",
mapping_function: Optional[
Callable[[EntradaEsquemaUnificado], Optional[BaseModel]]
] = None,
logger: Optional[logging.Logger] = None,
):
"""Inicializa una instancia de RESTIntegration con la configuración de OAuth y
los parámetros REST.
Args:
oauth_config: Configuración necesaria para autenticarse con OAuth 2.0.
method: El método HTTP que se utilizará para la integración
(por ejemplo, 'POST', 'PATCH').
resource: El recurso específico de la API con el cual se interactuará.
mapping_function: Una función opcional que define cómo mapear un
`EntradaEsquemaUnificado` a un modelo Pydantic.
"""
if mapping_function is not None:
super().__init__(
logger=logger, name=f"{method}_{mapping_function.__name__}"
)
self.oauth_config = oauth_config
self.method = method
self.resource = resource
self.mapping_function = mapping_function
self.response_processor = lambda r, m: StrategyResult(
success=True, response=r, bodysent=m
)
self._token: Optional[OAuthTokenPass] = None
def _authenticate(self) -> OAuthTokenPass:
"""Autentica usando OAuth 2.0 con grant_type=password y obtiene un token de acceso.
Returns:
Un objeto `OAuthTokenPass` que contiene el token de acceso y otra información relevante.
"""
auth_data = {
"grant_type": "password",
"client_id": self.oauth_config.client_id,
"client_secret": self.oauth_config.client_secret,
"username": self.oauth_config.username,
"password": self.oauth_config.password,
}
if self.oauth_config.use_url_params_for_auth:
token_url = f"{self.oauth_config.api_url}/{self.oauth_config.token_resource}?{urlencode(auth_data)}"
response = requests.post(token_url, headers={}, timeout=30)
else:
token_url = (
f"{self.oauth_config.api_url}/{self.oauth_config.token_resource}"
)
response = requests.post(token_url, data=auth_data, headers={}, timeout=30)
response.raise_for_status()
token_data = response.json()
self._token = OAuthTokenPass(**token_data)
return self._token
def _get_token(self) -> Optional[str]:
"""Obtiene el token actual o lo renueva si ha expirado.
Returns:
El token de acceso en formato de cadena.
"""
if self._token is None:
self._authenticate()
if self._token is not None:
current_time = int(time.time())
expiration_time = self._token.issued_at / 1000 + self._token.expires_in
if current_time >= expiration_time:
self.logger.info("El token ha expirado. Renovando...")
self._authenticate()
return self._token.access_token
raise ValueError("error en autenticación")
def modelo_unificado_mapping(
self, message: EntradaEsquemaUnificado
) -> Optional[BaseModel]:
"""Mapea el mensaje de entrada a un modelo unificado de salida utilizando la
función de mapeo proporcionada.
Args:
message: El mensaje de entrada que será mapeado.
Returns:
Un modelo Pydantic que representa la salida mapeada.
Raises:
TypeError: Si el mensaje no es una instancia de `EntradaEsquemaUnificado`.
NotImplementedError: Si no se ha proporcionado una función de mapeo personalizada.
"""
if self.mapping_function:
if not isinstance(message, EntradaEsquemaUnificado):
raise TypeError(
"El mensaje debe ser una instancia de EntradaEsquemaUnificado"
)
return self.mapping_function(message)
raise NotImplementedError(
"No se ha proporcionado una función de mapeo personalizada."
)
def integrate(self, output_model: Optional[BaseModel]) -> Optional[StrategyResult]:
"""Realiza la integración utilizando el modelo de salida mapeado.
Args:
output_model: El modelo de datos ya mapeado que se enviará a la integración.
Returns:
La respuesta defina en response_processor.
Raises:
HTTPError: Si la solicitud HTTP a la API falla.
"""
token = self._get_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
url = f"{self.oauth_config.api_url}/{self.resource}"
if output_model is not None:
response = requests.request(
self.method,
url,
json=output_model.model_dump(mode="json", exclude_none=True),
headers=headers,
timeout=300,
)
response.raise_for_status()
return self.response_processor(response, output_model)
self.logger.info("Evento es ignorado.")
return StrategyResult(
success=True,
response={"evento_ignorado": True},
bodysent={"evento_ignorado": True},
)
def set_response_processor(
self, processor: Callable[[requests.Response, BaseModel], StrategyResult]
):
"""Configura un procesamiento de la respuesta."""
self.response_processor = processor
__init__(self, oauth_config, method='POST', resource='', mapping_function=None, logger=None)
special
¶
Inicializa una instancia de RESTIntegration con la configuración de OAuth y los parámetros REST.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
oauth_config |
OAuthConfigPassFlow |
Configuración necesaria para autenticarse con OAuth 2.0. |
required |
method |
str |
El método HTTP que se utilizará para la integración (por ejemplo, 'POST', 'PATCH'). |
'POST' |
resource |
str |
El recurso específico de la API con el cual se interactuará. |
'' |
mapping_function |
Optional[Callable[[centraal_client_flow.models.schemas.EntradaEsquemaUnificado], Optional[pydantic.main.BaseModel]]] |
Una función opcional que define cómo mapear un
|
None |
Source code in centraal_client_flow/rules/integration/strategy.py
def __init__(
self,
oauth_config: OAuthConfigPassFlow,
method: str = "POST",
resource: str = "",
mapping_function: Optional[
Callable[[EntradaEsquemaUnificado], Optional[BaseModel]]
] = None,
logger: Optional[logging.Logger] = None,
):
"""Inicializa una instancia de RESTIntegration con la configuración de OAuth y
los parámetros REST.
Args:
oauth_config: Configuración necesaria para autenticarse con OAuth 2.0.
method: El método HTTP que se utilizará para la integración
(por ejemplo, 'POST', 'PATCH').
resource: El recurso específico de la API con el cual se interactuará.
mapping_function: Una función opcional que define cómo mapear un
`EntradaEsquemaUnificado` a un modelo Pydantic.
"""
if mapping_function is not None:
super().__init__(
logger=logger, name=f"{method}_{mapping_function.__name__}"
)
self.oauth_config = oauth_config
self.method = method
self.resource = resource
self.mapping_function = mapping_function
self.response_processor = lambda r, m: StrategyResult(
success=True, response=r, bodysent=m
)
self._token: Optional[OAuthTokenPass] = None
integrate(self, output_model)
¶
Realiza la integración utilizando el modelo de salida mapeado.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output_model |
Optional[pydantic.main.BaseModel] |
El modelo de datos ya mapeado que se enviará a la integración. |
required |
Returns:
| Type | Description |
|---|---|
Optional[centraal_client_flow.rules.integration.strategy.StrategyResult] |
La respuesta defina en response_processor. |
Exceptions:
| Type | Description |
|---|---|
HTTPError |
Si la solicitud HTTP a la API falla. |
Source code in centraal_client_flow/rules/integration/strategy.py
def integrate(self, output_model: Optional[BaseModel]) -> Optional[StrategyResult]:
"""Realiza la integración utilizando el modelo de salida mapeado.
Args:
output_model: El modelo de datos ya mapeado que se enviará a la integración.
Returns:
La respuesta defina en response_processor.
Raises:
HTTPError: Si la solicitud HTTP a la API falla.
"""
token = self._get_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
url = f"{self.oauth_config.api_url}/{self.resource}"
if output_model is not None:
response = requests.request(
self.method,
url,
json=output_model.model_dump(mode="json", exclude_none=True),
headers=headers,
timeout=300,
)
response.raise_for_status()
return self.response_processor(response, output_model)
self.logger.info("Evento es ignorado.")
return StrategyResult(
success=True,
response={"evento_ignorado": True},
bodysent={"evento_ignorado": True},
)
modelo_unificado_mapping(self, message)
¶
Mapea el mensaje de entrada a un modelo unificado de salida utilizando la función de mapeo proporcionada.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message |
EntradaEsquemaUnificado |
El mensaje de entrada que será mapeado. |
required |
Returns:
| Type | Description |
|---|---|
Optional[pydantic.main.BaseModel] |
Un modelo Pydantic que representa la salida mapeada. |
Exceptions:
| Type | Description |
|---|---|
TypeError |
Si el mensaje no es una instancia de |
NotImplementedError |
Si no se ha proporcionado una función de mapeo personalizada. |
Source code in centraal_client_flow/rules/integration/strategy.py
def modelo_unificado_mapping(
self, message: EntradaEsquemaUnificado
) -> Optional[BaseModel]:
"""Mapea el mensaje de entrada a un modelo unificado de salida utilizando la
función de mapeo proporcionada.
Args:
message: El mensaje de entrada que será mapeado.
Returns:
Un modelo Pydantic que representa la salida mapeada.
Raises:
TypeError: Si el mensaje no es una instancia de `EntradaEsquemaUnificado`.
NotImplementedError: Si no se ha proporcionado una función de mapeo personalizada.
"""
if self.mapping_function:
if not isinstance(message, EntradaEsquemaUnificado):
raise TypeError(
"El mensaje debe ser una instancia de EntradaEsquemaUnificado"
)
return self.mapping_function(message)
raise NotImplementedError(
"No se ha proporcionado una función de mapeo personalizada."
)
set_response_processor(self, processor)
¶
Configura un procesamiento de la respuesta.
Source code in centraal_client_flow/rules/integration/strategy.py
def set_response_processor(
self, processor: Callable[[requests.Response, BaseModel], StrategyResult]
):
"""Configura un procesamiento de la respuesta."""
self.response_processor = processor
StrategyResult
dataclass
¶
Resultado de Estrategia.
Source code in centraal_client_flow/rules/integration/strategy.py
@dataclass
class StrategyResult:
"""Resultado de Estrategia."""
success: bool
response: dict
bodysent: dict
v2
¶
Implementación de la regla de integración v2.
IntegrationResult
dataclass
¶
Resultado de integración.
Source code in centraal_client_flow/rules/integration/v2.py
@dataclass
class IntegrationResult:
"""Resultado de integración."""
success: bool
response: dict
bodysent: dict
def __post_init__(self):
"""Valida que bodysent no sea un diccionario vacío."""
if not self.bodysent:
raise ValueError("bodysent no puede ser un diccionario vacío")
if not self.response:
raise ValueError("response no puede ser un diccionario vacío")
__post_init__(self)
special
¶
Valida que bodysent no sea un diccionario vacío.
Source code in centraal_client_flow/rules/integration/v2.py
def __post_init__(self):
"""Valida que bodysent no sea un diccionario vacío."""
if not self.bodysent:
raise ValueError("bodysent no puede ser un diccionario vacío")
if not self.response:
raise ValueError("response no puede ser un diccionario vacío")
IntegrationRule (ABC)
¶
Implementación de la regla de integración v2. Esta implementación es una regla tiene el objetivo de simplicar la implementación, se observa que en los casos de uso la estrategias de integración no son valiosas y dan muy poca flexibilidad. Adicional en general las reglas de integración son un concpeto mas directo y claro para representar la logica de transformación del modelo unficaido a lo que necesita el sistema destino, esto ayudara a que el usuario implemente directamente la transformación y operaciones necesarias sin necesidad de definirla en un objeto diferente.
La reglas integración se implementa como una clase abstracta con metodos compartidos, el usuario solo debera
implementar el metodo abstracto integrate, que debe recibir el mensaje del topic, codificarlo mediante el modelo unificado
y hacer la implemetación que requiera (inlcuido mapping o logicas adicionales para realizar la integración),
y el set de body_sent que se enviara a la auditoria de cosmos, par asaber que se envio al sistema destino.
con el unico requisito de devolver un IntegrationResult, que indicara el resultado
de la transformación.
la clase abstracta tendra la implementación de metodos concretos:
run: se encarga de ejecutar integrate y realizar el logging a la auditoria de cosmos.
register_log: se encarga de realizar el logging de la auditoria de cosmos.
Source code in centraal_client_flow/rules/integration/v2.py
class IntegrationRule(ABC):
"""Implementación de la regla de integración v2.
Esta implementación es una regla tiene el objetivo de simplicar la implementación,
se observa que en los casos de uso la estrategias de integración no son valiosas y dan
muy poca flexibilidad. Adicional en general las reglas de integración son un concpeto mas directo y
claro para representar la logica de transformación del modelo unficaido a lo que necesita el sistema
destino, esto ayudara a que el usuario implemente directamente la transformación y operaciones
necesarias sin necesidad de definirla en un objeto diferente.
La reglas integración se implementa como una clase abstracta con metodos compartidos, el usuario solo debera
implementar el metodo abstracto `integrate`, que debe recibir el mensaje del topic, codificarlo mediante el modelo unificado
y hacer la implemetación que requiera (inlcuido mapping o logicas adicionales para realizar la integración),
y el set de body_sent que se enviara a la auditoria de cosmos, par asaber que se envio al sistema destino.
con el unico requisito de devolver un IntegrationResult, que indicara el resultado
de la transformación.
la clase abstracta tendra la implementación de metodos concretos:
run: se encarga de ejecutar `integrate` y realizar el logging a la auditoria de cosmos.
register_log: se encarga de realizar el logging de la auditoria de cosmos.
"""
def __init__(
self,
name: str,
model_unficado: type[EntradaEsquemaUnificado],
logger: logging.Logger,
container_name_aud: str,
):
"""
Inicializa una regla de integración.
Args:
name: Nombre del topic de Service Bus que se utilizará para la integración.
model_unficado: Modelo de esquema unificado para validar y mapear los mensajes recibidos.
container_name_aud: Nombre del contenedor de la auditoria de cosmos.
"""
self.name = name
self.model_unficado = model_unficado
self.logger = logger
self.id_esquema = None
self.container_name_aud = container_name_aud
self.body_sent = {}
@abstractmethod
def integrate(
self, entrada_esquema_unificado: EntradaEsquemaUnificado
) -> Optional[IntegrationResult]:
pass
def _validate_modelo_unificado(
self, message: dict
) -> Union[IntegrationResult, EntradaEsquemaUnificado]:
try:
message_esquema = self.model_unficado.model_validate(message)
self.id_esquema = message_esquema.id
return message_esquema
except ValidationError as e:
error_val_cosmos_friendly = serialize_validation_errors(e.errors())
response = built_valid_json_str_with_aditional_info(
error_val_cosmos_friendly,
f"Mensaje no cumple con el esquema {self.model_unficado.__name__}",
)
self.logger.error(
"Error en validación del modelo unificado %s",
error_val_cosmos_friendly,
exc_info=True,
)
return IntegrationResult(
success=False,
response=response,
bodysent={"error_validacion": True},
)
def run(
self,
message: Union[ServiceBusMessage, dict],
cosmos_client: CosmosDBSingleton,
):
"""Ejecuta la regla de integración."""
if isinstance(message, ServiceBusMessage):
message = json.loads(message.get_body().decode("utf-8"))
message_esquema = self._validate_modelo_unificado(message)
if isinstance(message_esquema, IntegrationResult):
raise ValueError(
f"Error en validación del modelo unificado. Se recibe un mensaje no valido {message_esquema}"
)
try:
self.id_esquema = message_esquema.id
result = self._retry_with_exponential_backoff(
self.integrate, message_esquema
)
if self.body_sent is None:
raise ValueError(
"No se ha definido el body_sent. Integrate debe setear el atributo body_sent."
)
except ValidationError as e:
error_val_cosmos_friendly = serialize_validation_errors(e.errors())
self.logger.error(
"Error de validación en integración %s",
error_val_cosmos_friendly,
exc_info=True,
)
result = IntegrationResult(
success=False,
response={"error_validacion": error_val_cosmos_friendly},
bodysent={"error_validacion": True},
)
except Exception as e:
self.logger.error(
"Error en integración %s",
e,
exc_info=True,
)
raise e
self.register_log(result, cosmos_client)
return result
def register_log(
self,
result: IntegrationResult,
cosmos_client: CosmosDBSingleton,
):
container = cosmos_client.get_container_client(self.container_name_aud)
if self.id_esquema is not None:
entry = AuditoriaEntryIntegracion(
id=self.id_esquema,
regla=self.name,
contenido=result.bodysent,
sucess=result.success,
response=result.response,
)
item_written = container.upsert_item(
entry.model_dump(mode="json", exclude_none=True),
)
return item_written
raise ValueError("No es posible usar registro del log.")
def _retry_with_exponential_backoff(
self, func, *args, max_retries=3, base_delay=1, **kwargs
):
"""Retries a function with exponential backoff."""
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
delay = base_delay * (2**attempt)
self.logger.warning(
"Retrying due to error: %s. Attempt %d/%d. Retrying in %d seconds...",
e,
attempt + 1,
max_retries,
delay,
)
time.sleep(delay)
else:
self.logger.error(
"Max retries reached. Last error: %s", e, exc_info=True
)
raise e
__init__(self, name, model_unficado, logger, container_name_aud)
special
¶
Inicializa una regla de integración.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str |
Nombre del topic de Service Bus que se utilizará para la integración. |
required |
model_unficado |
type[centraal_client_flow.models.schemas.EntradaEsquemaUnificado] |
Modelo de esquema unificado para validar y mapear los mensajes recibidos. |
required |
container_name_aud |
str |
Nombre del contenedor de la auditoria de cosmos. |
required |
Source code in centraal_client_flow/rules/integration/v2.py
def __init__(
self,
name: str,
model_unficado: type[EntradaEsquemaUnificado],
logger: logging.Logger,
container_name_aud: str,
):
"""
Inicializa una regla de integración.
Args:
name: Nombre del topic de Service Bus que se utilizará para la integración.
model_unficado: Modelo de esquema unificado para validar y mapear los mensajes recibidos.
container_name_aud: Nombre del contenedor de la auditoria de cosmos.
"""
self.name = name
self.model_unficado = model_unficado
self.logger = logger
self.id_esquema = None
self.container_name_aud = container_name_aud
self.body_sent = {}
run(self, message, cosmos_client)
¶
Ejecuta la regla de integración.
Source code in centraal_client_flow/rules/integration/v2.py
def run(
self,
message: Union[ServiceBusMessage, dict],
cosmos_client: CosmosDBSingleton,
):
"""Ejecuta la regla de integración."""
if isinstance(message, ServiceBusMessage):
message = json.loads(message.get_body().decode("utf-8"))
message_esquema = self._validate_modelo_unificado(message)
if isinstance(message_esquema, IntegrationResult):
raise ValueError(
f"Error en validación del modelo unificado. Se recibe un mensaje no valido {message_esquema}"
)
try:
self.id_esquema = message_esquema.id
result = self._retry_with_exponential_backoff(
self.integrate, message_esquema
)
if self.body_sent is None:
raise ValueError(
"No se ha definido el body_sent. Integrate debe setear el atributo body_sent."
)
except ValidationError as e:
error_val_cosmos_friendly = serialize_validation_errors(e.errors())
self.logger.error(
"Error de validación en integración %s",
error_val_cosmos_friendly,
exc_info=True,
)
result = IntegrationResult(
success=False,
response={"error_validacion": error_val_cosmos_friendly},
bodysent={"error_validacion": True},
)
except Exception as e:
self.logger.error(
"Error en integración %s",
e,
exc_info=True,
)
raise e
self.register_log(result, cosmos_client)
return result
update
¶
Módulo para las reglas de actualización.
Rule
dataclass
¶
Representa una regla de procesamiento que asocia un modelo Pydantic con un procesador y los tópicos relevantes.
Attributes:
| Name | Type | Description |
|---|---|---|
model |
Type[centraal_client_flow.models.schemas.EventoBase] |
El tipo de modelo Pydantic que la regla procesa. |
processor |
UpdateProcessor |
El procesador que manejará la lógica de actualización. |
topics |
Set[str] |
Los tópicos a los que la regla está asociada. |
name |
str |
El nombre asignado a la regla basado en el nombre de la clase del modelo. |
Source code in centraal_client_flow/rules/update.py
@dataclass
class Rule:
"""
Representa una regla de procesamiento que asocia un modelo Pydantic con un procesador
y los tópicos relevantes.
Attributes:
model: El tipo de modelo Pydantic que la regla procesa.
processor: El procesador que manejará la lógica de actualización.
topics: Los tópicos a los que la regla está asociada.
name: El nombre asignado a la regla basado en el nombre de la clase del modelo.
"""
model: Type[EventoBase]
processor: UpdateProcessor
topics: Set[str]
name: str = ""
def __post_init__(self):
"""Inicializa el nombre de la regla basado en el nombre de la clase del modelo."""
self.name = self.model.__name__
def process_rule(
self, data: EventoBase, current: Optional[EntradaEsquemaUnificado]
) -> EntradaEsquemaUnificado:
"""
Procesa una entrada de datos usando la regla definida.
Parameters:
data: El evento que se procesará.
current: El registro actual a ser actualizado.
Returns:
EntradaEsquemaUnificado: El registro actualizado.
"""
data_copy = data.model_copy(deep=True)
current_copy = current.model_copy(deep=True) if current else None
result = self.processor.process_message(data_copy, current_copy)
return result
__post_init__(self)
special
¶
Inicializa el nombre de la regla basado en el nombre de la clase del modelo.
Source code in centraal_client_flow/rules/update.py
def __post_init__(self):
"""Inicializa el nombre de la regla basado en el nombre de la clase del modelo."""
self.name = self.model.__name__
process_rule(self, data, current)
¶
Procesa una entrada de datos usando la regla definida.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
EventoBase |
El evento que se procesará. |
required |
current |
Optional[centraal_client_flow.models.schemas.EntradaEsquemaUnificado] |
El registro actual a ser actualizado. |
required |
Returns:
| Type | Description |
|---|---|
EntradaEsquemaUnificado |
El registro actualizado. |
Source code in centraal_client_flow/rules/update.py
def process_rule(
self, data: EventoBase, current: Optional[EntradaEsquemaUnificado]
) -> EntradaEsquemaUnificado:
"""
Procesa una entrada de datos usando la regla definida.
Parameters:
data: El evento que se procesará.
current: El registro actual a ser actualizado.
Returns:
EntradaEsquemaUnificado: El registro actualizado.
"""
data_copy = data.model_copy(deep=True)
current_copy = current.model_copy(deep=True) if current else None
result = self.processor.process_message(data_copy, current_copy)
return result
RuleProcessor
¶
Clase que orquesta el procesamiento de reglas y la interacción con Service Bus y Cosmos DB.
Source code in centraal_client_flow/rules/update.py
class RuleProcessor:
"""Clase que orquesta el procesamiento de reglas y la interacción con Service Bus y Cosmos DB."""
def __init__(
self,
queue_name: str,
unified_container_name: str,
auditoria_container_name: str,
service_bus_client: IServiceBusClient,
cosmos_client: CosmosDBSingleton,
rule_selector: RuleSelector,
):
self.queue_name = queue_name
self.unified_container_name = unified_container_name
self.auditoria_container_name = auditoria_container_name
self.service_bus_client = service_bus_client
self.cosmos_client = cosmos_client
self.rule_selector = rule_selector
def save_unified_model(
self,
new_data: EntradaEsquemaUnificado,
) -> dict:
"""
Guarda el modelo de EntradaEsquemaUnificado actualizado en Cosmos DB.
Parameters:
new_data: El modelo actualizado de EntradaEsquemaUnificado.
Returns:
EntradaEsquemaUnificado: El modelo almacenado en la base de datos.
"""
container = self.cosmos_client.get_container_client(self.unified_container_name)
item_written = container.upsert_item(
new_data.model_dump(mode="json", exclude_none=True)
)
return item_written
def record_auditoria(self, changes: List[AuditoriaEntry]):
"""
Registra los cambios detectados en el contenedor de auditoría de Cosmos DB.
Parameters:
changes: Lista de entradas de auditoría que contienen los cambios detectados.
"""
container = self.cosmos_client.get_container_client(
self.auditoria_container_name
)
for change in changes:
container.create_item(
change.model_dump(mode="json", exclude_none=True),
enable_automatic_id_generation=True,
)
def get_current_entrada(
self, id_entrada: IDModel, model_unificado: EntradaEsquemaUnificado
) -> Optional[EntradaEsquemaUnificado]:
"""
Recupera el registro actual desde Cosmos DB basado en el ID proporcionado.
Parameters:
id_entrada: El ID del registro que se desea recuperar.
Returns:
Optional[EntradaEsquemaUnificado]: El registro actual, si existe.
"""
container = self.cosmos_client.get_container_client(self.unified_container_name)
query = f"SELECT * FROM c WHERE c.id = '{id_entrada.model_dump()}'"
current_items = list(
container.query_items(query, enable_cross_partition_query=True)
)
if current_items:
return model_unificado.model_validate(current_items[0])
return None
def detect_changes(
self,
current_data: Optional[EntradaEsquemaUnificado],
updated_data: EntradaEsquemaUnificado,
id_model: IDModel,
regla_name: str,
) -> List[AuditoriaEntry]:
"""
Detecta cambios entre los datos actuales y los actualizados en el modelo EntradaEsquemaUnificado.
Parameters:
current_data: Los datos actuales en la base de datos.
updated_data: Los datos actualizados a comparar.
id_model: El ID del modelo que se está procesando.
Returns:
List[AuditoriaEntry]: Lista de entradas de auditoría que reflejan los cambios detectados.
"""
changes = []
def _log_changes(
subesquema_name: str, old_value: Any, new_value: Any, field_name: str
):
"""Función auxiliar para registrar cambios detectados."""
changes.append(
AuditoriaEntry(
id_entrada=id_model,
subesquema=subesquema_name,
campo=field_name,
new_value=new_value,
old_value=old_value,
regla=regla_name,
)
)
if current_data is None:
# No hay datos actuales, se registran todos los campos como cambios
for field_name in updated_data.model_fields_set:
new_value = getattr(updated_data, field_name)
if isinstance(new_value, BaseModel) and not (
isinstance(new_value, IDModel)
):
# Es un modelo Pydantic anidado (subesquema)
for sub_field_name in new_value.model_fields_set:
sub_field_value = getattr(new_value, sub_field_name)
_log_changes(field_name, None, sub_field_value, sub_field_name)
else:
# Si es principal es "root"
_log_changes("root", None, new_value, field_name)
else:
for field_name in updated_data.model_fields_set:
old_value = getattr(current_data, field_name)
new_value = getattr(updated_data, field_name)
if isinstance(new_value, BaseModel):
# Es un modelo Pydantic anidado (subesquema)
for sub_field_name in new_value.model_fields_set:
sub_old_value = (
getattr(old_value, sub_field_name, None)
if old_value
else None
)
sub_new_value = getattr(new_value, sub_field_name, None)
if sub_old_value != sub_new_value:
_log_changes(
field_name, sub_old_value, sub_new_value, sub_field_name
)
else:
if old_value != new_value:
_log_changes("root", old_value, new_value, field_name)
if not changes:
changes.append(
AuditoriaEntry(
id_entrada=id_model,
subesquema="No Changes",
campo="Ninguno",
new_value="No cambios",
old_value="No cambios",
regla=regla_name,
)
)
return changes
def publish_to_topics(
self,
processed_data: EntradaEsquemaUnificado,
topic_names: List[str],
):
"""
Publica los datos procesados a los tópicos de Service Bus relevantes.
Parameters:
processed_data: Los datos procesados que se enviarán.
topic_names: Lista de tópicos a los que se enviarán los datos.
"""
bus = self.service_bus_client
lock_cm = (
bus.synchronize()
if isinstance(bus, ServiceBusClientSingleton)
else nullcontext()
)
with lock_cm:
client = bus.client
for topic_name in topic_names:
body = processed_data.model_dump(mode="json", exclude_none=True)
with client.get_topic_sender(topic_name=topic_name) as sender:
message = SBMessage(body=json.dumps(body))
sender.send_messages(message)
detect_changes(self, current_data, updated_data, id_model, regla_name)
¶
Detecta cambios entre los datos actuales y los actualizados en el modelo EntradaEsquemaUnificado.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_data |
Optional[centraal_client_flow.models.schemas.EntradaEsquemaUnificado] |
Los datos actuales en la base de datos. |
required |
updated_data |
EntradaEsquemaUnificado |
Los datos actualizados a comparar. |
required |
id_model |
IDModel |
El ID del modelo que se está procesando. |
required |
Returns:
| Type | Description |
|---|---|
List[AuditoriaEntry] |
Lista de entradas de auditoría que reflejan los cambios detectados. |
Source code in centraal_client_flow/rules/update.py
def detect_changes(
self,
current_data: Optional[EntradaEsquemaUnificado],
updated_data: EntradaEsquemaUnificado,
id_model: IDModel,
regla_name: str,
) -> List[AuditoriaEntry]:
"""
Detecta cambios entre los datos actuales y los actualizados en el modelo EntradaEsquemaUnificado.
Parameters:
current_data: Los datos actuales en la base de datos.
updated_data: Los datos actualizados a comparar.
id_model: El ID del modelo que se está procesando.
Returns:
List[AuditoriaEntry]: Lista de entradas de auditoría que reflejan los cambios detectados.
"""
changes = []
def _log_changes(
subesquema_name: str, old_value: Any, new_value: Any, field_name: str
):
"""Función auxiliar para registrar cambios detectados."""
changes.append(
AuditoriaEntry(
id_entrada=id_model,
subesquema=subesquema_name,
campo=field_name,
new_value=new_value,
old_value=old_value,
regla=regla_name,
)
)
if current_data is None:
# No hay datos actuales, se registran todos los campos como cambios
for field_name in updated_data.model_fields_set:
new_value = getattr(updated_data, field_name)
if isinstance(new_value, BaseModel) and not (
isinstance(new_value, IDModel)
):
# Es un modelo Pydantic anidado (subesquema)
for sub_field_name in new_value.model_fields_set:
sub_field_value = getattr(new_value, sub_field_name)
_log_changes(field_name, None, sub_field_value, sub_field_name)
else:
# Si es principal es "root"
_log_changes("root", None, new_value, field_name)
else:
for field_name in updated_data.model_fields_set:
old_value = getattr(current_data, field_name)
new_value = getattr(updated_data, field_name)
if isinstance(new_value, BaseModel):
# Es un modelo Pydantic anidado (subesquema)
for sub_field_name in new_value.model_fields_set:
sub_old_value = (
getattr(old_value, sub_field_name, None)
if old_value
else None
)
sub_new_value = getattr(new_value, sub_field_name, None)
if sub_old_value != sub_new_value:
_log_changes(
field_name, sub_old_value, sub_new_value, sub_field_name
)
else:
if old_value != new_value:
_log_changes("root", old_value, new_value, field_name)
if not changes:
changes.append(
AuditoriaEntry(
id_entrada=id_model,
subesquema="No Changes",
campo="Ninguno",
new_value="No cambios",
old_value="No cambios",
regla=regla_name,
)
)
return changes
get_current_entrada(self, id_entrada, model_unificado)
¶
Recupera el registro actual desde Cosmos DB basado en el ID proporcionado.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id_entrada |
IDModel |
El ID del registro que se desea recuperar. |
required |
Returns:
| Type | Description |
|---|---|
Optional[EntradaEsquemaUnificado] |
El registro actual, si existe. |
Source code in centraal_client_flow/rules/update.py
def get_current_entrada(
self, id_entrada: IDModel, model_unificado: EntradaEsquemaUnificado
) -> Optional[EntradaEsquemaUnificado]:
"""
Recupera el registro actual desde Cosmos DB basado en el ID proporcionado.
Parameters:
id_entrada: El ID del registro que se desea recuperar.
Returns:
Optional[EntradaEsquemaUnificado]: El registro actual, si existe.
"""
container = self.cosmos_client.get_container_client(self.unified_container_name)
query = f"SELECT * FROM c WHERE c.id = '{id_entrada.model_dump()}'"
current_items = list(
container.query_items(query, enable_cross_partition_query=True)
)
if current_items:
return model_unificado.model_validate(current_items[0])
return None
publish_to_topics(self, processed_data, topic_names)
¶
Publica los datos procesados a los tópicos de Service Bus relevantes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
processed_data |
EntradaEsquemaUnificado |
Los datos procesados que se enviarán. |
required |
topic_names |
List[str] |
Lista de tópicos a los que se enviarán los datos. |
required |
Source code in centraal_client_flow/rules/update.py
def publish_to_topics(
self,
processed_data: EntradaEsquemaUnificado,
topic_names: List[str],
):
"""
Publica los datos procesados a los tópicos de Service Bus relevantes.
Parameters:
processed_data: Los datos procesados que se enviarán.
topic_names: Lista de tópicos a los que se enviarán los datos.
"""
bus = self.service_bus_client
lock_cm = (
bus.synchronize()
if isinstance(bus, ServiceBusClientSingleton)
else nullcontext()
)
with lock_cm:
client = bus.client
for topic_name in topic_names:
body = processed_data.model_dump(mode="json", exclude_none=True)
with client.get_topic_sender(topic_name=topic_name) as sender:
message = SBMessage(body=json.dumps(body))
sender.send_messages(message)
record_auditoria(self, changes)
¶
Registra los cambios detectados en el contenedor de auditoría de Cosmos DB.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
changes |
List[centraal_client_flow.models.schemas.AuditoriaEntry] |
Lista de entradas de auditoría que contienen los cambios detectados. |
required |
Source code in centraal_client_flow/rules/update.py
def record_auditoria(self, changes: List[AuditoriaEntry]):
"""
Registra los cambios detectados en el contenedor de auditoría de Cosmos DB.
Parameters:
changes: Lista de entradas de auditoría que contienen los cambios detectados.
"""
container = self.cosmos_client.get_container_client(
self.auditoria_container_name
)
for change in changes:
container.create_item(
change.model_dump(mode="json", exclude_none=True),
enable_automatic_id_generation=True,
)
save_unified_model(self, new_data)
¶
Guarda el modelo de EntradaEsquemaUnificado actualizado en Cosmos DB.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
new_data |
EntradaEsquemaUnificado |
El modelo actualizado de EntradaEsquemaUnificado. |
required |
Returns:
| Type | Description |
|---|---|
EntradaEsquemaUnificado |
El modelo almacenado en la base de datos. |
Source code in centraal_client_flow/rules/update.py
def save_unified_model(
self,
new_data: EntradaEsquemaUnificado,
) -> dict:
"""
Guarda el modelo de EntradaEsquemaUnificado actualizado en Cosmos DB.
Parameters:
new_data: El modelo actualizado de EntradaEsquemaUnificado.
Returns:
EntradaEsquemaUnificado: El modelo almacenado en la base de datos.
"""
container = self.cosmos_client.get_container_client(self.unified_container_name)
item_written = container.upsert_item(
new_data.model_dump(mode="json", exclude_none=True)
)
return item_written
RuleSelector
¶
Clase encargada de seleccionar y aplicar reglas de procesamiento sobre los eventos.
Source code in centraal_client_flow/rules/update.py
class RuleSelector:
"""Clase encargada de seleccionar y aplicar reglas de procesamiento sobre los eventos."""
def __init__(self, modelo_unificado: EntradaEsquemaUnificado):
self.rules: List[Rule] = []
self.modelo_unificado = modelo_unificado
def register_rule(self, rule: Rule):
"""
Registra una nueva regla para su uso futuro en el procesamiento de eventos.
Parameters:
rule: La regla que se va a registrar.
"""
self._validate_rule(rule)
self.rules.append(rule)
def _validate_rule(self, rule: Rule):
"""
Valida que los tópicos de la regla coincidan con los subesquemas en el modelo unificado.
Parameters:
rule: La regla a validar.
Raises:
ValueError: Si algún tópico de la regla no corresponde a un subesquema válido.
"""
model_fields = set(self.modelo_unificado.model_fields)
for t in rule.topics:
if t == "root":
pass
elif t not in model_fields:
raise ValueError(
f"El tópico {t} debe corresponder a un subesquema {model_fields}"
)
def select_rule(self, data: dict) -> Tuple[EventoBase, Rule]:
"""
Selecciona la regla adecuada para los datos proporcionados.
Parameters:
data: Un diccionario con los datos a validar y procesar.
Returns:
Tuple[EventoBase, Rule]: Los datos validados y la regla seleccionada.
Raises:
NoHayReglas: Si no se encuentra una regla válida para los datos proporcionados.
"""
for rule in self.rules:
try:
validated_data = rule.model.model_validate(data)
return validated_data, rule
except ValidationError:
continue
raise NoHayReglas(f"No se encontró una regla válida para {data}.")
def get_topics_by_changes(
self,
rule_topics: Set[str],
changes: List[AuditoriaEntry],
include_root: bool = False,
) -> List[str]:
"""
Selecciona los tópicos relevantes basados en los cambios detectados.
Parameters:
rule_topics: Conjunto de tópicos definidos en la regla.
changes: Lista de entradas de auditoría con los cambios detectados.
Returns:
List[str]: Lista de tópicos que necesitan ser notificados.
"""
topics_to_notify = set()
for change in changes:
if change.subesquema in rule_topics:
if include_root:
topics_to_notify.add(change.subesquema)
elif change.subesquema != "root":
topics_to_notify.add(change.subesquema)
return list(topics_to_notify)
get_topics_by_changes(self, rule_topics, changes, include_root=False)
¶
Selecciona los tópicos relevantes basados en los cambios detectados.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule_topics |
Set[str] |
Conjunto de tópicos definidos en la regla. |
required |
changes |
List[centraal_client_flow.models.schemas.AuditoriaEntry] |
Lista de entradas de auditoría con los cambios detectados. |
required |
Returns:
| Type | Description |
|---|---|
List[str] |
Lista de tópicos que necesitan ser notificados. |
Source code in centraal_client_flow/rules/update.py
def get_topics_by_changes(
self,
rule_topics: Set[str],
changes: List[AuditoriaEntry],
include_root: bool = False,
) -> List[str]:
"""
Selecciona los tópicos relevantes basados en los cambios detectados.
Parameters:
rule_topics: Conjunto de tópicos definidos en la regla.
changes: Lista de entradas de auditoría con los cambios detectados.
Returns:
List[str]: Lista de tópicos que necesitan ser notificados.
"""
topics_to_notify = set()
for change in changes:
if change.subesquema in rule_topics:
if include_root:
topics_to_notify.add(change.subesquema)
elif change.subesquema != "root":
topics_to_notify.add(change.subesquema)
return list(topics_to_notify)
register_rule(self, rule)
¶
Registra una nueva regla para su uso futuro en el procesamiento de eventos.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule |
Rule |
La regla que se va a registrar. |
required |
Source code in centraal_client_flow/rules/update.py
def register_rule(self, rule: Rule):
"""
Registra una nueva regla para su uso futuro en el procesamiento de eventos.
Parameters:
rule: La regla que se va a registrar.
"""
self._validate_rule(rule)
self.rules.append(rule)
select_rule(self, data)
¶
Selecciona la regla adecuada para los datos proporcionados.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
dict |
Un diccionario con los datos a validar y procesar. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[EventoBase, Rule] |
Los datos validados y la regla seleccionada. |
Exceptions:
| Type | Description |
|---|---|
NoHayReglas |
Si no se encuentra una regla válida para los datos proporcionados. |
Source code in centraal_client_flow/rules/update.py
def select_rule(self, data: dict) -> Tuple[EventoBase, Rule]:
"""
Selecciona la regla adecuada para los datos proporcionados.
Parameters:
data: Un diccionario con los datos a validar y procesar.
Returns:
Tuple[EventoBase, Rule]: Los datos validados y la regla seleccionada.
Raises:
NoHayReglas: Si no se encuentra una regla válida para los datos proporcionados.
"""
for rule in self.rules:
try:
validated_data = rule.model.model_validate(data)
return validated_data, rule
except ValidationError:
continue
raise NoHayReglas(f"No se encontró una regla válida para {data}.")
UpdateProcessor (LoggerMixin, ABC)
¶
Clase base abstracta para procesadores de eventos.
Source code in centraal_client_flow/rules/update.py
class UpdateProcessor(LoggerMixin, ABC):
"""Clase base abstracta para procesadores de eventos."""
@abstractmethod
def process_message(
self, event: EventoBase, current_registro: Optional[EntradaEsquemaUnificado]
) -> EntradaEsquemaUnificado:
"""
Procesa el evento recibido y retorna un modelo actualizado de EntradaEsquemaUnificado.
Parameters:
event: El evento que contiene la información del cambio, basado en un modelo Pydantic.
current_registro: El registro actual que será actualizado.
Returns:
EntradaEsquemaUnificado: El registro actualizado después de aplicar el evento.
"""
process_message(self, event, current_registro)
¶
Procesa el evento recibido y retorna un modelo actualizado de EntradaEsquemaUnificado.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event |
EventoBase |
El evento que contiene la información del cambio, basado en un modelo Pydantic. |
required |
current_registro |
Optional[centraal_client_flow.models.schemas.EntradaEsquemaUnificado] |
El registro actual que será actualizado. |
required |
Returns:
| Type | Description |
|---|---|
EntradaEsquemaUnificado |
El registro actualizado después de aplicar el evento. |
Source code in centraal_client_flow/rules/update.py
@abstractmethod
def process_message(
self, event: EventoBase, current_registro: Optional[EntradaEsquemaUnificado]
) -> EntradaEsquemaUnificado:
"""
Procesa el evento recibido y retorna un modelo actualizado de EntradaEsquemaUnificado.
Parameters:
event: El evento que contiene la información del cambio, basado en un modelo Pydantic.
current_registro: El registro actual que será actualizado.
Returns:
EntradaEsquemaUnificado: El registro actualizado después de aplicar el evento.
"""