001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
021import org.apache.yetus.audience.InterfaceAudience;
022
023/**
024 * Per-peer per-node throttling controller for replication: enabled if bandwidth > 0, a cycle =
025 * 100ms, by throttling we guarantee data pushed to peer within each cycle won't exceed 'bandwidth'
026 * bytes
027 */
028@InterfaceAudience.Private
029public class ReplicationThrottler {
030  private boolean enabled;
031  private double bandwidth;
032  private long cyclePushSize;
033  private long cycleStartTick;
034
035  /**
036   * ReplicationThrottler constructor If bandwidth less than 1, throttling is disabled
037   * @param bandwidth per cycle(100ms)
038   */
039  public ReplicationThrottler(final double bandwidth) {
040    this.bandwidth = bandwidth;
041    this.enabled = this.bandwidth > 0;
042    if (this.enabled) {
043      this.cyclePushSize = 0;
044      this.cycleStartTick = EnvironmentEdgeManager.currentTime();
045    }
046  }
047
048  /**
049   * If throttling is enabled
050   * @return true if throttling is enabled
051   */
052  public boolean isEnabled() {
053    return this.enabled;
054  }
055
056  /**
057   * Get how long the caller should sleep according to the current size and current cycle's total
058   * push size and start tick, return the sleep interval for throttling control.
059   * @param size is the size of edits to be pushed
060   * @return sleep interval for throttling control
061   */
062  public long getNextSleepInterval(final int size) {
063    if (!this.enabled) {
064      return 0;
065    }
066
067    long sleepTicks = 0;
068    long now = EnvironmentEdgeManager.currentTime();
069    // 1. if cyclePushSize exceeds bandwidth, we need to sleep some
070    // following cycles to amortize, this case can occur when a single push
071    // exceeds the bandwidth
072    if ((double) this.cyclePushSize > bandwidth) {
073      double cycles = Math.ceil((double) this.cyclePushSize / bandwidth);
074      long shouldTillTo = this.cycleStartTick + (long) (cycles * 100);
075      if (shouldTillTo > now) {
076        sleepTicks = shouldTillTo - now;
077      } else {
078        // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here!
079        this.cycleStartTick = now;
080      }
081      this.cyclePushSize = 0;
082    } else {
083      long nextCycleTick = this.cycleStartTick + 100; // a cycle is 100ms
084      if (now >= nextCycleTick) {
085        // 2. switch to next cycle if the current cycle has passed
086        this.cycleStartTick = now;
087        this.cyclePushSize = 0;
088      } else if (this.cyclePushSize > 0 && (double) (this.cyclePushSize + size) >= bandwidth) {
089        // 3. delay the push to next cycle if exceeds throttling bandwidth.
090        // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case
091        // where a cycle's first push size(currentSize) > bandwidth
092        sleepTicks = nextCycleTick - now;
093        this.cyclePushSize = 0;
094      }
095    }
096    return sleepTicks;
097  }
098
099  /**
100   * Add current size to the current cycle's total push size
101   * @param size is the current size added to the current cycle's total push size
102   */
103  public void addPushSize(final int size) {
104    if (this.enabled) {
105      this.cyclePushSize += size;
106    }
107  }
108
109  /**
110   * Reset the cycle start tick to NOW
111   */
112  public void resetStartTick() {
113    if (this.enabled) {
114      this.cycleStartTick = EnvironmentEdgeManager.currentTime();
115    }
116  }
117
118  public void setBandwidth(double bandwidth) {
119    this.bandwidth = bandwidth;
120    this.enabled = this.bandwidth > 0;
121  }
122}