1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25
26 import org.apache.commons.lang.ArrayUtils;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Abortable;
31 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
36 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
37 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
39 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
40 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
41 import org.apache.hadoop.hbase.util.ReflectionUtils;
42
43 import com.google.protobuf.Message;
44
45
46
47
48
49
50 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
51 @InterfaceStability.Evolving
52 public class RWQueueRpcExecutor extends RpcExecutor {
53 private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
54
55 private final List<BlockingQueue<CallRunner>> queues;
56 private final QueueBalancer writeBalancer;
57 private final QueueBalancer readBalancer;
58 private final QueueBalancer scanBalancer;
59 private final int writeHandlersCount;
60 private final int readHandlersCount;
61 private final int scanHandlersCount;
62 private final int numWriteQueues;
63 private final int numReadQueues;
64 private final int numScanQueues;
65
66 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
67 final float readShare, final int maxQueueLength,
68 final Configuration conf, final Abortable abortable) {
69 this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
70 conf, abortable, LinkedBlockingQueue.class);
71 }
72
73 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
74 final float readShare, final float scanShare, final int maxQueueLength) {
75 this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
76 }
77
78 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
79 final float readShare, final float scanShare, final int maxQueueLength,
80 final Configuration conf, final Abortable abortable) {
81 this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
82 conf, abortable, LinkedBlockingQueue.class);
83 }
84
85 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
86 final float readShare, final int maxQueueLength,
87 final Configuration conf, final Abortable abortable,
88 final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
89 this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
90 readQueueClass, readQueueInitArgs);
91 }
92
93 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
94 final float readShare, final float scanShare, final int maxQueueLength,
95 final Configuration conf, final Abortable abortable,
96 final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
97 this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
98 calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
99 LinkedBlockingQueue.class, new Object[] {maxQueueLength},
100 readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
101 }
102
103 public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
104 final int numWriteQueues, final int numReadQueues,
105 final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
106 final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
107 this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
108 writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
109 }
110
111 public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
112 int numWriteQueues, int numReadQueues, float scanShare,
113 final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
114 final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
115 super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
116
117 int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
118 int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
119 if ((numReadQueues - numScanQueues) > 0) {
120 numReadQueues -= numScanQueues;
121 readHandlers -= scanHandlers;
122 } else {
123 numScanQueues = 0;
124 scanHandlers = 0;
125 }
126
127 this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
128 this.readHandlersCount = Math.max(readHandlers, numReadQueues);
129 this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
130 this.numWriteQueues = numWriteQueues;
131 this.numReadQueues = numReadQueues;
132 this.numScanQueues = numScanQueues;
133 this.writeBalancer = getBalancer(numWriteQueues);
134 this.readBalancer = getBalancer(numReadQueues);
135 this.scanBalancer = getBalancer(numScanQueues);
136
137 queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
138 LOG.info(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
139 + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
140 + numScanQueues + " scanHandlers=" + scanHandlersCount);
141
142 for (int i = 0; i < numWriteQueues; ++i) {
143 queues.add((BlockingQueue<CallRunner>)
144 ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
145 }
146
147 for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
148 queues.add((BlockingQueue<CallRunner>)
149 ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
150 }
151 }
152
153 @Override
154 protected void startHandlers(final int port) {
155 startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
156 startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
157 if (numScanQueues > 0) {
158 startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
159 numScanQueues, port);
160 }
161 }
162
163 @Override
164 public boolean dispatch(final CallRunner callTask) throws InterruptedException {
165 RpcServer.Call call = callTask.getCall();
166 int queueIndex;
167 if (isWriteRequest(call.getHeader(), call.param)) {
168 queueIndex = writeBalancer.getNextQueue();
169 } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
170 queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
171 } else {
172 queueIndex = numWriteQueues + readBalancer.getNextQueue();
173 }
174 return queues.get(queueIndex).offer(callTask);
175 }
176
177 private boolean isWriteRequest(final RequestHeader header, final Message param) {
178
179 if (param instanceof MultiRequest) {
180 MultiRequest multi = (MultiRequest)param;
181 for (RegionAction regionAction : multi.getRegionActionList()) {
182 for (Action action: regionAction.getActionList()) {
183 if (action.hasMutation()) {
184 return true;
185 }
186 }
187 }
188 }
189 if (param instanceof MutateRequest) {
190 return true;
191 }
192
193
194
195
196
197
198 if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
199 return true;
200 }
201 if (param instanceof RegionServerStatusProtos.RegionServerStartupRequest) {
202 return true;
203 }
204 if (param instanceof RegionServerStatusProtos.RegionServerReportRequest) {
205 return true;
206 }
207 return false;
208 }
209
210 private boolean isScanRequest(final RequestHeader header, final Message param) {
211 if (param instanceof ScanRequest) {
212
213 ScanRequest request = (ScanRequest)param;
214 return request.hasScannerId();
215 }
216 return false;
217 }
218
219 @Override
220 public int getQueueLength() {
221 int length = 0;
222 for (final BlockingQueue<CallRunner> queue: queues) {
223 length += queue.size();
224 }
225 return length;
226 }
227
228 @Override
229 protected List<BlockingQueue<CallRunner>> getQueues() {
230 return queues;
231 }
232
233
234
235
236
237 private static int calcNumWriters(final int count, final float readShare) {
238 return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
239 }
240
241
242
243
244
245 private static int calcNumReaders(final int count, final float readShare) {
246 return count - calcNumWriters(count, readShare);
247 }
248 }