1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.hbase.DaemonThreadFactory;
22
23 import java.io.IOException;
24 import java.util.concurrent.ArrayBlockingQueue;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29
30
31
32
33
34 public class FifoRpcScheduler extends RpcScheduler {
35
36 private final int handlerCount;
37 private final int maxQueueLength;
38 private final AtomicInteger queueSize = new AtomicInteger(0);
39 private ThreadPoolExecutor executor;
40
41 public FifoRpcScheduler(Configuration conf, int handlerCount) {
42 this.handlerCount = handlerCount;
43 this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
44 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
45 }
46
47 @Override
48 public void init(Context context) {
49
50 }
51
52 @Override
53 public void start() {
54 this.executor = new ThreadPoolExecutor(
55 handlerCount,
56 handlerCount,
57 60,
58 TimeUnit.SECONDS,
59 new ArrayBlockingQueue<Runnable>(maxQueueLength),
60 new DaemonThreadFactory("FifoRpcScheduler.handler"),
61 new ThreadPoolExecutor.CallerRunsPolicy());
62 }
63
64 @Override
65 public void stop() {
66 this.executor.shutdown();
67 }
68
69 @Override
70 public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
71
72 int queued = queueSize.getAndIncrement();
73 if (maxQueueLength > 0 && queued >= maxQueueLength) {
74 queueSize.decrementAndGet();
75 return false;
76 }
77 executor.submit(new Runnable() {
78 @Override
79 public void run() {
80 task.setStatus(RpcServer.getStatus());
81 task.run();
82 queueSize.decrementAndGet();
83 }
84 });
85 return true;
86 }
87
88 @Override
89 public int getGeneralQueueLength() {
90 return executor.getQueue().size();
91 }
92
93 @Override
94 public int getPriorityQueueLength() {
95 return 0;
96 }
97
98 @Override
99 public int getReplicationQueueLength() {
100 return 0;
101 }
102
103 @Override
104 public int getActiveRpcHandlerCount() {
105 return executor.getActiveCount();
106 }
107 }