001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.Abortable;
022import org.apache.hadoop.hbase.HBaseInterfaceAudience;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.yetus.audience.InterfaceStability;
026import org.apache.hadoop.hbase.conf.ConfigurationObserver;
027
028/**
029 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
030 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to
031 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc.
032 * See below article for explanation of options.
033 * @see <a href="http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview on Request Queuing</a>
034 */
035@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
036@InterfaceStability.Evolving
037public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
038  private int port;
039  private final PriorityFunction priority;
040  private final RpcExecutor callExecutor;
041  private final RpcExecutor priorityExecutor;
042  private final RpcExecutor replicationExecutor;
043
044  /**
045   * This executor is only for meta transition
046   */
047  private final RpcExecutor metaTransitionExecutor;
048
049  /** What level a high priority call is at. */
050  private final int highPriorityLevel;
051
052  private Abortable abortable = null;
053
054  /**
055   * @param conf
056   * @param handlerCount the number of handler threads that will be used to process calls
057   * @param priorityHandlerCount How many threads for priority handling.
058   * @param replicationHandlerCount How many threads for replication handling.
059   * @param highPriorityLevel
060   * @param priority Function to extract request priority.
061   */
062  public SimpleRpcScheduler(
063      Configuration conf,
064      int handlerCount,
065      int priorityHandlerCount,
066      int replicationHandlerCount,
067      int metaTransitionHandler,
068      PriorityFunction priority,
069      Abortable server,
070      int highPriorityLevel) {
071
072    int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
073        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
074    int maxPriorityQueueLength =
075        conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength);
076
077    this.priority = priority;
078    this.highPriorityLevel = highPriorityLevel;
079    this.abortable = server;
080
081    String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
082      RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
083    float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
084
085    if (callqReadShare > 0) {
086      // at least 1 read handler and 1 write handler
087      callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
088        maxQueueLength, priority, conf, server);
089    } else {
090      if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
091        callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
092            maxQueueLength, priority, conf, server);
093      } else {
094        callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength,
095            priority, conf, server);
096      }
097    }
098
099    // Create 2 queues to help priorityExecutor be more scalable.
100    this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
101        "priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
102        maxPriorityQueueLength, priority, conf, abortable) : null;
103    this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
104        "replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
105        maxQueueLength, priority, conf, abortable) : null;
106
107    this.metaTransitionExecutor = metaTransitionHandler > 0 ?
108        new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler,
109            RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
110            abortable) :
111        null;
112  }
113
114  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
115      int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
116    this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null,
117        highPriorityLevel);
118  }
119
120  /**
121   * Resize call queues;
122   * @param conf new configuration
123   */
124  @Override
125  public void onConfigurationChange(Configuration conf) {
126    callExecutor.resizeQueues(conf);
127    if (priorityExecutor != null) {
128      priorityExecutor.resizeQueues(conf);
129    }
130    if (replicationExecutor != null) {
131      replicationExecutor.resizeQueues(conf);
132    }
133    if (metaTransitionExecutor != null) {
134      metaTransitionExecutor.resizeQueues(conf);
135    }
136
137    String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
138      RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
139    if (RpcExecutor.isCodelQueueType(callQueueType)) {
140      callExecutor.onConfigurationChange(conf);
141    }
142  }
143
144  @Override
145  public void init(Context context) {
146    this.port = context.getListenerAddress().getPort();
147  }
148
149  @Override
150  public void start() {
151    callExecutor.start(port);
152    if (priorityExecutor != null) {
153      priorityExecutor.start(port);
154    }
155    if (replicationExecutor != null) {
156      replicationExecutor.start(port);
157    }
158    if (metaTransitionExecutor != null) {
159      metaTransitionExecutor.start(port);
160    }
161
162  }
163
164  @Override
165  public void stop() {
166    callExecutor.stop();
167    if (priorityExecutor != null) {
168      priorityExecutor.stop();
169    }
170    if (replicationExecutor != null) {
171      replicationExecutor.stop();
172    }
173    if (metaTransitionExecutor != null) {
174      metaTransitionExecutor.stop();
175    }
176
177  }
178
179  @Override
180  public boolean dispatch(CallRunner callTask) throws InterruptedException {
181    RpcCall call = callTask.getRpcCall();
182    int level = priority.getPriority(call.getHeader(), call.getParam(),
183        call.getRequestUser().orElse(null));
184    if (level == HConstants.PRIORITY_UNSET) {
185      level = HConstants.NORMAL_QOS;
186    }
187    if (metaTransitionExecutor != null && level == HConstants.META_QOS) {
188      return metaTransitionExecutor.dispatch(callTask);
189    } else if (priorityExecutor != null && level > highPriorityLevel) {
190      return priorityExecutor.dispatch(callTask);
191    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
192      return replicationExecutor.dispatch(callTask);
193    } else {
194      return callExecutor.dispatch(callTask);
195    }
196  }
197
198  @Override
199  public int getMetaPriorityQueueLength() {
200    return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength();
201  }
202
203  @Override
204  public int getGeneralQueueLength() {
205    return callExecutor.getQueueLength();
206  }
207
208  @Override
209  public int getPriorityQueueLength() {
210    return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
211  }
212
213  @Override
214  public int getReplicationQueueLength() {
215    return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
216  }
217
218  @Override
219  public int getActiveRpcHandlerCount() {
220    return callExecutor.getActiveHandlerCount() +
221           (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
222           (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()) +
223           (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount());
224  }
225
226  @Override
227  public long getNumGeneralCallsDropped() {
228    return callExecutor.getNumGeneralCallsDropped();
229  }
230
231  @Override
232  public long getNumLifoModeSwitches() {
233    return callExecutor.getNumLifoModeSwitches();
234  }
235
236  @Override
237  public int getWriteQueueLength() {
238    return callExecutor.getWriteQueueLength();
239  }
240
241  @Override
242  public int getReadQueueLength() {
243    return callExecutor.getReadQueueLength();
244  }
245
246  @Override
247  public int getScanQueueLength() {
248    return callExecutor.getScanQueueLength();
249  }
250
251  @Override
252  public int getActiveWriteRpcHandlerCount() {
253    return callExecutor.getActiveWriteHandlerCount();
254  }
255
256  @Override
257  public int getActiveReadRpcHandlerCount() {
258    return callExecutor.getActiveReadHandlerCount();
259  }
260
261  @Override
262  public int getActiveScanRpcHandlerCount() {
263    return callExecutor.getActiveScanHandlerCount();
264  }
265
266  @Override
267  public CallQueueInfo getCallQueueInfo() {
268    String queueName;
269
270    CallQueueInfo callQueueInfo = new CallQueueInfo();
271
272    if (null != callExecutor) {
273      queueName = "Call Queue";
274      callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
275      callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
276    }
277
278    if (null != priorityExecutor) {
279      queueName = "Priority Queue";
280      callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
281      callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
282    }
283
284    if (null != replicationExecutor) {
285      queueName = "Replication Queue";
286      callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
287      callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
288    }
289
290    if (null != metaTransitionExecutor) {
291      queueName = "Meta Transition Queue";
292      callQueueInfo.setCallMethodCount(queueName,
293          metaTransitionExecutor.getCallQueueCountsSummary());
294      callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
295    }
296
297    return callQueueInfo;
298  }
299
300}
301