If an app is communicating with remote/networked software (think databases, key-value stores, webservices and queues to name a few) it can and will fail.

As more failures pile up our sofware can become unstable.

A common pattern for dealing with this, is the CircuitBreaker. It's an object with at least two states: open and closed. When closed and called, it wraps remote calls, intercepts failures and when certain conditions are satisfied, changes it's state to open. When open, subsequent calls fail immediately.

Here is a simple implementation in Python, using a class as a decorator. Note the __call__ method, acting as the decorating function.

from threading import RLock
from time import time


class CircuitOpenError(Exception):
    '''Indicates when the circuit is open'''
    pass


class CircuitBreaker(object):
    '''Circuit breaker decorator'''

    def __init__(self, n=3, match=Exception, reset_timeout=0.1):
        self.n = n
        self.FailureCase = match
        self.reset_timeout = reset_timeout
        self.rlock = RLock()
        self.reset()

    def reset(self):
        with self.rlock:
            self.tries = 0
            self.last_failed = None

    @property
    def state(self):
        with self.rlock:
            tries = self.tries
            last_failed = self.last_failed
            reset_timeout = self.reset_timeout
            if tries >= self.n:
                if last_failed is not None and (time() - last_failed) >= reset_timeout:
                    return 'half-open'
                return 'open'
            return 'closed'

    def _record_failure(self):
        with self.rlock:
            self.tries += 1
            self.last_failed = time()
        return self

    def __call__(self, fn):
        self.fn = fn
        return self._decorated

    def _decorated(self, *args, **kwargs):
        '''Call our protected function'''
        if self.state == 'open':
            raise CircuitOpenError()
        try:
            return self.fn(*args, **kwargs)
        except self.FailureCase as e:
            self._record_failure()
            raise e
        except Exception as e:
            raise e

And a script to exercise it:

import circuitbreaker as cb
from random import random
from time import time, sleep
from concurrent.futures import ThreadPoolExecutor, as_completed


class CustomException(Exception):
    pass


@cb.CircuitBreaker(n=2, match=CustomException)
def fetch_something(i):
    sleep(random() * 2 + 0.5)
    if random() > 0.25:
        raise CustomException()
    return True


def fetch_and_log_result(i):
    try:
        sleep(random() * 10 + 0.5)
        print('request %d: starting' % i)
        result = fetch_something(i)
        print('request %d: success' % i)
        return result
    except cb.CircuitOpenError as cbe:
        print('request %d: Circuit open' % i)
        raise cbe
    except Exception as ex:
        print('request %d: exception %s' % (i, repr(ex)))
        raise ex

        
def simulate_concurrent_requests():
    with ThreadPoolExecutor() as executor:
        requests = {executor.submit(fetch_and_log_result, i): i for i in range(10)}
        for future in as_completed(requests):
            try:
                future.result()
            except:
                pass
               


simulate_concurrent_requests()