001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.Abortable; 022import org.apache.hadoop.hbase.HBaseInterfaceAudience; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.hadoop.hbase.conf.ConfigurationObserver; 025import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.yetus.audience.InterfaceStability; 028 029/** 030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'), 031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to 032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. See 033 * below article for explanation of options. 034 * @see <a href= 035 * "http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview 036 * on Request Queuing</a> 037 */ 038@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 039@InterfaceStability.Evolving 040public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver { 041 private int port; 042 private final PriorityFunction priority; 043 private final RpcExecutor callExecutor; 044 private final RpcExecutor priorityExecutor; 045 private final RpcExecutor replicationExecutor; 046 047 /** 048 * This executor is only for meta transition 049 */ 050 private final RpcExecutor metaTransitionExecutor; 051 052 /** What level a high priority call is at. */ 053 private final int highPriorityLevel; 054 055 private Abortable abortable = null; 056 057 /** 058 * n * @param handlerCount the number of handler threads that will be used to process calls 059 * @param priorityHandlerCount How many threads for priority handling. 060 * @param replicationHandlerCount How many threads for replication handling. n * @param priority 061 * Function to extract request priority. 062 */ 063 public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 064 int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, 065 Abortable server, int highPriorityLevel) { 066 067 int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 068 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 069 int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, 070 priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 071 int maxReplicationQueueLength = 072 conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, 073 replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 074 075 this.priority = priority; 076 this.highPriorityLevel = highPriorityLevel; 077 this.abortable = server; 078 079 String callQueueType = 080 conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 081 float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0); 082 083 if (callqReadShare > 0) { 084 // at least 1 read handler and 1 write handler 085 callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), 086 maxQueueLength, priority, conf, server); 087 } else { 088 if ( 089 RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) 090 || RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf) 091 ) { 092 callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount, 093 maxQueueLength, priority, conf, server); 094 } else { 095 callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength, 096 priority, conf, server); 097 } 098 } 099 100 float metaCallqReadShare = 101 conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 102 MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE); 103 if (metaCallqReadShare > 0) { 104 // different read/write handler for meta, at least 1 read handler and 1 write handler 105 this.priorityExecutor = new MetaRWQueueRpcExecutor("priority.RWQ", 106 Math.max(2, priorityHandlerCount), maxPriorityQueueLength, priority, conf, server); 107 } else { 108 // Create 2 queues to help priorityExecutor be more scalable. 109 this.priorityExecutor = priorityHandlerCount > 0 110 ? new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount, 111 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 112 abortable) 113 : null; 114 } 115 this.replicationExecutor = replicationHandlerCount > 0 116 ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount, 117 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf, 118 abortable) 119 : null; 120 121 this.metaTransitionExecutor = metaTransitionHandler > 0 122 ? new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler, 123 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 124 abortable) 125 : null; 126 } 127 128 public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 129 int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) { 130 this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null, 131 highPriorityLevel); 132 } 133 134 /** 135 * Resize call queues; 136 * @param conf new configuration 137 */ 138 @Override 139 public void onConfigurationChange(Configuration conf) { 140 callExecutor.resizeQueues(conf); 141 if (priorityExecutor != null) { 142 priorityExecutor.resizeQueues(conf); 143 } 144 if (replicationExecutor != null) { 145 replicationExecutor.resizeQueues(conf); 146 } 147 if (metaTransitionExecutor != null) { 148 metaTransitionExecutor.resizeQueues(conf); 149 } 150 151 String callQueueType = 152 conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 153 if ( 154 RpcExecutor.isCodelQueueType(callQueueType) || RpcExecutor.isPluggableQueueType(callQueueType) 155 ) { 156 callExecutor.onConfigurationChange(conf); 157 } 158 } 159 160 @Override 161 public void init(Context context) { 162 this.port = context.getListenerAddress().getPort(); 163 } 164 165 @Override 166 public void start() { 167 callExecutor.start(port); 168 if (priorityExecutor != null) { 169 priorityExecutor.start(port); 170 } 171 if (replicationExecutor != null) { 172 replicationExecutor.start(port); 173 } 174 if (metaTransitionExecutor != null) { 175 metaTransitionExecutor.start(port); 176 } 177 178 } 179 180 @Override 181 public void stop() { 182 callExecutor.stop(); 183 if (priorityExecutor != null) { 184 priorityExecutor.stop(); 185 } 186 if (replicationExecutor != null) { 187 replicationExecutor.stop(); 188 } 189 if (metaTransitionExecutor != null) { 190 metaTransitionExecutor.stop(); 191 } 192 193 } 194 195 @Override 196 public boolean dispatch(CallRunner callTask) { 197 RpcCall call = callTask.getRpcCall(); 198 int level = 199 priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser().orElse(null)); 200 if (level == HConstants.PRIORITY_UNSET) { 201 level = HConstants.NORMAL_QOS; 202 } 203 if ( 204 metaTransitionExecutor != null 205 && level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS 206 ) { 207 return metaTransitionExecutor.dispatch(callTask); 208 } else if (priorityExecutor != null && level > highPriorityLevel) { 209 return priorityExecutor.dispatch(callTask); 210 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { 211 return replicationExecutor.dispatch(callTask); 212 } else { 213 return callExecutor.dispatch(callTask); 214 } 215 } 216 217 @Override 218 public int getMetaPriorityQueueLength() { 219 return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength(); 220 } 221 222 @Override 223 public int getGeneralQueueLength() { 224 return callExecutor.getQueueLength(); 225 } 226 227 @Override 228 public int getPriorityQueueLength() { 229 return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength(); 230 } 231 232 @Override 233 public int getReplicationQueueLength() { 234 return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); 235 } 236 237 @Override 238 public int getActiveRpcHandlerCount() { 239 return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount() 240 + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount(); 241 } 242 243 @Override 244 public int getActiveMetaPriorityRpcHandlerCount() { 245 return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount()); 246 } 247 248 @Override 249 public int getActiveGeneralRpcHandlerCount() { 250 return callExecutor.getActiveHandlerCount(); 251 } 252 253 @Override 254 public int getActivePriorityRpcHandlerCount() { 255 return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()); 256 } 257 258 @Override 259 public int getActiveReplicationRpcHandlerCount() { 260 return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); 261 } 262 263 @Override 264 public long getNumGeneralCallsDropped() { 265 return callExecutor.getNumGeneralCallsDropped(); 266 } 267 268 @Override 269 public long getNumLifoModeSwitches() { 270 return callExecutor.getNumLifoModeSwitches(); 271 } 272 273 @Override 274 public int getWriteQueueLength() { 275 return callExecutor.getWriteQueueLength(); 276 } 277 278 @Override 279 public int getReadQueueLength() { 280 return callExecutor.getReadQueueLength(); 281 } 282 283 @Override 284 public int getScanQueueLength() { 285 return callExecutor.getScanQueueLength(); 286 } 287 288 @Override 289 public int getActiveWriteRpcHandlerCount() { 290 return callExecutor.getActiveWriteHandlerCount(); 291 } 292 293 @Override 294 public int getActiveReadRpcHandlerCount() { 295 return callExecutor.getActiveReadHandlerCount(); 296 } 297 298 @Override 299 public int getActiveScanRpcHandlerCount() { 300 return callExecutor.getActiveScanHandlerCount(); 301 } 302 303 @Override 304 public CallQueueInfo getCallQueueInfo() { 305 String queueName; 306 307 CallQueueInfo callQueueInfo = new CallQueueInfo(); 308 309 if (null != callExecutor) { 310 queueName = "Call Queue"; 311 callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary()); 312 callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary()); 313 } 314 315 if (null != priorityExecutor) { 316 queueName = "Priority Queue"; 317 callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary()); 318 callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary()); 319 } 320 321 if (null != replicationExecutor) { 322 queueName = "Replication Queue"; 323 callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary()); 324 callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary()); 325 } 326 327 if (null != metaTransitionExecutor) { 328 queueName = "Meta Transition Queue"; 329 callQueueInfo.setCallMethodCount(queueName, 330 metaTransitionExecutor.getCallQueueCountsSummary()); 331 callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary()); 332 } 333 334 return callQueueInfo; 335 } 336 337}