001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import org.apache.yetus.audience.InterfaceAudience; 021import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 022 023/** 024 * Per-peer per-node throttling controller for replication: enabled if 025 * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed 026 * to peer within each cycle won't exceed 'bandwidth' bytes 027 */ 028@InterfaceAudience.Private 029public class ReplicationThrottler { 030 private boolean enabled; 031 private double bandwidth; 032 private long cyclePushSize; 033 private long cycleStartTick; 034 035 /** 036 * ReplicationThrottler constructor 037 * If bandwidth less than 1, throttling is disabled 038 * @param bandwidth per cycle(100ms) 039 */ 040 public ReplicationThrottler(final double bandwidth) { 041 this.bandwidth = bandwidth; 042 this.enabled = this.bandwidth > 0; 043 if (this.enabled) { 044 this.cyclePushSize = 0; 045 this.cycleStartTick = EnvironmentEdgeManager.currentTime(); 046 } 047 } 048 049 /** 050 * If throttling is enabled 051 * @return true if throttling is enabled 052 */ 053 public boolean isEnabled() { 054 return this.enabled; 055 } 056 057 /** 058 * Get how long the caller should sleep according to the current size and 059 * current cycle's total push size and start tick, return the sleep interval 060 * for throttling control. 061 * @param size is the size of edits to be pushed 062 * @return sleep interval for throttling control 063 */ 064 public long getNextSleepInterval(final int size) { 065 if (!this.enabled) { 066 return 0; 067 } 068 069 long sleepTicks = 0; 070 long now = EnvironmentEdgeManager.currentTime(); 071 // 1. if cyclePushSize exceeds bandwidth, we need to sleep some 072 // following cycles to amortize, this case can occur when a single push 073 // exceeds the bandwidth 074 if ((double)this.cyclePushSize > bandwidth) { 075 double cycles = Math.ceil((double)this.cyclePushSize / bandwidth); 076 long shouldTillTo = this.cycleStartTick + (long)(cycles * 100); 077 if (shouldTillTo > now) { 078 sleepTicks = shouldTillTo - now; 079 } else { 080 // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here! 081 this.cycleStartTick = now; 082 } 083 this.cyclePushSize = 0; 084 } else { 085 long nextCycleTick = this.cycleStartTick + 100; //a cycle is 100ms 086 if (now >= nextCycleTick) { 087 // 2. switch to next cycle if the current cycle has passed 088 this.cycleStartTick = now; 089 this.cyclePushSize = 0; 090 } else if (this.cyclePushSize > 0 && 091 (double)(this.cyclePushSize + size) >= bandwidth) { 092 // 3. delay the push to next cycle if exceeds throttling bandwidth. 093 // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case 094 // where a cycle's first push size(currentSize) > bandwidth 095 sleepTicks = nextCycleTick - now; 096 this.cyclePushSize = 0; 097 } 098 } 099 return sleepTicks; 100 } 101 102 /** 103 * Add current size to the current cycle's total push size 104 * @param size is the current size added to the current cycle's 105 * total push size 106 */ 107 public void addPushSize(final int size) { 108 if (this.enabled) { 109 this.cyclePushSize += size; 110 } 111 } 112 113 /** 114 * Reset the cycle start tick to NOW 115 */ 116 public void resetStartTick() { 117 if (this.enabled) { 118 this.cycleStartTick = EnvironmentEdgeManager.currentTime(); 119 } 120 } 121 122 public void setBandwidth(double bandwidth) { 123 this.bandwidth = bandwidth; 124 this.enabled = this.bandwidth > 0; 125 } 126}