Source code for pluserable.no_bruteforce

"""Prevent brute force by storing IP addresses in redis."""

from datetime import datetime, timedelta
from typing import Dict, Tuple

from bag.reify import reify
from redis import StrictRedis

from pluserable.strings import get_strings


# https://github.com/ericrasmussen/pyramid_redis_sessions/blob/master/pyramid_redis_sessions/connection.py
[docs]def get_default_connection(kerno, url="", redis_client=StrictRedis, **redis_options): """Return a Redis connection ready for use. Once a connection is established it is saved in `kerno.brute_redis`. ``url`` is a connection string that will be passed straight to `StrictRedis.from_url`. Example:: redis://username:password@localhost:6379/0 """ # attempt to get an existing connection from the registry redis = getattr(kerno, "brute_redis", None) # if we found an active connection, return it if redis is not None: return redis # otherwise create a new connection if url: # remove defaults to avoid duplicating settings in the `url` redis_options.pop("password", None) redis_options.pop("host", None) redis_options.pop("port", None) redis_options.pop("db", None) # the StrictRedis.from_url option no longer takes a socket # argument. instead, sockets should be encoded in the URL if # used. example: # unix://[:password]@/path/to/socket.sock?db=0 redis_options.pop("unix_socket_path", None) # connection pools are also no longer a valid option for # loading via URL redis_options.pop("connection_pool", None) redis = redis_client.from_url(url, **redis_options) else: redis = redis_client(**redis_options) # save the new connection setattr(kerno, "brute_redis", redis) return redis
[docs]class BruteForceAidDummy: """A test double for BruteForceAidRedis. Shows its public API.""" def __init__(self, kerno, ip: str) -> None: # noqa pass
[docs] def store_login_failure(self) -> int: """Store an authentication failure. Return the number of seconds the user must wait before retrying, or zero if this feature is not enabled. """ return 0
[docs] def is_login_blocked(self) -> Tuple[int, str]: """Find out whether the IP address is currently blocked. Return a tuple (remaining_seconds: int, error_msg: str). """ return (0, "")
[docs]class BruteForceAidRedis: """App component that prevents brute forcing login, storing IPs in redis.""" one_day_in_milliseconds = 60 * 60 * 24 * 1000 def __init__(self, kerno, ip: str) -> None: # noqa self.kerno = kerno self.ip = ip @reify def name(self): """Return the key used in redis to store data about this remote IP.""" return f"login-{self.ip}" @reify def redis(self): return get_default_connection(kerno=self.kerno, url=self.redis_url) @reify def redis_url(self): return self.kerno.pluserable_settings["redis_url"] @reify def seconds_after_login_fail(self) -> int: """Return the configured initial duration of a login block.""" return int(self.kerno.pluserable_settings["seconds_after_login_fail"])
[docs] def pure_get_new_time( self, now: datetime, attempts: int = 1 ) -> Tuple[datetime, int]: """Compute the next moment until which this IP is getting blocked. Pure method (no IO) that returns a tuple (datetime, int_seconds). The first login failure requires the user to wait 15 seconds and this number increases exponentially each time the credentials are found wrong. """ seconds = self.seconds_after_login_fail * (2 ** (attempts - 1)) delta = timedelta(seconds=seconds) new_time = now + delta # print(seconds, new_time) return (new_time, seconds)
[docs] def store_login_failure(self) -> int: """Store an authentication failure in redis. Each value stored in redis is a dict with {blocked_until, attempts}. The redis key expires one day after the most recent failed attempt; during this time the waiting time increases exponentially. Return the number of seconds the user must wait before retrying, or zero if this feature is not enabled. """ if not self.redis_url: return 0 adict: Dict[bytes, bytes] = self.redis.hgetall(self.name) # print("got:", self.ip, adict) attempts = int(adict[b"attempts"]) + 1 if adict else 1 new_time, seconds = self.pure_get_new_time( now=datetime.utcnow(), attempts=attempts ) if adict: self.redis.hset( name=self.name, key="blocked_until", value=new_time.isoformat() ) self.redis.hincrby(name=self.name, key="attempts", amount=1) else: self.redis.hmset( self.name, { "attempts": attempts, "blocked_until": new_time.isoformat(), }, ) self.redis.pexpire(self.name, self.one_day_in_milliseconds) return seconds
[docs] def is_login_blocked(self) -> Tuple[int, str]: """Find out whether the IP address is currently blocked. Return a tuple (remaining_seconds: int, error: str). """ if not self.redis_url: return (0, "") # allow login attempt template: str = get_strings(self.kerno).login_is_blocked byts_blocked = self.redis.hget(self.name, "blocked_until") if not byts_blocked: return (0, "") blocked_until = datetime.fromisoformat(byts_blocked.decode("utf8")) now = datetime.utcnow() if blocked_until < now: return (0, "") # allow login attempt seconds = (blocked_until - now).seconds return ( seconds, template.format(seconds=seconds, until=str(blocked_until).split(".")[0]), )