熔断后的服务恢复与预热机制
你有没有遇到过这种情况:某个服务挂了一段时间,熔断器打开了。后来服务恢复了,但请求打过去,发现服务又挂了。
为什么?服务刚启动,JVM 刚完成类加载,数据库连接池正在建立,缓存还没预热……一下子涌进来大量请求,直接把服务压垮了。
这就是为什么熔断后不能简单地「打开闸门」,需要渐进式恢复和服务预热。
熔断恢复的挑战
冷启动问题
服务重启后的资源状态:
│
│ ╭──── 资源峰值
│ ╭─╯
│ ╭─╯
│ ╭─╯
│ ╭─╯
│ ╭─╯
│ ╭─╯
│ ╭─╯
│ ╭─╯
│ ╭─╯
├───┴───────────────────────────────────▶ 时间
│ │ │
│ │ │
│ 服务启动 资源建立
│ (熔断恢复) (预热期)服务刚启动时:
- 类加载器正在加载类
- 数据库连接正在建立
- 缓存是空的
- JIT 编译器正在编译热点代码
流量冲击问题
熔断关闭后的流量变化:
请求数
│
正常流量 ─┐
│
熔断期间 ────┘ ≈ 0
│
│
熔断恢复 ────┐
│
│ ╭─ 瞬时涌入 ─╮
├─╯ ╰─╮
│ │
└────────────────┴───────▶ 时间如果熔断恢复后直接涌入全部流量,会导致:
- 响应时间急剧增加
- 资源耗尽
- 再次触发熔断
半开状态设计
标准半开状态
java
public class CircuitBreaker {
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
private final int halfOpenRequests;
private final Duration breakDuration;
private volatile int halfOpenAttemptCount = 0;
public enum State {
CLOSED, // 正常
OPEN, // 熔断
HALF_OPEN // 半开
}
public boolean allowRequest() {
if (state.get() == State.CLOSED) {
return true;
}
if (state.get() == State.OPEN) {
// 检查是否应该进入半开状态
if (shouldAttemptReset()) {
transitionToHalfOpen();
return true;
}
return false;
}
if (state.get() == HALF_OPEN) {
// 半开状态允许有限请求
return halfOpenAttemptCount < halfOpenRequests;
}
return false;
}
private boolean shouldAttemptReset() {
return System.currentTimeMillis() - lastFailureTime >= breakDuration.toMillis();
}
private void transitionToHalfOpen() {
state.set(State.HALF_OPEN);
halfOpenAttemptCount = 0;
log.info("熔断器进入半开状态");
}
}智能半开状态
标准半开状态的问题是:所有请求同时涌入。我们需要渐进式半开。
java
public class ProgressiveCircuitBreaker {
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
// 半开状态的配置
private final int maxHalfOpenRequests = 10; // 半开状态最多允许的请求数
private final double successRateThreshold = 0.8; // 成功率达到 80% 才关闭熔断
// 半开状态的统计
private AtomicInteger halfOpenSuccess = new AtomicInteger(0);
private AtomicInteger halfOpenFailure = new AtomicInteger(0);
private volatile int currentPermits = 1; // 当前允许的请求数
public boolean allowRequest() {
if (state.get() == State.CLOSED) {
return true;
}
if (state.get() == State.OPEN) {
if (shouldAttemptReset()) {
transitionToHalfOpen();
}
return false;
}
if (state.get() == HALF_OPEN) {
// 渐进式:当前允许的请求数
return permits.tryAcquire();
}
return false;
}
private void transitionToHalfOpen() {
state.set(State.HALF_OPEN);
halfOpenSuccess.set(0);
halfOpenFailure.set(0);
currentPermits = 1; // 从 1 开始
permits.release(currentPermits); // 释放初始 permit
}
public void recordSuccess() {
if (state.get() == State.HALF_OPEN) {
int success = halfOpenSuccess.incrementAndGet();
// 检查是否可以关闭熔断
int total = halfOpenSuccess.get() + halfOpenFailure.get();
if (success * 1.0 / total >= successRateThreshold) {
transitionToClosed();
} else if (currentPermits < maxHalfOpenRequests) {
// 渐进增加请求数
increasePermits();
}
}
}
public void recordFailure() {
if (state.get() == State.HALF_OPEN) {
halfOpenFailure.incrementAndGet();
int total = halfOpenSuccess.get() + halfOpenFailure.get();
// 失败率过高,回到熔断状态
if (halfOpenFailure.get() * 1.0 / total > (1 - successRateThreshold)) {
transitionToOpen();
}
}
}
private void increasePermits() {
// 指数增加:1 → 2 → 4 → 8
int newPermits = Math.min(currentPermits * 2, maxHalfOpenRequests);
permits.release(newPermits - currentPermits);
currentPermits = newPermits;
}
}服务预热
1. 连接池预热
java
@Service
public class ConnectionPoolWarmer {
@Autowired
private DataSource dataSource;
private final List<String> criticalQueries = List.of(
"SELECT 1",
"SELECT COUNT(*) FROM users LIMIT 1",
"SELECT COUNT(*) FROM orders LIMIT 1"
);
/**
* 应用启动时预热连接池
*/
@PostConstruct
public void warmUp() {
log.info("开始预热数据库连接池...");
HikariDataSource hikari = (HikariDataSource) dataSource;
int targetConnections = hikari.getMinimumIdle();
ExecutorService executor = Executors.newFixedThreadPool(targetConnections);
for (int i = 0; i < targetConnections; i++) {
final int index = i;
executor.submit(() -> {
try (Connection conn = dataSource.getConnection()) {
// 执行一些查询,确保连接可用
for (String query : criticalQueries) {
conn.createStatement().execute(query);
}
log.debug("连接 {} 预热完成", index);
} catch (Exception e) {
log.warn("连接 {} 预热失败: {}", index, e.getMessage());
}
});
}
executor.shutdown();
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
log.info("数据库连接池预热完成,共 {} 个连接", targetConnections);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}2. 缓存预热
java
@Service
public class CacheWarmer {
@Autowired
private CacheManager cacheManager;
@Autowired
private ProductService productService;
@Autowired
private UserService userService;
/**
* 启动时预热缓存
*/
@PostConstruct
public void warmUp() {
log.info("开始预热缓存...");
CompletableFuture.allOf(
warmProductCache(),
warmUserCache(),
warmConfigCache()
).join();
log.info("缓存预热完成");
}
private CompletableFuture<Void> warmProductCache() {
return CompletableFuture.runAsync(() -> {
// 预热热门商品
List<Long> hotProductIds = productService.getHotProductIds();
for (Long productId : hotProductIds) {
try {
Product product = productService.getProductById(productId);
// 主动放入缓存
cacheManager.getCache("product").put(productId, product);
} catch (Exception e) {
log.warn("商品 {} 缓存预热失败", productId);
}
}
log.info("商品缓存预热完成,共 {} 条", hotProductIds.size());
});
}
private CompletableFuture<Void> warmUserCache() {
return CompletableFuture.runAsync(() -> {
// 预热活跃用户
List<Long> activeUserIds = userService.getActiveUserIds();
for (Long userId : activeUserIds) {
try {
User user = userService.getUserById(userId);
cacheManager.getCache("user").put(userId, user);
} catch (Exception e) {
log.warn("用户 {} 缓存预热失败", userId);
}
}
log.info("用户缓存预热完成,共 {} 条", activeUserIds.size());
});
}
private CompletableFuture<Void> warmConfigCache() {
return CompletableFuture.runAsync(() -> {
// 预热配置信息
Map<String, String> configs = configService.getAllConfigs();
Cache<String, String> configCache = cacheManager.getCache("config");
configs.forEach(configCache::put);
log.info("配置缓存预热完成,共 {} 条", configs.size());
});
}
}3. JIT 预热
java
@Service
public class JITWarmer {
/**
* 预热 JIT 编译热点代码
*/
@PostConstruct
public void warmUp() {
log.info("开始 JIT 预热...");
// 预热关键接口
warmEndpoint("/api/product/list");
warmEndpoint("/api/product/detail");
warmEndpoint("/api/order/create");
warmEndpoint("/api/user/info");
log.info("JIT 预热完成");
}
private void warmEndpoint(String endpoint) {
log.debug("预热接口: {}", endpoint);
// 循环调用,让 JIT 充分编译
for (int i = 0; i < 10000; i++) {
try {
// 模拟请求
RestTemplate.getForObject(endpoint, String.class);
} catch (Exception e) {
// 忽略错误
}
}
}
}渐进式流量恢复
1. 令牌桶控制
java
@Service
public class GradualRecoveryService {
private final AtomicInteger currentRps = new AtomicInteger(0);
private final int maxRps;
private final int increaseStep;
private final TokenBucket rateLimiter;
public GradualRecoveryService(int maxRps, int increaseStep) {
this.maxRps = maxRps;
this.increaseStep = increaseStep;
this.rateLimiter = new TokenBucket(maxRps);
}
/**
* 获取是否允许请求
*/
public boolean allowRequest() {
return rateLimiter.tryAcquire();
}
/**
* 增加流量
*/
public void increaseTraffic() {
int current = currentRps.get();
int newRps = Math.min(current + increaseStep, maxRps);
if (currentRps.compareAndSet(current, newRps)) {
rateLimiter.setRate(newRps);
log.info("流量增加: {} -> {}", current, newRps);
}
}
/**
* 减少流量(触发熔断时)
*/
public void decreaseTraffic() {
int current = currentRps.get();
int newRps = Math.max(current / 2, 1);
if (currentRps.compareAndSet(current, newRps)) {
rateLimiter.setRate(newRps);
log.info("流量减少: {} -> {}", current, newRps);
}
}
}2. 调度器控制
java
@Configuration
public class RecoveryScheduler {
@Autowired
private GradualRecoveryService recoveryService;
@Autowired
private MetricsService metricsService;
/**
* 渐进式增加流量
*/
@Scheduled(fixedRate = 10000) // 每 10 秒
public void increaseTrafficIfHealthy() {
Metrics metrics = metricsService.getCurrentMetrics();
// 检查健康指标
if (isHealthy(metrics)) {
recoveryService.increaseTraffic();
} else {
log.warn("指标不健康,暂停流量增加: errorRate={}, latency={}",
metrics.getErrorRate(), metrics.getAvgLatency());
}
}
private boolean isHealthy(Metrics metrics) {
return metrics.getErrorRate() < 0.01 && // 错误率 < 1%
metrics.getAvgLatency() < 500 && // 平均延迟 < 500ms
metrics.getP99Latency() < 1000; // P99 < 1s
}
}3. 基于指标的动态调整
java
@Service
public class AdaptiveRecoveryService {
private final AtomicInteger currentRps = new AtomicInteger(0);
private final MetricsService metricsService;
public void adjustTraffic() {
Metrics metrics = metricsService.getCurrentMetrics();
int current = currentRps.get();
if (metrics.getErrorRate() > 0.05) {
// 错误率高,大幅减少流量
int newRps = Math.max(current / 4, 1);
currentRps.set(newRps);
log.warn("错误率过高,流量降至: {}", newRps);
} else if (metrics.getErrorRate() > 0.01) {
// 错误率偏高,小幅减少
int newRps = Math.max(current / 2, 1);
currentRps.set(newRps);
log.warn("错误率偏高,流量降至: {}", newRps);
} else if (metrics.getAvgLatency() > 1000) {
// 延迟高,适当减少
int newRps = Math.max(current * 8 / 10, 1);
currentRps.set(newRps);
log.warn("延迟过高,流量降至: {}", newRps);
} else if (metrics.getErrorRate() < 0.001 && metrics.getAvgLatency() < 200) {
// 指标优秀,快速增加
int newRps = Math.min(current * 2, getMaxRps());
currentRps.set(newRps);
log.info("指标优秀,流量增至: {}", newRps);
}
}
}健康检查
1. 健康检查端点
java
@RestController
@RequestMapping("/health")
public class HealthController {
@Autowired
private DataSource dataSource;
@Autowired
private CacheManager cacheManager;
@Autowired
private ExternalService externalService;
@GetMapping
public HealthStatus health() {
HealthStatus status = new HealthStatus();
// 数据库检查
try (Connection conn = dataSource.getConnection()) {
status.setDatabase(true);
} catch (Exception e) {
status.setDatabase(false);
status.setDatabaseError(e.getMessage());
}
// 缓存检查
try {
String testKey = "health:check";
cacheManager.getCache("default").put(testKey, "ok");
cacheManager.getCache("default").evict(testKey);
status.setCache(true);
} catch (Exception e) {
status.setCache(false);
status.setCacheError(e.getMessage());
}
// 外部服务检查
status.setExternalService(externalService.isHealthy());
// 综合状态
boolean allHealthy = status.isDatabase() &&
status.isCache() &&
status.isExternalService();
status.setStatus(allHealthy ? "UP" : "DEGRADED");
return status;
}
@GetMapping("/ready")
public Response ready() {
HealthStatus status = health();
if ("UP".equals(status.getStatus())) {
return Response.ok(status);
}
return Response.status(503).body(status);
}
@GetMapping("/live")
public Response live() {
// 只要进程存活就返回 OK
return Response.ok("OK");
}
}2. Kubernetes 探针配置
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
spec:
template:
spec:
containers:
- name: myapp
image: myapp:v2
# 存活探针:检查进程是否存活
livenessProbe:
httpGet:
path: /health/live
port: 8080
initialDelaySeconds: 30 # 启动后 30 秒开始检查
periodSeconds: 10 # 每 10 秒检查一次
timeoutSeconds: 5 # 超时 5 秒
failureThreshold: 3 # 连续 3 次失败则重启
# 就绪探针:检查是否准备好接收流量
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 60 # 启动后 60 秒开始检查
periodSeconds: 5 # 每 5 秒检查一次
timeoutSeconds: 3 # 超时 3 秒
failureThreshold: 3 # 连续 3 次失败则移出服务
# 启动探针:检查是否完成启动
startupProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 0
periodSeconds: 10
failureThreshold: 30 # 最多等待 5 分钟启动完成3. 探针友好的代码
java
@SpringBootApplication
public class Application {
@Autowired
private ConnectionPoolWarmer connectionPoolWarmer;
@Autowired
private CacheWarmer cacheWarmer;
@Autowired
private JITWarmer jitWarmer;
private volatile boolean ready = false;
@PostConstruct
public void init() {
// 顺序执行预热
connectionPoolWarmer.warmUp();
cacheWarmer.warmUp();
jitWarmer.warmUp();
// 标记就绪
ready = true;
log.info("应用预热完成,已就绪");
}
@GetMapping("/health/ready")
public Response ready() {
if (!ready) {
return Response.status(503).body("warming up");
}
return Response.ok("ready");
}
}综合恢复策略
恢复流程
熔断器打开
│
│ 等待熔断时间
▼
进入半开状态
│
├──▶ 允许 1 个请求通过
│ │
│ ├──▶ 成功
│ │ │
│ │ 允许更多请求(2 → 4 → 8...)
│ │ │
│ │ 成功率 > 80% ──▶ 关闭熔断,全量恢复
│ │ │
│ │ 成功率 < 80% ──▶ 继续半开或回到熔断
│ │
│ └──▶ 失败
│ │
│ 回到熔断状态
│ │
│ 重置熔断计时器
▼
根据指标动态调整代码实现
java
@Service
public class CircuitBreakerRecoveryManager {
private final Map<String, CircuitBreaker> breakers;
private final MetricsService metricsService;
private final LoadBalancer loadBalancer;
/**
* 处理熔断器状态转换
*/
public void handleStateTransition(String name, StateTransition transition) {
switch (transition) {
case OPEN_TO_HALF_OPEN:
handleOpenToHalfOpen(name);
break;
case HALF_OPEN_TO_CLOSED:
handleHalfOpenToClosed(name);
break;
case HALF_OPEN_TO_OPEN:
handleHalfOpenToOpen(name);
break;
}
}
private void handleOpenToHalfOpen(String name) {
log.info("熔断器 {} 进入半开状态", name);
// 减少目标服务的权重
loadBalancer.adjustWeight(name, 0.1);
// 发送告警
alertManager.send(Alert.builder()
.level(AlertLevel.WARNING)
.title("熔断器进入半开")
.message("服务 " + name + " 正在尝试恢复")
.build());
}
private void handleHalfOpenToClosed(String name) {
log.info("熔断器 {} 恢复关闭", name);
// 恢复服务权重
loadBalancer.adjustWeight(name, 1.0);
// 恢复正常流量
recoveryService.restoreTraffic(name, 1.0);
}
private void handleHalfOpenToOpen(String name) {
log.warn("熔断器 {} 半开状态下再次熔断", name);
// 进一步减少权重
loadBalancer.adjustWeight(name, 0);
// 记录失败
recoveryService.recordFailure(name);
}
/**
* 渐进式恢复
*/
public void progressiveRecovery(String name) {
Metrics metrics = metricsService.getMetrics(name);
if (metrics.getErrorRate() < 0.01 && metrics.getAvgLatency() < 500) {
// 指标良好,增加流量
double factor = recoveryService.getRecoveryFactor(name);
recoveryService.increaseTraffic(name, factor);
} else {
// 指标不健康,暂停恢复
recoveryService.pauseRecovery(name);
}
}
}思考题:
服务预热需要时间,但如果预热期间大量请求进来怎么办?如何设计预热机制让用户无感知?
熔断恢复时,如果上游服务已经恢复了,但下游依赖还没准备好(比如数据库连接池还没建好),会发生什么?如何处理?
渐进式流量恢复的节奏如何控制?太快会导致再次熔断,太慢又影响用户体验。如何找到一个平衡点?
如何设计一个「智能熔断器」,能够根据历史数据自动调整熔断阈值和恢复策略?
