If an app is communicating with remote/networked software (think databases, key-value stores, webservices and queues to name a few) it can and will fail.
As more failures pile up our sofware can become unstable.
A common pattern for dealing with this, is the CircuitBreaker. It's an object with at least two states: open and closed. When closed and called, it wraps remote calls, intercepts failures and when certain conditions are satisfied, changes it's state to open. When open, subsequent calls fail immediately.
Here is a simple implementation in Python, using a class as a decorator.
Note the __call__
method, acting as the decorating function.
from threading import RLock
from time import time
class CircuitOpenError(Exception):
'''Indicates when the circuit is open'''
pass
class CircuitBreaker(object):
'''Circuit breaker decorator'''
def __init__(self, n=3, match=Exception, reset_timeout=0.1):
self.n = n
self.FailureCase = match
self.reset_timeout = reset_timeout
self.rlock = RLock()
self.reset()
def reset(self):
with self.rlock:
self.tries = 0
self.last_failed = None
@property
def state(self):
with self.rlock:
tries = self.tries
last_failed = self.last_failed
reset_timeout = self.reset_timeout
if tries >= self.n:
if last_failed is not None and (time() - last_failed) >= reset_timeout:
return 'half-open'
return 'open'
return 'closed'
def _record_failure(self):
with self.rlock:
self.tries += 1
self.last_failed = time()
return self
def __call__(self, fn):
self.fn = fn
return self._decorated
def _decorated(self, *args, **kwargs):
'''Call our protected function'''
if self.state == 'open':
raise CircuitOpenError()
try:
return self.fn(*args, **kwargs)
except self.FailureCase as e:
self._record_failure()
raise e
except Exception as e:
raise e
And a script to exercise it:
import circuitbreaker as cb
from random import random
from time import time, sleep
from concurrent.futures import ThreadPoolExecutor, as_completed
class CustomException(Exception):
pass
@cb.CircuitBreaker(n=2, match=CustomException)
def fetch_something(i):
sleep(random() * 2 + 0.5)
if random() > 0.25:
raise CustomException()
return True
def fetch_and_log_result(i):
try:
sleep(random() * 10 + 0.5)
print('request %d: starting' % i)
result = fetch_something(i)
print('request %d: success' % i)
return result
except cb.CircuitOpenError as cbe:
print('request %d: Circuit open' % i)
raise cbe
except Exception as ex:
print('request %d: exception %s' % (i, repr(ex)))
raise ex
def simulate_concurrent_requests():
with ThreadPoolExecutor() as executor:
requests = {executor.submit(fetch_and_log_result, i): i for i in range(10)}
for future in as_completed(requests):
try:
future.result()
except:
pass
simulate_concurrent_requests()