001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.Abortable;
022import org.apache.hadoop.hbase.HBaseInterfaceAudience;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.yetus.audience.InterfaceStability;
026import org.apache.hadoop.hbase.conf.ConfigurationObserver;
027import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction;
028
029/**
030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to
032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc.
033 * See below article for explanation of options.
034 * @see <a href="http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview on Request Queuing</a>
035 */
036@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
037@InterfaceStability.Evolving
038public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
039  private int port;
040  private final PriorityFunction priority;
041  private final RpcExecutor callExecutor;
042  private final RpcExecutor priorityExecutor;
043  private final RpcExecutor replicationExecutor;
044
045  /**
046   * This executor is only for meta transition
047   */
048  private final RpcExecutor metaTransitionExecutor;
049
050  /** What level a high priority call is at. */
051  private final int highPriorityLevel;
052
053  private Abortable abortable = null;
054
055  /**
056   * @param conf
057   * @param handlerCount the number of handler threads that will be used to process calls
058   * @param priorityHandlerCount How many threads for priority handling.
059   * @param replicationHandlerCount How many threads for replication handling.
060   * @param highPriorityLevel
061   * @param priority Function to extract request priority.
062   */
063  public SimpleRpcScheduler(
064      Configuration conf,
065      int handlerCount,
066      int priorityHandlerCount,
067      int replicationHandlerCount,
068      int metaTransitionHandler,
069      PriorityFunction priority,
070      Abortable server,
071      int highPriorityLevel) {
072
073    int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
074        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
075    int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
076      priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
077    int maxReplicationQueueLength =
078        conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
079          replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
080
081    this.priority = priority;
082    this.highPriorityLevel = highPriorityLevel;
083    this.abortable = server;
084
085    String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
086      RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
087    float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
088
089    if (callqReadShare > 0) {
090      // at least 1 read handler and 1 write handler
091      callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
092        maxQueueLength, priority, conf, server);
093    } else {
094      if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
095        callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
096            maxQueueLength, priority, conf, server);
097      } else {
098        callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength,
099            priority, conf, server);
100      }
101    }
102
103    float metaCallqReadShare =
104        conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY,
105            MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE);
106    if (metaCallqReadShare > 0) {
107      // different read/write handler for meta, at least 1 read handler and 1 write handler
108      this.priorityExecutor =
109          new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount),
110              maxPriorityQueueLength, priority, conf, server);
111    } else {
112      // Create 2 queues to help priorityExecutor be more scalable.
113      this.priorityExecutor = priorityHandlerCount > 0 ?
114          new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount,
115              RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
116              abortable) :
117          null;
118    }
119    this.replicationExecutor =
120        replicationHandlerCount > 0
121            ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,
122                RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority,
123                conf, abortable)
124            : null;
125    this.metaTransitionExecutor = metaTransitionHandler > 0 ?
126        new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler,
127            RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
128            abortable) :
129        null;
130  }
131
132  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
133      int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
134    this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null,
135        highPriorityLevel);
136  }
137
138  /**
139   * Resize call queues;
140   * @param conf new configuration
141   */
142  @Override
143  public void onConfigurationChange(Configuration conf) {
144    callExecutor.resizeQueues(conf);
145    if (priorityExecutor != null) {
146      priorityExecutor.resizeQueues(conf);
147    }
148    if (replicationExecutor != null) {
149      replicationExecutor.resizeQueues(conf);
150    }
151    if (metaTransitionExecutor != null) {
152      metaTransitionExecutor.resizeQueues(conf);
153    }
154
155    String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
156      RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
157    if (RpcExecutor.isCodelQueueType(callQueueType)) {
158      callExecutor.onConfigurationChange(conf);
159    }
160  }
161
162  @Override
163  public void init(Context context) {
164    this.port = context.getListenerAddress().getPort();
165  }
166
167  @Override
168  public void start() {
169    callExecutor.start(port);
170    if (priorityExecutor != null) {
171      priorityExecutor.start(port);
172    }
173    if (replicationExecutor != null) {
174      replicationExecutor.start(port);
175    }
176    if (metaTransitionExecutor != null) {
177      metaTransitionExecutor.start(port);
178    }
179
180  }
181
182  @Override
183  public void stop() {
184    callExecutor.stop();
185    if (priorityExecutor != null) {
186      priorityExecutor.stop();
187    }
188    if (replicationExecutor != null) {
189      replicationExecutor.stop();
190    }
191    if (metaTransitionExecutor != null) {
192      metaTransitionExecutor.stop();
193    }
194
195  }
196
197  @Override
198  public boolean dispatch(CallRunner callTask) throws InterruptedException {
199    RpcCall call = callTask.getRpcCall();
200    int level = priority.getPriority(call.getHeader(), call.getParam(),
201        call.getRequestUser().orElse(null));
202    if (level == HConstants.PRIORITY_UNSET) {
203      level = HConstants.NORMAL_QOS;
204    }
205    if (metaTransitionExecutor != null &&
206      level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS) {
207      return metaTransitionExecutor.dispatch(callTask);
208    } else if (priorityExecutor != null && level > highPriorityLevel) {
209      return priorityExecutor.dispatch(callTask);
210    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
211      return replicationExecutor.dispatch(callTask);
212    } else {
213      return callExecutor.dispatch(callTask);
214    }
215  }
216
217  @Override
218  public int getMetaPriorityQueueLength() {
219    return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength();
220  }
221
222  @Override
223  public int getGeneralQueueLength() {
224    return callExecutor.getQueueLength();
225  }
226
227  @Override
228  public int getPriorityQueueLength() {
229    return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
230  }
231
232  @Override
233  public int getReplicationQueueLength() {
234    return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
235  }
236
237  @Override
238  public int getActiveRpcHandlerCount() {
239    return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
240        + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount();
241  }
242
243  @Override
244  public int getActiveMetaPriorityRpcHandlerCount() {
245    return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount());
246  }
247
248  @Override
249  public int getActiveGeneralRpcHandlerCount() {
250    return callExecutor.getActiveHandlerCount();
251  }
252
253  @Override
254  public int getActivePriorityRpcHandlerCount() {
255    return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount());
256  }
257
258  @Override
259  public int getActiveReplicationRpcHandlerCount() {
260    return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
261  }
262
263  @Override
264  public long getNumGeneralCallsDropped() {
265    return callExecutor.getNumGeneralCallsDropped();
266  }
267
268  @Override
269  public long getNumLifoModeSwitches() {
270    return callExecutor.getNumLifoModeSwitches();
271  }
272
273  @Override
274  public int getWriteQueueLength() {
275    return callExecutor.getWriteQueueLength();
276  }
277
278  @Override
279  public int getReadQueueLength() {
280    return callExecutor.getReadQueueLength();
281  }
282
283  @Override
284  public int getScanQueueLength() {
285    return callExecutor.getScanQueueLength();
286  }
287
288  @Override
289  public int getActiveWriteRpcHandlerCount() {
290    return callExecutor.getActiveWriteHandlerCount();
291  }
292
293  @Override
294  public int getActiveReadRpcHandlerCount() {
295    return callExecutor.getActiveReadHandlerCount();
296  }
297
298  @Override
299  public int getActiveScanRpcHandlerCount() {
300    return callExecutor.getActiveScanHandlerCount();
301  }
302
303  @Override
304  public CallQueueInfo getCallQueueInfo() {
305    String queueName;
306
307    CallQueueInfo callQueueInfo = new CallQueueInfo();
308
309    if (null != callExecutor) {
310      queueName = "Call Queue";
311      callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
312      callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
313    }
314
315    if (null != priorityExecutor) {
316      queueName = "Priority Queue";
317      callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
318      callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
319    }
320
321    if (null != replicationExecutor) {
322      queueName = "Replication Queue";
323      callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
324      callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
325    }
326
327    if (null != metaTransitionExecutor) {
328      queueName = "Meta Transition Queue";
329      callQueueInfo.setCallMethodCount(queueName,
330          metaTransitionExecutor.getCallQueueCountsSummary());
331      callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
332    }
333
334    return callQueueInfo;
335  }
336
337}
338