1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.hbase.DaemonThreadFactory;
22
23 import java.io.IOException;
24 import java.util.concurrent.ArrayBlockingQueue;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27
28
29
30
31
32
33 public class FifoRpcScheduler extends RpcScheduler {
34
35 private final int handlerCount;
36 private final int maxQueueLength;
37 private ThreadPoolExecutor executor;
38
39 public FifoRpcScheduler(Configuration conf, int handlerCount) {
40 this.handlerCount = handlerCount;
41 this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
42 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
43 }
44
45 @Override
46 public void init(Context context) {
47
48 }
49
50 @Override
51 public void start() {
52 this.executor = new ThreadPoolExecutor(
53 handlerCount,
54 handlerCount,
55 60,
56 TimeUnit.SECONDS,
57 new ArrayBlockingQueue<Runnable>(maxQueueLength),
58 new DaemonThreadFactory("FifoRpcScheduler.handler"),
59 new ThreadPoolExecutor.CallerRunsPolicy());
60 }
61
62 @Override
63 public void stop() {
64 this.executor.shutdown();
65 }
66
67 @Override
68 public void dispatch(final CallRunner task) throws IOException, InterruptedException {
69 executor.submit(new Runnable() {
70 @Override
71 public void run() {
72 task.setStatus(RpcServer.getStatus());
73 task.run();
74 }
75 });
76 }
77
78 @Override
79 public int getGeneralQueueLength() {
80 return executor.getQueue().size();
81 }
82
83 @Override
84 public int getPriorityQueueLength() {
85 return 0;
86 }
87
88 @Override
89 public int getReplicationQueueLength() {
90 return 0;
91 }
92
93 @Override
94 public int getActiveRpcHandlerCount() {
95 return executor.getActiveCount();
96 }
97 }