001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 021import org.apache.yetus.audience.InterfaceAudience; 022 023/** 024 * Per-peer per-node throttling controller for replication: enabled if bandwidth > 0, a cycle = 025 * 100ms, by throttling we guarantee data pushed to peer within each cycle won't exceed 'bandwidth' 026 * bytes 027 */ 028@InterfaceAudience.Private 029public class ReplicationThrottler { 030 private boolean enabled; 031 private double bandwidth; 032 private long cyclePushSize; 033 private long cycleStartTick; 034 035 /** 036 * ReplicationThrottler constructor If bandwidth less than 1, throttling is disabled 037 * @param bandwidth per cycle(100ms) 038 */ 039 public ReplicationThrottler(final double bandwidth) { 040 this.bandwidth = bandwidth; 041 this.enabled = this.bandwidth > 0; 042 if (this.enabled) { 043 this.cyclePushSize = 0; 044 this.cycleStartTick = EnvironmentEdgeManager.currentTime(); 045 } 046 } 047 048 /** 049 * If throttling is enabled 050 * @return true if throttling is enabled 051 */ 052 public boolean isEnabled() { 053 return this.enabled; 054 } 055 056 /** 057 * Get how long the caller should sleep according to the current size and current cycle's total 058 * push size and start tick, return the sleep interval for throttling control. 059 * @param size is the size of edits to be pushed 060 * @return sleep interval for throttling control 061 */ 062 public long getNextSleepInterval(final int size) { 063 if (!this.enabled) { 064 return 0; 065 } 066 067 long sleepTicks = 0; 068 long now = EnvironmentEdgeManager.currentTime(); 069 // 1. if cyclePushSize exceeds bandwidth, we need to sleep some 070 // following cycles to amortize, this case can occur when a single push 071 // exceeds the bandwidth 072 if ((double) this.cyclePushSize > bandwidth) { 073 double cycles = Math.ceil((double) this.cyclePushSize / bandwidth); 074 long shouldTillTo = this.cycleStartTick + (long) (cycles * 100); 075 if (shouldTillTo > now) { 076 sleepTicks = shouldTillTo - now; 077 } else { 078 // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here! 079 this.cycleStartTick = now; 080 } 081 this.cyclePushSize = 0; 082 } else { 083 long nextCycleTick = this.cycleStartTick + 100; // a cycle is 100ms 084 if (now >= nextCycleTick) { 085 // 2. switch to next cycle if the current cycle has passed 086 this.cycleStartTick = now; 087 this.cyclePushSize = 0; 088 } else if (this.cyclePushSize > 0 && (double) (this.cyclePushSize + size) >= bandwidth) { 089 // 3. delay the push to next cycle if exceeds throttling bandwidth. 090 // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case 091 // where a cycle's first push size(currentSize) > bandwidth 092 sleepTicks = nextCycleTick - now; 093 this.cyclePushSize = 0; 094 } 095 } 096 return sleepTicks; 097 } 098 099 /** 100 * Add current size to the current cycle's total push size 101 * @param size is the current size added to the current cycle's total push size 102 */ 103 public void addPushSize(final int size) { 104 if (this.enabled) { 105 this.cyclePushSize += size; 106 } 107 } 108 109 /** 110 * Reset the cycle start tick to NOW 111 */ 112 public void resetStartTick() { 113 if (this.enabled) { 114 this.cycleStartTick = EnvironmentEdgeManager.currentTime(); 115 } 116 } 117 118 public void setBandwidth(double bandwidth) { 119 this.bandwidth = bandwidth; 120 this.enabled = this.bandwidth > 0; 121 } 122}