Token bucket implementation with adaptive token generation rate

In order to ensure the stable operation of the system, we often limit the flow of requests. If we do not limit the flow, massive requests are likely to crush the system. It is often a good choice to discard some requests to ensure that most of the requests can respond normally

The most commonly used current limiting method is the token bucket, which is simple to implement and highly reliable. It only needs to add a token to the token bucket at a constant rate. After the request comes, go to the token bucket to get the token. If the token is empty, access will be denied.

But token bucket is difficult to use, because it is difficult to determine the appropriate amount of traffic limit. If it is too large, the system can’t bear it. If it is too small, the hardware performance can’t be fully utilized. Moreover, the hardware of each machine may be inconsistent. You can’t put a set of token bucket parameters on different servers. Even if you find the right parameters, with the iterative update of your system version, the original parameters may not be so suitable. It is often very difficult to find a suitable parameter.

We can use smartratelimiter to solve this problem. It can find a suitable parameter according to your hardware.

It will start a coroutine to monitor the CPU and memory usage of the hardware. If the CPU and memory usage exceeds the warning value, it will reduce the token generation rate and the traffic into the system. If the utilization rate of CPU and memory is too low and the real request rate is higher than the token generation rate, the system is considered to be unsaturated. At this time, the token generation rate will be increased and the flow into the system will be increased. In this way, the most appropriate parameter value can be found according to the hardware condition of the machine.

OK, no more nonsense, just code

Let’s look at the data structure first

type TokenBucket struct {
	Capacity          int64        //Token bucket capacity can cache some tokens
	Ch chan bool //Token bucket
	CountPerSecond int64 //The number of tokens generated per second
	AddTimer *time.Ticker //Timer for adding tokens
	RealPerSecond int64 //actually requests per second
	Counter int64 //Counter
	RealPerSenconds *Queue //Queueue for caching requests per second
	ResetCounterTimer *time.Ticker // timer to reset the counter
	UpdateTimer *time.Ticker //Update the timer that generates the token rate
	CpuPercent float64 // warning value of cpu usage
	MemPercent float64 //Warning value of memory usage
}

Create a token bucket

func NewTokenBucket(capacity int64, countPerSecond int64, cpuPercent float64, memPercent float64) *TokenBucket {
	tokenBucket := &TokenBucket{
		Capacity:          capacity,
		Ch:                make(chan bool, capacity),
		CountPerSecond:    countPerSecond,
		AddTimer:          time.NewTicker(time.Duration(int64(time.Second)/countPerSecond)),
		RealPerSenconds:   &Queue{},
		ResetCounterTimer: time.NewTicker(1 * time.Second),  //每Reset the counter every second
		UpdateTimer: time.NewTicker(10 * time.Second), //default 10 seconds to try to update the token generation rate
		CpuPercent: cpuPercent,
		MemPercent: memPercent,
	}
	//Start the token generation process
	go tokenBucket.add()
	//Start the token generation rate update procedure
	go tokenBucket.updateRate()
	//start the request rate logging concatenation
	go tokenBucket.resetAndSaveCounter()
	return tokenBucket
}

Next, let’s take a look at the core method to update the token bucket generation rate

/**
If the cpu usage or memory usage is greater than the warning value, the token generation rate decreases by 10%.
If the cpu usage and memory usage are less than 80% of the warning value, and the real request token rate is greater than the token generation rate, the token generation rate is increased by 10%.
*/
func (tokenBucket *TokenBucket) updateRate() {
	for {
		select {
		case <-tokenBucket.UpdateTimer.C:
			v, _ := mem.VirtualMemory()
			//Memory usage
			mem := v.UsedPercent
			//cpu usage
			cc, _ := cpu.Percent(10*time.Second, false)
			if cc[0] > tokenBucket.MemPercent || mem > tokenBucket.MemPercent {
				tokenBucket.CountPerSecond = tokenBucket.CountPerSecond - tokenBucket.CountPerSecond/10
				tokenBucket.doUpdateRate()
			}

			if cc[0] < tokenBucket.MemPercent-tokenBucket.MemPercent/5 &&
				mem < tokenBucket.MemPercent-tokenBucket.MemPercent/5 &&
				tokenBucket.RealPerSecond > tokenBucket.CountPerSecond {
				tokenBucket.CountPerSecond = tokenBucket.CountPerSecond + tokenBucket.CountPerSecond/10
				tokenBucket.doUpdateRate()
			}
			fmt.Printf("memory usage: %f,cpu usage: %f,current token bucket speed limit:%d,real speed: %d\n",
				mem, cc[0], tokenBucket.CountPerSecond, tokenBucket.RealPerSecond)
		}
	}

}

func (tokenBucket *TokenBucket) doUpdateRate() {
	t := tokenBucket.AddTimer
	defer t.Stop()
	//Create a new Ticker and replace the original
	tokenBucket.AddTimer = time.NewTicker(tokenBucket.getDuration())
}

Next, let’s see how to get the actual number of requests per second

/**
The real token request rate is the maximum value of requests per second in the last 10s
*/
func (tokenBucket *TokenBucket) resetAndSaveCounter() {
		for {
			select {
			case <-tokenBucket.ResetCounterTimer.C:
				//If the queue is greater than 10, discard the first value stuffed untilsize=10
				for i := 0; i < len(*tokenBucket.RealPerSenconds)-10; i++ {
					tokenBucket.RealPerSenconds.Pop()
				}

				tokenBucket.RealPerSenconds.Push(tokenBucket.Counter)
				//Counter zeroing
				for {
					if atomic.CompareAndSwapInt64(&tokenBucket.Counter, tokenBucket.Counter, 0) {
						break
					}
				}
				var temp int64
				for _, value := range *tokenBucket.RealPerSenconds {
					if temp < value {
						temp = value
					}
				}
				tokenBucket.RealPerSecond = temp
			}
		}

}

Next, there are two ways to get a token

/**
Get the token and return directly if you don't get it
*/
func (tokenBucket *TokenBucket) GetToken() bool {
	atomic.AddInt64(&tokenBucket.Counter, 1)
	select {
	case <-tokenBucket.Ch:
		return true
	default:
		return false
	}
}

/**
Get the token and wait until the timeout if you don't get it
*/
func (tokenBucket *TokenBucket) TryGetToken(timeout time.Duration) bool {
	atomic.AddInt64(&tokenBucket.Counter, 1)
	t := time.NewTimer(timeout)
	defer t.Stop()
	select {
	case <-tokenBucket.Ch:
		return true
	case <-t.C:
		return false
	}
}

Finally, add a token

func (tokenBucket *TokenBucket) add() {
	for {
		select {
		case <-tokenBucket.AddTimer.C:
			tokenBucket.doAdd()
		}
	}
}

func (tokenBucket *TokenBucket) doAdd() {
	if int64(len(tokenBucket.Ch)) < tokenBucket.Capacity {
		tokenBucket.Ch <- true
	}
}

Source code address: click here

If it helps you, please click star on GitHub, thank you!

Read More: