Simple CircuitBreaker in Python
panos Oct. 31, 2017, midnightIf an app is communicating with remote/networked software (think databases, key-value stores, webservices and queues to name a few) it can and will fail.
As more failures pile up our sofware can become unstable.
A common pattern for dealing with this, is the CircuitBreaker. It's an object with at least two states: open and closed. When closed and called, it wraps remote calls, intercepts failures and when certain conditions are satisfied, changes it's state to open. When open, subsequent calls fail immediately.
Here is a simple implementation in Python, using a class as a decorator.
Note the __call__ method, acting as the decorating function.
from threading import RLock
from time import time
class CircuitOpenError(Exception):
    '''Indicates when the circuit is open'''
    pass
class CircuitBreaker(object):
    '''Circuit breaker decorator'''
    def __init__(self, n=3, match=Exception, reset_timeout=0.1):
        self.n = n
        self.FailureCase = match
        self.reset_timeout = reset_timeout
        self.rlock = RLock()
        self.reset()
    def reset(self):
        with self.rlock:
            self.tries = 0
            self.last_failed = None
    @property
    def state(self):
        with self.rlock:
            tries = self.tries
            last_failed = self.last_failed
            reset_timeout = self.reset_timeout
            if tries >= self.n:
                if last_failed is not None and (time() - last_failed) >= reset_timeout:
                    return 'half-open'
                return 'open'
            return 'closed'
    def _record_failure(self):
        with self.rlock:
            self.tries += 1
            self.last_failed = time()
        return self
    def __call__(self, fn):
        self.fn = fn
        return self._decorated
    def _decorated(self, *args, **kwargs):
        '''Call our protected function'''
        if self.state == 'open':
            raise CircuitOpenError()
        try:
            return self.fn(*args, **kwargs)
        except self.FailureCase as e:
            self._record_failure()
            raise e
        except Exception as e:
            raise e
And a script to exercise it:
import circuitbreaker as cb
from random import random
from time import time, sleep
from concurrent.futures import ThreadPoolExecutor, as_completed
class CustomException(Exception):
    pass
@cb.CircuitBreaker(n=2, match=CustomException)
def fetch_something(i):
    sleep(random() * 2 + 0.5)
    if random() > 0.25:
        raise CustomException()
    return True
def fetch_and_log_result(i):
    try:
        sleep(random() * 10 + 0.5)
        print('request %d: starting' % i)
        result = fetch_something(i)
        print('request %d: success' % i)
        return result
    except cb.CircuitOpenError as cbe:
        print('request %d: Circuit open' % i)
        raise cbe
    except Exception as ex:
        print('request %d: exception %s' % (i, repr(ex)))
        raise ex
def simulate_concurrent_requests():
    with ThreadPoolExecutor() as executor:
        requests = {executor.submit(fetch_and_log_result, i): i for i in range(10)}
        for future in as_completed(requests):
            try:
                future.result()
            except:
                pass
simulate_concurrent_requests()
 Kountanis
      Kountanis