In order to ensure the stable operation of the system, we often limit the flow of requests. If we do not limit the flow, massive requests are likely to crush the system. It is often a good choice to discard some requests to ensure that most of the requests can respond normally
The most commonly used current limiting method is the token bucket, which is simple to implement and highly reliable. It only needs to add a token to the token bucket at a constant rate. After the request comes, go to the token bucket to get the token. If the token is empty, access will be denied.
But token bucket is difficult to use, because it is difficult to determine the appropriate amount of traffic limit. If it is too large, the system can’t bear it. If it is too small, the hardware performance can’t be fully utilized. Moreover, the hardware of each machine may be inconsistent. You can’t put a set of token bucket parameters on different servers. Even if you find the right parameters, with the iterative update of your system version, the original parameters may not be so suitable. It is often very difficult to find a suitable parameter.
We can use smartratelimiter to solve this problem. It can find a suitable parameter according to your hardware.
It will start a coroutine to monitor the CPU and memory usage of the hardware. If the CPU and memory usage exceeds the warning value, it will reduce the token generation rate and the traffic into the system. If the utilization rate of CPU and memory is too low and the real request rate is higher than the token generation rate, the system is considered to be unsaturated. At this time, the token generation rate will be increased and the flow into the system will be increased. In this way, the most appropriate parameter value can be found according to the hardware condition of the machine.
OK, no more nonsense, just code
Let’s look at the data structure first
type TokenBucket struct {
Capacity int64 //Token bucket capacity can cache some tokens
Ch chan bool //Token bucket
CountPerSecond int64 //The number of tokens generated per second
AddTimer *time.Ticker //Timer for adding tokens
RealPerSecond int64 //actually requests per second
Counter int64 //Counter
RealPerSenconds *Queue //Queueue for caching requests per second
ResetCounterTimer *time.Ticker // timer to reset the counter
UpdateTimer *time.Ticker //Update the timer that generates the token rate
CpuPercent float64 // warning value of cpu usage
MemPercent float64 //Warning value of memory usage
}
Create a token bucket
func NewTokenBucket(capacity int64, countPerSecond int64, cpuPercent float64, memPercent float64) *TokenBucket {
tokenBucket := &TokenBucket{
Capacity: capacity,
Ch: make(chan bool, capacity),
CountPerSecond: countPerSecond,
AddTimer: time.NewTicker(time.Duration(int64(time.Second)/countPerSecond)),
RealPerSenconds: &Queue{},
ResetCounterTimer: time.NewTicker(1 * time.Second), //每Reset the counter every second
UpdateTimer: time.NewTicker(10 * time.Second), //default 10 seconds to try to update the token generation rate
CpuPercent: cpuPercent,
MemPercent: memPercent,
}
//Start the token generation process
go tokenBucket.add()
//Start the token generation rate update procedure
go tokenBucket.updateRate()
//start the request rate logging concatenation
go tokenBucket.resetAndSaveCounter()
return tokenBucket
}
Next, let’s take a look at the core method to update the token bucket generation rate
/**
If the cpu usage or memory usage is greater than the warning value, the token generation rate decreases by 10%.
If the cpu usage and memory usage are less than 80% of the warning value, and the real request token rate is greater than the token generation rate, the token generation rate is increased by 10%.
*/
func (tokenBucket *TokenBucket) updateRate() {
for {
select {
case <-tokenBucket.UpdateTimer.C:
v, _ := mem.VirtualMemory()
//Memory usage
mem := v.UsedPercent
//cpu usage
cc, _ := cpu.Percent(10*time.Second, false)
if cc[0] > tokenBucket.MemPercent || mem > tokenBucket.MemPercent {
tokenBucket.CountPerSecond = tokenBucket.CountPerSecond - tokenBucket.CountPerSecond/10
tokenBucket.doUpdateRate()
}
if cc[0] < tokenBucket.MemPercent-tokenBucket.MemPercent/5 &&
mem < tokenBucket.MemPercent-tokenBucket.MemPercent/5 &&
tokenBucket.RealPerSecond > tokenBucket.CountPerSecond {
tokenBucket.CountPerSecond = tokenBucket.CountPerSecond + tokenBucket.CountPerSecond/10
tokenBucket.doUpdateRate()
}
fmt.Printf("memory usage: %f,cpu usage: %f,current token bucket speed limit:%d,real speed: %d\n",
mem, cc[0], tokenBucket.CountPerSecond, tokenBucket.RealPerSecond)
}
}
}
func (tokenBucket *TokenBucket) doUpdateRate() {
t := tokenBucket.AddTimer
defer t.Stop()
//Create a new Ticker and replace the original
tokenBucket.AddTimer = time.NewTicker(tokenBucket.getDuration())
}
Next, let’s see how to get the actual number of requests per second
/**
The real token request rate is the maximum value of requests per second in the last 10s
*/
func (tokenBucket *TokenBucket) resetAndSaveCounter() {
for {
select {
case <-tokenBucket.ResetCounterTimer.C:
//If the queue is greater than 10, discard the first value stuffed untilsize=10
for i := 0; i < len(*tokenBucket.RealPerSenconds)-10; i++ {
tokenBucket.RealPerSenconds.Pop()
}
tokenBucket.RealPerSenconds.Push(tokenBucket.Counter)
//Counter zeroing
for {
if atomic.CompareAndSwapInt64(&tokenBucket.Counter, tokenBucket.Counter, 0) {
break
}
}
var temp int64
for _, value := range *tokenBucket.RealPerSenconds {
if temp < value {
temp = value
}
}
tokenBucket.RealPerSecond = temp
}
}
}
Next, there are two ways to get a token
/**
Get the token and return directly if you don't get it
*/
func (tokenBucket *TokenBucket) GetToken() bool {
atomic.AddInt64(&tokenBucket.Counter, 1)
select {
case <-tokenBucket.Ch:
return true
default:
return false
}
}
/**
Get the token and wait until the timeout if you don't get it
*/
func (tokenBucket *TokenBucket) TryGetToken(timeout time.Duration) bool {
atomic.AddInt64(&tokenBucket.Counter, 1)
t := time.NewTimer(timeout)
defer t.Stop()
select {
case <-tokenBucket.Ch:
return true
case <-t.C:
return false
}
}
Finally, add a token
func (tokenBucket *TokenBucket) add() {
for {
select {
case <-tokenBucket.AddTimer.C:
tokenBucket.doAdd()
}
}
}
func (tokenBucket *TokenBucket) doAdd() {
if int64(len(tokenBucket.Ch)) < tokenBucket.Capacity {
tokenBucket.Ch <- true
}
}
Source code address: click here
If it helps you, please click star on GitHub, thank you!