Documentation Index
Fetch the complete documentation index at: https://ibm-llm-runtime-aaf3a78b.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Prerequisites: Quick Start complete,
pip install mellea, Ollama running locally.
Async methods
Every sync method on MelleaSession has an a-prefixed async counterpart with the
same signature and return type:
| Sync | Async |
|---|
instruct() | ainstruct() |
chat() | achat() |
act() | aact() |
validate() | avalidate() |
query() | aquery() |
transform() | atransform() |
# Requires: mellea
# Returns: None
import asyncio
import mellea
async def main():
m = mellea.start_session()
result = await m.ainstruct("Write a haiku about concurrency.")
print(str(result))
# Output will vary — LLM responses depend on model and temperature.
asyncio.run(main())
Parallel generation
ainstruct() returns a ModelOutputThunk immediately — generation starts in the
background but the value is not resolved until you call avalue(). This lets you
fire multiple generations and resolve them all at once:
# Requires: mellea
# Returns: None
import asyncio
import mellea
async def main():
m = mellea.start_session()
# Fire off all three — generation starts for each immediately
thunk_a = await m.ainstruct("Write a poem about mountains.")
thunk_b = await m.ainstruct("Write a poem about rivers.")
thunk_c = await m.ainstruct("Write a poem about forests.")
# None are resolved yet
print(thunk_a.is_computed()) # False
# Resolve all in parallel
await asyncio.gather(
thunk_a.avalue(),
thunk_b.avalue(),
thunk_c.avalue(),
)
print(thunk_a.value)
print(thunk_b.value)
print(thunk_c.value)
# Output will vary — LLM responses depend on model and temperature.
asyncio.run(main())
For a list of thunks, wait_for_all_mots is a convenience wrapper:
# Requires: mellea
# Returns: None
import asyncio
import mellea
from mellea.helpers.async_helpers import wait_for_all_mots
async def main():
m = mellea.start_session()
thunks = []
for topic in ["mountains", "rivers", "forests"]:
thunks.append(await m.ainstruct(f"Write a short poem about {topic}."))
await wait_for_all_mots(thunks)
for t in thunks:
print(t.value)
# Output will vary — LLM responses depend on model and temperature.
asyncio.run(main())
Note: All thunks passed to wait_for_all_mots must belong to the same event
loop, which is always the case when using MelleaSession.
Streaming
Enable streaming by passing ModelOption.STREAM: True in model_options. Consume
incremental output chunks with mot.astream():
# Requires: mellea
# Returns: None
import asyncio
import mellea
from mellea.backends import ModelOption
async def main():
m = mellea.start_session()
mot = await m.ainstruct(
"Write a short story about a robot learning to cook.",
model_options={ModelOption.STREAM: True},
)
# Consume chunks as they arrive
while not mot.is_computed():
chunk = await mot.astream()
print(chunk, end="", flush=True)
print() # newline after streaming completes
asyncio.run(main())
# Output will vary — LLM responses depend on model and temperature.
How astream() behaves:
- Each call returns only the new content since the previous call.
- When the thunk is fully computed (
is_computed() returns True), the final
astream() call returns the complete value.
- If the thunk is already computed,
astream() returns the full value immediately.
Warning: Do not call astream() from multiple coroutines simultaneously on
the same thunk. Each thunk should have a single reader.
Async and context
Use SimpleContext (the default) with concurrent async requests. Using ChatContext
with concurrent requests can cause stale context issues — Mellea logs a warning
when this is detected:
WARNING: Not using a SimpleContext with asynchronous requests could cause
unexpected results due to stale contexts. Ensure you await between requests.
If you need ChatContext with async, await each call before starting the next:
# Requires: mellea
# Returns: None
import asyncio
import mellea
from mellea.stdlib.context import ChatContext
async def sequential_chat():
m = mellea.start_session(ctx=ChatContext())
r1 = await m.achat("Hello.")
r2 = await m.achat("Tell me more.") # safe — r1 is fully resolved
print(str(r2))
# Output will vary — LLM responses depend on model and temperature.
asyncio.run(sequential_chat())
For parallel generation, use SimpleContext.
Streaming with per-chunk validation
stream_with_chunking() adds per-chunk validation to a streaming generation.
It splits the accumulated text into semantic units (sentences, words, or
paragraphs), calls stream_validate() on each chunk in parallel, and can
exit early if any requirement returns "fail" — preventing the consumer from
seeing invalid content mid-stream.
The primary way to observe a stream_with_chunking() run is via typed
StreamEvent objects from result.events():
# Requires: mellea
# Returns: None
import asyncio
from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import (
ChunkEvent,
CompletedEvent,
FullValidationEvent,
QuickCheckEvent,
StreamingDoneEvent,
stream_with_chunking,
)
class MaxSentencesReq(Requirement):
"""Fails if the model generates more than *limit* sentences."""
def __init__(self, limit: int) -> None:
super().__init__()
self._limit = limit
self._count = 0
def format_for_llm(self) -> str:
return f"The response must be at most {self._limit} sentences."
async def stream_validate(
self, chunk: str, *, backend: Backend, ctx: Context
) -> PartialValidationResult:
self._count += 1
if self._count > self._limit:
return PartialValidationResult("fail", reason="Too many sentences")
return PartialValidationResult("unknown")
async def validate(
self, backend: Backend, ctx: Context, *, format=None, model_options=None
) -> ValidationResult:
return ValidationResult(result=True)
async def main() -> None:
from mellea.stdlib.session import start_session
m = start_session()
action = Instruction("Write a two-sentence summary of the water cycle.")
req = MaxSentencesReq(limit=3)
result = await stream_with_chunking(
action, m.backend, m.ctx, requirements=[req], chunking="sentence"
)
async for event in result.events():
match event:
case ChunkEvent():
print(f" chunk[{event.chunk_index}]: {event.text!r}")
case QuickCheckEvent(passed=False):
print(f" FAIL at chunk {event.chunk_index}: {event.results}")
case StreamingDoneEvent():
print(f" stream done — {len(event.full_text)} chars")
case FullValidationEvent():
print(f" final: {'pass' if event.passed else 'fail'}")
case CompletedEvent():
print(f" completed — success={event.success}")
case _:
pass # ErrorEvent and other future types
await result.acomplete()
print(f"completed={result.completed}, failures={len(result.streaming_failures)}")
asyncio.run(main())
If you only need the raw validated text without event metadata, use
result.astream() instead:
result = await stream_with_chunking(
action, m.backend, m.ctx, requirements=[req], chunking="sentence"
)
async for chunk in result.astream():
print(chunk)
await result.acomplete()
Both astream() (raw chunks) and events() are available on the same result
object. They use independent queues, so you can run them concurrently with
asyncio.gather. Both are single-consumer — a second iteration on either
will block indefinitely.
The stream_validate tri-state
Each call to stream_validate returns a PartialValidationResult with one of
three values:
| Value | Meaning |
|---|
"unknown" | No conclusion yet — wait for the full output before judging. |
"pass" | This chunk is valid so far (informational; does not skip final validate()). |
"fail" | Invalid — cancel the stream immediately and record a streaming failure. |
After a natural stream end, validate() is called on every non-"fail"
requirement (both "pass" and "unknown"). This means "pass" from
stream_validate does not replace the final validate() call.
See also: The Requirements System — Streaming validation
See also: Tutorial 02: Streaming and Async | act() and aact()