Skip to content

Resources#

Resources are external dependencies such as memory, LLMs, query engines or chat history instances that will be injected into workflow steps at runtime.

Resources are a powerful way of binding workflow steps to Python objects that we otherwise would need to create by hand every time. For performance reasons, by default resources are cached for a workflow, meaning the same resource instance is passed to every step where it's injected. It's important to master this concept because cached and non-cached resources can lead to unexpected behaviour, let's see it in detail.

Resources are cached by default#

First of all, to use resources within our code, we need to import Resource from the resource submodule:

from llama_index.core.workflow.resource import Resource
from llama_index.core.workflow import (
    Event,
    step,
    StartEvent,
    StopEvent,
    Workflow,
)

Resource wraps a function or callable that must return an object of the same type as the one in the resource definition, let's see an example:

from typing import Annotated
from llama_index.core.memory import Memory


def get_memory(*args, **kwargs) -> Memory:
    return Memory.from_defaults("user_id_123", token_limit=60000)


resource = Annotated[Memory, Resource(get_memory)]

In the example above, Annotated[Memory, Resource(get_memory) defines a resource of type Memory that will be provided at runtime by the get_memory() function. A resource defined like this can be injected into a step by passing it as a method parameter:

import random

from typing import Union
from llama_index.core.llms import ChatMessage

RANDOM_MESSAGES = [
    "Hello World!",
    "Python is awesome!",
    "Resources are great!",
]


class CustomStartEvent(StartEvent):
    message: str


class SecondEvent(Event):
    message: str


class ThirdEvent(Event):
    message: str


class WorkflowWithMemory(Workflow):
    @step
    async def first_step(
        self,
        ev: CustomStartEvent,
        memory: Annotated[Memory, Resource(get_memory)],
    ) -> SecondEvent:
        await memory.aput(
            ChatMessage.from_str(
                role="user", content="First step: " + ev.message
            )
        )
        return SecondEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])

    @step
    async def second_step(
        self, ev: SecondEvent, memory: Annotated[Memory, Resource(get_memory)]
    ) -> Union[ThirdEvent, StopEvent]:
        await memory.aput(
            ChatMessage(role="assistant", content="Second step: " + ev.message)
        )
        if random.randint(0, 1) == 0:
            return ThirdEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])
        else:
            messages = await memory.aget_all()
            return StopEvent(result=messages)

    @step
    async def third_step(
        self, ev: ThirdEvent, memory: Annotated[Memory, Resource(get_memory)]
    ) -> StopEvent:
        await memory.aput(
            ChatMessage(role="user", content="Third step: " + ev.message)
        )
        messages = await memory.aget_all()
        return StopEvent(result=messages)

As you can see, each step has access to the memory resource and can write to it. It's important to note that get_memory() will be called only once, and the same memory instance will be injected into the different steps. We can see this is the case by running the workflow:

wf = WorkflowWithMemory(disable_validation=True)


async def main():
    messages = await wf.run(
        start_event=CustomStartEvent(message="Happy birthday!")
    )
    for m in messages:
        print(m.blocks[0].text)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

A potential result for this might be:

First step: Happy birthday!
Second step: Python is awesome!
Third step: Hello World!

This shows that each step added its message to a global memory, which is exactly what we were expecting!

Note that resources are preserved across steps of the same workflow instance, but not across different workflows. If we were to run two WorkflowWithMemory instances, get_memory would be called one time for each workflow and as a result their memories would be separate and independent:

wf1 = WorkflowWithMemory(disable_validation=True)
wf2 = WorkflowWithMemory(disable_validation=True)


async def main():
    messages1 = await wf1.run(
        start_event=CustomStartEvent(message="Happy birthday!")
    )
    messages2 = await wf1.run(
        start_event=CustomStartEvent(message="Happy New Year!")
    )
    for m in messages1:
        print(m.blocks[0].text)
    print("===================")
    for m in messages2:
        print(m.blocks[0].text)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

This is a possible output:

First step: Happy birthday!
Second step: Resources are great!
===================
First step: Happy New Year!
Second step: Python is awesome!

Disable resource caching#

If we pass cache=False to Resource when defining a resource, the wrapped function is called every time the resource is injected into a step. This behaviour can be desirable at times, let's see a simple example using a custom Counter class:

from pydantic import BaseModel, Field


class Counter(BaseModel):
    counter: int = Field(description="A simple counter", default=0)

    async def increment(self) -> None:
        self.counter += 1


def get_counter() -> Counter:
    return Counter()


class SecondEvent(Event):
    count: int


class WorkflowWithCounter(Workflow):
    @step
    async def first_step(
        self,
        ev: StartEvent,
        counter: Annotated[Counter, Resource(get_counter, cache=False)],
    ) -> SecondEvent:
        await counter.increment()
        return SecondEvent(count=counter.counter)

    @step
    async def second_step(
        self,
        ev: SecondEvent,
        counter: Annotated[Counter, Resource(get_counter, cache=False)],
    ) -> StopEvent:
        print("Counter at first step: ", ev.count)
        await counter.increment()
        print("Counter at second step: ", counter.counter)
        return StopEvent(result="End of Workflow")

If we now run this workflow, we will get out:

Counter at first step:  1
Counter at second step:  1

A note about stateful and stateless resources#

As we have seen, cached resources are expected to be stateful, meaning that they can maintain their state across different workflow runs and different steps, unless otherwise specified. But this doesn't mean we can consider a resource stateless only because we disable caching. Let's see an example:

global_mem = Memory.from_defaults("global_id", token_limit=60000)


def get_memory(*args, **kwargs) -> Memory:
    return global_mem

If we disable caching with Annotated[Memory, Resource(get_memory, cache=False)], the function get_memory is going to be called multiple times but the resource instance will be always the same. Such a resource should be considered stateful not regarding its caching behaviour.

Now that we've mastered resources, let's take a look at observability and debugging in workflows.