Zum Inhalt springen
EN DE

Challenge 8.2: Streaming to Frontend

Wenn ein Workflow 3 Schritte hat und jeder Schritt 5-10 Sekunden dauert — wie zeigst Du dem User den Fortschritt, nicht nur das Endergebnis?

Workflow API fuehrt 3 Steps aus und streamt Fortschritt und Ergebnisse zum Frontend

Statt 30 Sekunden Stille bekommt der User nach jedem Step ein Update. Die ersten zwei Steps laufen im Hintergrund (nur Fortschritt), der dritte Step streamt den Text direkt ins UI.

Ohne Progress-Streaming: User wartet 30 Sekunden auf schwarzen Bildschirm. Keine Rueckmeldung, ob etwas passiert. User drueckt Reload, startet die Pipeline nochmal, verdoppelt die Kosten.

Mit Progress-Streaming: Echtzeit-Updates pro Step. “Recherche laeuft…” → “Zusammenfassung…” → “Formatierung…” → Text erscheint. Der User sieht, dass etwas passiert, und weiss wo im Prozess die Pipeline steht.

Schicht 1: createDataStream für Workflow-Fortschritt

Abschnitt betitelt „Schicht 1: createDataStream für Workflow-Fortschritt“

In Level 7.1 hast Du createDataStream und writeData kennengelernt. Jetzt nutzen wir sie, um Workflow-Steps zu tracken:

import { createDataStream, generateText, streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
const model = anthropic('claude-sonnet-4-5-20250514');
export async function POST(req: Request) {
const { topic } = await req.json();
const dataStream = createDataStream({
async execute(dataStream) {
// Step 1: Research
dataStream.writeData({ step: 1, total: 3, label: 'Recherche laeuft...' });
const research = await generateText({
model,
system: 'Du bist ein Research-Assistent. Sammle Fakten zum Thema.',
prompt: `Recherchiere: ${topic}`,
});
dataStream.writeData({ step: 1, total: 3, status: 'done' });
// Step 2: Summarize
dataStream.writeData({ step: 2, total: 3, label: 'Zusammenfassung...' });
const summary = await generateText({
model,
system: 'Fasse die Informationen in 5 Kernaussagen zusammen.',
prompt: research.text,
});
dataStream.writeData({ step: 2, total: 3, status: 'done' });
// Step 3: Format — diesen Step streamen wir als Text
dataStream.writeData({ step: 3, total: 3, label: 'Formatierung...' });
const result = streamText({ // ← streamText statt generateText
model,
system: 'Formatiere als professionelle E-Mail.',
prompt: summary.text,
onFinish() {
dataStream.writeData({ step: 3, total: 3, status: 'done' });
},
});
result.mergeIntoDataStream(dataStream); // ← Text-Stream einfuegen
},
});
return dataStream.toDataStreamResponse();
}

Der entscheidende Trick: Die ersten zwei Steps nutzen generateText (wir brauchen das Ergebnis als String für den nächsten Step). Der letzte Step nutzt streamText + mergeIntoDataStream, damit der User den finalen Text in Echtzeit sieht.

Im Frontend konsumierst Du die Data Parts wie in Level 7.1 — über den data-Array von useChat:

'use client';
import { useChat } from '@ai-sdk/react';
export function PipelineUI() {
const { messages, input, handleInputChange, handleSubmit, data, isLoading } = useChat({
api: '/api/pipeline',
});
// Aktuellen Fortschritt aus Data Parts extrahieren
const progressParts = data?.filter((d: any) => d.step !== undefined) ?? [];
const currentStep = progressParts.at(-1);
return (
<div>
{/* Fortschrittsanzeige */}
{isLoading && currentStep && (
<div className="progress">
<div className="steps">
{[1, 2, 3].map((step) => {
const stepData = progressParts.filter((d: any) => d.step === step);
const isDone = stepData.some((d: any) => d.status === 'done');
const isActive = currentStep.step === step && !isDone;
return (
<div
key={step}
className={`step ${isDone ? 'done' : ''} ${isActive ? 'active' : ''}`}
>
Step {step}: {isDone ? 'Fertig' : isActive ? currentStep.label : 'Ausstehend'}
</div>
);
})}
</div>
<div>Schritt {currentStep.step} von {currentStep.total}</div>
</div>
)}
{/* Messages */}
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} placeholder="Thema eingeben..." />
</form>
</div>
);
}

Für CLI-Tests ohne Next.js kannst Du den Data Stream direkt konsumieren:

import { createDataStream, generateText, streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
const model = anthropic('claude-sonnet-4-5-20250514');
const dataStream = createDataStream({
async execute(dataStream) {
// Step 1
dataStream.writeData({ step: 1, label: 'Recherche...' });
const research = await generateText({
model,
system: 'Recherchiere Fakten zum Thema.',
prompt: 'Recherchiere: Edge Computing',
});
dataStream.writeData({ step: 1, status: 'done' });
// Step 2
dataStream.writeData({ step: 2, label: 'Zusammenfassung...' });
const summary = await generateText({
model,
system: 'Fasse in 3 Saetzen zusammen.',
prompt: research.text,
});
dataStream.writeData({ step: 2, status: 'done' });
// Step 3: Als Text streamen
dataStream.writeData({ step: 3, label: 'Formatierung...' });
const result = streamText({
model,
system: 'Formatiere als kurze E-Mail.',
prompt: summary.text,
onFinish() {
dataStream.writeData({ step: 3, status: 'done' });
},
});
result.mergeIntoDataStream(dataStream);
},
});
// Stream im Terminal lesen
const reader = dataStream.toDataStream().getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
process.stdout.write(decoder.decode(value));
}

Im Terminal siehst Du die Data Parts als JSON-Zeilen und den gestreamten Text — alles in einem Stream, in der richtigen Reihenfolge.

Aufgabe: Erweitere die 3-Step Pipeline aus Challenge 8.1 mit Fortschritts-Streaming. Sende vor und nach jedem Step ein Custom Data Part.

Erstelle die Datei streaming-pipeline.ts und fuehre sie aus mit npx tsx streaming-pipeline.ts.

import { createDataStream, generateText, streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
const model = anthropic('claude-sonnet-4-5-20250514');
// TODO 1: Erstelle einen dataStream mit createDataStream
// TODO 2: In der execute-Funktion:
// - Sende ein Data Part { step: 1, label: 'Recherche...' }
// - Fuehre Step 1 (Research) mit generateText aus
// - Sende ein Data Part { step: 1, status: 'done' }
// TODO 3: Wiederhole für Step 2 (Summarize)
// TODO 4: Für Step 3 (Translate):
// - Nutze streamText statt generateText
// - Merge den Stream mit mergeIntoDataStream
// TODO 5: Konsumiere den Stream im Terminal

Checkliste:

  • createDataStream erstellt mit execute Callback
  • Vor jedem Step wird ein Data Part mit Label gesendet
  • Nach jedem Step wird ein Data Part mit Status “done” gesendet
  • Step 1 und 2 nutzen generateText, Step 3 nutzt streamText
  • Der letzte Step wird mit mergeIntoDataStream in den Data Stream eingefuegt
Lösung anzeigen
import { createDataStream, generateText, streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
const model = anthropic('claude-sonnet-4-5-20250514');
const topic = 'Kuenstliche Intelligenz in der Medizin';
const dataStream = createDataStream({
async execute(dataStream) {
// Step 1: Research
dataStream.writeData({ step: 1, total: 3, label: 'Recherche laeuft...' });
const research = await generateText({
model,
system: 'Du bist ein Research-Assistent. Sammle Fakten und aktuelle Entwicklungen zum Thema.',
prompt: `Recherchiere: ${topic}`,
});
dataStream.writeData({ step: 1, total: 3, status: 'done', tokens: research.usage.totalTokens });
// Step 2: Summarize
dataStream.writeData({ step: 2, total: 3, label: 'Zusammenfassung...' });
const summary = await generateText({
model,
system: 'Fasse die folgenden Informationen in exakt 5 Kernaussagen zusammen.',
prompt: `Fasse zusammen:\n\n${research.text}`,
});
dataStream.writeData({ step: 2, total: 3, status: 'done', tokens: summary.usage.totalTokens });
// Step 3: Translate (gestreamt)
dataStream.writeData({ step: 3, total: 3, label: 'Uebersetzung...' });
const result = streamText({
model,
system: 'Translate the following German text to English. Keep the professional tone.',
prompt: `Translate:\n\n${summary.text}`,
onFinish({ usage }) {
dataStream.writeData({ step: 3, total: 3, status: 'done', tokens: usage.totalTokens });
},
});
result.mergeIntoDataStream(dataStream);
},
});
// Stream im Terminal lesen
const reader = dataStream.toDataStream().getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
process.stdout.write(decoder.decode(value));
}
console.log('\n--- Pipeline beendet ---');

Erwarteter Output (ungefaehr):

2:[{"step":1,"total":3,"label":"Recherche laeuft..."}]
2:[{"step":1,"total":3,"status":"done","tokens":342}]
2:[{"step":2,"total":3,"label":"Zusammenfassung..."}]
2:[{"step":2,"total":3,"status":"done","tokens":187}]
2:[{"step":3,"total":3,"label":"Uebersetzung..."}]
0:"The "
0:"analysis "
0:"shows..."
2:[{"step":3,"total":3,"status":"done","tokens":156}]
--- Pipeline beendet ---

Erklärung: createDataStream oeffnet einen gemischten Kanal. writeData sendet Custom Data Parts (JSON-Zeilen, Prefix 2:) vor und nach jedem Step. streamText + mergeIntoDataStream fuegt den Text-Stream des letzten Steps ein (Prefix 0:). Der User sieht: Fortschritts-Updates für Step 1 und 2, dann den gestreamten Text von Step 3 — alles in Echtzeit.

Combine: Topic in 3-Step Pipeline, dann createDataStream mit Progress Data Parts und streamText, beides ins Frontend

Uebung: Kombiniere Workflow-Streaming mit smoothStream aus Level 7.3. Für den letzten Step (der den Text streamt):

  1. Nutze experimental_transform: smoothStream() in streamText für fluessigere Textausgabe
  2. Sende zusätzlich zu den Step-Updates ein Data Part mit der Gesamtdauer: { type: 'stats', durationMs: Date.now() - startTime }
  3. Sende ein Data Part mit dem Gesamt-Token-Verbrauch aller drei Steps

Optional Stretch Goal: Baue die Pipeline als Next.js API Route und konsumiere sie mit useChat. Zeige einen visuellen Fortschrittsbalken mit drei Segmenten, die sich nacheinander gruen faerben.

Part of AI Learning — free courses from prompt to production. Jan on LinkedIn