001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.Abortable; 022import org.apache.hadoop.hbase.HBaseInterfaceAudience; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.yetus.audience.InterfaceStability; 026import org.apache.hadoop.hbase.conf.ConfigurationObserver; 027 028/** 029 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'), 030 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to 031 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. 032 * See below article for explanation of options. 033 * @see <a href="http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview on Request Queuing</a> 034 */ 035@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) 036@InterfaceStability.Evolving 037public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver { 038 private int port; 039 private final PriorityFunction priority; 040 private final RpcExecutor callExecutor; 041 private final RpcExecutor priorityExecutor; 042 private final RpcExecutor replicationExecutor; 043 044 /** 045 * This executor is only for meta transition 046 */ 047 private final RpcExecutor metaTransitionExecutor; 048 049 /** What level a high priority call is at. */ 050 private final int highPriorityLevel; 051 052 private Abortable abortable = null; 053 054 /** 055 * @param conf 056 * @param handlerCount the number of handler threads that will be used to process calls 057 * @param priorityHandlerCount How many threads for priority handling. 058 * @param replicationHandlerCount How many threads for replication handling. 059 * @param highPriorityLevel 060 * @param priority Function to extract request priority. 061 */ 062 public SimpleRpcScheduler( 063 Configuration conf, 064 int handlerCount, 065 int priorityHandlerCount, 066 int replicationHandlerCount, 067 int metaTransitionHandler, 068 PriorityFunction priority, 069 Abortable server, 070 int highPriorityLevel) { 071 072 int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 073 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 074 int maxPriorityQueueLength = 075 conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength); 076 077 this.priority = priority; 078 this.highPriorityLevel = highPriorityLevel; 079 this.abortable = server; 080 081 String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 082 RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 083 float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0); 084 085 if (callqReadShare > 0) { 086 // at least 1 read handler and 1 write handler 087 callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount), 088 maxQueueLength, priority, conf, server); 089 } else { 090 if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) { 091 callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount, 092 maxQueueLength, priority, conf, server); 093 } else { 094 callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength, 095 priority, conf, server); 096 } 097 } 098 099 // Create 2 queues to help priorityExecutor be more scalable. 100 this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor( 101 "priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, 102 maxPriorityQueueLength, priority, conf, abortable) : null; 103 this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor( 104 "replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, 105 maxQueueLength, priority, conf, abortable) : null; 106 107 this.metaTransitionExecutor = metaTransitionHandler > 0 ? 108 new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler, 109 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 110 abortable) : 111 null; 112 } 113 114 public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 115 int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) { 116 this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null, 117 highPriorityLevel); 118 } 119 120 /** 121 * Resize call queues; 122 * @param conf new configuration 123 */ 124 @Override 125 public void onConfigurationChange(Configuration conf) { 126 callExecutor.resizeQueues(conf); 127 if (priorityExecutor != null) { 128 priorityExecutor.resizeQueues(conf); 129 } 130 if (replicationExecutor != null) { 131 replicationExecutor.resizeQueues(conf); 132 } 133 if (metaTransitionExecutor != null) { 134 metaTransitionExecutor.resizeQueues(conf); 135 } 136 137 String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 138 RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 139 if (RpcExecutor.isCodelQueueType(callQueueType)) { 140 callExecutor.onConfigurationChange(conf); 141 } 142 } 143 144 @Override 145 public void init(Context context) { 146 this.port = context.getListenerAddress().getPort(); 147 } 148 149 @Override 150 public void start() { 151 callExecutor.start(port); 152 if (priorityExecutor != null) { 153 priorityExecutor.start(port); 154 } 155 if (replicationExecutor != null) { 156 replicationExecutor.start(port); 157 } 158 if (metaTransitionExecutor != null) { 159 metaTransitionExecutor.start(port); 160 } 161 162 } 163 164 @Override 165 public void stop() { 166 callExecutor.stop(); 167 if (priorityExecutor != null) { 168 priorityExecutor.stop(); 169 } 170 if (replicationExecutor != null) { 171 replicationExecutor.stop(); 172 } 173 if (metaTransitionExecutor != null) { 174 metaTransitionExecutor.stop(); 175 } 176 177 } 178 179 @Override 180 public boolean dispatch(CallRunner callTask) throws InterruptedException { 181 RpcCall call = callTask.getRpcCall(); 182 int level = priority.getPriority(call.getHeader(), call.getParam(), 183 call.getRequestUser().orElse(null)); 184 if (level == HConstants.PRIORITY_UNSET) { 185 level = HConstants.NORMAL_QOS; 186 } 187 if (metaTransitionExecutor != null && level == HConstants.META_QOS) { 188 return metaTransitionExecutor.dispatch(callTask); 189 } else if (priorityExecutor != null && level > highPriorityLevel) { 190 return priorityExecutor.dispatch(callTask); 191 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { 192 return replicationExecutor.dispatch(callTask); 193 } else { 194 return callExecutor.dispatch(callTask); 195 } 196 } 197 198 @Override 199 public int getMetaPriorityQueueLength() { 200 return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength(); 201 } 202 203 @Override 204 public int getGeneralQueueLength() { 205 return callExecutor.getQueueLength(); 206 } 207 208 @Override 209 public int getPriorityQueueLength() { 210 return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength(); 211 } 212 213 @Override 214 public int getReplicationQueueLength() { 215 return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); 216 } 217 218 @Override 219 public int getActiveRpcHandlerCount() { 220 return callExecutor.getActiveHandlerCount() + 221 (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) + 222 (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()) + 223 (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount()); 224 } 225 226 @Override 227 public long getNumGeneralCallsDropped() { 228 return callExecutor.getNumGeneralCallsDropped(); 229 } 230 231 @Override 232 public long getNumLifoModeSwitches() { 233 return callExecutor.getNumLifoModeSwitches(); 234 } 235 236 @Override 237 public int getWriteQueueLength() { 238 return callExecutor.getWriteQueueLength(); 239 } 240 241 @Override 242 public int getReadQueueLength() { 243 return callExecutor.getReadQueueLength(); 244 } 245 246 @Override 247 public int getScanQueueLength() { 248 return callExecutor.getScanQueueLength(); 249 } 250 251 @Override 252 public int getActiveWriteRpcHandlerCount() { 253 return callExecutor.getActiveWriteHandlerCount(); 254 } 255 256 @Override 257 public int getActiveReadRpcHandlerCount() { 258 return callExecutor.getActiveReadHandlerCount(); 259 } 260 261 @Override 262 public int getActiveScanRpcHandlerCount() { 263 return callExecutor.getActiveScanHandlerCount(); 264 } 265 266 @Override 267 public CallQueueInfo getCallQueueInfo() { 268 String queueName; 269 270 CallQueueInfo callQueueInfo = new CallQueueInfo(); 271 272 if (null != callExecutor) { 273 queueName = "Call Queue"; 274 callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary()); 275 callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary()); 276 } 277 278 if (null != priorityExecutor) { 279 queueName = "Priority Queue"; 280 callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary()); 281 callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary()); 282 } 283 284 if (null != replicationExecutor) { 285 queueName = "Replication Queue"; 286 callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary()); 287 callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary()); 288 } 289 290 if (null != metaTransitionExecutor) { 291 queueName = "Meta Transition Queue"; 292 callQueueInfo.setCallMethodCount(queueName, 293 metaTransitionExecutor.getCallQueueCountsSummary()); 294 callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary()); 295 } 296 297 return callQueueInfo; 298 } 299 300} 301