1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift2;
20
21 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
22 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
23 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
24 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
25 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
26 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
27 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
28 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
29 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
30 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
31 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
32 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
33 import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
34
35 import java.io.IOException;
36 import java.lang.reflect.InvocationHandler;
37 import java.lang.reflect.InvocationTargetException;
38 import java.lang.reflect.Method;
39 import java.lang.reflect.Proxy;
40 import java.nio.ByteBuffer;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.atomic.AtomicInteger;
46
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.client.ResultScanner;
52 import org.apache.hadoop.hbase.client.Table;
53 import org.apache.hadoop.hbase.security.UserProvider;
54 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
55 import org.apache.hadoop.hbase.thrift2.generated.TAppend;
56 import org.apache.hadoop.hbase.thrift2.generated.TDelete;
57 import org.apache.hadoop.hbase.thrift2.generated.TGet;
58 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
59 import org.apache.hadoop.hbase.thrift2.generated.TIOError;
60 import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
61 import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
62 import org.apache.hadoop.hbase.thrift2.generated.TPut;
63 import org.apache.hadoop.hbase.thrift2.generated.TResult;
64 import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
65 import org.apache.hadoop.hbase.thrift2.generated.TScan;
66 import org.apache.hadoop.hbase.util.Bytes;
67 import org.apache.hadoop.hbase.util.ConnectionCache;
68 import org.apache.thrift.TException;
69
70
71
72
73
74 @InterfaceAudience.Private
75 public class ThriftHBaseServiceHandler implements THBaseService.Iface {
76
77
78 private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
79
80
81
82 private final AtomicInteger nextScannerId = new AtomicInteger(0);
83 private final Map<Integer, ResultScanner> scannerMap =
84 new ConcurrentHashMap<Integer, ResultScanner>();
85
86 private final ConnectionCache connectionCache;
87
88 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
89 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
90
91 public static THBaseService.Iface newInstance(
92 THBaseService.Iface handler, ThriftMetrics metrics) {
93 return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
94 new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
95 }
96
97 private static class THBaseServiceMetricsProxy implements InvocationHandler {
98 private final THBaseService.Iface handler;
99 private final ThriftMetrics metrics;
100
101 private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
102 this.handler = handler;
103 this.metrics = metrics;
104 }
105
106 @Override
107 public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
108 Object result;
109 try {
110 long start = now();
111 result = m.invoke(handler, args);
112 int processTime = (int) (now() - start);
113 metrics.incMethodTime(m.getName(), processTime);
114 } catch (InvocationTargetException e) {
115 throw e.getTargetException();
116 } catch (Exception e) {
117 throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
118 }
119 return result;
120 }
121 }
122
123 private static long now() {
124 return System.nanoTime();
125 }
126
127 ThriftHBaseServiceHandler(final Configuration conf,
128 final UserProvider userProvider) throws IOException {
129 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
130 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
131 connectionCache = new ConnectionCache(
132 conf, userProvider, cleanInterval, maxIdleTime);
133 }
134
135 private Table getTable(ByteBuffer tableName) {
136 try {
137 return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
138 } catch (IOException e) {
139 throw new RuntimeException(e);
140 }
141 }
142
143 private void closeTable(Table table) throws TIOError {
144 try {
145 table.close();
146 } catch (IOException e) {
147 throw getTIOError(e);
148 }
149 }
150
151 private TIOError getTIOError(IOException e) {
152 TIOError err = new TIOError();
153 err.setMessage(e.getMessage());
154 return err;
155 }
156
157
158
159
160
161
162 private int addScanner(ResultScanner scanner) {
163 int id = nextScannerId.getAndIncrement();
164 scannerMap.put(id, scanner);
165 return id;
166 }
167
168
169
170
171
172
173 private ResultScanner getScanner(int id) {
174 return scannerMap.get(id);
175 }
176
177 void setEffectiveUser(String effectiveUser) {
178 connectionCache.setEffectiveUser(effectiveUser);
179 }
180
181
182
183
184
185
186 protected ResultScanner removeScanner(int id) {
187 return scannerMap.remove(id);
188 }
189
190 @Override
191 public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
192 Table htable = getTable(table);
193 try {
194 return htable.exists(getFromThrift(get));
195 } catch (IOException e) {
196 throw getTIOError(e);
197 } finally {
198 closeTable(htable);
199 }
200 }
201
202 @Override
203 public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
204 Table htable = getTable(table);
205 try {
206 return resultFromHBase(htable.get(getFromThrift(get)));
207 } catch (IOException e) {
208 throw getTIOError(e);
209 } finally {
210 closeTable(htable);
211 }
212 }
213
214 @Override
215 public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
216 Table htable = getTable(table);
217 try {
218 return resultsFromHBase(htable.get(getsFromThrift(gets)));
219 } catch (IOException e) {
220 throw getTIOError(e);
221 } finally {
222 closeTable(htable);
223 }
224 }
225
226 @Override
227 public void put(ByteBuffer table, TPut put) throws TIOError, TException {
228 Table htable = getTable(table);
229 try {
230 htable.put(putFromThrift(put));
231 } catch (IOException e) {
232 throw getTIOError(e);
233 } finally {
234 closeTable(htable);
235 }
236 }
237
238 @Override
239 public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
240 ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
241 Table htable = getTable(table);
242 try {
243 return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
244 byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
245 putFromThrift(put));
246 } catch (IOException e) {
247 throw getTIOError(e);
248 } finally {
249 closeTable(htable);
250 }
251 }
252
253 @Override
254 public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
255 Table htable = getTable(table);
256 try {
257 htable.put(putsFromThrift(puts));
258 } catch (IOException e) {
259 throw getTIOError(e);
260 } finally {
261 closeTable(htable);
262 }
263 }
264
265 @Override
266 public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
267 Table htable = getTable(table);
268 try {
269 htable.delete(deleteFromThrift(deleteSingle));
270 } catch (IOException e) {
271 throw getTIOError(e);
272 } finally {
273 closeTable(htable);
274 }
275 }
276
277 @Override
278 public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
279 TException {
280 Table htable = getTable(table);
281 try {
282 htable.delete(deletesFromThrift(deletes));
283 } catch (IOException e) {
284 throw getTIOError(e);
285 } finally {
286 closeTable(htable);
287 }
288 return Collections.emptyList();
289 }
290
291 @Override
292 public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
293 ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
294 Table htable = getTable(table);
295
296 try {
297 if (value == null) {
298 return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
299 byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
300 } else {
301 return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
302 byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
303 deleteFromThrift(deleteSingle));
304 }
305 } catch (IOException e) {
306 throw getTIOError(e);
307 } finally {
308 closeTable(htable);
309 }
310 }
311
312 @Override
313 public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
314 Table htable = getTable(table);
315 try {
316 return resultFromHBase(htable.increment(incrementFromThrift(increment)));
317 } catch (IOException e) {
318 throw getTIOError(e);
319 } finally {
320 closeTable(htable);
321 }
322 }
323
324 @Override
325 public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
326 Table htable = getTable(table);
327 try {
328 return resultFromHBase(htable.append(appendFromThrift(append)));
329 } catch (IOException e) {
330 throw getTIOError(e);
331 } finally {
332 closeTable(htable);
333 }
334 }
335
336 @Override
337 public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
338 Table htable = getTable(table);
339 ResultScanner resultScanner = null;
340 try {
341 resultScanner = htable.getScanner(scanFromThrift(scan));
342 } catch (IOException e) {
343 throw getTIOError(e);
344 } finally {
345 closeTable(htable);
346 }
347 return addScanner(resultScanner);
348 }
349
350 @Override
351 public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
352 TIllegalArgument, TException {
353 ResultScanner scanner = getScanner(scannerId);
354 if (scanner == null) {
355 TIllegalArgument ex = new TIllegalArgument();
356 ex.setMessage("Invalid scanner Id");
357 throw ex;
358 }
359
360 try {
361 return resultsFromHBase(scanner.next(numRows));
362 } catch (IOException e) {
363 throw getTIOError(e);
364 }
365 }
366
367 @Override
368 public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
369 throws TIOError, TException {
370 Table htable = getTable(table);
371 List<TResult> results = null;
372 ResultScanner scanner = null;
373 try {
374 scanner = htable.getScanner(scanFromThrift(scan));
375 results = resultsFromHBase(scanner.next(numRows));
376 } catch (IOException e) {
377 throw getTIOError(e);
378 } finally {
379 if (scanner != null) {
380 scanner.close();
381 }
382 closeTable(htable);
383 }
384 return results;
385 }
386
387 @Override
388 public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
389 LOG.debug("scannerClose: id=" + scannerId);
390 ResultScanner scanner = getScanner(scannerId);
391 if (scanner == null) {
392 String message = "scanner ID is invalid";
393 LOG.warn(message);
394 TIllegalArgument ex = new TIllegalArgument();
395 ex.setMessage("Invalid scanner Id");
396 throw ex;
397 }
398 scanner.close();
399 removeScanner(scannerId);
400 }
401
402 @Override
403 public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
404 Table htable = getTable(table);
405 try {
406 htable.mutateRow(rowMutationsFromThrift(rowMutations));
407 } catch (IOException e) {
408 throw getTIOError(e);
409 } finally {
410 closeTable(htable);
411 }
412 }
413 }