Skip to content

LangChain SDK

LangChain is a popular framework for building applications with LLMs.

Eunomia provides two LangChain integrations that allow to:

These integrations help the configuration and enforcement of authorization policies on LLM applications that leverage LangChain.

Installation

Install the eunomia-sdk-langchain package via pip:

pip install eunomia-sdk-langchain

SDK Docs

eunomia_sdk_langchain.document_loader.EunomiaLoader

A wrapper around LangChain loaders that sends documents to the Eunomia server.

This class wraps any LangChain document loader and intercepts the document loading process to send their metadata as attributes to the Eunomia server. The Eunomia server assigns an identifier to each document, which can be used for checking access permissions by retrieving the associated document attributes at runtime.

Parameters:

Name Type Description Default
loader BaseLoader

The LangChain loader to wrap.

required
server_host str

The hostname of the Eunomia server.

None
api_key str

The API key to use for the Eunomia server, only required when the server is hosted on cloud.

None
Notes

The user can add additional metadata to the documents to be sent to the Eunomia server with respect to the ones obtained from the loader.

Source code in pkgs/sdks/langchain/src/eunomia_sdk_langchain/document_loader.py
class EunomiaLoader:
    """
    A wrapper around LangChain loaders that sends documents to the Eunomia server.

    This class wraps any LangChain document loader and intercepts the document loading
    process to send their metadata as attributes to the Eunomia server. The Eunomia server
    assigns an identifier to each document, which can be used for checking access permissions
    by retrieving the associated document attributes at runtime.

    Parameters
    ----------
    loader : BaseLoader
        The LangChain loader to wrap.
    server_host : str, optional
        The hostname of the Eunomia server.
    api_key : str, optional
        The API key to use for the Eunomia server, only required when the server is hosted on cloud.

    Notes
    -----
    The user can add additional metadata to the documents to be sent to the
    Eunomia server with respect to the ones obtained from the loader.
    """

    def __init__(
        self,
        loader: BaseLoader,
        server_host: str | None = None,
        api_key: str | None = None,
    ):
        self._loader = loader
        self._client = EunomiaClient(server_host=server_host, api_key=api_key)

    def _process_document_sync(
        self, doc: Document, additional_metadata: dict = {}
    ) -> Document:
        if not hasattr(doc, "metadata") or doc.metadata is None:
            doc.metadata = {}
        doc.metadata.update(additional_metadata)

        response_data = self._client.register_entity(
            type=enums.EntityType.resource, attributes=doc.metadata
        )
        doc.metadata["eunomia_uri"] = response_data.uri
        return doc

    async def _process_document_async(
        self, doc: Document, additional_metadata: dict = {}
    ) -> Document:
        if not hasattr(doc, "metadata") or doc.metadata is None:
            doc.metadata = {}
        doc.metadata.update(additional_metadata)

        loop = asyncio.get_running_loop()
        response_data = await loop.run_in_executor(
            None,
            lambda: self._client.register_entity(
                type=enums.EntityType.resource, attributes=doc.metadata
            ),
        )
        doc.metadata["eunomia_uri"] = response_data.uri
        return doc

    async def alazy_load(
        self, additional_metadata: dict = {}
    ) -> AsyncIterator[Document]:
        """Load documents lazily and asynchronously, registering them with the Eunomia server.

        Parameters
        ----------
        additional_metadata : dict, optional
            Additional metadata to be sent to the Eunomia server.

        Yields
        ------
        Document
            Documents with Eunomia identifiers added to their metadata as 'eunomia_uri',
            yielded one by one.

        Examples
        --------
        >>> import asyncio
        >>> from langchain_community.document_loaders.csv_loader import CSVLoader
        >>>
        >>> async def process_docs():
        ...     loader = CSVLoader("data.csv")
        ...     wrapped_loader = EunomiaLoader(loader)
        ...     async for doc in wrapped_loader.alazy_load(additional_metadata={"group": "financials"}):
        ...         await process_document(doc)
        >>>
        >>> asyncio.run(process_docs())
        """
        async for doc in self._loader.alazy_load():
            processed_doc = await self._process_document_async(doc, additional_metadata)
            yield processed_doc

    async def aload(self, additional_metadata: dict = {}) -> List[Document]:
        """Load documents asynchronously and register them with the Eunomia server.

        Parameters
        ----------
        additional_metadata : dict, optional
            Additional metadata to be sent to the Eunomia server.

        Returns
        -------
        List[Document]
            The list of loaded documents with Eunomia identifiers added to their metadata
            as 'eunomia_uri'.

        Examples
        --------
        >>> import asyncio
        >>> from langchain_community.document_loaders.csv_loader import CSVLoader
        >>> loader = CSVLoader("data.csv")
        >>> wrapped_loader = EunomiaLoader(loader)
        >>> docs = asyncio.run(wrapped_loader.aload(additional_metadata={"group": "financials"}))
        """
        documents = await self._loader.aload()
        processed_docs = [
            await self._process_document_async(doc, additional_metadata)
            for doc in documents
        ]
        return processed_docs

    def lazy_load(self, additional_metadata: dict = {}) -> Iterator[Document]:
        """Load documents lazily and synchronously, registering them with the Eunomia server.

        Parameters
        ----------
        additional_metadata : dict, optional
            Additional metadata to be sent to the Eunomia server.

        Yields
        ------
        Document
            Documents with Eunomia identifiers added to their metadata as 'eunomia_uri',
            yielded one by one.

        Examples
        --------
        >>> from langchain_community.document_loaders.csv_loader import CSVLoader
        >>> loader = CSVLoader("data.csv")
        >>> wrapped_loader = EunomiaLoader(loader)
        >>> for doc in wrapped_loader.lazy_load(additional_metadata={"group": "financials"}):
        ...     process_document(doc)
        """
        for doc in self._loader.lazy_load():
            processed_doc = self._process_document_sync(doc, additional_metadata)
            yield processed_doc

    def load(self, additional_metadata: dict = {}) -> List[Document]:
        """Load documents synchronously and register them with the Eunomia server.

        Parameters
        ----------
        additional_metadata : dict, optional
            Additional metadata to be sent to the Eunomia server.

        Returns
        -------
        List[Document]
            The list of loaded documents with Eunomia identifiers added to their metadata
            as 'eunomia_uri'.

        Examples
        --------
        >>> from langchain_community.document_loaders.csv_loader import CSVLoader
        >>> loader = CSVLoader("data.csv")
        >>> wrapped_loader = EunomiaLoader(loader)
        >>> docs = wrapped_loader.load(additional_metadata={"group": "financials"})
        """
        documents = self._loader.load()
        processed_docs = [
            self._process_document_sync(doc, additional_metadata) for doc in documents
        ]
        return processed_docs

    def __getattr__(self, name):
        # Delegate any attribute or method lookup to the underlying loader
        # if not explicitly defined in this wrapper.
        return getattr(self._loader, name)

alazy_load(additional_metadata={}) async

Load documents lazily and asynchronously, registering them with the Eunomia server.

Parameters:

Name Type Description Default
additional_metadata dict

Additional metadata to be sent to the Eunomia server.

{}

Yields:

Type Description
Document

Documents with Eunomia identifiers added to their metadata as 'eunomia_uri', yielded one by one.

Examples:

>>> import asyncio
>>> from langchain_community.document_loaders.csv_loader import CSVLoader
>>>
>>> async def process_docs():
...     loader = CSVLoader("data.csv")
...     wrapped_loader = EunomiaLoader(loader)
...     async for doc in wrapped_loader.alazy_load(additional_metadata={"group": "financials"}):
...         await process_document(doc)
>>>
>>> asyncio.run(process_docs())
Source code in pkgs/sdks/langchain/src/eunomia_sdk_langchain/document_loader.py
async def alazy_load(
    self, additional_metadata: dict = {}
) -> AsyncIterator[Document]:
    """Load documents lazily and asynchronously, registering them with the Eunomia server.

    Parameters
    ----------
    additional_metadata : dict, optional
        Additional metadata to be sent to the Eunomia server.

    Yields
    ------
    Document
        Documents with Eunomia identifiers added to their metadata as 'eunomia_uri',
        yielded one by one.

    Examples
    --------
    >>> import asyncio
    >>> from langchain_community.document_loaders.csv_loader import CSVLoader
    >>>
    >>> async def process_docs():
    ...     loader = CSVLoader("data.csv")
    ...     wrapped_loader = EunomiaLoader(loader)
    ...     async for doc in wrapped_loader.alazy_load(additional_metadata={"group": "financials"}):
    ...         await process_document(doc)
    >>>
    >>> asyncio.run(process_docs())
    """
    async for doc in self._loader.alazy_load():
        processed_doc = await self._process_document_async(doc, additional_metadata)
        yield processed_doc

aload(additional_metadata={}) async

Load documents asynchronously and register them with the Eunomia server.

Parameters:

Name Type Description Default
additional_metadata dict

Additional metadata to be sent to the Eunomia server.

{}

Returns:

Type Description
List[Document]

The list of loaded documents with Eunomia identifiers added to their metadata as 'eunomia_uri'.

Examples:

>>> import asyncio
>>> from langchain_community.document_loaders.csv_loader import CSVLoader
>>> loader = CSVLoader("data.csv")
>>> wrapped_loader = EunomiaLoader(loader)
>>> docs = asyncio.run(wrapped_loader.aload(additional_metadata={"group": "financials"}))
Source code in pkgs/sdks/langchain/src/eunomia_sdk_langchain/document_loader.py
async def aload(self, additional_metadata: dict = {}) -> List[Document]:
    """Load documents asynchronously and register them with the Eunomia server.

    Parameters
    ----------
    additional_metadata : dict, optional
        Additional metadata to be sent to the Eunomia server.

    Returns
    -------
    List[Document]
        The list of loaded documents with Eunomia identifiers added to their metadata
        as 'eunomia_uri'.

    Examples
    --------
    >>> import asyncio
    >>> from langchain_community.document_loaders.csv_loader import CSVLoader
    >>> loader = CSVLoader("data.csv")
    >>> wrapped_loader = EunomiaLoader(loader)
    >>> docs = asyncio.run(wrapped_loader.aload(additional_metadata={"group": "financials"}))
    """
    documents = await self._loader.aload()
    processed_docs = [
        await self._process_document_async(doc, additional_metadata)
        for doc in documents
    ]
    return processed_docs

lazy_load(additional_metadata={})

Load documents lazily and synchronously, registering them with the Eunomia server.

Parameters:

Name Type Description Default
additional_metadata dict

Additional metadata to be sent to the Eunomia server.

{}

Yields:

Type Description
Document

Documents with Eunomia identifiers added to their metadata as 'eunomia_uri', yielded one by one.

Examples:

>>> from langchain_community.document_loaders.csv_loader import CSVLoader
>>> loader = CSVLoader("data.csv")
>>> wrapped_loader = EunomiaLoader(loader)
>>> for doc in wrapped_loader.lazy_load(additional_metadata={"group": "financials"}):
...     process_document(doc)
Source code in pkgs/sdks/langchain/src/eunomia_sdk_langchain/document_loader.py
def lazy_load(self, additional_metadata: dict = {}) -> Iterator[Document]:
    """Load documents lazily and synchronously, registering them with the Eunomia server.

    Parameters
    ----------
    additional_metadata : dict, optional
        Additional metadata to be sent to the Eunomia server.

    Yields
    ------
    Document
        Documents with Eunomia identifiers added to their metadata as 'eunomia_uri',
        yielded one by one.

    Examples
    --------
    >>> from langchain_community.document_loaders.csv_loader import CSVLoader
    >>> loader = CSVLoader("data.csv")
    >>> wrapped_loader = EunomiaLoader(loader)
    >>> for doc in wrapped_loader.lazy_load(additional_metadata={"group": "financials"}):
    ...     process_document(doc)
    """
    for doc in self._loader.lazy_load():
        processed_doc = self._process_document_sync(doc, additional_metadata)
        yield processed_doc

load(additional_metadata={})

Load documents synchronously and register them with the Eunomia server.

Parameters:

Name Type Description Default
additional_metadata dict

Additional metadata to be sent to the Eunomia server.

{}

Returns:

Type Description
List[Document]

The list of loaded documents with Eunomia identifiers added to their metadata as 'eunomia_uri'.

Examples:

>>> from langchain_community.document_loaders.csv_loader import CSVLoader
>>> loader = CSVLoader("data.csv")
>>> wrapped_loader = EunomiaLoader(loader)
>>> docs = wrapped_loader.load(additional_metadata={"group": "financials"})
Source code in pkgs/sdks/langchain/src/eunomia_sdk_langchain/document_loader.py
def load(self, additional_metadata: dict = {}) -> List[Document]:
    """Load documents synchronously and register them with the Eunomia server.

    Parameters
    ----------
    additional_metadata : dict, optional
        Additional metadata to be sent to the Eunomia server.

    Returns
    -------
    List[Document]
        The list of loaded documents with Eunomia identifiers added to their metadata
        as 'eunomia_uri'.

    Examples
    --------
    >>> from langchain_community.document_loaders.csv_loader import CSVLoader
    >>> loader = CSVLoader("data.csv")
    >>> wrapped_loader = EunomiaLoader(loader)
    >>> docs = wrapped_loader.load(additional_metadata={"group": "financials"})
    """
    documents = self._loader.load()
    processed_docs = [
        self._process_document_sync(doc, additional_metadata) for doc in documents
    ]
    return processed_docs

eunomia_sdk_langchain.retriever.EunomiaRetriever

Bases: BaseRetriever

A wrapper around LangChain retrievers that filters allowed documents using the Eunomia server.

This class wraps any LangChain retriever and intercepts the document retrieval process to filter allowed documents using the Eunomia server.

Parameters:

Name Type Description Default
retriever BaseRetriever

The LangChain retriever to wrap.

required
principal PrincipalAccess

The principal to use for the Eunomia server. Defined either with its identifier (uri), attributes or both.

required
server_host str

The hostname of the Eunomia server.

None
api_key str

The API key to use for the Eunomia server, only required when the server is hosted on cloud.

None

Examples:

>>> from eunomia_core import schemas
>>> from eunomia_sdk_langchain.retriever import EunomiaRetriever
>>> from langchain_community.retrievers import BM25Retriever
>>> from langchain_core.documents import Document
>>> retriever = BM25Retriever.from_documents(
...     [
...         Document(page_content="foo", metadata={"confidentiality": "public"}),
...         Document(page_content="bar", metadata={"confidentiality": "public"}),
...         Document(page_content="foo bar", metadata={"confidentiality": "private"}),
...     ]
... )
>>> wrapped_retriever = EunomiaRetriever(
...     retriever=retriever,
...     principal=schemas.PrincipalAccess(uri="test-uri"),
... )
>>> docs = wrapped_retriever.invoke("foo")
Source code in pkgs/sdks/langchain/src/eunomia_sdk_langchain/retriever.py
class EunomiaRetriever(BaseRetriever):
    """
    A wrapper around LangChain retrievers that filters allowed documents using the Eunomia server.

    This class wraps any LangChain retriever and intercepts the document retrieval
    process to filter allowed documents using the Eunomia server.

    Parameters
    ----------
    retriever : BaseRetriever
        The LangChain retriever to wrap.
    principal : schemas.PrincipalAccess
        The principal to use for the Eunomia server. Defined either with its identifier (uri), attributes or both.
    server_host : str, optional
        The hostname of the Eunomia server.
    api_key : str, optional
        The API key to use for the Eunomia server, only required when the server is hosted on cloud.

    Examples
    --------
    >>> from eunomia_core import schemas
    >>> from eunomia_sdk_langchain.retriever import EunomiaRetriever
    >>> from langchain_community.retrievers import BM25Retriever
    >>> from langchain_core.documents import Document
    >>> retriever = BM25Retriever.from_documents(
    ...     [
    ...         Document(page_content="foo", metadata={"confidentiality": "public"}),
    ...         Document(page_content="bar", metadata={"confidentiality": "public"}),
    ...         Document(page_content="foo bar", metadata={"confidentiality": "private"}),
    ...     ]
    ... )
    >>> wrapped_retriever = EunomiaRetriever(
    ...     retriever=retriever,
    ...     principal=schemas.PrincipalAccess(uri="test-uri"),
    ... )
    >>> docs = wrapped_retriever.invoke("foo")
    """

    def __init__(
        self,
        retriever: BaseRetriever,
        principal: schemas.PrincipalAccess,
        server_host: str | None = None,
        api_key: str | None = None,
    ):
        super().__init__()
        self._retriever = retriever
        self._principal = principal
        self._client = EunomiaClient(server_host=server_host, api_key=api_key)

    def _check_docs_access(self, docs: list[Document]) -> list[Document]:
        return [
            doc
            for doc in docs
            if self._client.check_access(
                resource_uri=doc.metadata.pop("eunomia_uri", None),
                resource_attributes=doc.metadata,
                principal_uri=self._principal.uri,
                principal_attributes=self._principal.attributes,
            )
        ]

    def _get_relevant_documents(self, query: str) -> list[Document]:
        docs = self._retriever.invoke(query)
        return self._check_docs_access(docs)

    async def _acheck_doc_access(self, doc: Document) -> tuple[Document, bool]:
        return (
            doc,
            await asyncio.to_thread(
                self._client.check_access,
                resource_uri=doc.metadata.pop("eunomia_uri", None),
                resource_attributes=doc.metadata,
                principal_uri=self._principal.uri,
                principal_attributes=self._principal.attributes,
            ),
        )

    async def _acheck_docs_access(self, docs: list[Document]) -> list[Document]:
        results = await asyncio.gather(*[self._acheck_doc_access(doc) for doc in docs])
        return [doc for doc, has_access in results if has_access]

    async def _aget_relevant_documents(self, query: str) -> list[Document]:
        docs = await self._retriever.ainvoke(query)
        return await self._acheck_docs_access(docs)