001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.Abortable; 022import org.apache.hadoop.hbase.HBaseInterfaceAudience; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.hadoop.hbase.conf.ConfigurationObserver; 025import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.yetus.audience.InterfaceStability; 028 029/** 030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'), 031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to 032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. See 033 * below article for explanation of options. 034 * @see <a href= 035 * "http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview 036 * on Request Queuing</a> 037 */ 038@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 039@InterfaceStability.Evolving 040public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver { 041 private int port; 042 private final PriorityFunction priority; 043 private final RpcExecutor callExecutor; 044 private final RpcExecutor priorityExecutor; 045 private final RpcExecutor replicationExecutor; 046 047 /** 048 * This executor is only for meta transition 049 */ 050 private final RpcExecutor metaTransitionExecutor; 051 052 private final RpcExecutor bulkloadExecutor; 053 054 /** What level a high priority call is at. */ 055 private final int highPriorityLevel; 056 057 private Abortable abortable = null; 058 059 /** 060 * @param handlerCount the number of handler threads that will be used to process calls 061 * @param priorityHandlerCount How many threads for priority handling. 062 * @param replicationHandlerCount How many threads for replication handling. 063 * @param priority Function to extract request priority. 064 */ 065 public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 066 int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, 067 Abortable server, int highPriorityLevel) { 068 int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT, 069 HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT); 070 int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 071 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 072 int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, 073 priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 074 int maxReplicationQueueLength = 075 conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, 076 replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 077 int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, 078 bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 079 080 this.priority = priority; 081 this.highPriorityLevel = highPriorityLevel; 082 this.abortable = server; 083 084 String callQueueType = 085 conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 086 float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0); 087 088 if (callqReadShare > 0) { 089 // at least 1 read handler and 1 write handler 090 callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), 091 maxQueueLength, priority, conf, server); 092 } else { 093 if ( 094 RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) 095 || RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf) 096 ) { 097 callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount, 098 maxQueueLength, priority, conf, server); 099 } else { 100 callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength, 101 priority, conf, server); 102 } 103 } 104 105 float metaCallqReadShare = 106 conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 107 MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE); 108 if (metaCallqReadShare > 0) { 109 // different read/write handler for meta, at least 1 read handler and 1 write handler 110 this.priorityExecutor = new MetaRWQueueRpcExecutor("priority.RWQ", 111 Math.max(2, priorityHandlerCount), maxPriorityQueueLength, priority, conf, server); 112 } else { 113 // Create 2 queues to help priorityExecutor be more scalable. 114 this.priorityExecutor = priorityHandlerCount > 0 115 ? new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount, 116 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 117 abortable) 118 : null; 119 } 120 this.replicationExecutor = replicationHandlerCount > 0 121 ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount, 122 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf, 123 abortable) 124 : null; 125 this.metaTransitionExecutor = metaTransitionHandler > 0 126 ? new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler, 127 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 128 abortable) 129 : null; 130 this.bulkloadExecutor = bulkLoadHandlerCount > 0 131 ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount, 132 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf, 133 abortable) 134 : null; 135 } 136 137 public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 138 int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) { 139 this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null, 140 highPriorityLevel); 141 } 142 143 /** 144 * Resize call queues; 145 * @param conf new configuration 146 */ 147 @Override 148 public void onConfigurationChange(Configuration conf) { 149 callExecutor.resizeQueues(conf); 150 if (priorityExecutor != null) { 151 priorityExecutor.resizeQueues(conf); 152 } 153 if (replicationExecutor != null) { 154 replicationExecutor.resizeQueues(conf); 155 } 156 if (metaTransitionExecutor != null) { 157 metaTransitionExecutor.resizeQueues(conf); 158 } 159 if (bulkloadExecutor != null) { 160 bulkloadExecutor.resizeQueues(conf); 161 } 162 163 String callQueueType = 164 conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 165 if ( 166 RpcExecutor.isCodelQueueType(callQueueType) || RpcExecutor.isPluggableQueueType(callQueueType) 167 ) { 168 callExecutor.onConfigurationChange(conf); 169 } 170 } 171 172 @Override 173 public void init(Context context) { 174 this.port = context.getListenerAddress().getPort(); 175 } 176 177 @Override 178 public void start() { 179 callExecutor.start(port); 180 if (priorityExecutor != null) { 181 priorityExecutor.start(port); 182 } 183 if (replicationExecutor != null) { 184 replicationExecutor.start(port); 185 } 186 if (metaTransitionExecutor != null) { 187 metaTransitionExecutor.start(port); 188 } 189 if (bulkloadExecutor != null) { 190 bulkloadExecutor.start(port); 191 } 192 193 } 194 195 @Override 196 public void stop() { 197 callExecutor.stop(); 198 if (priorityExecutor != null) { 199 priorityExecutor.stop(); 200 } 201 if (replicationExecutor != null) { 202 replicationExecutor.stop(); 203 } 204 if (metaTransitionExecutor != null) { 205 metaTransitionExecutor.stop(); 206 } 207 if (bulkloadExecutor != null) { 208 bulkloadExecutor.stop(); 209 } 210 211 } 212 213 @Override 214 public boolean dispatch(CallRunner callTask) { 215 RpcCall call = callTask.getRpcCall(); 216 int level = 217 priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser().orElse(null)); 218 if (level == HConstants.PRIORITY_UNSET) { 219 level = HConstants.NORMAL_QOS; 220 } 221 if ( 222 metaTransitionExecutor != null 223 && level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS 224 ) { 225 return metaTransitionExecutor.dispatch(callTask); 226 } else if (priorityExecutor != null && level > highPriorityLevel) { 227 return priorityExecutor.dispatch(callTask); 228 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { 229 return replicationExecutor.dispatch(callTask); 230 } else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) { 231 return bulkloadExecutor.dispatch(callTask); 232 } else { 233 return callExecutor.dispatch(callTask); 234 } 235 } 236 237 @Override 238 public int getMetaPriorityQueueLength() { 239 return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength(); 240 } 241 242 @Override 243 public int getGeneralQueueLength() { 244 return callExecutor.getQueueLength(); 245 } 246 247 @Override 248 public int getPriorityQueueLength() { 249 return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength(); 250 } 251 252 @Override 253 public int getReplicationQueueLength() { 254 return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); 255 } 256 257 @Override 258 public int getBulkLoadQueueLength() { 259 return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength(); 260 } 261 262 @Override 263 public int getActiveRpcHandlerCount() { 264 return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount() 265 + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount() 266 + getActiveBulkLoadRpcHandlerCount(); 267 } 268 269 @Override 270 public int getActiveMetaPriorityRpcHandlerCount() { 271 return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount()); 272 } 273 274 @Override 275 public int getActiveGeneralRpcHandlerCount() { 276 return callExecutor.getActiveHandlerCount(); 277 } 278 279 @Override 280 public int getActivePriorityRpcHandlerCount() { 281 return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()); 282 } 283 284 @Override 285 public int getActiveReplicationRpcHandlerCount() { 286 return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); 287 } 288 289 @Override 290 public int getActiveBulkLoadRpcHandlerCount() { 291 return bulkloadExecutor == null ? 0 : bulkloadExecutor.getActiveHandlerCount(); 292 } 293 294 @Override 295 public long getNumGeneralCallsDropped() { 296 return callExecutor.getNumGeneralCallsDropped(); 297 } 298 299 @Override 300 public long getNumLifoModeSwitches() { 301 return callExecutor.getNumLifoModeSwitches(); 302 } 303 304 @Override 305 public int getWriteQueueLength() { 306 return callExecutor.getWriteQueueLength(); 307 } 308 309 @Override 310 public int getReadQueueLength() { 311 return callExecutor.getReadQueueLength(); 312 } 313 314 @Override 315 public int getScanQueueLength() { 316 return callExecutor.getScanQueueLength(); 317 } 318 319 @Override 320 public int getActiveWriteRpcHandlerCount() { 321 return callExecutor.getActiveWriteHandlerCount(); 322 } 323 324 @Override 325 public int getActiveReadRpcHandlerCount() { 326 return callExecutor.getActiveReadHandlerCount(); 327 } 328 329 @Override 330 public int getActiveScanRpcHandlerCount() { 331 return callExecutor.getActiveScanHandlerCount(); 332 } 333 334 @Override 335 public CallQueueInfo getCallQueueInfo() { 336 String queueName; 337 338 CallQueueInfo callQueueInfo = new CallQueueInfo(); 339 340 if (null != callExecutor) { 341 queueName = "Call Queue"; 342 callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary()); 343 callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary()); 344 } 345 346 if (null != priorityExecutor) { 347 queueName = "Priority Queue"; 348 callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary()); 349 callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary()); 350 } 351 352 if (null != replicationExecutor) { 353 queueName = "Replication Queue"; 354 callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary()); 355 callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary()); 356 } 357 358 if (null != metaTransitionExecutor) { 359 queueName = "Meta Transition Queue"; 360 callQueueInfo.setCallMethodCount(queueName, 361 metaTransitionExecutor.getCallQueueCountsSummary()); 362 callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary()); 363 } 364 365 if (null != bulkloadExecutor) { 366 queueName = "BulkLoad Queue"; 367 callQueueInfo.setCallMethodCount(queueName, bulkloadExecutor.getCallQueueCountsSummary()); 368 callQueueInfo.setCallMethodSize(queueName, bulkloadExecutor.getCallQueueSizeSummary()); 369 } 370 371 return callQueueInfo; 372 } 373 374}