View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util;
19  
20  import java.util.concurrent.atomic.AtomicLong;
21  
22  /**
23   * Utility class that can be used to implement
24   * queues with limited capacity (in terms of memory).
25   * It maintains internal counter and provides
26   * two operations: increase and decrease.
27   * Increase blocks until internal counter is lower than
28   * given threshold and then increases internal counter.
29   * Decrease decreases internal counter and wakes up
30   * waiting threads if counter is lower than threshold.
31   *
32   * This implementation allows you to set the value of internal
33   * counter to be greater than threshold. It happens
34   * when internal counter is lower than threshold and
35   * increase method is called with parameter 'delta' big enough
36   * so that sum of delta and internal counter is greater than
37   * threshold. This is not a bug, this is a feature.
38   * It solves some problems:
39   *   - thread calling increase with big parameter will not be
40   *     starved by other threads calling increase with small
41   *     arguments.
42   *   - thread calling increase with argument greater than
43   *     threshold won't deadlock. This is useful when throttling
44   *     queues - you can submit object that is bigger than limit.
45   *
46   * This implementation introduces small costs in terms of
47   * synchronization (no synchronization in most cases at all), but is
48   * vulnerable to races. For details see documentation of
49   * increase method.
50   */
51  public class SizeBasedThrottler {
52  
53    private final long threshold;
54    private final AtomicLong currentSize;
55  
56    /**
57     * Creates SizeBoundary with provided threshold
58     *
59     * @param threshold threshold used by instance
60     */
61    public SizeBasedThrottler(long threshold) {
62      if (threshold <= 0) {
63        throw new IllegalArgumentException("Treshold must be greater than 0");
64      }
65      this.threshold = threshold;
66      this.currentSize = new AtomicLong(0);
67    }
68  
69    /**
70     * Blocks until internal counter is lower than threshold
71     * and then increases value of internal counter.
72     *
73     * THIS METHOD IS VULNERABLE TO RACES.
74     * It may happen that increment operation will
75     * succeed immediately, even if it should block. This happens when
76     * at least two threads call increase at the some moment. The decision
77     * whether to block is made at the beginning, without synchronization.
78     * If value of currentSize is lower than threshold at that time, call
79     * will succeed immediately. It is possible, that 2 threads will make
80     * decision not to block, even if one of them should block.
81     *
82     * @param delta increase internal counter by this value
83     * @return new value of internal counter
84     * @throws InterruptedException when interrupted during waiting
85     */
86    public synchronized long increase(long delta) throws InterruptedException{
87      if (currentSize.get() >= threshold) {
88        synchronized (this) {
89          while (currentSize.get() >= threshold) {
90            wait();
91          }
92        }
93      }
94  
95      return currentSize.addAndGet(delta);
96    }
97  
98  
99    /**
100    * Decreases value of internal counter. Wakes up waiting threads if required.
101    *
102    * @param delta decrease internal counter by this value
103    * @return new value of internal counter
104    */
105   public synchronized long decrease(long delta) {
106     final long newSize = currentSize.addAndGet(-delta);
107 
108     if (newSize < threshold && newSize + delta >= threshold) {
109       synchronized (this) {
110         notifyAll();
111       }
112     }
113 
114     return newSize;
115   }
116 
117   /**
118    *
119    * @return current value of internal counter
120    */
121   public synchronized long getCurrentValue(){
122     return currentSize.get();
123   }
124 
125   /**
126    * @return threshold
127    */
128   public long getThreshold(){
129     return threshold;
130   }
131 }