001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.Abortable;
022import org.apache.hadoop.hbase.HBaseInterfaceAudience;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.yetus.audience.InterfaceStability;
026import org.apache.hadoop.hbase.conf.ConfigurationObserver;
027import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction;
028
029/**
030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to
032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc.
033 * See below article for explanation of options.
034 * @see <a href="http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview on Request Queuing</a>
035 */
036@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
037@InterfaceStability.Evolving
038public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
039  private int port;
040  private final PriorityFunction priority;
041  private final RpcExecutor callExecutor;
042  private final RpcExecutor priorityExecutor;
043  private final RpcExecutor replicationExecutor;
044
045  /**
046   * This executor is only for meta transition
047   */
048  private final RpcExecutor metaTransitionExecutor;
049
050  /** What level a high priority call is at. */
051  private final int highPriorityLevel;
052
053  private Abortable abortable = null;
054
055  /**
056   * @param conf
057   * @param handlerCount the number of handler threads that will be used to process calls
058   * @param priorityHandlerCount How many threads for priority handling.
059   * @param replicationHandlerCount How many threads for replication handling.
060   * @param highPriorityLevel
061   * @param priority Function to extract request priority.
062   */
063  public SimpleRpcScheduler(
064      Configuration conf,
065      int handlerCount,
066      int priorityHandlerCount,
067      int replicationHandlerCount,
068      int metaTransitionHandler,
069      PriorityFunction priority,
070      Abortable server,
071      int highPriorityLevel) {
072
073    int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
074        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
075    int maxPriorityQueueLength =
076        conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength);
077
078    this.priority = priority;
079    this.highPriorityLevel = highPriorityLevel;
080    this.abortable = server;
081
082    String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
083      RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
084    float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
085
086    if (callqReadShare > 0) {
087      // at least 1 read handler and 1 write handler
088      callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
089        maxQueueLength, priority, conf, server);
090    } else {
091      if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
092        callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
093            maxQueueLength, priority, conf, server);
094      } else {
095        callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength,
096            priority, conf, server);
097      }
098    }
099
100    float metaCallqReadShare =
101        conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY,
102            MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE);
103    if (metaCallqReadShare > 0) {
104      // different read/write handler for meta, at least 1 read handler and 1 write handler
105      this.priorityExecutor =
106          new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount),
107              maxPriorityQueueLength, priority, conf, server);
108    } else {
109      // Create 2 queues to help priorityExecutor be more scalable.
110      this.priorityExecutor = priorityHandlerCount > 0 ?
111          new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount,
112              RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
113              abortable) :
114          null;
115    }
116    this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
117        "replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
118        maxQueueLength, priority, conf, abortable) : null;
119
120    this.metaTransitionExecutor = metaTransitionHandler > 0 ?
121        new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler,
122            RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
123            abortable) :
124        null;
125  }
126
127  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
128      int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
129    this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null,
130        highPriorityLevel);
131  }
132
133  /**
134   * Resize call queues;
135   * @param conf new configuration
136   */
137  @Override
138  public void onConfigurationChange(Configuration conf) {
139    callExecutor.resizeQueues(conf);
140    if (priorityExecutor != null) {
141      priorityExecutor.resizeQueues(conf);
142    }
143    if (replicationExecutor != null) {
144      replicationExecutor.resizeQueues(conf);
145    }
146    if (metaTransitionExecutor != null) {
147      metaTransitionExecutor.resizeQueues(conf);
148    }
149
150    String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
151      RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
152    if (RpcExecutor.isCodelQueueType(callQueueType)) {
153      callExecutor.onConfigurationChange(conf);
154    }
155  }
156
157  @Override
158  public void init(Context context) {
159    this.port = context.getListenerAddress().getPort();
160  }
161
162  @Override
163  public void start() {
164    callExecutor.start(port);
165    if (priorityExecutor != null) {
166      priorityExecutor.start(port);
167    }
168    if (replicationExecutor != null) {
169      replicationExecutor.start(port);
170    }
171    if (metaTransitionExecutor != null) {
172      metaTransitionExecutor.start(port);
173    }
174
175  }
176
177  @Override
178  public void stop() {
179    callExecutor.stop();
180    if (priorityExecutor != null) {
181      priorityExecutor.stop();
182    }
183    if (replicationExecutor != null) {
184      replicationExecutor.stop();
185    }
186    if (metaTransitionExecutor != null) {
187      metaTransitionExecutor.stop();
188    }
189
190  }
191
192  @Override
193  public boolean dispatch(CallRunner callTask) throws InterruptedException {
194    RpcCall call = callTask.getRpcCall();
195    int level = priority.getPriority(call.getHeader(), call.getParam(),
196        call.getRequestUser().orElse(null));
197    if (level == HConstants.PRIORITY_UNSET) {
198      level = HConstants.NORMAL_QOS;
199    }
200    if (metaTransitionExecutor != null &&
201      level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS) {
202      return metaTransitionExecutor.dispatch(callTask);
203    } else if (priorityExecutor != null && level > highPriorityLevel) {
204      return priorityExecutor.dispatch(callTask);
205    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
206      return replicationExecutor.dispatch(callTask);
207    } else {
208      return callExecutor.dispatch(callTask);
209    }
210  }
211
212  @Override
213  public int getMetaPriorityQueueLength() {
214    return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength();
215  }
216
217  @Override
218  public int getGeneralQueueLength() {
219    return callExecutor.getQueueLength();
220  }
221
222  @Override
223  public int getPriorityQueueLength() {
224    return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
225  }
226
227  @Override
228  public int getReplicationQueueLength() {
229    return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
230  }
231
232  @Override
233  public int getActiveRpcHandlerCount() {
234    return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
235        + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount();
236  }
237
238  @Override
239  public int getActiveMetaPriorityRpcHandlerCount() {
240    return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount());
241  }
242
243  @Override
244  public int getActiveGeneralRpcHandlerCount() {
245    return callExecutor.getActiveHandlerCount();
246  }
247
248  @Override
249  public int getActivePriorityRpcHandlerCount() {
250    return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount());
251  }
252
253  @Override
254  public int getActiveReplicationRpcHandlerCount() {
255    return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
256  }
257
258  @Override
259  public long getNumGeneralCallsDropped() {
260    return callExecutor.getNumGeneralCallsDropped();
261  }
262
263  @Override
264  public long getNumLifoModeSwitches() {
265    return callExecutor.getNumLifoModeSwitches();
266  }
267
268  @Override
269  public int getWriteQueueLength() {
270    return callExecutor.getWriteQueueLength();
271  }
272
273  @Override
274  public int getReadQueueLength() {
275    return callExecutor.getReadQueueLength();
276  }
277
278  @Override
279  public int getScanQueueLength() {
280    return callExecutor.getScanQueueLength();
281  }
282
283  @Override
284  public int getActiveWriteRpcHandlerCount() {
285    return callExecutor.getActiveWriteHandlerCount();
286  }
287
288  @Override
289  public int getActiveReadRpcHandlerCount() {
290    return callExecutor.getActiveReadHandlerCount();
291  }
292
293  @Override
294  public int getActiveScanRpcHandlerCount() {
295    return callExecutor.getActiveScanHandlerCount();
296  }
297
298  @Override
299  public CallQueueInfo getCallQueueInfo() {
300    String queueName;
301
302    CallQueueInfo callQueueInfo = new CallQueueInfo();
303
304    if (null != callExecutor) {
305      queueName = "Call Queue";
306      callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
307      callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
308    }
309
310    if (null != priorityExecutor) {
311      queueName = "Priority Queue";
312      callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
313      callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
314    }
315
316    if (null != replicationExecutor) {
317      queueName = "Replication Queue";
318      callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
319      callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
320    }
321
322    if (null != metaTransitionExecutor) {
323      queueName = "Meta Transition Queue";
324      callQueueInfo.setCallMethodCount(queueName,
325          metaTransitionExecutor.getCallQueueCountsSummary());
326      callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
327    }
328
329    return callQueueInfo;
330  }
331
332}
333