001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import java.util.concurrent.ConcurrentHashMap; 021import java.util.concurrent.ConcurrentMap; 022 023import org.apache.hadoop.conf.Configured; 024import org.apache.hadoop.hbase.HBaseInterfaceAudience; 025import org.apache.hadoop.hbase.Stoppable; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029import org.apache.hadoop.hbase.regionserver.RegionServerServices; 030import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032 033@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 034public abstract class PressureAwareThroughputController extends Configured implements 035 ThroughputController, Stoppable { 036 private static final Logger LOG = 037 LoggerFactory.getLogger(PressureAwareThroughputController.class); 038 039 /** 040 * Stores the information of one controlled compaction. 041 */ 042 private static final class ActiveOperation { 043 044 private final long startTime; 045 046 private long lastControlTime; 047 048 private long lastControlSize; 049 050 private long totalSize; 051 052 private long numberOfSleeps; 053 054 private long totalSleepTime; 055 056 // prevent too many debug log 057 private long lastLogTime; 058 059 ActiveOperation() { 060 long currentTime = EnvironmentEdgeManager.currentTime(); 061 this.startTime = currentTime; 062 this.lastControlTime = currentTime; 063 this.lastLogTime = currentTime; 064 } 065 } 066 067 protected long maxThroughputUpperBound; 068 069 protected long maxThroughputLowerBound; 070 071 protected OffPeakHours offPeakHours; 072 073 protected long controlPerSize; 074 075 protected int tuningPeriod; 076 077 private volatile double maxThroughput; 078 private volatile double maxThroughputPerOperation; 079 080 protected final ConcurrentMap<String, ActiveOperation> activeOperations = new ConcurrentHashMap<>(); 081 082 @Override 083 public abstract void setup(final RegionServerServices server); 084 085 protected String throughputDesc(long deltaSize, long elapsedTime) { 086 return throughputDesc((double) deltaSize / elapsedTime * 1000); 087 } 088 089 protected String throughputDesc(double speed) { 090 if (speed >= 1E15) { // large enough to say it is unlimited 091 return "unlimited"; 092 } else { 093 return String.format("%.2f MB/second", speed / 1024 / 1024); 094 } 095 } 096 097 @Override 098 public void start(String opName) { 099 activeOperations.put(opName, new ActiveOperation()); 100 maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); 101 } 102 103 @Override 104 public long control(String opName, long size) throws InterruptedException { 105 ActiveOperation operation = activeOperations.get(opName); 106 operation.totalSize += size; 107 long deltaSize = operation.totalSize - operation.lastControlSize; 108 if (deltaSize < controlPerSize) { 109 return 0; 110 } 111 long now = EnvironmentEdgeManager.currentTime(); 112 long minTimeAllowed = (long) (deltaSize / maxThroughputPerOperation * 1000); // ms 113 long elapsedTime = now - operation.lastControlTime; 114 operation.lastControlSize = operation.totalSize; 115 if (elapsedTime >= minTimeAllowed) { 116 operation.lastControlTime = EnvironmentEdgeManager.currentTime(); 117 return 0; 118 } 119 // too fast 120 long sleepTime = minTimeAllowed - elapsedTime; 121 if (LOG.isDebugEnabled()) { 122 // do not log too much 123 if (now - operation.lastLogTime > 5L * 1000) { 124 LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns"); 125 LOG.debug(opName + " sleep=" + sleepTime + "ms because current throughput is " 126 + throughputDesc(deltaSize, elapsedTime) + ", max allowed is " 127 + throughputDesc(maxThroughputPerOperation) + ", already slept " 128 + operation.numberOfSleeps + " time(s) and total slept time is " 129 + operation.totalSleepTime + " ms till now."); 130 operation.lastLogTime = now; 131 } 132 } 133 Thread.sleep(sleepTime); 134 operation.numberOfSleeps++; 135 operation.totalSleepTime += sleepTime; 136 operation.lastControlTime = EnvironmentEdgeManager.currentTime(); 137 return sleepTime; 138 } 139 140 @Override 141 public void finish(String opName) { 142 ActiveOperation operation = activeOperations.remove(opName); 143 maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); 144 long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime; 145 LOG.info(opName + " average throughput is " 146 + throughputDesc(operation.totalSize, elapsedTime) + ", slept " 147 + operation.numberOfSleeps + " time(s) and total slept time is " 148 + operation.totalSleepTime + " ms. " + activeOperations.size() 149 + " active operations remaining, total limit is " + throughputDesc(getMaxThroughput())); 150 } 151 152 private volatile boolean stopped = false; 153 154 @Override 155 public void stop(String why) { 156 stopped = true; 157 } 158 159 @Override 160 public boolean isStopped() { 161 return stopped; 162 } 163 164 public double getMaxThroughput() { 165 return maxThroughput; 166 } 167 168 public void setMaxThroughput(double maxThroughput) { 169 this.maxThroughput = maxThroughput; 170 maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); 171 } 172}