Source code for agentc_core.activity.span

import couchbase.exceptions
import functools
import logging
import pydantic
import textwrap
import typing
import uuid

from .logger import ChainLogger
from .logger import DBLogger
from .logger import LocalLogger
from agentc_core.activity.models.content import BeginContent
from agentc_core.activity.models.content import Content
from agentc_core.activity.models.content import EndContent
from agentc_core.activity.models.content import KeyValueContent
from agentc_core.activity.models.log import Log
from agentc_core.config import LocalCatalogConfig
from agentc_core.config import RemoteCatalogConfig
from agentc_core.version import VersionDescriptor

logger = logging.getLogger(__name__)


[docs] class Span(pydantic.BaseModel): """A structured logging context for agent activity. .. card:: Class Description A :py:class:`Span` instance belongs to a tree of other :py:class:`Span` instances, whose root is a :py:class:`GlobalSpan` instance that is constructed using the :py:meth:`Catalog.Span` method. .. attention:: Spans should never be created directly (via constructor), as logs generated by the span must always be associated with a catalog version and some application structure. Below we illustrate how a tree of :py:class:`Span` instances is created: .. code-block:: python import agentc catalog = agentc.Catalog() root_span = catalog.Span(name="root") child_1_span = root_span.new(name="child_1") child_2_span = root_span.new(name="child_2") In practice, you'll likely use different spans for different agents and/or different tasks. Below we give a small LangGraph example using spans for different agents: .. code-block:: python import agentc import langgraph.graph catalog = agentc.Catalog() root_span = catalog.Span(name="flight_planner") def front_desk_agent(...): with root_span.new(name="front_desk_agent") as front_desk_span: ... def route_finding_agent(...): with root_span.new(name="route_finding_agent") as route_finding_span: ... workflow = langgraph.graph.StateGraph() workflow.add_node("front_desk_agent", front_desk_agent) workflow.add_node("route_finding_agent", route_finding_agent) workflow.set_entry_point("front_desk_agent") workflow.add_edge("front_desk_agent", "route_finding_agent") ... """
[docs] class Identifier(pydantic.BaseModel): """The unique identifier for a :py:class:`Span`. .. card:: Class Description A :py:class:`Span` is uniquely identified by two parts: 1. an application-defined multipart name and... 2. a session identifier unique to each run of the application. """ model_config = pydantic.ConfigDict(frozen=True) name: list[str] """ The name of the :py:class:`Span`. Names are built up from the root of the span tree to the leaf, thus the first element of :py:attr:`name` is the name of the root and the last element is the name of the current span (i.e., the leaf). """ session: str """ The session identifier of the :py:class:`Span`. Sessions must be unique to each run of the application. By default, we generate these as UUIDs (see :py:attr:`GlobalSpan.session`). """
logger: typing.Callable[..., Log] """ Method which handles the logging implementation. """ name: str """ Name to bind to each message logged within this span. """ parent: "Span" = None """ Parent span of this span (i.e., the span that had :py:meth:`new` called on it). """ state: typing.Any = None """ A JSON-serializable object that will be logged on entering and exiting this span. """ iterable: typing.Optional[bool] = False """ Flag to indicate whether or not this span should be iterable. """ kwargs: typing.Optional[dict[str, typing.Any]] = None """ Annotations to apply to all messages logged within this span. """ _logs: list[Log] = None @pydantic.model_validator(mode="after") def _initialize_iterable_logger(self) -> typing.Self: if self.iterable: logger.debug(f"Iterable span requested for {str(self.identifier.name)}.") self._logs = list() # The logs captured here (and this instance's children) belong to this specific span. # The "iterable" field itself is not propagated to children. original_logger = self.logger @functools.wraps(original_logger) def iterable_logger(*args, **kwargs) -> typing.Callable[..., Log]: log = original_logger(*args, **kwargs) self._logs.append(log) return log self.logger = iterable_logger return self
[docs] def new(self, name: str, state: typing.Any = None, iterable: bool = False, **kwargs) -> "Span": """Create a new span under the current :py:class:`Span`. .. card:: Method Description Spans require a name and a session (see :py:attr:`identifier`). Aside from :py:attr:`name`, :py:attr:`state`, and :py:attr:`iterable`, you can also pass additional keywords that will be applied as annotations to each :py:meth:`log` call within a span. As an example, the following code illustrates the use of :py:attr:`kwargs` to add a span-wide "alpha" annotation: .. code-block:: python import agentc catalog = agentc.Catalog() root_span = catalog.Span(name="flight_planner") with root_span.new(name="find_airports_task", alpha="SDGD") as child_span: child_span.log(content=agentc.span.UserContent(value="Hello, world!", "beta": "412d")) The example code above will generate the three logs below (for brevity, we only show the ``content`` and ``annotations`` fields): .. code-block:: json { "content": { "kind": "begin" }, "annotations": { "alpha": "SDGD"} } { "content": { "kind": "user", "value": "Hello, world!" }, "annotations": { "alpha": "SDGD", "beta": "412d" } } { "content" : { "kind": "end" }, "annotations": { "alpha": "SDGD" } } :param name: The name of the span. :param state: The starting state of the span. This will be recorded upon entering and exiting the span. :param iterable: Whether this new span should be iterable. By default, this is :python:`False`. :param kwargs: Additional annotations to apply to the span. :return: A new :py:class:`Span` instance. """ # **kwargs take precedence over self.kwargs. if self.kwargs is not None and len(kwargs) > 0: new_kwargs = {**self.kwargs, **kwargs} elif self.kwargs is not None: new_kwargs = self.kwargs elif len(kwargs) > 0: new_kwargs = kwargs else: new_kwargs = None return Span( logger=self.logger, name=name, parent=self, iterable=iterable, state=state or self.state, kwargs=new_kwargs, )
[docs] def log(self, content: Content, **kwargs): """Accept some content (with optional annotations specified by :python:`kwargs`) and generate a corresponding log entry. .. card:: Method Description The heart of the :py:class:`Span` class is the :py:meth:`log` method. This method is used to log events that occur within the span. Users can capture events that occur in popular frameworks like LangChain and LlamaIndex using our helper packages (see :py:mod:`agentc_langchain`, :py:mod:`agentc_langgraph`, and :py:mod:`agentc_llamaindex`) but must use those packages in conjunction with this :py:meth:`log` method to capture the full breadth of their application's activity. See `here <log.html>`__ for a list of all available log content types. Users can also use Python's ``[]`` syntax to write arbitrary JSON-serializable content as a key-value (:py:class:`KeyValueContent`) pair. This is useful for logging arbitrary data like metrics during evaluations. In the example below, we illustrate an example of a system-wide evaluation suite that uses this ``[]`` syntax: .. code-block:: python import my_agent_app import my_output_evaluator import agentc catalog = agentc.Catalog() evaluation_span = catalog.Span(name="evaluation_suite") with open("my-evaluation-suite.json") as fp: for i, line in enumerate(fp): with evaluation_span.new(name=f"evaluation{i}") as span: output = my_agent_app(span) span["positive_sentiment"] = my_output_evaluator.positive(output) span.log( content={ "kind": "key-value", "key": "negative_sentiment", "value": my_output_evaluator.negative(output) }, alpha="SDGD" ) All keywords passed to the :py:meth:`log` method will be applied as annotations to the log entry. In the example above, the ``alpha`` annotation is applied only to the second log entry. For span-wide annotations, use the :py:attr:`kwargs` attribute on :py:meth:`new`. :param content: The content to log. :param kwargs: Additional annotations to apply to the log. """ new_kwargs = {**self.kwargs, **kwargs} if self.kwargs is not None else kwargs identifier: Span.Identifier = self.identifier _log = self.logger(content=content, session_id=identifier.session, span_name=identifier.name, **new_kwargs) if self.iterable: self._logs.append(_log)
@pydantic.computed_field @property def identifier(self) -> "Span.Identifier": """A unique identifier for this span.""" name_stack = [self.name] working = self while working.parent is not None: name_stack += [working.parent.name] working = working.parent return Span.Identifier(name=list(reversed(name_stack)), session=working.session)
[docs] def enter(self) -> typing.Self: """Record a :py:class:`BeginContent` log entry for this span. .. card:: Method Description The :py:meth:`enter` method is to denote the start of the span (optionally logging the incoming state if specified). This method is also called when entering the span using the :python:`with` statement. In the example below, :py:meth:`enter` is called (implicitly). .. code-block:: python import agentc catalog = agentc.Catalog() incoming_state = {"flights": []} with catalog.Span(name="flight_planner", state=incoming_state) as span: flight_planner_implementation() On entering the context, one log is generated possessing the content below: .. code-block:: json { "kind": "begin", "state": {"flights": []} } """ self.log(content=BeginContent() if self.state is None else BeginContent(state=self.state)) return self
[docs] def exit(self): """Record a :py:class:`EndContent` log entry for this span. .. card:: Method Description The :py:meth:`exit` method is to denote the end of the span (optionally logging the outgoing state if specified). This method is also called when exiting the span using the :python:`with` statement *successfully*. In the example below, :py:meth:`exit` is called (implicitly). .. code-block:: python import agentc catalog = agentc.Catalog() incoming_state = {"flights": []} with catalog.Span(name="flight_planner", state=incoming_state) as span: ... = flight_planner_implementation(...) incoming_state["flights"] = [{"flight_number": "AA123", "status": "on_time"}] On exiting the context, one log is generated possessing the content below: .. code-block:: json { "kind": "end", "state": {"flights": [{"flight_number": "AA123", "status": "on_time"}]} } .. note:: The :python:`state` of the span must be JSON-serializable **and** must be mutated in-place. If you are working with immutable state objects, you must set the :py:attr:`state` attribute before exiting the span (i.e., before the :python:`with` statement exits or with :py:meth:`exit` explicitly). .. code-block:: python import agentc catalog = agentc.Catalog() immutable_incoming_state = {"flights": []} with catalog.Span(name="flight_planner", state=incoming_state) as span: ... = flight_planner_implementation(...) span.state = {"flights": [{"flight_number": "AA123", "status": "on_time"}]} """ self.log(content=EndContent() if self.state is None else EndContent(state=self.state))
[docs] def logs(self) -> typing.Iterable[Log]: """Return the logs generated by the tree of :py:class:`Span` nodes rooted from this :py:class:`Span` instance. .. card:: Method Description The :py:meth:`logs` method returns an iterable of all logs generated within the span. This method is also called (implicitly) when iterating over the span (e.g., using a :python:`for` loop). To use this method, you must set the :py:attr:`iterable` attribute to True when instantiating the span: .. code-block:: python import agentc catalog = agentc.Catalog() span = catalog.Span(name="flight_planner", iterable=True) for log in span: match log.content.kind: case "begin": ... .. tip:: Generally, this method should only be used for debugging purposes. This method will keep **all** logs generated by the span in memory. To perform efficient aggregate analysis of your logs, consider querying the ``agent_activity.logs`` collection in your Couchbase cluster using SQL++ instead. """ if not self.iterable: raise ValueError("This span is not iterable. To iterate over logs, set 'iterable'=True on instantiation.") return self._logs
def __enter__(self): return self.enter() def __setitem__(self, key: str, value: typing.Any): self.log(content=KeyValueContent(key=key, value=value)) def __exit__(self, exc_type, exc_val, exc_tb): # We will only record this transition if we are exiting cleanly. if all(x is None for x in [exc_type, exc_val, exc_tb]): self.exit() def __iter__(self): if not self.iterable: raise ValueError("This span is not iterable. To iterate over logs, set 'iterable'=True on instantiation.") yield from self._logs
class GlobalSpan(Span): """An auditor of various events (e.g., LLM completions) given a catalog.""" # Note: this is more of a composite type rather than a union type. config: typing.Union[LocalCatalogConfig, RemoteCatalogConfig] """ Config (configuration) instance associated with this activity. """ session: str = pydantic.Field(default_factory=lambda: uuid.uuid4().hex, frozen=True) """ The run (alternative: session) that this span is associated with. """ version: VersionDescriptor = pydantic.Field(frozen=True) """ Catalog version to bind all messages logged within this auditor. """ logger: typing.Optional[typing.Callable] = None _local_logger: LocalLogger = None _db_logger: DBLogger = None _chain_logger: ChainLogger = None @pydantic.model_validator(mode="after") def _find_local_activity(self) -> typing.Self: if self.config.activity_path is None: try: # Note: this method sets the self.config.activity_path attribute if found. self.config.ActivityPath() except ValueError as e: logger.debug( f"Local activity (folder) not found while trying to initialize a Span instance. " f"Swallowing exception {str(e)}." ) return self @pydantic.model_validator(mode="after") def _initialize_auditor(self) -> typing.Self: if self.config.activity_path is None and self.config.conn_string is None: error_message = textwrap.dedent(""" Could not initialize a local or remote auditor! If this is a new project, please run the command `agentc init` before instantiating an auditor. If you are intending to use a remote-only auditor, please ensure that all of the relevant variables (i.e., conn_string, username, password, and bucket) are set. """) logger.debug(error_message) raise ValueError(error_message) # Finally, instantiate our auditors. if self.config.activity_path is not None: self._local_logger = LocalLogger(cfg=self.config, catalog_version=self.version, **self.kwargs) if self.config.conn_string is not None: try: self._db_logger = DBLogger(cfg=self.config, catalog_version=self.version, **self.kwargs) except (couchbase.exceptions.CouchbaseException, ValueError) as e: logger.warning( f"Could not connect to the Couchbase cluster. " f"Skipping remote auditor and swallowing exception {str(e)}." ) self._db_logger = None # If we have both a local and remote auditor, we'll use both. if self._local_logger is not None and self._db_logger is not None: logger.info("Using both a local auditor and a remote auditor.") self._chain_logger = ChainLogger(self._local_logger, self._db_logger, **self.kwargs) self.logger = self._chain_logger.log elif self._local_logger is not None: logger.info("Using a local auditor (a connection to a remote auditor could not be established).") self.logger = self._local_logger.log elif self._db_logger is not None: logger.info("Using a remote auditor (a local auditor could not be instantiated).") self.logger = self._db_logger.log else: # We should never reach this point (this error is handled above). raise ValueError("Could not instantiate an auditor.") return self def new(self, name: str, state: typing.Any = None, iterable: bool = False, **kwargs) -> Span: """Create a new span under the current :py:class:`GlobalSpan`. :param name: The name of the span. :param state: The starting state of the span. This will be recorded upon entering and exiting the span. :param iterable: Whether this new span should be iterable. :param kwargs: Additional annotations to apply to the span. :return: A new :py:class:`Span` instance. """ return Span(logger=self.logger, name=name, parent=self, state=state, iterable=iterable, kwargs=kwargs)