Skip to main content

Documentation Index

Fetch the complete documentation index at: https://ibm-llm-runtime-aaf3a78b.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Prerequisites: Quick Start complete, pip install mellea, Ollama running locally.

Async methods

Every sync method on MelleaSession has an a-prefixed async counterpart with the same signature and return type:
SyncAsync
instruct()ainstruct()
chat()achat()
act()aact()
validate()avalidate()
query()aquery()
transform()atransform()
# Requires: mellea
# Returns: None
import asyncio
import mellea

async def main():
    m = mellea.start_session()
    result = await m.ainstruct("Write a haiku about concurrency.")
    print(str(result))
    # Output will vary — LLM responses depend on model and temperature.

asyncio.run(main())

Parallel generation

ainstruct() returns a ModelOutputThunk immediately — generation starts in the background but the value is not resolved until you call avalue(). This lets you fire multiple generations and resolve them all at once:
# Requires: mellea
# Returns: None
import asyncio
import mellea

async def main():
    m = mellea.start_session()

    # Fire off all three — generation starts for each immediately
    thunk_a = await m.ainstruct("Write a poem about mountains.")
    thunk_b = await m.ainstruct("Write a poem about rivers.")
    thunk_c = await m.ainstruct("Write a poem about forests.")

    # None are resolved yet
    print(thunk_a.is_computed())  # False

    # Resolve all in parallel
    await asyncio.gather(
        thunk_a.avalue(),
        thunk_b.avalue(),
        thunk_c.avalue(),
    )

    print(thunk_a.value)
    print(thunk_b.value)
    print(thunk_c.value)
    # Output will vary — LLM responses depend on model and temperature.

asyncio.run(main())
For a list of thunks, wait_for_all_mots is a convenience wrapper:
# Requires: mellea
# Returns: None
import asyncio
import mellea
from mellea.helpers.async_helpers import wait_for_all_mots

async def main():
    m = mellea.start_session()

    thunks = []
    for topic in ["mountains", "rivers", "forests"]:
        thunks.append(await m.ainstruct(f"Write a short poem about {topic}."))

    await wait_for_all_mots(thunks)

    for t in thunks:
        print(t.value)
    # Output will vary — LLM responses depend on model and temperature.

asyncio.run(main())
Note: All thunks passed to wait_for_all_mots must belong to the same event loop, which is always the case when using MelleaSession.

Streaming

Enable streaming by passing ModelOption.STREAM: True in model_options. Consume incremental output chunks with mot.astream():
# Requires: mellea
# Returns: None
import asyncio
import mellea
from mellea.backends import ModelOption

async def main():
    m = mellea.start_session()
    mot = await m.ainstruct(
        "Write a short story about a robot learning to cook.",
        model_options={ModelOption.STREAM: True},
    )

    # Consume chunks as they arrive
    while not mot.is_computed():
        chunk = await mot.astream()
        print(chunk, end="", flush=True)

    print()  # newline after streaming completes

asyncio.run(main())
# Output will vary — LLM responses depend on model and temperature.
How astream() behaves:
  • Each call returns only the new content since the previous call.
  • When the thunk is fully computed (is_computed() returns True), the final astream() call returns the complete value.
  • If the thunk is already computed, astream() returns the full value immediately.
Warning: Do not call astream() from multiple coroutines simultaneously on the same thunk. Each thunk should have a single reader.

Async and context

Use SimpleContext (the default) with concurrent async requests. Using ChatContext with concurrent requests can cause stale context issues — Mellea logs a warning when this is detected:
WARNING: Not using a SimpleContext with asynchronous requests could cause
unexpected results due to stale contexts. Ensure you await between requests.
If you need ChatContext with async, await each call before starting the next:
# Requires: mellea
# Returns: None
import asyncio
import mellea
from mellea.stdlib.context import ChatContext

async def sequential_chat():
    m = mellea.start_session(ctx=ChatContext())
    r1 = await m.achat("Hello.")
    r2 = await m.achat("Tell me more.")  # safe — r1 is fully resolved
    print(str(r2))
    # Output will vary — LLM responses depend on model and temperature.

asyncio.run(sequential_chat())
For parallel generation, use SimpleContext.

Streaming with per-chunk validation

stream_with_chunking() adds per-chunk validation to a streaming generation. It splits the accumulated text into semantic units (sentences, words, or paragraphs), calls stream_validate() on each chunk in parallel, and can exit early if any requirement returns "fail" — preventing the consumer from seeing invalid content mid-stream. The primary way to observe a stream_with_chunking() run is via typed StreamEvent objects from result.events():
# Requires: mellea
# Returns: None
import asyncio

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import (
    ChunkEvent,
    CompletedEvent,
    FullValidationEvent,
    QuickCheckEvent,
    StreamingDoneEvent,
    stream_with_chunking,
)


class MaxSentencesReq(Requirement):
    """Fails if the model generates more than *limit* sentences."""

    def __init__(self, limit: int) -> None:
        super().__init__()
        self._limit = limit
        self._count = 0

    def format_for_llm(self) -> str:
        return f"The response must be at most {self._limit} sentences."

    async def stream_validate(
        self, chunk: str, *, backend: Backend, ctx: Context
    ) -> PartialValidationResult:
        self._count += 1
        if self._count > self._limit:
            return PartialValidationResult("fail", reason="Too many sentences")
        return PartialValidationResult("unknown")

    async def validate(
        self, backend: Backend, ctx: Context, *, format=None, model_options=None
    ) -> ValidationResult:
        return ValidationResult(result=True)


async def main() -> None:
    from mellea.stdlib.session import start_session

    m = start_session()
    action = Instruction("Write a two-sentence summary of the water cycle.")
    req = MaxSentencesReq(limit=3)

    result = await stream_with_chunking(
        action, m.backend, m.ctx, requirements=[req], chunking="sentence"
    )

    async for event in result.events():
        match event:
            case ChunkEvent():
                print(f"  chunk[{event.chunk_index}]: {event.text!r}")
            case QuickCheckEvent(passed=False):
                print(f"  FAIL at chunk {event.chunk_index}: {event.results}")
            case StreamingDoneEvent():
                print(f"  stream done — {len(event.full_text)} chars")
            case FullValidationEvent():
                print(f"  final: {'pass' if event.passed else 'fail'}")
            case CompletedEvent():
                print(f"  completed — success={event.success}")
            case _:
                pass  # ErrorEvent and other future types

    await result.acomplete()
    print(f"completed={result.completed}, failures={len(result.streaming_failures)}")


asyncio.run(main())
If you only need the raw validated text without event metadata, use result.astream() instead:
result = await stream_with_chunking(
    action, m.backend, m.ctx, requirements=[req], chunking="sentence"
)
async for chunk in result.astream():
    print(chunk)
await result.acomplete()
Both astream() (raw chunks) and events() are available on the same result object. They use independent queues, so you can run them concurrently with asyncio.gather. Both are single-consumer — a second iteration on either will block indefinitely.

The stream_validate tri-state

Each call to stream_validate returns a PartialValidationResult with one of three values:
ValueMeaning
"unknown"No conclusion yet — wait for the full output before judging.
"pass"This chunk is valid so far (informational; does not skip final validate()).
"fail"Invalid — cancel the stream immediately and record a streaming failure.
After a natural stream end, validate() is called on every non-"fail" requirement (both "pass" and "unknown"). This means "pass" from stream_validate does not replace the final validate() call.
See also: The Requirements System — Streaming validation

See also: Tutorial 02: Streaming and Async | act() and aact()