Skip to main content
In this tutorial you take the feedback analysis pipeline from Tutorial 01 and make it production-ready: non-blocking async calls, token-by-token streaming to a UI, and concurrent batch processing. By the end you will have covered:
  • ainstruct() and the async session method naming convention
  • ModelOption.STREAM and mot.astream() for incremental output
  • wait_for_all_mots for fan-out concurrent generation
  • Context behaviour with concurrent async calls
Prerequisites: Tutorial 01 complete, pip install mellea, Ollama running locally with granite4:micro downloaded.

Step 1: Your first async call

Every sync method on MelleaSession has an a-prefixed async counterpart with the same signature and return type. Replace instruct() with ainstruct() and wrap the call in async def:
import asyncio
import mellea

async def main():
    m = mellea.start_session()
    result = await m.ainstruct(
        "Summarise this customer feedback in one sentence: "
        "The onboarding was confusing and took far too long. "
        "Support was helpful once I got through."
    )
    print(str(result))
    # Output will vary — LLM responses depend on model and temperature.

asyncio.run(main())
ainstruct() returns a ModelOutputThunk. await-ing it starts generation immediately; str(result) resolves the value when it is ready. Every other method follows the same pattern: achat(), aact(), aquery(), atransform(), avalidate().

Step 2: Streaming tokens

Enable streaming by passing ModelOption.STREAM: True in model_options. Consume chunks with mot.astream() as they arrive — useful for displaying output progressively rather than waiting for the full response:
import asyncio
import mellea
from mellea.backends import ModelOption

async def stream_summary(feedback: str) -> str:
    m = mellea.start_session()
    mot = await m.ainstruct(
        "Summarise this customer feedback in one sentence: {{text}}",
        user_variables={"text": feedback},
        model_options={ModelOption.STREAM: True},
    )

    chunks = []
    while not mot.is_computed():
        chunk = await mot.astream()
        print(chunk, end="", flush=True)
        chunks.append(chunk)
    print()  # newline after streaming completes

    return "".join(chunks)

asyncio.run(stream_summary(
    "The onboarding was confusing and took far too long. "
    "Support was helpful once I got through."
))
# Output will vary — LLM responses depend on model and temperature.
How astream() works:
  • Each call returns only the new content since the previous call.
  • When generation is complete, is_computed() returns True and the final astream() call returns the remaining content.
  • Do not call astream() from multiple coroutines on the same thunk simultaneously.

Step 3: Concurrent batch processing

The pipeline from Tutorial 01 processes one feedback item at a time, and each call blocks until the previous one completes. With ainstruct() you can fire all calls immediately and resolve them together. Use wait_for_all_mots to await a list of thunks concurrently:
import asyncio
import mellea
from mellea.helpers.async_helpers import wait_for_all_mots

FEEDBACK_BATCH = [
    "The onboarding was confusing and took far too long. Support was helpful once I got through.",
    "Product works great but the mobile app crashes frequently. No response from support.",
    "Fast delivery, exactly as described. Will order again.",
    "Billing charged me twice. Still waiting for a refund after two weeks.",
]

async def summarise_batch(items: list[str]) -> list[str]:
    m = mellea.start_session()

    # Fire all summarisation calls immediately — none waits for the others.
    thunks = []
    for item in items:
        thunk = await m.ainstruct(
            "Summarise this customer feedback in one sentence: {{text}}",
            user_variables={"text": item},
        )
        thunks.append(thunk)

    # None are resolved yet — all are generating in parallel.
    await wait_for_all_mots(thunks)

    # All thunks are now resolved.
    return [t.value for t in thunks]

summaries = asyncio.run(summarise_batch(FEEDBACK_BATCH))
for summary in summaries:
    print(summary)
# Output will vary — LLM responses depend on model and temperature.
The four requests are in flight simultaneously. Total wall-clock time is roughly the latency of the slowest single call, rather than the sum of all four.

Step 4: Mixing parallel and sequential steps

Some pipeline steps are independent; others depend on earlier results. You can resolve dependencies explicitly without blocking unrelated work. In the Tutorial 01 pipeline, extract_issues is independent of summarize — both take the raw feedback. Run them in parallel, then feed the resolved summary into classify_sentiment:
import asyncio
from typing import Literal

import mellea
from mellea import generative
from mellea.helpers.async_helpers import wait_for_all_mots


@generative
def classify_sentiment(summary: str) -> Literal["positive", "negative", "mixed"]:
    """Classify the overall sentiment of the customer feedback summary."""


async def analyze_feedback(feedback: str) -> None:
    m = mellea.start_session()

    # Fire summarise and extract_issues in parallel — both take raw feedback.
    summary_thunk = await m.ainstruct(
        "Summarise this customer feedback in one sentence: {{text}}",
        user_variables={"text": feedback},
    )
    issues_thunk = await m.ainstruct(
        "Extract JSON with main_complaint, positive_aspect, and urgency from: {{text}}",
        user_variables={"text": feedback},
    )

    await wait_for_all_mots([summary_thunk, issues_thunk])

    summary = summary_thunk.value

    # classify_sentiment depends on the resolved summary — run it after.
    sentiment = classify_sentiment(m, summary=summary)

    print(f"Summary:   {summary}")
    print(f"Sentiment: {str(sentiment)}")
    print(f"Issues:    {issues_thunk.value}")
    # Output will vary — LLM responses depend on model and temperature.


asyncio.run(analyze_feedback(
    "The onboarding was confusing and took far too long. "
    "Support was helpful once I got through."
))

Step 5: Context and concurrency

By default start_session() uses SimpleContext, which is safe for concurrent async calls. If you switch to ChatContext, Mellea logs a warning because concurrent writes can corrupt the context state:
WARNING: Not using a SimpleContext with asynchronous requests could cause
unexpected results due to stale contexts. Ensure you await between requests.
Note: This warning appears whenever ChatContext is used with async methods, even if you await each call sequentially. It is safe to ignore when you ensure each call is fully resolved before starting the next.
If you need ChatContext (for multi-turn conversation), await each call before starting the next:
import asyncio
import mellea
from mellea.stdlib.context import ChatContext

async def sequential_chat():
    m = mellea.start_session(ctx=ChatContext())
    r1 = await m.achat("Hello.")
    r2 = await m.achat("Tell me more.")  # safe — r1 is fully resolved
    print(str(r2))
    # Output will vary — LLM responses depend on model and temperature.

asyncio.run(sequential_chat())
For parallel generation, keep the default SimpleContext.

What you built

PatternWhat it gives you
ainstruct() / achat() / aact()Non-blocking LLM calls
ModelOption.STREAM + astream()Token-by-token output for responsive UIs
wait_for_all_motsFan-out: all thunks resolve concurrently
Explicit dependency orderingSequential where needed, parallel everywhere else
SimpleContext (default)Safe concurrent access with no state corruption

See also: Async and Streaming (full API reference) | Tutorial 03: Using Generative Slots