001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.Abortable;
022import org.apache.hadoop.hbase.HBaseInterfaceAudience;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.hadoop.hbase.conf.ConfigurationObserver;
025import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028
029/**
030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to
032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. See
033 * below article for explanation of options.
034 * @see <a href=
035 *      "http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview
036 *      on Request Queuing</a>
037 */
038@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
039@InterfaceStability.Evolving
040public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
041  private int port;
042  private final PriorityFunction priority;
043  private final RpcExecutor callExecutor;
044  private final RpcExecutor priorityExecutor;
045  private final RpcExecutor replicationExecutor;
046
047  /**
048   * This executor is only for meta transition
049   */
050  private final RpcExecutor metaTransitionExecutor;
051
052  private final RpcExecutor bulkloadExecutor;
053
054  /** What level a high priority call is at. */
055  private final int highPriorityLevel;
056
057  private Abortable abortable = null;
058
059  /**
060   * @param handlerCount            the number of handler threads that will be used to process calls
061   * @param priorityHandlerCount    How many threads for priority handling.
062   * @param replicationHandlerCount How many threads for replication handling.
063   * @param priority                Function to extract request priority.
064   */
065  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
066    int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
067    Abortable server, int highPriorityLevel) {
068    int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
069      HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT);
070    int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
071      handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
072    int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
073      priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
074    int maxReplicationQueueLength =
075      conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
076        replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
077    int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
078      bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
079
080    this.priority = priority;
081    this.highPriorityLevel = highPriorityLevel;
082    this.abortable = server;
083
084    String callQueueType =
085      conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
086    float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
087
088    if (callqReadShare > 0) {
089      // at least 1 read handler and 1 write handler
090      callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
091        maxQueueLength, priority, conf, server);
092    } else {
093      if (
094        RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)
095          || RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf)
096      ) {
097        callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
098          maxQueueLength, priority, conf, server);
099      } else {
100        callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength,
101          priority, conf, server);
102      }
103    }
104
105    float metaCallqReadShare =
106      conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY,
107        MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE);
108    if (metaCallqReadShare > 0) {
109      // different read/write handler for meta, at least 1 read handler and 1 write handler
110      this.priorityExecutor = new MetaRWQueueRpcExecutor("priority.RWQ",
111        Math.max(2, priorityHandlerCount), maxPriorityQueueLength, priority, conf, server);
112    } else {
113      // Create 2 queues to help priorityExecutor be more scalable.
114      this.priorityExecutor = priorityHandlerCount > 0
115        ? new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount,
116          RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
117          abortable)
118        : null;
119    }
120    this.replicationExecutor = replicationHandlerCount > 0
121      ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,
122        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf,
123        abortable)
124      : null;
125    this.metaTransitionExecutor = metaTransitionHandler > 0
126      ? new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler,
127        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
128        abortable)
129      : null;
130    this.bulkloadExecutor = bulkLoadHandlerCount > 0
131      ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount,
132        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf,
133        abortable)
134      : null;
135  }
136
137  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
138    int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
139    this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null,
140      highPriorityLevel);
141  }
142
143  /**
144   * Resize call queues;
145   * @param conf new configuration
146   */
147  @Override
148  public void onConfigurationChange(Configuration conf) {
149    callExecutor.resizeQueues(conf);
150    if (priorityExecutor != null) {
151      priorityExecutor.resizeQueues(conf);
152    }
153    if (replicationExecutor != null) {
154      replicationExecutor.resizeQueues(conf);
155    }
156    if (metaTransitionExecutor != null) {
157      metaTransitionExecutor.resizeQueues(conf);
158    }
159    if (bulkloadExecutor != null) {
160      bulkloadExecutor.resizeQueues(conf);
161    }
162
163    String callQueueType =
164      conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
165    if (
166      RpcExecutor.isCodelQueueType(callQueueType) || RpcExecutor.isPluggableQueueType(callQueueType)
167    ) {
168      callExecutor.onConfigurationChange(conf);
169    }
170  }
171
172  @Override
173  public void init(Context context) {
174    this.port = context.getListenerAddress().getPort();
175  }
176
177  @Override
178  public void start() {
179    callExecutor.start(port);
180    if (priorityExecutor != null) {
181      priorityExecutor.start(port);
182    }
183    if (replicationExecutor != null) {
184      replicationExecutor.start(port);
185    }
186    if (metaTransitionExecutor != null) {
187      metaTransitionExecutor.start(port);
188    }
189    if (bulkloadExecutor != null) {
190      bulkloadExecutor.start(port);
191    }
192
193  }
194
195  @Override
196  public void stop() {
197    callExecutor.stop();
198    if (priorityExecutor != null) {
199      priorityExecutor.stop();
200    }
201    if (replicationExecutor != null) {
202      replicationExecutor.stop();
203    }
204    if (metaTransitionExecutor != null) {
205      metaTransitionExecutor.stop();
206    }
207    if (bulkloadExecutor != null) {
208      bulkloadExecutor.stop();
209    }
210
211  }
212
213  @Override
214  public boolean dispatch(CallRunner callTask) {
215    RpcCall call = callTask.getRpcCall();
216    int level =
217      priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser().orElse(null));
218    if (level == HConstants.PRIORITY_UNSET) {
219      level = HConstants.NORMAL_QOS;
220    }
221    if (
222      metaTransitionExecutor != null
223        && level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS
224    ) {
225      return metaTransitionExecutor.dispatch(callTask);
226    } else if (priorityExecutor != null && level > highPriorityLevel) {
227      return priorityExecutor.dispatch(callTask);
228    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
229      return replicationExecutor.dispatch(callTask);
230    } else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) {
231      return bulkloadExecutor.dispatch(callTask);
232    } else {
233      return callExecutor.dispatch(callTask);
234    }
235  }
236
237  @Override
238  public int getMetaPriorityQueueLength() {
239    return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength();
240  }
241
242  @Override
243  public int getGeneralQueueLength() {
244    return callExecutor.getQueueLength();
245  }
246
247  @Override
248  public int getPriorityQueueLength() {
249    return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
250  }
251
252  @Override
253  public int getReplicationQueueLength() {
254    return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
255  }
256
257  @Override
258  public int getBulkLoadQueueLength() {
259    return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength();
260  }
261
262  @Override
263  public int getActiveRpcHandlerCount() {
264    return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
265      + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount()
266      + getActiveBulkLoadRpcHandlerCount();
267  }
268
269  @Override
270  public int getActiveMetaPriorityRpcHandlerCount() {
271    return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount());
272  }
273
274  @Override
275  public int getActiveGeneralRpcHandlerCount() {
276    return callExecutor.getActiveHandlerCount();
277  }
278
279  @Override
280  public int getActivePriorityRpcHandlerCount() {
281    return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount());
282  }
283
284  @Override
285  public int getActiveReplicationRpcHandlerCount() {
286    return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
287  }
288
289  @Override
290  public int getActiveBulkLoadRpcHandlerCount() {
291    return bulkloadExecutor == null ? 0 : bulkloadExecutor.getActiveHandlerCount();
292  }
293
294  @Override
295  public long getNumGeneralCallsDropped() {
296    return callExecutor.getNumGeneralCallsDropped();
297  }
298
299  @Override
300  public long getNumLifoModeSwitches() {
301    return callExecutor.getNumLifoModeSwitches();
302  }
303
304  @Override
305  public int getWriteQueueLength() {
306    return callExecutor.getWriteQueueLength();
307  }
308
309  @Override
310  public int getReadQueueLength() {
311    return callExecutor.getReadQueueLength();
312  }
313
314  @Override
315  public int getScanQueueLength() {
316    return callExecutor.getScanQueueLength();
317  }
318
319  @Override
320  public int getActiveWriteRpcHandlerCount() {
321    return callExecutor.getActiveWriteHandlerCount();
322  }
323
324  @Override
325  public int getActiveReadRpcHandlerCount() {
326    return callExecutor.getActiveReadHandlerCount();
327  }
328
329  @Override
330  public int getActiveScanRpcHandlerCount() {
331    return callExecutor.getActiveScanHandlerCount();
332  }
333
334  @Override
335  public CallQueueInfo getCallQueueInfo() {
336    String queueName;
337
338    CallQueueInfo callQueueInfo = new CallQueueInfo();
339
340    if (null != callExecutor) {
341      queueName = "Call Queue";
342      callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
343      callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
344    }
345
346    if (null != priorityExecutor) {
347      queueName = "Priority Queue";
348      callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
349      callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
350    }
351
352    if (null != replicationExecutor) {
353      queueName = "Replication Queue";
354      callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
355      callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
356    }
357
358    if (null != metaTransitionExecutor) {
359      queueName = "Meta Transition Queue";
360      callQueueInfo.setCallMethodCount(queueName,
361        metaTransitionExecutor.getCallQueueCountsSummary());
362      callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
363    }
364
365    if (null != bulkloadExecutor) {
366      queueName = "BulkLoad Queue";
367      callQueueInfo.setCallMethodCount(queueName, bulkloadExecutor.getCallQueueCountsSummary());
368      callQueueInfo.setCallMethodSize(queueName, bulkloadExecutor.getCallQueueSizeSummary());
369    }
370
371    return callQueueInfo;
372  }
373
374}