HystrixRollingNumber
阅读数:139 评论数:0
跳转到新版页面分类
python/Java
正文
HystrixRollingNumber利用了LongAdder,也借鉴了LongAdder分段的思想,即分段统计,比如说要统计qps,即1秒内的请求总数,可以将1s的时间分成10段,每段100ms,在第一个100ms内,写入第一段中统计数,在第二个100ms内,写入第二段中的统计数……,这样如果要统计当前时间的qps,通过计算这10段的总和即可。
Bucket
给定时间桶内的计数器,也即是我们上面所说的段,它有三个重要的属性值:
(1)final long windowStart
记录了该Bucket所属的时间段的开始时间。
(2)final LongAdder[] adderForCounterType
数组中每元素代表了一种事件类型的计数值。
(3)final LongMaxUpdate[] updateForCounterType;
ListState
它是个不可变类,每次BucketCircularArray状态改变的时候,会创建一个并且会原子地设置到BucketCircularArray中,它用来处理复合操作。因为ListState是个不可变类,遵循不可变类的原则:
(1)Fields为final
(2)copy on write,写方法返回新的ListState
ListState算是个助手类,维持了一个Bucket数组,定义了一些围绕着Bucket数组的有用操作,并且自身是个不可变类,天然的线程安全属性。
BucketCircularArray
环形数组,它在添加Bucket失败时,不重试,完全忽略这次失败,因为CAS失败是其它其他线程在执行adding或removing操作,如果时间片流逝了,可以通过下次调用getCurrentBucket进行补偿。
HystrixRollingNumber
用来统计一段时间内的计数。
(1)private final Time time
当前时间毫秒值
(2)final int timeInMilliseconds
统计的时间长度(毫秒单位)
(3)final int numberOfBuckets
bucket的数量(分成多少段进行统计)
(4)final int bucketSizeInMillseconds
每个bucket所对应的时间片(毫秒为单位)
(5)final BucketCircularArray buckets
帮助维持环形数组桶
Bucket getCurrentBucket() {
// 获取当前的毫秒时间
long currentTime = time.getCurrentTimeInMillis();
//获取最后一个Bucket(即最新一个Bucket)
Bucket currentBucket = buckets.peekLast();
if (currentBucket != null && currentTime < currentBucket.windowStart + this.bucketSizeInMillseconds) {
//如果当前时间是在currentBucket对应的时间窗口内,直接返回currentBucket
return currentBucket;
}
/* if we didn't find the current bucket above, then we have to create one */
//如果当前时间对应的Bucket不存在,我们需要创建一个
if (newBucketLock.tryLock()) {
//尝试获取一次锁
try {
if (buckets.peekLast() == null) {
// the list is empty so create the first bucket
//首次创建
Bucket newBucket = new Bucket(currentTime);
buckets.addLast(newBucket);
return newBucket;
} else {
// We go into a loop so that it will create as many buckets as needed to catch up to the current time
// as we want the buckets complete even if we don't have transactions during a period of time.
// 将创建一个或者多个Bucket,直到Bucket代表的时间窗口赶上当前时间
for (int i = 0; i < numberOfBuckets; i++) {
// we have at least 1 bucket so retrieve it
Bucket lastBucket = buckets.peekLast();
if (currentTime < lastBucket.windowStart + this.bucketSizeInMillseconds) {
// if we're within the bucket 'window of time' return the current one
// NOTE: We do not worry if we are BEFORE the window in a weird case of where thread scheduling causes that to occur,
// we'll just use the latest as long as we're not AFTER the window
return lastBucket;
} else if (currentTime - (lastBucket.windowStart + this.bucketSizeInMillseconds) > timeInMilliseconds) {
// the time passed is greater than the entire rolling counter so we want to clear it all and start from scratch
reset();
// recursively call getCurrentBucket which will create a new bucket and return it
return getCurrentBucket();
} else { // we're past the window so we need to create a new bucket
// create a new bucket and add it as the new 'last'
buckets.addLast(new Bucket(lastBucket.windowStart + this.bucketSizeInMillseconds));
// add the lastBucket values to the cumulativeSum
cumulativeSum.addBucket(lastBucket);
}
}
// we have finished the for-loop and created all of the buckets, so return the lastBucket now
return buckets.peekLast();
}
} finally {
//释放锁
newBucketLock.unlock();
}
} else {
//如果获取不到锁,尝试获取最新一个Bucket
currentBucket = buckets.peekLast();
if (currentBucket != null) {
//如果不为null,直接返回最新Bucket
// we didn't get the lock so just return the latest bucket while another thread creates the next one
return currentBucket;
} else {
//多个线程同时创建第一个Bucket,尝试等待,递归调用getCurrentBucket
// the rare scenario where multiple threads raced to create the very first bucket
// wait slightly and then use recursion while the other thread finishes creating a bucket
try {
Thread.sleep(5);
} catch (Exception e) {
// ignore
}
return getCurrentBucket();
}
}
}