10. Limitando tasa de clientes

Si estás construyendo una API para uso público, es bastante probable que desees implementar algún tipo de límite de frecuencia para evitar que los clientes realicen demasiadas solicitudes demasiado rápido y pongan una carga excesiva en tu servidor.

En esta seccion vamos a crear un middleware que nos ayude con eso.

Básicamente, queremos que este middleware verifique cuántas solicitudes se han recibido en los últimos ‘N’ segundos y, si han sido demasiadas, debería enviar al cliente una respuesta 429 Too Many Requests. Posicionaremos este middleware antes de nuestros manejadores principales de la aplicación, de modo que realice esta verificación antes de realizar cualquier procesamiento costoso, como decodificar un cuerpo de solicitud JSON o consultar nuestra base de datos.

Aprenderás:

  • Sobre los principios detrás de los algoritmos de limitación de frecuencia con cubo de tokens y cómo podemos aplicarlos en el contexto de una API o aplicación web.
  • Cómo crear middleware para limitar la frecuencia de las solicitudes a tus puntos finales de API, primero mediante la creación de un único limitador global de frecuencia y luego extendiéndolo para admitir la limitación por cliente basada en la dirección IP.
  • Cómo hacer que el comportamiento del limitador de frecuencia sea configurable en tiempo de ejecución, incluida la posibilidad de desactivar por completo el limitador de frecuencia con fines de prueba.

10.1 Limitación de tasa global

Vamos a construir las cosas lentamente y comenzar creando un único limitador de frecuencia global para nuestra aplicación. Esto considerará todas las solicitudes que recibe nuestra API (en lugar de tener limitadores de frecuencia separados para cada cliente individual).

En lugar de escribir nuestra propia lógica de limitación de frecuencia desde cero, lo cual sería bastante complejo y consumiría mucho tiempo, podemos aprovechar el paquete x/time/rate para ayudarnos en esto. Este paquete proporciona una implementación probada y probada de un limitador de frecuencia con token bucket.

Si estás siguiendo, por favor, descarga la última versión de este paquete de la siguiente manera:

$ go get golang.org/x/time/rate@latest
go: downloading golang.org/x/time v0.3.0
go: added golang.org/x/time v0.3.0

Vamos a tomarnos un momento para explicar como funciona el token-bucket funciona. La descripcion oficial de x/time/rate dice:

Un Limiter controla con qué frecuencia se permiten que ocurran eventos. Implementa un “cubo de tokens” de tamaño b, inicialmente lleno y rellenado a una tasa de r tokens por segundo.

Colocándolo en el contexto de nuestra aplicación API…

  • Cada vez que recibamos una solicitud HTTP, eliminaremos un token del bucket.
  • Tendremos un bucket que comienza con b tokens en él.
  • Cada 1/r segundos, se agrega un token de nuevo al bucket, hasta un máximo de b tokens en total.
  • Si recibimos una solicitud HTTP y el bucket está vacío, entonces deberíamos devolver una respuesta 429 Too Many Requests.

En la práctica, esto significa que nuestra aplicación permitiría un máximo de ‘b’ solicitudes HTTP en rápida sucesión, pero con el tiempo permitiría un promedio de ‘r’ solicitudes por segundo.

Para crear un limitador de frecuencia con bucket de tokens utilizando x/time/rate, necesitaremos utilizar la función NewLimiter(). Esta tiene una firma que se ve así:

// Note that the Limit type is an 'alias' for float64.
func NewLimiter(r Limit, b int) *Limiter

Entonces, si queremos crear un limitador de frecuencia que permita un promedio de 2 solicitudes por segundo, con un máximo de 4 solicitudes en un solo ‘burst’, podríamos hacerlo con el siguiente código:

// Allow 2 requests per second, with a maximum of 4 requests in a burst.
limiter := rate.NewLimiter(2, 4)

10.1 Haciendo cumplir un limite de tasa global

De acuerdo, con esa explicación de alto nivel fuera del camino, sumerjámonos en un poco de código y veamos cómo funciona en la práctica.

Una de las cosas agradables sobre el patrón de middleware que estamos utilizando es que es sencillo incluir código de ‘inicialización’ que se ejecuta solo una vez cuando envolvemos algo con el middleware, en lugar de ejecutarse en cada solicitud que maneja el middleware.

func (app *application) ejemploMiddleware(next http.Handler) http.Handler {
    // Cualquier código aquí se ejecutará solo una vez, cuando envolvamos algo con el middleware.
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // Cualquier código aquí se ejecutará para cada solicitud que maneje el middleware.
        next.ServeHTTP(w, r)
    })
}

En nuestro caso, crearemos un nuevo método de middleware llamado rateLimit() que crea un nuevo limitador de frecuencia como parte del código de ‘inicialización’ y luego utiliza este limitador de frecuencia para cada solicitud que maneje posteriormente.

Si estás siguiendo, abre el archivo cmd/api/middleware.go y crea el middleware de la siguiente manera:

func (app *application) rateLimit(next http.Handler) http.Handler {
	// Initialize a new rate limiter which allows an average of 2 requests per second,
	// with a maximum of 4 requests in a single ‘burst’.
	limiter := rate.NewLimiter(2, 4)
	// The function we are returning is a closure, which 'closes over' the limiter
	// variable.
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Call limiter.Allow() to see if the request is permitted, and if it's not,
		// then we call the rateLimitExceededResponse() helper to return a 429 Too Many
		// Requests response (we will create this helper in a minute).
		if !limiter.Allow() {
			app.rateLimitExceededResponse(w, r)
			return
		}
		next.ServeHTTP(w, r)
	})
}

En este código, cada vez que llamamos al método Allow() en el limitador de frecuencia, exactamente un token se consumirá del cubo. Si no hay tokens restantes en el cubo, entonces Allow() devolverá false y eso actúa como el desencadenante para enviar al cliente una respuesta 429 Too Many Requests.

También es importante destacar que el código detrás del método Allow() está protegido por un mutex y es seguro para su uso concurrente.

Ahora vayamos al archivo cmd/api/errors.go y creemos el helper rateLimitExceededResponse(). Así:

func (app *application) rateLimitExceededResponse(w http.ResponseWriter, r *http.Request) {
	message := "rate limit exceeded"
	app.errorResponse(w, r, http.StatusTooManyRequests, message)
}

Luego, por último, en el archivo cmd/api/routes.go queremos agregar el middleware rateLimit() a nuestra cadena de middleware. Esto debe colocarse después de nuestro middleware de recuperación de pánico (para que cualquier pánico en rateLimit() se recupere), pero de lo contrario, queremos que se use lo antes posible para evitar un trabajo innecesario para nuestro servidor.

Ve adelante y actualiza el archivo de acuerdo a esto:

func (app *application) routes() http.Handler {
	router := httprouter.New()

	router.NotFound = http.HandlerFunc(app.notFoundResponse)

	router.MethodNotAllowed = http.HandlerFunc(app.methodNotAllowedResponse)

	router.HandlerFunc(http.MethodGet, "/v1/healthcheck", app.healthcheckHandler)

	// Add the route for the GET /v1/movies endpoint.
	router.HandlerFunc(http.MethodGet, "/v1/movies", app.listMoviesHandler)
	router.HandlerFunc(http.MethodPost, "/v1/movies", app.createMovieHandler)
	router.HandlerFunc(http.MethodGet, "/v1/movies/:id", app.showMovieHandler)
	router.HandlerFunc(http.MethodPatch, "/v1/movies/:id", app.updateMovieHandler)
	router.HandlerFunc(http.MethodDelete, "/v1/movies/:id", app.deleteMovieHandler)

	// Wrap the router with the rateLimit() middleware.
	return app.recoverPanic(app.rateLimit(router))
}

Ahora deberíamos estar listos para probar esto. Reinicia la API y luego en otra ventana de terminal ejecuta el siguiente comando para enviar un lote de 6 solicitudes a nuestro punto final GET /v1/healthcheck en rápida sucesión. Deberías obtener respuestas que se vean así:

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "error": "rate limit exceeded"
}
{
  "error": "rate limit exceeded"
}

Podemos observar que los primeros 4 solicitudes tienen éxito, debido a que nuestro limitador está configurado para permitir un ‘burst’ de 4 solicitudes en rápida sucesión. Sin embargo, una vez que se utilizaron esas 4 solicitudes, los tokens en el cubo se agotaron y nuestra API comenzó a devolver la respuesta de error “rate limit exceeded”.

Si esperas un segundo y vuelves a ejecutar este comando, deberías encontrar que algunas solicitudes en el segundo lote vuelven a tener éxito, ya que el cubo de tokens se vuelve a llenar a una tasa de dos tokens cada segundo.

10.2 Limite de tasa basado en IP

Utilizar un limitador de frecuencia global puede ser útil cuando deseas imponer un límite estricto en la tasa total de solicitudes a tu API, y no te importa de dónde provienen las solicitudes. Pero generalmente es más común querer un limitador de frecuencia individual para cada cliente, de modo que un cliente problemático que realiza demasiadas solicitudes no afecte a los demás.

Una forma conceptualmente sencilla de implementar esto es crear un mapa en memoria de limitadores de frecuencia, utilizando la dirección IP de cada cliente como clave del mapa. Cada vez que un nuevo cliente realiza una solicitud a nuestra API, inicializaremos un nuevo limitador de frecuencia y lo agregaremos al mapa. Para cualquier solicitud posterior, recuperaremos el limitador de frecuencia del cliente desde el mapa y verificaremos si la solicitud está permitida llamando a su método Allow(), tal como hicimos anteriormente.

Pero hay algo de lo que debemos ser conscientes: de forma predeterminada, los mapas no son seguros para su uso concurrente. Esto es un problema para nosotros porque nuestro middleware rateLimit() puede estar ejecutándose en varias goroutines al mismo tiempo (recuerda, el servidor HTTP de Go maneja cada solicitud HTTP en su propia goroutine).

Desde el blog de Go: “Los mapas no son seguros para su uso concurrente: no está definido qué sucede cuando los lees y escribes simultáneamente. Si necesitas leer y escribir en un mapa desde goroutines que se ejecutan concurrentemente, los accesos deben ser mediados por algún tipo de mecanismo de sincronización”.

Entonces, para solucionar esto, necesitaremos sincronizar el acceso al mapa de limitadores de frecuencia utilizando un sync.Mutex (un candado de exclusión mutua), de modo que solo una goroutine pueda leer o escribir en el mapa en cualquier momento.

Es importante destacar que el funcionamiento de los mutex y cómo utilizarlos puede resultar bastante confuso si no los has encontrado antes, y es imposible explicarlo completamente en unas pocas frases cortas. He escrito un artículo mucho más detallado llamado Comprendiendo los Mutexes que proporciona una explicación adecuada. Si aún no te sientes seguro con los mutexes, te recomiendo leer esto antes de continuar.

Bien, si estás siguiendo, vamos a adentrarnos en el código y actualizar nuestro middleware rateLimit() para implementar esto.

func (app *application) rateLimit(next http.Handler) http.Handler {
	// Declare a mutex and a map to hold the clients' IP addresses and rate limiters.
	var (
		mu      sync.Mutex
		clients = make(map[string]*rate.Limiter)
	)
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Extract the client's IP address from the request.
		ip, _, err := net.SplitHostPort(r.RemoteAddr)
		if err != nil {
			app.serverErrorResponse(w, r, err)
			return
		}
		// Lock the mutex to prevent this code from being executed concurrently.
		mu.Lock()
		// Check to see if the IP address already exists in the map. If it doesn't, then
		// initialize a new rate limiter and add the IP address and limiter to the map.
		if _, found := clients[ip]; !found {
			clients[ip] = rate.NewLimiter(2, 4)
		}
		// Call the Allow() method on the rate limiter for the current IP address. If
		// the request isn't allowed, unlock the mutex and send a 429 Too Many Requests
		// response, just like before.
		if !clients[ip].Allow() {
			mu.Unlock()
			app.rateLimitExceededResponse(w, r)
			return
		}
		// Very importantly, unlock the mutex before calling the next handler in the
		// chain. Notice that we DON'T use defer to unlock the mutex, as that would mean
		// that the mutex isn't unlocked until all the handlers downstream of this
		// middleware have also returned.
		mu.Unlock()
		next.ServeHTTP(w, r)
	})
}

10.2 Eliminando viejos limitados

El código anterior funcionará, pero hay un pequeño problema: el mapa de clientes crecerá indefinidamente, ocupando más recursos con cada nueva dirección IP y limitador de frecuencia que agregamos.

Para evitar esto, actualicemos nuestro código para que también registremos la última vez vista para cada cliente. Luego, ejecutaremos una goroutine en segundo plano en la que eliminaremos periódicamente a cualquier cliente que no hayamos visto recientemente del mapa de clientes.

Para que esto funcione, necesitaremos crear una estructura de cliente personalizada que contenga tanto el limitador de frecuencia como la última vez vista para cada cliente, y lanzar la goroutine de limpieza en segundo plano al inicializar el middleware.

Así:

func (app *application) rateLimit(next http.Handler) http.Handler {
	// Define a client struct to hold the rate limiter and last seen time for each
	// client.
	type client struct {
		limiter  *rate.Limiter
		lastSeen time.Time
	}
	var (
		mu sync.Mutex
		// Update the map so the values are pointers to a client struct.
		clients = make(map[string]*client)
	)
	// Launch a background goroutine which removes old entries from the clients map once
	// every minute.
	go func() {
		for {
			time.Sleep(time.Minute)
			// Lock the mutex to prevent any rate limiter checks from happening while
			// the cleanup is taking place.
			mu.Lock()
			// Loop through all clients. If they haven't been seen within the last three// Loop through all clients. If they haven't been seen within the last three
			// minutes, delete the corresponding entry from the map.
			for ip, client := range clients {
				if time.Since(client.lastSeen) > 3*time.Minute {
					delete(clients, ip)
				}
			}
			// Importantly, unlock the mutex when the cleanup is complete.
			mu.Unlock()
		}
	}()
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		ip, _, err := net.SplitHostPort(r.RemoteAddr)
		if err != nil {
			app.serverErrorResponse(w, r, err)
			return
		}
		mu.Lock()
		if _, found := clients[ip]; !found {
			// Create and add a new client struct to the map if it doesn't already exist.
			clients[ip] = &client{limiter: rate.NewLimiter(2, 4)}
		}
		// Update the last seen time for the client.
		clients[ip].lastSeen = time.Now()
		if !clients[ip].limiter.Allow() {
			mu.Unlock()
			app.rateLimitExceededResponse(w, r)
			return
		}
		mu.Unlock()
		next.ServeHTTP(w, r)
	})
}

En este punto, si reinicias la API y vuelves a intentar hacer un lote de solicitudes en rápida sucesión, deberías encontrar que el limitador de frecuencia sigue funcionando correctamente desde la perspectiva de un cliente individual, al igual que lo hacía antes.

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "error": "rate limit exceeded"
}
{
  "error": "rate limit exceeded"
}


10.2 Informacion adicional

10.2 Aplicaciones distribuidas

Este patrón de limitación de frecuencia solo funcionará si tu aplicación de API se está ejecutando en una sola máquina. Si tu infraestructura es distribuida, con tu aplicación ejecutándose en múltiples servidores detrás de un equilibrador de carga, entonces necesitarás utilizar un enfoque alternativo.

Si estás utilizando HAProxy o Nginx como equilibrador de carga o proxy inverso, ambos tienen funcionalidades incorporadas para limitar la frecuencia que probablemente sería sensato utilizar. Alternativamente, podrías utilizar una base de datos rápida como Redis para mantener un recuento de solicitudes para los clientes, ejecutándola en un servidor con el cual todos tus servidores de aplicación pueden comunicarse.

10.3 Configuración de los limitadores de velocidad

En este momento, nuestros valores de solicitudes por segundo y ráfaga están codificados directamente en el middleware rateLimit(). Esto está bien, pero sería más flexible si fueran configurables en tiempo de ejecución en su lugar. Del mismo modo, sería útil tener una manera sencilla de desactivar completamente la limitación de frecuencia (lo cual es útil cuando deseas ejecutar pruebas de rendimiento o realizar pruebas de carga, cuando todas las solicitudes podrían provenir de un pequeño número de direcciones IP).

Para hacer estas cosas configurables, volvamos a nuestro archivo cmd/api/main.go y actualicemos la estructura de configuración y las banderas de línea de comandos de la siguiente manera:

package main

...


type config struct {
	port int
	env  string
	db   struct {
		dsn          string
		maxOpenConns int
		maxIdleConns int
		maxIdleTime  time.Duration
	}
	// Add a new limiter struct containing fields for the requests-per-second and burst
	// values, and a boolean field which we can use to enable/disable rate limiting
	// altogether.
	limiter struct {
		rps     float64
		burst   int
		enabled bool
	}
}

func main() {
	var cfg config

	flag.IntVar(&cfg.port, "port", 4000, "API server port")
	flag.StringVar(&cfg.env, "env", "development", "Environment (development|staging|production)")

	flag.StringVar(&cfg.db.dsn, "db-dsn", "postgres://greenlight:pa55word@localhost/greenlight", "PostgreSQL DSN")

	flag.IntVar(&cfg.db.maxOpenConns, "db-max-open-conns", 25, "PostgreSQL max open connections")
	flag.IntVar(&cfg.db.maxIdleConns, "db-max-idle-conns", 25, "PostgreSQL max idle connections")
	flag.DurationVar(&cfg.db.maxIdleTime, "db-max-idle-time", 15*time.Minute, "PostgreSQL max connection idle time")

	// Create command line flags to read the setting values into the config struct.
	// Notice that we use true as the default for the 'enabled' setting?
	flag.Float64Var(&cfg.limiter.rps, "limiter-rps", 2, "Rate limiter maximum requests per second")
	flag.IntVar(&cfg.limiter.burst, "limiter-burst", 4, "Rate limiter maximum burst")
	flag.BoolVar(&cfg.limiter.enabled, "limiter-enabled", true, "Enable rate limiter")
	flag.Parse()

	...
}

Luego, actualicemos nuestro middleware rateLimit() para utilizar estas configuraciones, de la siguiente manera:

func (app *application) rateLimit(next http.Handler) http.Handler {
	// Define a client struct to hold the rate limiter and last seen time for each
	// client.
	type client struct {
		limiter  *rate.Limiter
		lastSeen time.Time
	}
	var (
		mu sync.Mutex
		// Update the map so the values are pointers to a client struct.
		clients = make(map[string]*client)
	)
	// Launch a background goroutine which removes old entries from the clients map once
	// every minute.
	go func() {
		for {
			time.Sleep(time.Minute)
			// Lock the mutex to prevent any rate limiter checks from happening while
			// the cleanup is taking place.
			mu.Lock()
			// Loop through all clients. If they haven't been seen within the last three// Loop through all clients. If they haven't been seen within the last three
			// minutes, delete the corresponding entry from the map.
			for ip, client := range clients {
				if time.Since(client.lastSeen) > 3*time.Minute {
					delete(clients, ip)
				}
			}
			// Importantly, unlock the mutex when the cleanup is complete.
			mu.Unlock()
		}
	}()
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Only carry out the check if rate limiting is enabled.
		if app.config.limiter.enabled {
			ip, _, err := net.SplitHostPort(r.RemoteAddr)
			if err != nil {
				app.serverErrorResponse(w, r, err)
				return
			}
			mu.Lock()
			if _, found := clients[ip]; !found {
				clients[ip] = &client{
					// Use the requests-per-second and burst values from the config
					// struct.
					limiter: rate.NewLimiter(rate.Limit(app.config.limiter.rps), app.config.limiter.burst),
				}
			}
			clients[ip].lastSeen = time.Now()
			if !clients[ip].limiter.Allow() {
				mu.Unlock()
				app.rateLimitExceededResponse(w, r)
				return
			}
			mu.Unlock()
		}
		next.ServeHTTP(w, r)
	})
}

Una vez hecho eso, probemos ejecutando la API con la bandera -limiter-burst y el valor de ráfaga reducido a 2:

$ go run ./cmd/api/ -limiter-burst=2
time=2023-09-10T10:59:13.722+02:00 level=INFO msg="database connection pool established"
time=2023-09-10T10:59:13.722+02:00 level=INFO msg="starting server" addr=:4000 env=development

Si emites nuevamente un lote de seis solicitudes en rápida sucesión, ahora deberías encontrar que solo las primeras dos tienen éxito:

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
{
  "error": "rate limit exceeded"
}
{
  "error": "rate limit exceeded"
}
{
  "error": "rate limit exceeded"
}
{
  "error": "rate limit exceeded"
}

De manera similar, puedes intentar desactivar por completo el limitador de frecuencia con la bandera -limiter-enabled=false, así:

$ go run ./cmd/api/ -limiter-enabled=false
time=2023-09-10T10:59:13.722+02:00 level=INFO msg="database connection pool established"
time=2023-09-10T10:59:13.722+02:00 level=INFO msg="starting server" addr=:4000 env=development

Y deberías encontrar que todas las solicitudes se completan con éxito ahora, sin importar cuántas hagas.

Y deberías encontrar que todas las solicitudes se completan con éxito ahora, sin importar cuántas hagas.

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}
...
{
  "status": "available",
  "system_info": {
    "environment": "development",
    "version": "1.0.0"
  }
}

Post Relacionados