1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.hbase.util.ReflectionUtils;
31
32
33
34
35
36 @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
37 @InterfaceStability.Evolving
38 public class BalancedQueueRpcExecutor extends RpcExecutor {
39
40 protected final List<BlockingQueue<CallRunner>> queues;
41 private final QueueBalancer balancer;
42
43 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
44 final int maxQueueLength) {
45 this(name, handlerCount, numQueues, maxQueueLength, null, null);
46 }
47
48 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
49 final int maxQueueLength, final Configuration conf, final Abortable abortable) {
50 this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
51 }
52
53 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
54 final Class<? extends BlockingQueue> queueClass, Object... initargs) {
55 this(name, handlerCount, numQueues, null, null, queueClass, initargs);
56 }
57
58 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
59 final Configuration conf, final Abortable abortable,
60 final Class<? extends BlockingQueue> queueClass, Object... initargs) {
61 super(name, Math.max(handlerCount, numQueues), conf, abortable);
62 queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
63 this.balancer = getBalancer(numQueues);
64 initializeQueues(numQueues, queueClass, initargs);
65 }
66
67 protected void initializeQueues(final int numQueues,
68 final Class<? extends BlockingQueue> queueClass, Object... initargs) {
69 for (int i = 0; i < numQueues; ++i) {
70 queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
71 }
72 }
73
74 @Override
75 public void dispatch(final CallRunner callTask) throws InterruptedException {
76 int queueIndex = balancer.getNextQueue();
77 queues.get(queueIndex).put(callTask);
78 }
79
80 @Override
81 public int getQueueLength() {
82 int length = 0;
83 for (final BlockingQueue<CallRunner> queue : queues) {
84 length += queue.size();
85 }
86 return length;
87 }
88
89 @Override
90 public List<BlockingQueue<CallRunner>> getQueues() {
91 return queues;
92 }
93 }