001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.Abortable; 022import org.apache.hadoop.hbase.HBaseInterfaceAudience; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.yetus.audience.InterfaceStability; 026import org.apache.hadoop.hbase.conf.ConfigurationObserver; 027import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction; 028 029/** 030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'), 031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to 032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. 033 * See below article for explanation of options. 034 * @see <a href="http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview on Request Queuing</a> 035 */ 036@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) 037@InterfaceStability.Evolving 038public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver { 039 private int port; 040 private final PriorityFunction priority; 041 private final RpcExecutor callExecutor; 042 private final RpcExecutor priorityExecutor; 043 private final RpcExecutor replicationExecutor; 044 045 /** 046 * This executor is only for meta transition 047 */ 048 private final RpcExecutor metaTransitionExecutor; 049 050 /** What level a high priority call is at. */ 051 private final int highPriorityLevel; 052 053 private Abortable abortable = null; 054 055 /** 056 * @param conf 057 * @param handlerCount the number of handler threads that will be used to process calls 058 * @param priorityHandlerCount How many threads for priority handling. 059 * @param replicationHandlerCount How many threads for replication handling. 060 * @param highPriorityLevel 061 * @param priority Function to extract request priority. 062 */ 063 public SimpleRpcScheduler( 064 Configuration conf, 065 int handlerCount, 066 int priorityHandlerCount, 067 int replicationHandlerCount, 068 int metaTransitionHandler, 069 PriorityFunction priority, 070 Abortable server, 071 int highPriorityLevel) { 072 073 int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 074 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 075 int maxPriorityQueueLength = 076 conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength); 077 078 this.priority = priority; 079 this.highPriorityLevel = highPriorityLevel; 080 this.abortable = server; 081 082 String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 083 RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 084 float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0); 085 086 if (callqReadShare > 0) { 087 // at least 1 read handler and 1 write handler 088 callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount), 089 maxQueueLength, priority, conf, server); 090 } else { 091 if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) { 092 callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount, 093 maxQueueLength, priority, conf, server); 094 } else { 095 callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength, 096 priority, conf, server); 097 } 098 } 099 100 float metaCallqReadShare = 101 conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 102 MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE); 103 if (metaCallqReadShare > 0) { 104 // different read/write handler for meta, at least 1 read handler and 1 write handler 105 this.priorityExecutor = 106 new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount), 107 maxPriorityQueueLength, priority, conf, server); 108 } else { 109 // Create 2 queues to help priorityExecutor be more scalable. 110 this.priorityExecutor = priorityHandlerCount > 0 ? 111 new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount, 112 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 113 abortable) : 114 null; 115 } 116 this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor( 117 "replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, 118 maxQueueLength, priority, conf, abortable) : null; 119 120 this.metaTransitionExecutor = metaTransitionHandler > 0 ? 121 new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler, 122 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 123 abortable) : 124 null; 125 } 126 127 public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 128 int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) { 129 this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null, 130 highPriorityLevel); 131 } 132 133 /** 134 * Resize call queues; 135 * @param conf new configuration 136 */ 137 @Override 138 public void onConfigurationChange(Configuration conf) { 139 callExecutor.resizeQueues(conf); 140 if (priorityExecutor != null) { 141 priorityExecutor.resizeQueues(conf); 142 } 143 if (replicationExecutor != null) { 144 replicationExecutor.resizeQueues(conf); 145 } 146 if (metaTransitionExecutor != null) { 147 metaTransitionExecutor.resizeQueues(conf); 148 } 149 150 String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 151 RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 152 if (RpcExecutor.isCodelQueueType(callQueueType)) { 153 callExecutor.onConfigurationChange(conf); 154 } 155 } 156 157 @Override 158 public void init(Context context) { 159 this.port = context.getListenerAddress().getPort(); 160 } 161 162 @Override 163 public void start() { 164 callExecutor.start(port); 165 if (priorityExecutor != null) { 166 priorityExecutor.start(port); 167 } 168 if (replicationExecutor != null) { 169 replicationExecutor.start(port); 170 } 171 if (metaTransitionExecutor != null) { 172 metaTransitionExecutor.start(port); 173 } 174 175 } 176 177 @Override 178 public void stop() { 179 callExecutor.stop(); 180 if (priorityExecutor != null) { 181 priorityExecutor.stop(); 182 } 183 if (replicationExecutor != null) { 184 replicationExecutor.stop(); 185 } 186 if (metaTransitionExecutor != null) { 187 metaTransitionExecutor.stop(); 188 } 189 190 } 191 192 @Override 193 public boolean dispatch(CallRunner callTask) throws InterruptedException { 194 RpcCall call = callTask.getRpcCall(); 195 int level = priority.getPriority(call.getHeader(), call.getParam(), 196 call.getRequestUser().orElse(null)); 197 if (level == HConstants.PRIORITY_UNSET) { 198 level = HConstants.NORMAL_QOS; 199 } 200 if (metaTransitionExecutor != null && 201 level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS) { 202 return metaTransitionExecutor.dispatch(callTask); 203 } else if (priorityExecutor != null && level > highPriorityLevel) { 204 return priorityExecutor.dispatch(callTask); 205 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { 206 return replicationExecutor.dispatch(callTask); 207 } else { 208 return callExecutor.dispatch(callTask); 209 } 210 } 211 212 @Override 213 public int getMetaPriorityQueueLength() { 214 return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength(); 215 } 216 217 @Override 218 public int getGeneralQueueLength() { 219 return callExecutor.getQueueLength(); 220 } 221 222 @Override 223 public int getPriorityQueueLength() { 224 return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength(); 225 } 226 227 @Override 228 public int getReplicationQueueLength() { 229 return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); 230 } 231 232 @Override 233 public int getActiveRpcHandlerCount() { 234 return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount() 235 + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount(); 236 } 237 238 @Override 239 public int getActiveMetaPriorityRpcHandlerCount() { 240 return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount()); 241 } 242 243 @Override 244 public int getActiveGeneralRpcHandlerCount() { 245 return callExecutor.getActiveHandlerCount(); 246 } 247 248 @Override 249 public int getActivePriorityRpcHandlerCount() { 250 return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()); 251 } 252 253 @Override 254 public int getActiveReplicationRpcHandlerCount() { 255 return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); 256 } 257 258 @Override 259 public long getNumGeneralCallsDropped() { 260 return callExecutor.getNumGeneralCallsDropped(); 261 } 262 263 @Override 264 public long getNumLifoModeSwitches() { 265 return callExecutor.getNumLifoModeSwitches(); 266 } 267 268 @Override 269 public int getWriteQueueLength() { 270 return callExecutor.getWriteQueueLength(); 271 } 272 273 @Override 274 public int getReadQueueLength() { 275 return callExecutor.getReadQueueLength(); 276 } 277 278 @Override 279 public int getScanQueueLength() { 280 return callExecutor.getScanQueueLength(); 281 } 282 283 @Override 284 public int getActiveWriteRpcHandlerCount() { 285 return callExecutor.getActiveWriteHandlerCount(); 286 } 287 288 @Override 289 public int getActiveReadRpcHandlerCount() { 290 return callExecutor.getActiveReadHandlerCount(); 291 } 292 293 @Override 294 public int getActiveScanRpcHandlerCount() { 295 return callExecutor.getActiveScanHandlerCount(); 296 } 297 298 @Override 299 public CallQueueInfo getCallQueueInfo() { 300 String queueName; 301 302 CallQueueInfo callQueueInfo = new CallQueueInfo(); 303 304 if (null != callExecutor) { 305 queueName = "Call Queue"; 306 callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary()); 307 callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary()); 308 } 309 310 if (null != priorityExecutor) { 311 queueName = "Priority Queue"; 312 callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary()); 313 callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary()); 314 } 315 316 if (null != replicationExecutor) { 317 queueName = "Replication Queue"; 318 callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary()); 319 callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary()); 320 } 321 322 if (null != metaTransitionExecutor) { 323 queueName = "Meta Transition Queue"; 324 callQueueInfo.setCallMethodCount(queueName, 325 metaTransitionExecutor.getCallQueueCountsSummary()); 326 callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary()); 327 } 328 329 return callQueueInfo; 330 } 331 332} 333