Challenge 8.2: Streaming to Frontend
Wenn ein Workflow 3 Schritte hat und jeder Schritt 5-10 Sekunden dauert — wie zeigst Du dem User den Fortschritt, nicht nur das Endergebnis?
OVERVIEW
Abschnitt betitelt „OVERVIEW“Statt 30 Sekunden Stille bekommt der User nach jedem Step ein Update. Die ersten zwei Steps laufen im Hintergrund (nur Fortschritt), der dritte Step streamt den Text direkt ins UI.
Ohne Progress-Streaming: User wartet 30 Sekunden auf schwarzen Bildschirm. Keine Rueckmeldung, ob etwas passiert. User drueckt Reload, startet die Pipeline nochmal, verdoppelt die Kosten.
Mit Progress-Streaming: Echtzeit-Updates pro Step. “Recherche laeuft…” → “Zusammenfassung…” → “Formatierung…” → Text erscheint. Der User sieht, dass etwas passiert, und weiss wo im Prozess die Pipeline steht.
WALKTHROUGH
Abschnitt betitelt „WALKTHROUGH“Schicht 1: createDataStream für Workflow-Fortschritt
Abschnitt betitelt „Schicht 1: createDataStream für Workflow-Fortschritt“In Level 7.1 hast Du createDataStream und writeData kennengelernt. Jetzt nutzen wir sie, um Workflow-Steps zu tracken:
import { createDataStream, generateText, streamText } from 'ai';import { anthropic } from '@ai-sdk/anthropic';
const model = anthropic('claude-sonnet-4-5-20250514');
export async function POST(req: Request) { const { topic } = await req.json();
const dataStream = createDataStream({ async execute(dataStream) { // Step 1: Research dataStream.writeData({ step: 1, total: 3, label: 'Recherche laeuft...' });
const research = await generateText({ model, system: 'Du bist ein Research-Assistent. Sammle Fakten zum Thema.', prompt: `Recherchiere: ${topic}`, });
dataStream.writeData({ step: 1, total: 3, status: 'done' });
// Step 2: Summarize dataStream.writeData({ step: 2, total: 3, label: 'Zusammenfassung...' });
const summary = await generateText({ model, system: 'Fasse die Informationen in 5 Kernaussagen zusammen.', prompt: research.text, });
dataStream.writeData({ step: 2, total: 3, status: 'done' });
// Step 3: Format — diesen Step streamen wir als Text dataStream.writeData({ step: 3, total: 3, label: 'Formatierung...' });
const result = streamText({ // ← streamText statt generateText model, system: 'Formatiere als professionelle E-Mail.', prompt: summary.text, onFinish() { dataStream.writeData({ step: 3, total: 3, status: 'done' }); }, });
result.mergeIntoDataStream(dataStream); // ← Text-Stream einfuegen }, });
return dataStream.toDataStreamResponse();}Der entscheidende Trick: Die ersten zwei Steps nutzen generateText (wir brauchen das Ergebnis als String für den nächsten Step). Der letzte Step nutzt streamText + mergeIntoDataStream, damit der User den finalen Text in Echtzeit sieht.
Schicht 2: Frontend-Konsum
Abschnitt betitelt „Schicht 2: Frontend-Konsum“Im Frontend konsumierst Du die Data Parts wie in Level 7.1 — über den data-Array von useChat:
'use client';import { useChat } from '@ai-sdk/react';
export function PipelineUI() { const { messages, input, handleInputChange, handleSubmit, data, isLoading } = useChat({ api: '/api/pipeline', });
// Aktuellen Fortschritt aus Data Parts extrahieren const progressParts = data?.filter((d: any) => d.step !== undefined) ?? []; const currentStep = progressParts.at(-1);
return ( <div> {/* Fortschrittsanzeige */} {isLoading && currentStep && ( <div className="progress"> <div className="steps"> {[1, 2, 3].map((step) => { const stepData = progressParts.filter((d: any) => d.step === step); const isDone = stepData.some((d: any) => d.status === 'done'); const isActive = currentStep.step === step && !isDone; return ( <div key={step} className={`step ${isDone ? 'done' : ''} ${isActive ? 'active' : ''}`} > Step {step}: {isDone ? 'Fertig' : isActive ? currentStep.label : 'Ausstehend'} </div> ); })} </div> <div>Schritt {currentStep.step} von {currentStep.total}</div> </div> )}
{/* Messages */} {messages.map((m) => ( <div key={m.id}> <strong>{m.role}:</strong> {m.content} </div> ))}
<form onSubmit={handleSubmit}> <input value={input} onChange={handleInputChange} placeholder="Thema eingeben..." /> </form> </div> );}Schicht 3: CLI-Variante ohne Frontend
Abschnitt betitelt „Schicht 3: CLI-Variante ohne Frontend“Für CLI-Tests ohne Next.js kannst Du den Data Stream direkt konsumieren:
import { createDataStream, generateText, streamText } from 'ai';import { anthropic } from '@ai-sdk/anthropic';
const model = anthropic('claude-sonnet-4-5-20250514');
const dataStream = createDataStream({ async execute(dataStream) { // Step 1 dataStream.writeData({ step: 1, label: 'Recherche...' }); const research = await generateText({ model, system: 'Recherchiere Fakten zum Thema.', prompt: 'Recherchiere: Edge Computing', }); dataStream.writeData({ step: 1, status: 'done' });
// Step 2 dataStream.writeData({ step: 2, label: 'Zusammenfassung...' }); const summary = await generateText({ model, system: 'Fasse in 3 Saetzen zusammen.', prompt: research.text, }); dataStream.writeData({ step: 2, status: 'done' });
// Step 3: Als Text streamen dataStream.writeData({ step: 3, label: 'Formatierung...' }); const result = streamText({ model, system: 'Formatiere als kurze E-Mail.', prompt: summary.text, onFinish() { dataStream.writeData({ step: 3, status: 'done' }); }, }); result.mergeIntoDataStream(dataStream); },});
// Stream im Terminal lesenconst reader = dataStream.toDataStream().getReader();const decoder = new TextDecoder();
while (true) { const { done, value } = await reader.read(); if (done) break; process.stdout.write(decoder.decode(value));}Im Terminal siehst Du die Data Parts als JSON-Zeilen und den gestreamten Text — alles in einem Stream, in der richtigen Reihenfolge.
Aufgabe: Erweitere die 3-Step Pipeline aus Challenge 8.1 mit Fortschritts-Streaming. Sende vor und nach jedem Step ein Custom Data Part.
Erstelle die Datei streaming-pipeline.ts und fuehre sie aus mit npx tsx streaming-pipeline.ts.
import { createDataStream, generateText, streamText } from 'ai';import { anthropic } from '@ai-sdk/anthropic';
const model = anthropic('claude-sonnet-4-5-20250514');
// TODO 1: Erstelle einen dataStream mit createDataStream
// TODO 2: In der execute-Funktion:// - Sende ein Data Part { step: 1, label: 'Recherche...' }// - Fuehre Step 1 (Research) mit generateText aus// - Sende ein Data Part { step: 1, status: 'done' }
// TODO 3: Wiederhole für Step 2 (Summarize)
// TODO 4: Für Step 3 (Translate):// - Nutze streamText statt generateText// - Merge den Stream mit mergeIntoDataStream
// TODO 5: Konsumiere den Stream im TerminalCheckliste:
-
createDataStreamerstellt mitexecuteCallback - Vor jedem Step wird ein Data Part mit Label gesendet
- Nach jedem Step wird ein Data Part mit Status “done” gesendet
- Step 1 und 2 nutzen
generateText, Step 3 nutztstreamText - Der letzte Step wird mit
mergeIntoDataStreamin den Data Stream eingefuegt
Lösung anzeigen
import { createDataStream, generateText, streamText } from 'ai';import { anthropic } from '@ai-sdk/anthropic';
const model = anthropic('claude-sonnet-4-5-20250514');const topic = 'Kuenstliche Intelligenz in der Medizin';
const dataStream = createDataStream({ async execute(dataStream) { // Step 1: Research dataStream.writeData({ step: 1, total: 3, label: 'Recherche laeuft...' }); const research = await generateText({ model, system: 'Du bist ein Research-Assistent. Sammle Fakten und aktuelle Entwicklungen zum Thema.', prompt: `Recherchiere: ${topic}`, }); dataStream.writeData({ step: 1, total: 3, status: 'done', tokens: research.usage.totalTokens });
// Step 2: Summarize dataStream.writeData({ step: 2, total: 3, label: 'Zusammenfassung...' }); const summary = await generateText({ model, system: 'Fasse die folgenden Informationen in exakt 5 Kernaussagen zusammen.', prompt: `Fasse zusammen:\n\n${research.text}`, }); dataStream.writeData({ step: 2, total: 3, status: 'done', tokens: summary.usage.totalTokens });
// Step 3: Translate (gestreamt) dataStream.writeData({ step: 3, total: 3, label: 'Uebersetzung...' }); const result = streamText({ model, system: 'Translate the following German text to English. Keep the professional tone.', prompt: `Translate:\n\n${summary.text}`, onFinish({ usage }) { dataStream.writeData({ step: 3, total: 3, status: 'done', tokens: usage.totalTokens }); }, }); result.mergeIntoDataStream(dataStream); },});
// Stream im Terminal lesenconst reader = dataStream.toDataStream().getReader();const decoder = new TextDecoder();
while (true) { const { done, value } = await reader.read(); if (done) break; process.stdout.write(decoder.decode(value));}
console.log('\n--- Pipeline beendet ---');Erwarteter Output (ungefaehr):
2:[{"step":1,"total":3,"label":"Recherche laeuft..."}]2:[{"step":1,"total":3,"status":"done","tokens":342}]2:[{"step":2,"total":3,"label":"Zusammenfassung..."}]2:[{"step":2,"total":3,"status":"done","tokens":187}]2:[{"step":3,"total":3,"label":"Uebersetzung..."}]0:"The "0:"analysis "0:"shows..."2:[{"step":3,"total":3,"status":"done","tokens":156}]
--- Pipeline beendet ---Erklärung: createDataStream oeffnet einen gemischten Kanal. writeData sendet Custom Data Parts (JSON-Zeilen, Prefix 2:) vor und nach jedem Step. streamText + mergeIntoDataStream fuegt den Text-Stream des letzten Steps ein (Prefix 0:). Der User sieht: Fortschritts-Updates für Step 1 und 2, dann den gestreamten Text von Step 3 — alles in Echtzeit.
COMBINE
Abschnitt betitelt „COMBINE“Uebung: Kombiniere Workflow-Streaming mit smoothStream aus Level 7.3. Für den letzten Step (der den Text streamt):
- Nutze
experimental_transform: smoothStream()instreamTextfür fluessigere Textausgabe - Sende zusätzlich zu den Step-Updates ein Data Part mit der Gesamtdauer:
{ type: 'stats', durationMs: Date.now() - startTime } - Sende ein Data Part mit dem Gesamt-Token-Verbrauch aller drei Steps
Optional Stretch Goal: Baue die Pipeline als Next.js API Route und konsumiere sie mit useChat. Zeige einen visuellen Fortschrittsbalken mit drei Segmenten, die sich nacheinander gruen faerben.