Server Task Implementation
Experimental
Tasks are an experimental feature. The API may change without notice.
This guide covers implementing task support in MCP servers, from basic setup to advanced patterns like elicitation and sampling within tasks.
Quick Start
The simplest way to add task support:
from mcp.server import Server
from mcp.server.experimental.task_context import ServerTaskContext
from mcp.types import CallToolResult, CreateTaskResult, TextContent, Tool, ToolExecution, TASK_REQUIRED
server = Server("my-server")
server.experimental.enable_tasks() # Registers all task handlers automatically
@server.list_tools()
async def list_tools():
return [
Tool(
name="process_data",
description="Process data asynchronously",
inputSchema={"type": "object", "properties": {"input": {"type": "string"}}},
execution=ToolExecution(taskSupport=TASK_REQUIRED),
)
]
@server.call_tool()
async def handle_tool(name: str, arguments: dict) -> CallToolResult | CreateTaskResult:
if name == "process_data":
return await handle_process_data(arguments)
return CallToolResult(content=[TextContent(type="text", text=f"Unknown: {name}")], isError=True)
async def handle_process_data(arguments: dict) -> CreateTaskResult:
ctx = server.request_context
ctx.experimental.validate_task_mode(TASK_REQUIRED)
async def work(task: ServerTaskContext) -> CallToolResult:
await task.update_status("Processing...")
result = arguments.get("input", "").upper()
return CallToolResult(content=[TextContent(type="text", text=result)])
return await ctx.experimental.run_task(work)
That's it. enable_tasks() automatically:
- Creates an in-memory task store
- Registers handlers for
tasks/get,tasks/result,tasks/list,tasks/cancel - Updates server capabilities
Tool Declaration
Tools declare task support via the execution.taskSupport field:
from mcp.types import Tool, ToolExecution, TASK_REQUIRED, TASK_OPTIONAL, TASK_FORBIDDEN
Tool(
name="my_tool",
inputSchema={"type": "object"},
execution=ToolExecution(taskSupport=TASK_REQUIRED), # or TASK_OPTIONAL, TASK_FORBIDDEN
)
| Value | Meaning |
|---|---|
TASK_REQUIRED |
Tool must be called as a task |
TASK_OPTIONAL |
Tool supports both sync and task execution |
TASK_FORBIDDEN |
Tool cannot be called as a task (default) |
Validate the request matches your tool's requirements:
@server.call_tool()
async def handle_tool(name: str, arguments: dict):
ctx = server.request_context
if name == "required_task_tool":
ctx.experimental.validate_task_mode(TASK_REQUIRED) # Raises if not task mode
return await handle_as_task(arguments)
elif name == "optional_task_tool":
if ctx.experimental.is_task:
return await handle_as_task(arguments)
else:
return handle_sync(arguments)
The run_task Pattern
run_task() is the recommended way to execute task work:
async def handle_my_tool(arguments: dict) -> CreateTaskResult:
ctx = server.request_context
ctx.experimental.validate_task_mode(TASK_REQUIRED)
async def work(task: ServerTaskContext) -> CallToolResult:
# Your work here
return CallToolResult(content=[TextContent(type="text", text="Done")])
return await ctx.experimental.run_task(work)
What run_task() does:
- Creates a task in the store
- Spawns your work function in the background
- Returns
CreateTaskResultimmediately - Auto-completes the task when your function returns
- Auto-fails the task if your function raises
The ServerTaskContext provides:
task.task_id- The task identifiertask.update_status(message)- Update progresstask.complete(result)- Explicitly complete (usually automatic)task.fail(error)- Explicitly failtask.is_cancelled- Check if cancellation requested
Status Updates
Keep clients informed of progress:
async def work(task: ServerTaskContext) -> CallToolResult:
await task.update_status("Starting...")
for i, item in enumerate(items):
await task.update_status(f"Processing {i+1}/{len(items)}")
await process_item(item)
await task.update_status("Finalizing...")
return CallToolResult(content=[TextContent(type="text", text="Complete")])
Status messages appear in tasks/get responses, letting clients show progress to users.
Elicitation Within Tasks
Tasks can request user input via elicitation. This transitions the task to input_required status.
Form Elicitation
Collect structured data from the user:
async def work(task: ServerTaskContext) -> CallToolResult:
await task.update_status("Waiting for confirmation...")
result = await task.elicit(
message="Delete these files?",
requestedSchema={
"type": "object",
"properties": {
"confirm": {"type": "boolean"},
"reason": {"type": "string"},
},
"required": ["confirm"],
},
)
if result.action == "accept" and result.content.get("confirm"):
# User confirmed
return CallToolResult(content=[TextContent(type="text", text="Files deleted")])
else:
# User declined or cancelled
return CallToolResult(content=[TextContent(type="text", text="Cancelled")])
URL Elicitation
Direct users to external URLs for OAuth, payments, or other out-of-band flows:
async def work(task: ServerTaskContext) -> CallToolResult:
await task.update_status("Waiting for OAuth...")
result = await task.elicit_url(
message="Please authorize with GitHub",
url="https://github.com/login/oauth/authorize?client_id=...",
elicitation_id="oauth-github-123",
)
if result.action == "accept":
# User completed OAuth flow
return CallToolResult(content=[TextContent(type="text", text="Connected to GitHub")])
else:
return CallToolResult(content=[TextContent(type="text", text="OAuth cancelled")])
Sampling Within Tasks
Tasks can request LLM completions from the client:
from mcp.types import SamplingMessage, TextContent
async def work(task: ServerTaskContext) -> CallToolResult:
await task.update_status("Generating response...")
result = await task.create_message(
messages=[
SamplingMessage(
role="user",
content=TextContent(type="text", text="Write a haiku about coding"),
)
],
max_tokens=100,
)
haiku = result.content.text if isinstance(result.content, TextContent) else "Error"
return CallToolResult(content=[TextContent(type="text", text=haiku)])
Sampling supports additional parameters:
result = await task.create_message(
messages=[...],
max_tokens=500,
system_prompt="You are a helpful assistant",
temperature=0.7,
stop_sequences=["\n\n"],
model_preferences=ModelPreferences(hints=[ModelHint(name="claude-3")]),
)
Cancellation Support
Check for cancellation in long-running work:
async def work(task: ServerTaskContext) -> CallToolResult:
for i in range(1000):
if task.is_cancelled:
# Clean up and exit
return CallToolResult(content=[TextContent(type="text", text="Cancelled")])
await task.update_status(f"Step {i}/1000")
await process_step(i)
return CallToolResult(content=[TextContent(type="text", text="Complete")])
The SDK's default cancel handler updates the task status. Your work function should check is_cancelled periodically.
Custom Task Store
For production, implement TaskStore with persistent storage:
from mcp.shared.experimental.tasks.store import TaskStore
from mcp.types import Task, TaskMetadata, Result
class RedisTaskStore(TaskStore):
def __init__(self, redis_client):
self.redis = redis_client
async def create_task(self, metadata: TaskMetadata, task_id: str | None = None) -> Task:
# Create and persist task
...
async def get_task(self, task_id: str) -> Task | None:
# Retrieve task from Redis
...
async def update_task(self, task_id: str, status: str | None = None, ...) -> Task:
# Update and persist
...
async def store_result(self, task_id: str, result: Result) -> None:
# Store result in Redis
...
async def get_result(self, task_id: str) -> Result | None:
# Retrieve result
...
# ... implement remaining methods
Use your custom store:
store = RedisTaskStore(redis_client)
server.experimental.enable_tasks(store=store)
Complete Example
A server with multiple task-supporting tools:
from mcp.server import Server
from mcp.server.experimental.task_context import ServerTaskContext
from mcp.types import (
CallToolResult, CreateTaskResult, TextContent, Tool, ToolExecution,
SamplingMessage, TASK_REQUIRED,
)
server = Server("task-demo")
server.experimental.enable_tasks()
@server.list_tools()
async def list_tools():
return [
Tool(
name="confirm_action",
description="Requires user confirmation",
inputSchema={"type": "object", "properties": {"action": {"type": "string"}}},
execution=ToolExecution(taskSupport=TASK_REQUIRED),
),
Tool(
name="generate_text",
description="Generate text via LLM",
inputSchema={"type": "object", "properties": {"prompt": {"type": "string"}}},
execution=ToolExecution(taskSupport=TASK_REQUIRED),
),
]
async def handle_confirm_action(arguments: dict) -> CreateTaskResult:
ctx = server.request_context
ctx.experimental.validate_task_mode(TASK_REQUIRED)
action = arguments.get("action", "unknown action")
async def work(task: ServerTaskContext) -> CallToolResult:
result = await task.elicit(
message=f"Confirm: {action}?",
requestedSchema={
"type": "object",
"properties": {"confirm": {"type": "boolean"}},
"required": ["confirm"],
},
)
if result.action == "accept" and result.content.get("confirm"):
return CallToolResult(content=[TextContent(type="text", text=f"Executed: {action}")])
return CallToolResult(content=[TextContent(type="text", text="Cancelled")])
return await ctx.experimental.run_task(work)
async def handle_generate_text(arguments: dict) -> CreateTaskResult:
ctx = server.request_context
ctx.experimental.validate_task_mode(TASK_REQUIRED)
prompt = arguments.get("prompt", "Hello")
async def work(task: ServerTaskContext) -> CallToolResult:
await task.update_status("Generating...")
result = await task.create_message(
messages=[SamplingMessage(role="user", content=TextContent(type="text", text=prompt))],
max_tokens=200,
)
text = result.content.text if isinstance(result.content, TextContent) else "Error"
return CallToolResult(content=[TextContent(type="text", text=text)])
return await ctx.experimental.run_task(work)
@server.call_tool()
async def handle_tool(name: str, arguments: dict) -> CallToolResult | CreateTaskResult:
if name == "confirm_action":
return await handle_confirm_action(arguments)
elif name == "generate_text":
return await handle_generate_text(arguments)
return CallToolResult(content=[TextContent(type="text", text=f"Unknown: {name}")], isError=True)
Error Handling in Tasks
Tasks handle errors automatically, but you can also fail explicitly:
async def work(task: ServerTaskContext) -> CallToolResult:
try:
result = await risky_operation()
return CallToolResult(content=[TextContent(type="text", text=result)])
except PermissionError:
await task.fail("Access denied - insufficient permissions")
raise
except TimeoutError:
await task.fail("Operation timed out after 30 seconds")
raise
When run_task() catches an exception, it automatically:
- Marks the task as
failed - Sets
statusMessageto the exception message - Propagates the exception (which is caught by the task group)
For custom error messages, call task.fail() before raising.
HTTP Transport Example
For web applications, use the Streamable HTTP transport:
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Mount
from mcp.server import Server
from mcp.server.experimental.task_context import ServerTaskContext
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import (
CallToolResult, CreateTaskResult, TextContent, Tool, ToolExecution, TASK_REQUIRED,
)
server = Server("http-task-server")
server.experimental.enable_tasks()
@server.list_tools()
async def list_tools():
return [
Tool(
name="long_operation",
description="A long-running operation",
inputSchema={"type": "object", "properties": {"duration": {"type": "number"}}},
execution=ToolExecution(taskSupport=TASK_REQUIRED),
)
]
async def handle_long_operation(arguments: dict) -> CreateTaskResult:
ctx = server.request_context
ctx.experimental.validate_task_mode(TASK_REQUIRED)
duration = arguments.get("duration", 5)
async def work(task: ServerTaskContext) -> CallToolResult:
import anyio
for i in range(int(duration)):
await task.update_status(f"Step {i+1}/{int(duration)}")
await anyio.sleep(1)
return CallToolResult(content=[TextContent(type="text", text=f"Completed after {duration}s")])
return await ctx.experimental.run_task(work)
@server.call_tool()
async def handle_tool(name: str, arguments: dict) -> CallToolResult | CreateTaskResult:
if name == "long_operation":
return await handle_long_operation(arguments)
return CallToolResult(content=[TextContent(type="text", text=f"Unknown: {name}")], isError=True)
def create_app():
session_manager = StreamableHTTPSessionManager(app=server)
@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
async with session_manager.run():
yield
return Starlette(
routes=[Mount("/mcp", app=session_manager.handle_request)],
lifespan=lifespan,
)
if __name__ == "__main__":
uvicorn.run(create_app(), host="127.0.0.1", port=8000)
Testing Task Servers
Test task functionality with the SDK's testing utilities:
import pytest
import anyio
from mcp.client.session import ClientSession
from mcp.types import CallToolResult
@pytest.mark.anyio
async def test_task_tool():
server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream(10)
client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream(10)
async def run_server():
await server.run(
client_to_server_receive,
server_to_client_send,
server.create_initialization_options(),
)
async def run_client():
async with ClientSession(server_to_client_receive, client_to_server_send) as session:
await session.initialize()
# Call the tool as a task
result = await session.experimental.call_tool_as_task("my_tool", {"arg": "value"})
task_id = result.task.taskId
assert result.task.status == "working"
# Poll until complete
async for status in session.experimental.poll_task(task_id):
if status.status in ("completed", "failed"):
break
# Get result
final = await session.experimental.get_task_result(task_id, CallToolResult)
assert len(final.content) > 0
async with anyio.create_task_group() as tg:
tg.start_soon(run_server)
tg.start_soon(run_client)
Best Practices
Keep Work Functions Focused
# Good: focused work function
async def work(task: ServerTaskContext) -> CallToolResult:
await task.update_status("Validating...")
validate_input(arguments)
await task.update_status("Processing...")
result = await process_data(arguments)
return CallToolResult(content=[TextContent(type="text", text=result)])
Check Cancellation in Loops
async def work(task: ServerTaskContext) -> CallToolResult:
results = []
for item in large_dataset:
if task.is_cancelled:
return CallToolResult(content=[TextContent(type="text", text="Cancelled")])
results.append(await process(item))
return CallToolResult(content=[TextContent(type="text", text=str(results))])
Use Meaningful Status Messages
async def work(task: ServerTaskContext) -> CallToolResult:
await task.update_status("Connecting to database...")
db = await connect()
await task.update_status("Fetching records (0/1000)...")
for i, record in enumerate(records):
if i % 100 == 0:
await task.update_status(f"Processing records ({i}/1000)...")
await process(record)
await task.update_status("Finalizing results...")
return CallToolResult(content=[TextContent(type="text", text="Done")])
Handle Elicitation Responses
async def work(task: ServerTaskContext) -> CallToolResult:
result = await task.elicit(message="Continue?", requestedSchema={...})
match result.action:
case "accept":
# User accepted, process content
return await process_accepted(result.content)
case "decline":
# User explicitly declined
return CallToolResult(content=[TextContent(type="text", text="User declined")])
case "cancel":
# User cancelled the elicitation
return CallToolResult(content=[TextContent(type="text", text="Cancelled")])
Next Steps
- Client Usage - Learn how clients interact with task servers
- Tasks Overview - Review lifecycle and concepts