001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import org.apache.yetus.audience.InterfaceAudience;
021import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
022
023/**
024 * Per-peer per-node throttling controller for replication: enabled if
025 * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed
026 * to peer within each cycle won't exceed 'bandwidth' bytes
027 */
028@InterfaceAudience.Private
029public class ReplicationThrottler {
030  private boolean enabled;
031  private double bandwidth;
032  private long cyclePushSize;
033  private long cycleStartTick;
034
035  /**
036   * ReplicationThrottler constructor
037   * If bandwidth less than 1, throttling is disabled
038   * @param bandwidth per cycle(100ms)
039   */
040  public ReplicationThrottler(final double bandwidth) {
041    this.bandwidth = bandwidth;
042    this.enabled = this.bandwidth > 0;
043    if (this.enabled) {
044      this.cyclePushSize = 0;
045      this.cycleStartTick = EnvironmentEdgeManager.currentTime();
046    }
047  }
048
049  /**
050   * If throttling is enabled
051   * @return true if throttling is enabled
052   */
053  public boolean isEnabled() {
054    return this.enabled;
055  }
056
057  /**
058   * Get how long the caller should sleep according to the current size and
059   * current cycle's total push size and start tick, return the sleep interval
060   * for throttling control.
061   * @param size is the size of edits to be pushed
062   * @return sleep interval for throttling control
063   */
064  public long getNextSleepInterval(final int size) {
065    if (!this.enabled) {
066      return 0;
067    }
068
069    long sleepTicks = 0;
070    long now = EnvironmentEdgeManager.currentTime();
071    // 1. if cyclePushSize exceeds bandwidth, we need to sleep some
072    //    following cycles to amortize, this case can occur when a single push
073    //    exceeds the bandwidth
074    if ((double)this.cyclePushSize > bandwidth) {
075      double cycles = Math.ceil((double)this.cyclePushSize / bandwidth);
076      long shouldTillTo = this.cycleStartTick + (long)(cycles * 100);
077      if (shouldTillTo > now) {
078        sleepTicks = shouldTillTo - now;
079      } else {
080        // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here!
081        this.cycleStartTick = now;
082      }
083      this.cyclePushSize = 0;
084    } else {
085      long nextCycleTick = this.cycleStartTick + 100;  //a cycle is 100ms
086      if (now >= nextCycleTick) {
087        // 2. switch to next cycle if the current cycle has passed
088        this.cycleStartTick = now;
089        this.cyclePushSize = 0;
090      } else if (this.cyclePushSize > 0 &&
091          (double)(this.cyclePushSize + size) >= bandwidth) {
092        // 3. delay the push to next cycle if exceeds throttling bandwidth.
093        //    enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case
094        //    where a cycle's first push size(currentSize) > bandwidth
095        sleepTicks = nextCycleTick - now;
096        this.cyclePushSize = 0;
097      }
098    }
099    return sleepTicks;
100  }
101
102  /**
103   * Add current size to the current cycle's total push size
104   * @param size is the current size added to the current cycle's
105   * total push size
106   */
107  public void addPushSize(final int size) {
108    if (this.enabled) {
109      this.cyclePushSize += size;
110    }
111  }
112
113  /**
114   * Reset the cycle start tick to NOW
115   */
116  public void resetStartTick() {
117    if (this.enabled) {
118      this.cycleStartTick = EnvironmentEdgeManager.currentTime();
119    }
120  }
121
122  public void setBandwidth(double bandwidth) {
123    this.bandwidth = bandwidth;
124    this.enabled = this.bandwidth > 0;
125  }
126}