View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  
21  import java.util.Comparator;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Abortable;
27  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
32  
33  /**
34   * A scheduler that maintains isolated handler pools for general,
35   * high-priority, and replication requests.
36   */
37  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
38  @InterfaceStability.Evolving
39  public class SimpleRpcScheduler extends RpcScheduler {
40    public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
41  
42    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
43        "hbase.ipc.server.callqueue.read.ratio";
44    public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
45        "hbase.ipc.server.callqueue.scan.ratio";
46    public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
47        "hbase.ipc.server.callqueue.handler.factor";
48  
49    /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
50    public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
51    public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
52    public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
53  
54    /** max delay in msec used to bound the deprioritized requests */
55    public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
56        = "hbase.ipc.server.queue.max.call.delay";
57  
58    /**
59     * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
60     * It uses the calculated "deadline" e.g. to deprioritize long-running job
61     *
62     * If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in
63     * FIFO (first-in-first-out) manner.
64     */
65    private static class CallPriorityComparator implements Comparator<CallRunner> {
66      private final static int DEFAULT_MAX_CALL_DELAY = 5000;
67  
68      private final PriorityFunction priority;
69      private final int maxDelay;
70  
71      public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
72        this.priority = priority;
73        this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
74      }
75  
76      @Override
77      public int compare(CallRunner a, CallRunner b) {
78        RpcServer.Call callA = a.getCall();
79        RpcServer.Call callB = b.getCall();
80        long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
81        long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
82        deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
83        deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
84        return Long.compare(deadlineA, deadlineB);
85      }
86    }
87  
88    private int port;
89    private final PriorityFunction priority;
90    private final RpcExecutor callExecutor;
91    private final RpcExecutor priorityExecutor;
92    private final RpcExecutor replicationExecutor;
93  
94    /** What level a high priority call is at. */
95    private final int highPriorityLevel;
96  
97    private Abortable abortable = null;
98  
99    /**
100    * @param conf
101    * @param handlerCount the number of handler threads that will be used to process calls
102    * @param priorityHandlerCount How many threads for priority handling.
103    * @param replicationHandlerCount How many threads for replication handling.
104    * @param highPriorityLevel
105    * @param priority Function to extract request priority.
106    */
107   public SimpleRpcScheduler(
108       Configuration conf,
109       int handlerCount,
110       int priorityHandlerCount,
111       int replicationHandlerCount,
112       PriorityFunction priority,
113       Abortable server,
114       int highPriorityLevel) {
115     int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
116         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
117     this.priority = priority;
118     this.highPriorityLevel = highPriorityLevel;
119     this.abortable = server;
120 
121     String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
122     float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
123     float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
124 
125     float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
126     int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
127 
128     LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
129 
130     if (numCallQueues > 1 && callqReadShare > 0) {
131       // multiple read/write queues
132       if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
133         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
134         callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
135             callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
136             BoundedPriorityBlockingQueue.class, callPriority);
137       } else {
138         callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
139           callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
140       }
141     } else {
142       // multiple queues
143       if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
144         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
145         callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
146           conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
147       } else {
148         callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
149             numCallQueues, maxQueueLength, conf, abortable);
150       }
151     }
152 
153     // Create 2 queues to help priorityExecutor be more scalable.
154     this.priorityExecutor = priorityHandlerCount > 0 ?
155         new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null;
156 
157    this.replicationExecutor =
158      replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
159        replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
160   }
161 
162   public SimpleRpcScheduler(
163 	      Configuration conf,
164 	      int handlerCount,
165 	      int priorityHandlerCount,
166 	      int replicationHandlerCount,
167 	      PriorityFunction priority,
168 	      int highPriorityLevel) {
169 	  this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
170 	    null, highPriorityLevel);
171   }
172 
173   @Override
174   public void init(Context context) {
175     this.port = context.getListenerAddress().getPort();
176   }
177 
178   @Override
179   public void start() {
180     callExecutor.start(port);
181     if (priorityExecutor != null) priorityExecutor.start(port);
182     if (replicationExecutor != null) replicationExecutor.start(port);
183   }
184 
185   @Override
186   public void stop() {
187     callExecutor.stop();
188     if (priorityExecutor != null) priorityExecutor.stop();
189     if (replicationExecutor != null) replicationExecutor.stop();
190   }
191 
192   @Override
193   public void dispatch(CallRunner callTask) throws InterruptedException {
194     RpcServer.Call call = callTask.getCall();
195     int level = priority.getPriority(call.getHeader(), call.param);
196     if (priorityExecutor != null && level > highPriorityLevel) {
197       priorityExecutor.dispatch(callTask);
198     } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
199       replicationExecutor.dispatch(callTask);
200     } else {
201       callExecutor.dispatch(callTask);
202     }
203   }
204 
205   @Override
206   public int getGeneralQueueLength() {
207     return callExecutor.getQueueLength();
208   }
209 
210   @Override
211   public int getPriorityQueueLength() {
212     return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
213   }
214 
215   @Override
216   public int getReplicationQueueLength() {
217     return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
218   }
219 
220   @Override
221   public int getActiveRpcHandlerCount() {
222     return callExecutor.getActiveHandlerCount() +
223            (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
224            (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
225   }
226 }
227