微服务如何保证不会出现连锁反应?Go 实现的断路器了解下( 三 )

cep21/circuit 实现的断路器模式有限状态机图解
微服务如何保证不会出现连锁反应?Go 实现的断路器了解下文章插图
异常处理func (c *Circuit) Execute(ctx context.Context, runFunc func(context.Context) error, fallbackFunc func(context.Context, error) error) error 方法返回的 error 可能的情况有:

  • runFunc 返回的 error , 包括 circuit.BadRequest
  • circuit 返回的 - 当状态为“打开”: circuit.circuitError{concurrencyLimitReached: true, msg: "circuit is open"} - 当并发请求超过阈值:- runFunc 并发超过阈值: circuit.circuitError{concurrencyLimitReached: true, msg: "throttling connections to command"} - fallbackFunc 并发超过阈值: circuit.circuitError{circuitOpen: true, msg: "throttling concurrency to fallbacks"}
  • fallbackFunc 返回的 error
  • nil
其中 fallbackFunc 的 error 参数可能为:
  • runFunc 返回的 error , 除了 circuit.BadRequest
  • circuit 返回的
    • runFunc 并发超过阈值: circuit.circuitError{concurrencyLimitReached: true, msg: "throttling connections to command"}
    • 当状态为“打开”: circuit.circuitError{concurrencyLimitReached: true, msg: "circuit is open"}
    • 当并发请求超过阈值:
  • nil
业务系统中的实际应用在我的业务系统中 ,OpenToClosed 接口的实现 , 不打算使用 hystrix 默认的进入“半打开”的逻辑:定时放行部分调用 。 因为这样可能会影响到上游的业务请求 , 并且在 fallbackFunc 中 , 我会去调用异地热备服务 。 所以进入“半打开”状态 , 我选择自己实现 OpenToClosed 接口 , 策略如下:
  • 当断路器进入“打开”状态时 , 启动下游服务健康检查定时器 , 通过模拟业务请求调用下游服务
    • 如果调用成功了 , 并且累积成功调用次数达到一定阈值 , 此时 OpenToClosed.Allow(now time.Time) bool 根据一定概率返回 true , 断路器进入“半打开”状态
    • 如果调用失败了 ,OpenToClosed.Allow(now time.Time) bool 返回 false
  • 当断路器进入“半打开”状态
    • 如果调用成功了 , 进行计数 , 达到阈值后 , 断路器进入“关闭”状态 , 并且停止下游健康检查定时器
    • 如果调用失败了 , 断路器回到“打开”状态
实现代码:
package circuitimport ("math/rand""sync""time""github.com/cep21/circuit""github.com/cep21/circuit/v3/faststats")type ConfigureCloser struct {CloseOnSuccessfulAttemptsCount int64ReopenHealthCheck*HealthCheck}type Closer struct {reopenHealthCheck *HealthChecksuccessfulAttemptsfaststats.AtomicInt64closeOnSuccessfulAttemptsCount int64}func NewCloser(config ConfigureCloser) circuit.OpenToClosed {return &Closer{reopenHealthCheck:config.ReopenHealthCheck,closeOnSuccessfulAttemptsCount: config.CloseOnSuccessfulAttemptsCount,}}// start health check when circuit is openedfunc (c *Closer) Opened(now time.Time) {c.reopenHealthCheck.start()}// stop health check when circuit is closedfunc (c *Closer) Closed(now time.Time) {c.reopenHealthCheck.stop()}// half-openfunc (c *Closer) Allow(now time.Time) bool {return c.reopenHealthCheck.ok()}func (c *Closer) Success(now time.Time, duration time.Duration) {c.successfulAttempts.Add(1)}func (c *Closer) ErrBadRequest(now time.Time, duration time.Duration) {}func (c *Closer) ErrInterrupt(now time.Time, duration time.Duration) {}func (c *Closer) ErrConcurrencyLimitReject(now time.Time) {}func (c *Closer) ErrShortCircuit(now time.Time) {}func (c *Closer) ErrFailure(now time.Time, duration time.Duration) {c.successfulAttempts.Set(0)c.reopenHealthCheck.reset()}func (c *Closer) ErrTimeout(now time.Time, duration time.Duration) {c.successfulAttempts.Set(0)c.reopenHealthCheck.reset()}func (c *Closer) ShouldClose(now time.Time) bool {return c.successfulAttempts.Get() > c.closeOnSuccessfulAttemptsCount}type ConfigureHealthCheck struct {TickDurationtime.DurationRunfunc() boolThresholdint64AllowProbability int}type HealthCheck struct {runningboolstopSignalCh chan struct{}countfaststats.AtomicInt64musync.MutexconfigConfigureHealthCheck}func NewHealthCheck(config ConfigureHealthCheck) *HealthCheck {return &HealthCheck{stopSignalCh: make(chan struct{}),config:config,}}func (h *HealthCheck) start() {h.mu.Lock()defer h.mu.Unlock()if h.running {return}h.running = trueh.count.Set(0)go func() {tick := time.Tick(h.config.TickDuration)for {select {case


推荐阅读