跳至主要内容
小龙虾小龙虾AI
🤖

微服务模式

提供有关分解单体、服务间通信、数据管理和微服务架构中的弹性的指导和模式。

下载568
星标2
版本1.0.0
开发工具
安全通过
💬Prompt

技能说明


name: microservices-patterns model: reasoning

Microservices Patterns

WHAT

Patterns for building distributed systems: service decomposition, inter-service communication, data management, and resilience. Helps you avoid the "distributed monolith" anti-pattern.

WHEN

  • Decomposing a monolith into microservices
  • Designing service boundaries and contracts
  • Implementing inter-service communication
  • Managing distributed transactions
  • Building resilient distributed systems

KEYWORDS

microservices, service mesh, event-driven, saga, circuit breaker, API gateway, service discovery, distributed transactions, eventual consistency, CQRS


Decision Framework: When to Use Microservices

If you have...Then...
Small team (<5 devs), simple domainStart with monolith
Need independent deployment/scalingConsider microservices
Multiple teams, clear domain boundariesMicroservices work well
Tight deadlines, unknown requirementsMonolith first, extract later

Rule of thumb: If you can't define clear service boundaries, you're not ready for microservices.


Service Decomposition Patterns

Pattern 1: By Business Capability

Organize services around business functions, not technical layers.

E-commerce Example:
├── order-service       # Order lifecycle
├── payment-service     # Payment processing
├── inventory-service   # Stock management
├── shipping-service    # Fulfillment
└── notification-service # Emails, SMS

Pattern 2: Strangler Fig (Monolith Migration)

Gradually extract from monolith without big-bang rewrites.

1. Identify bounded context to extract
2. Create new microservice
3. Route new traffic to microservice
4. Gradually migrate existing functionality
5. Remove from monolith when complete
# API Gateway routing during migration
async def route_orders(request):
    if request.path.startswith("/api/orders/v2"):
        return await new_order_service.forward(request)
    else:
        return await legacy_monolith.forward(request)

Communication Patterns

Pattern 1: Synchronous (REST/gRPC)

Use for: Queries, when you need immediate response.

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class ServiceClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=5.0)
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def get(self, path: str):
        """GET with automatic retries."""
        response = await self.client.get(f"{self.base_url}{path}")
        response.raise_for_status()
        return response.json()

# Usage
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.get(f"/payments/{payment_id}")

Pattern 2: Asynchronous (Events)

Use for: Commands, when eventual consistency is acceptable.

from aiokafka import AIOKafkaProducer
import json

@dataclass
class DomainEvent:
    event_id: str
    event_type: str
    aggregate_id: str
    occurred_at: datetime
    data: dict

class EventBus:
    def __init__(self, bootstrap_servers: List[str]):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
    
    async def publish(self, event: DomainEvent):
        await self.producer.send_and_wait(
            event.event_type,  # Topic = event type
            value=asdict(event),
            key=event.aggregate_id.encode()
        )

# Order service publishes
await event_bus.publish(DomainEvent(
    event_id=str(uuid.uuid4()),
    event_type="OrderCreated",
    aggregate_id=order.id,
    occurred_at=datetime.now(),
    data={"order_id": order.id, "customer_id": order.customer_id}
))

# Inventory service subscribes and reacts
async def handle_order_created(event_data: dict):
    order_id = event_data["data"]["order_id"]
    items = event_data["data"]["items"]
    await reserve_inventory(order_id, items)

When to Use Each

SynchronousAsynchronous
Need immediate responseFire-and-forget
Simple query/responseLong-running operations
Low latency requiredDecoupling is priority
Tight coupling acceptableEventual consistency OK

Data Patterns

Database Per Service

Each service owns its data. No shared databases.

order-service     → orders_db (PostgreSQL)
payment-service   → payments_db (PostgreSQL)
product-service   → products_db (MongoDB)
analytics-service → analytics_db (ClickHouse)

Saga Pattern (Distributed Transactions)

For operations spanning multiple services that need rollback capability.

class SagaStep:
    def __init__(self, name: str, action: Callable, compensation: Callable):
        self.name = name
        self.action = action
        self.compensation = compensation

class OrderFulfillmentSaga:
    def __init__(self):
        self.steps = [
            SagaStep("create_order", self.create_order, self.cancel_order),
            SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory),
            SagaStep("process_payment", self.process_payment, self.refund_payment),
            SagaStep("confirm_order", self.confirm_order, self.cancel_confirmation),
        ]
    
    async def execute(self, order_data: dict) -> SagaResult:
        completed_steps = []
        context = {"order_data": order_data}
        
        for step in self.steps:
            try:
                result = await step.action(context)
                if not result.success:
                    await self.compensate(completed_steps, context)
                    return SagaResult(status="failed", error=result.error)
                completed_steps.append(step)
                context.update(result.data)
            except Exception as e:
                await self.compensate(completed_steps, context)
                return SagaResult(status="failed", error=str(e))
        
        return SagaResult(status="completed", data=context)
    
    async def compensate(self, completed_steps: List[SagaStep], context: dict):
        """Execute compensating actions in reverse order."""
        for step in reversed(completed_steps):
            try:
                await step.compensation(context)
            except Exception as e:
                # Log but continue compensating
                logger.error(f"Compensation failed for {step.name}: {e}")

Resilience Patterns

Circuit Breaker

Fail fast when a service is down. Prevents cascade failures.

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None
    
    async def call(self, func: Callable, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpen("Service unavailable")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0
    
    def _on_failure(self):
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()
    
    def _should_attempt_reset(self) -> bool:
        return datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout)

# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)

async def call_payment_service(data: dict):
    return await breaker.call(payment_client.post, "/payments", json=data)

Retry with Exponential Backoff

For transient failures.

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError))
)
async def fetch_user(user_id: str):
    response = await client.get(f"/users/{user_id}")
    response.raise_for_status()
    return response.json()

Bulkhead

Isolate resources to limit impact of failures.

import asyncio

class Bulkhead:
    def __init__(self, max_concurrent: int):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call(self, func: Callable, *args, **kwargs):
        async with self.semaphore:
            return await func(*args, **kwargs)

# Limit concurrent calls to each service
payment_bulkhead = Bulkhead(max_concurrent=10)
inventory_bulkhead = Bulkhead(max_concurrent=20)

result = await payment_bulkhead.call(payment_service.charge, amount)

API Gateway Pattern

Single entry point for all clients.

from fastapi import FastAPI, Depends, HTTPException
from circuitbreaker import circuit

app = FastAPI()

class APIGateway:
    def __init__(self):
        self.clients = {
            "orders": httpx.AsyncClient(base_url="http://order-service:8000"),
            "payments": httpx.AsyncClient(base_url="http://payment-service:8001"),
            "inventory": httpx.AsyncClient(base_url="http://inventory-service:8002"),
        }
    
    @circuit(failure_threshold=5, recovery_timeout=30)
    async def forward(self, service: str, path: str, **kwargs):
        client = self.clients[service]
        response = await client.request(**kwargs, url=path)
        response.raise_for_status()
        return response.json()
    
    async def aggregate(self, order_id: str) -> dict:
        """Aggregate data from multiple services."""
        results = await asyncio.gather(
            self.forward("orders", f"/orders/{order_id}", method="GET"),
            self.forward("payments", f"/payments/order/{order_id}", method="GET"),
            self.forward("inventory", f"/reservations/order/{order_id}", method="GET"),
            return_exceptions=True
        )
        
        return {
            "order": results[0] if not isinstance(results[0], Exception) else None,
            "payment": results[1] if not isinstance(results[1], Exception) else None,
            "inventory": results[2] if not isinstance(results[2], Exception) else None,
        }

gateway = APIGateway()

@app.get("/api/orders/{order_id}")
async def get_order_aggregate(order_id: str):
    return await gateway.aggregate(order_id)

Health Checks

Every service needs liveness and readiness probes.

@app.get("/health/live")
async def liveness():
    """Is the process running?"""
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    """Can we serve traffic?"""
    checks = {
        "database": await check_database(),
        "cache": await check_redis(),
    }
    
    all_healthy = all(checks.values())
    status = "ready" if all_healthy else "not_ready"
    
    return {"status": status, "checks": checks}

NEVER

  • Shared Databases: Creates tight coupling, defeats the purpose
  • Synchronous Chains: A → B → C → D = fragile, slow
  • No Circuit Breakers: One service down takes everything down
  • Distributed Monolith: Services that must deploy together
  • Ignoring Network Failures: Assume the network WILL fail
  • No Compensation Logic: Can't undo failed distributed transactions
  • Starting with Microservices: Always start with a well-structured monolith
  • Chatty Services: Too many inter-service calls = latency death

如何使用「微服务模式」?

  1. 打开小龙虾AI(Web 或 iOS App)
  2. 点击上方「立即使用」按钮,或在对话框中输入任务描述
  3. 小龙虾AI 会自动匹配并调用「微服务模式」技能完成任务
  4. 结果即时呈现,支持继续对话优化

相关技能