View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication.regionserver;
19  
20  import org.apache.hadoop.hbase.classification.InterfaceAudience;
21  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
22  
23  /**
24   * Per-peer per-node throttling controller for replication: enabled if
25   * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed
26   * to peer within each cycle won't exceed 'bandwidth' bytes
27   */
28  @InterfaceAudience.Private
29  public class ReplicationThrottler {
30    private final boolean enabled;
31    private final double bandwidth;
32    private long cyclePushSize;
33    private long cycleStartTick;
34  
35    /**
36     * ReplicationThrottler constructor
37     * If bandwidth less than 1, throttling is disabled
38     * @param bandwidth per cycle(100ms)
39     */
40    public ReplicationThrottler(final double bandwidth) {
41      this.bandwidth = bandwidth;
42      this.enabled = this.bandwidth > 0;
43      if (this.enabled) {
44        this.cyclePushSize = 0;
45        this.cycleStartTick = EnvironmentEdgeManager.currentTime();
46      }
47    }
48  
49    /**
50     * If throttling is enabled
51     * @return true if throttling is enabled
52     */
53    public boolean isEnabled() {
54      return this.enabled;
55    }
56  
57    /**
58     * Get how long the caller should sleep according to the current size and
59     * current cycle's total push size and start tick, return the sleep interval
60     * for throttling control.
61     * @param size is the size of edits to be pushed
62     * @return sleep interval for throttling control
63     */
64    public long getNextSleepInterval(final int size) {
65      if (!this.enabled) {
66        return 0;
67      }
68  
69      long sleepTicks = 0;
70      long now = EnvironmentEdgeManager.currentTime();
71      // 1. if cyclePushSize exceeds bandwidth, we need to sleep some
72      //    following cycles to amortize, this case can occur when a single push
73      //    exceeds the bandwidth
74      if ((double)this.cyclePushSize > bandwidth) {
75        double cycles = Math.ceil((double)this.cyclePushSize / bandwidth);
76        long shouldTillTo = this.cycleStartTick + (long)(cycles * 100);
77        if (shouldTillTo > now) {
78          sleepTicks = shouldTillTo - now;
79        } else {
80          // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here!
81          this.cycleStartTick = now;
82        }
83        this.cyclePushSize = 0;
84      } else {
85        long nextCycleTick = this.cycleStartTick + 100;  //a cycle is 100ms
86        if (now >= nextCycleTick) {
87          // 2. switch to next cycle if the current cycle has passed
88          this.cycleStartTick = now;
89          this.cyclePushSize = 0;
90        } else if (this.cyclePushSize > 0 &&
91            (double)(this.cyclePushSize + size) >= bandwidth) {
92          // 3. delay the push to next cycle if exceeds throttling bandwidth.
93          //    enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case
94          //    where a cycle's first push size(currentSize) > bandwidth
95          sleepTicks = nextCycleTick - now;
96          this.cyclePushSize = 0;
97        }
98      }
99      return sleepTicks;
100   }
101 
102   /**
103    * Add current size to the current cycle's total push size
104    * @param size is the current size added to the current cycle's
105    * total push size
106    */
107   public void addPushSize(final int size) {
108     if (this.enabled) {
109       this.cyclePushSize += size;
110     }
111   }
112 
113   /**
114    * Reset the cycle start tick to NOW
115    */
116   public void resetStartTick() {
117     if (this.enabled) {
118       this.cycleStartTick = EnvironmentEdgeManager.currentTime();
119     }
120   }
121 }