1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20
21 import java.util.Comparator;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
32
33
34
35
36
37 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
38 @InterfaceStability.Evolving
39 public class SimpleRpcScheduler extends RpcScheduler {
40 public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
41
42 public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
43 "hbase.ipc.server.callqueue.read.ratio";
44 public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
45 "hbase.ipc.server.callqueue.scan.ratio";
46 public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
47 "hbase.ipc.server.callqueue.handler.factor";
48
49
50 public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
51 public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
52 public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
53
54
55 public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
56 = "hbase.ipc.server.queue.max.call.delay";
57
58
59
60
61
62
63
64
65 private static class CallPriorityComparator implements Comparator<CallRunner> {
66 private final static int DEFAULT_MAX_CALL_DELAY = 5000;
67
68 private final PriorityFunction priority;
69 private final int maxDelay;
70
71 public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
72 this.priority = priority;
73 this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
74 }
75
76 @Override
77 public int compare(CallRunner a, CallRunner b) {
78 RpcServer.Call callA = a.getCall();
79 RpcServer.Call callB = b.getCall();
80 long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
81 long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
82 deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
83 deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
84 return Long.compare(deadlineA, deadlineB);
85 }
86 }
87
88 private int port;
89 private final PriorityFunction priority;
90 private final RpcExecutor callExecutor;
91 private final RpcExecutor priorityExecutor;
92 private final RpcExecutor replicationExecutor;
93
94
95 private final int highPriorityLevel;
96
97 private Abortable abortable = null;
98
99
100
101
102
103
104
105
106
107 public SimpleRpcScheduler(
108 Configuration conf,
109 int handlerCount,
110 int priorityHandlerCount,
111 int replicationHandlerCount,
112 PriorityFunction priority,
113 Abortable server,
114 int highPriorityLevel) {
115 int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
116 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
117 this.priority = priority;
118 this.highPriorityLevel = highPriorityLevel;
119 this.abortable = server;
120
121 String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
122 float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
123 float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
124
125 float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
126 int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
127
128 LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
129
130 if (numCallQueues > 1 && callqReadShare > 0) {
131
132 if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
133 CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
134 callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
135 callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
136 BoundedPriorityBlockingQueue.class, callPriority);
137 } else {
138 callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
139 callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
140 }
141 } else {
142
143 if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
144 CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
145 callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
146 conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
147 } else {
148 callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
149 numCallQueues, maxQueueLength, conf, abortable);
150 }
151 }
152
153
154 this.priorityExecutor = priorityHandlerCount > 0 ?
155 new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null;
156
157 this.replicationExecutor =
158 replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
159 replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
160 }
161
162 public SimpleRpcScheduler(
163 Configuration conf,
164 int handlerCount,
165 int priorityHandlerCount,
166 int replicationHandlerCount,
167 PriorityFunction priority,
168 int highPriorityLevel) {
169 this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
170 null, highPriorityLevel);
171 }
172
173 @Override
174 public void init(Context context) {
175 this.port = context.getListenerAddress().getPort();
176 }
177
178 @Override
179 public void start() {
180 callExecutor.start(port);
181 if (priorityExecutor != null) priorityExecutor.start(port);
182 if (replicationExecutor != null) replicationExecutor.start(port);
183 }
184
185 @Override
186 public void stop() {
187 callExecutor.stop();
188 if (priorityExecutor != null) priorityExecutor.stop();
189 if (replicationExecutor != null) replicationExecutor.stop();
190 }
191
192 @Override
193 public void dispatch(CallRunner callTask) throws InterruptedException {
194 RpcServer.Call call = callTask.getCall();
195 int level = priority.getPriority(call.getHeader(), call.param);
196 if (priorityExecutor != null && level > highPriorityLevel) {
197 priorityExecutor.dispatch(callTask);
198 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
199 replicationExecutor.dispatch(callTask);
200 } else {
201 callExecutor.dispatch(callTask);
202 }
203 }
204
205 @Override
206 public int getGeneralQueueLength() {
207 return callExecutor.getQueueLength();
208 }
209
210 @Override
211 public int getPriorityQueueLength() {
212 return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
213 }
214
215 @Override
216 public int getReplicationQueueLength() {
217 return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
218 }
219
220 @Override
221 public int getActiveRpcHandlerCount() {
222 return callExecutor.getActiveHandlerCount() +
223 (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
224 (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
225 }
226 }
227