001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.Abortable;
022import org.apache.hadoop.hbase.HBaseInterfaceAudience;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.hadoop.hbase.conf.ConfigurationObserver;
025import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028
029/**
030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to
032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. See
033 * below article for explanation of options.
034 * @see <a href=
035 *      "http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview
036 *      on Request Queuing</a>
037 */
038@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
039@InterfaceStability.Evolving
040public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
041  private int port;
042  private final PriorityFunction priority;
043  private final RpcExecutor callExecutor;
044  private final RpcExecutor priorityExecutor;
045  private final RpcExecutor replicationExecutor;
046
047  /**
048   * This executor is only for meta transition
049   */
050  private final RpcExecutor metaTransitionExecutor;
051
052  /** What level a high priority call is at. */
053  private final int highPriorityLevel;
054
055  private Abortable abortable = null;
056
057  /**
058   * n * @param handlerCount the number of handler threads that will be used to process calls
059   * @param priorityHandlerCount    How many threads for priority handling.
060   * @param replicationHandlerCount How many threads for replication handling. n * @param priority
061   *                                Function to extract request priority.
062   */
063  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
064    int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
065    Abortable server, int highPriorityLevel) {
066
067    int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
068      handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
069    int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
070      priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
071    int maxReplicationQueueLength =
072      conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
073        replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
074
075    this.priority = priority;
076    this.highPriorityLevel = highPriorityLevel;
077    this.abortable = server;
078
079    String callQueueType =
080      conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
081    float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
082
083    if (callqReadShare > 0) {
084      // at least 1 read handler and 1 write handler
085      callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
086        maxQueueLength, priority, conf, server);
087    } else {
088      if (
089        RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)
090          || RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf)
091      ) {
092        callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
093          maxQueueLength, priority, conf, server);
094      } else {
095        callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength,
096          priority, conf, server);
097      }
098    }
099
100    float metaCallqReadShare =
101      conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY,
102        MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE);
103    if (metaCallqReadShare > 0) {
104      // different read/write handler for meta, at least 1 read handler and 1 write handler
105      this.priorityExecutor = new MetaRWQueueRpcExecutor("priority.RWQ",
106        Math.max(2, priorityHandlerCount), maxPriorityQueueLength, priority, conf, server);
107    } else {
108      // Create 2 queues to help priorityExecutor be more scalable.
109      this.priorityExecutor = priorityHandlerCount > 0
110        ? new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount,
111          RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
112          abortable)
113        : null;
114    }
115    this.replicationExecutor = replicationHandlerCount > 0
116      ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,
117        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf,
118        abortable)
119      : null;
120
121    this.metaTransitionExecutor = metaTransitionHandler > 0
122      ? new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler,
123        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
124        abortable)
125      : null;
126  }
127
128  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
129    int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
130    this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null,
131      highPriorityLevel);
132  }
133
134  /**
135   * Resize call queues;
136   * @param conf new configuration
137   */
138  @Override
139  public void onConfigurationChange(Configuration conf) {
140    callExecutor.resizeQueues(conf);
141    if (priorityExecutor != null) {
142      priorityExecutor.resizeQueues(conf);
143    }
144    if (replicationExecutor != null) {
145      replicationExecutor.resizeQueues(conf);
146    }
147    if (metaTransitionExecutor != null) {
148      metaTransitionExecutor.resizeQueues(conf);
149    }
150
151    String callQueueType =
152      conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
153    if (
154      RpcExecutor.isCodelQueueType(callQueueType) || RpcExecutor.isPluggableQueueType(callQueueType)
155    ) {
156      callExecutor.onConfigurationChange(conf);
157    }
158  }
159
160  @Override
161  public void init(Context context) {
162    this.port = context.getListenerAddress().getPort();
163  }
164
165  @Override
166  public void start() {
167    callExecutor.start(port);
168    if (priorityExecutor != null) {
169      priorityExecutor.start(port);
170    }
171    if (replicationExecutor != null) {
172      replicationExecutor.start(port);
173    }
174    if (metaTransitionExecutor != null) {
175      metaTransitionExecutor.start(port);
176    }
177
178  }
179
180  @Override
181  public void stop() {
182    callExecutor.stop();
183    if (priorityExecutor != null) {
184      priorityExecutor.stop();
185    }
186    if (replicationExecutor != null) {
187      replicationExecutor.stop();
188    }
189    if (metaTransitionExecutor != null) {
190      metaTransitionExecutor.stop();
191    }
192
193  }
194
195  @Override
196  public boolean dispatch(CallRunner callTask) {
197    RpcCall call = callTask.getRpcCall();
198    int level =
199      priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser().orElse(null));
200    if (level == HConstants.PRIORITY_UNSET) {
201      level = HConstants.NORMAL_QOS;
202    }
203    if (
204      metaTransitionExecutor != null
205        && level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS
206    ) {
207      return metaTransitionExecutor.dispatch(callTask);
208    } else if (priorityExecutor != null && level > highPriorityLevel) {
209      return priorityExecutor.dispatch(callTask);
210    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
211      return replicationExecutor.dispatch(callTask);
212    } else {
213      return callExecutor.dispatch(callTask);
214    }
215  }
216
217  @Override
218  public int getMetaPriorityQueueLength() {
219    return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength();
220  }
221
222  @Override
223  public int getGeneralQueueLength() {
224    return callExecutor.getQueueLength();
225  }
226
227  @Override
228  public int getPriorityQueueLength() {
229    return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
230  }
231
232  @Override
233  public int getReplicationQueueLength() {
234    return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
235  }
236
237  @Override
238  public int getActiveRpcHandlerCount() {
239    return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
240      + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount();
241  }
242
243  @Override
244  public int getActiveMetaPriorityRpcHandlerCount() {
245    return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount());
246  }
247
248  @Override
249  public int getActiveGeneralRpcHandlerCount() {
250    return callExecutor.getActiveHandlerCount();
251  }
252
253  @Override
254  public int getActivePriorityRpcHandlerCount() {
255    return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount());
256  }
257
258  @Override
259  public int getActiveReplicationRpcHandlerCount() {
260    return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
261  }
262
263  @Override
264  public long getNumGeneralCallsDropped() {
265    return callExecutor.getNumGeneralCallsDropped();
266  }
267
268  @Override
269  public long getNumLifoModeSwitches() {
270    return callExecutor.getNumLifoModeSwitches();
271  }
272
273  @Override
274  public int getWriteQueueLength() {
275    return callExecutor.getWriteQueueLength();
276  }
277
278  @Override
279  public int getReadQueueLength() {
280    return callExecutor.getReadQueueLength();
281  }
282
283  @Override
284  public int getScanQueueLength() {
285    return callExecutor.getScanQueueLength();
286  }
287
288  @Override
289  public int getActiveWriteRpcHandlerCount() {
290    return callExecutor.getActiveWriteHandlerCount();
291  }
292
293  @Override
294  public int getActiveReadRpcHandlerCount() {
295    return callExecutor.getActiveReadHandlerCount();
296  }
297
298  @Override
299  public int getActiveScanRpcHandlerCount() {
300    return callExecutor.getActiveScanHandlerCount();
301  }
302
303  @Override
304  public CallQueueInfo getCallQueueInfo() {
305    String queueName;
306
307    CallQueueInfo callQueueInfo = new CallQueueInfo();
308
309    if (null != callExecutor) {
310      queueName = "Call Queue";
311      callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
312      callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
313    }
314
315    if (null != priorityExecutor) {
316      queueName = "Priority Queue";
317      callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
318      callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
319    }
320
321    if (null != replicationExecutor) {
322      queueName = "Replication Queue";
323      callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
324      callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
325    }
326
327    if (null != metaTransitionExecutor) {
328      queueName = "Meta Transition Queue";
329      callQueueInfo.setCallMethodCount(queueName,
330        metaTransitionExecutor.getCallQueueCountsSummary());
331      callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
332    }
333
334    return callQueueInfo;
335  }
336
337}