1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift2;
20
21 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
22 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
23 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
24 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
25 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
26 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
27 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
28 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
29 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
30 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
31 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
32 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
33 import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
34
35 import java.io.IOException;
36 import java.lang.reflect.InvocationHandler;
37 import java.lang.reflect.InvocationTargetException;
38 import java.lang.reflect.Method;
39 import java.lang.reflect.Proxy;
40 import java.nio.ByteBuffer;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.atomic.AtomicInteger;
46
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.hbase.HRegionLocation;
51 import org.apache.hadoop.hbase.classification.InterfaceAudience;
52 import org.apache.hadoop.hbase.client.RegionLocator;
53 import org.apache.hadoop.hbase.client.ResultScanner;
54 import org.apache.hadoop.hbase.client.Table;
55 import org.apache.hadoop.hbase.security.UserProvider;
56 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
57 import org.apache.hadoop.hbase.thrift2.generated.TAppend;
58 import org.apache.hadoop.hbase.thrift2.generated.TDelete;
59 import org.apache.hadoop.hbase.thrift2.generated.TGet;
60 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
61 import org.apache.hadoop.hbase.thrift2.generated.THRegionLocation;
62 import org.apache.hadoop.hbase.thrift2.generated.TIOError;
63 import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
64 import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
65 import org.apache.hadoop.hbase.thrift2.generated.TPut;
66 import org.apache.hadoop.hbase.thrift2.generated.TResult;
67 import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
68 import org.apache.hadoop.hbase.thrift2.generated.TScan;
69 import org.apache.hadoop.hbase.util.Bytes;
70 import org.apache.hadoop.hbase.util.ConnectionCache;
71 import org.apache.thrift.TException;
72
73
74
75
76
77 @InterfaceAudience.Private
78 public class ThriftHBaseServiceHandler implements THBaseService.Iface {
79
80
81 private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
82
83
84
85 private final AtomicInteger nextScannerId = new AtomicInteger(0);
86 private final Map<Integer, ResultScanner> scannerMap =
87 new ConcurrentHashMap<Integer, ResultScanner>();
88
89 private final ConnectionCache connectionCache;
90
91 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
92 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
93
94 public static THBaseService.Iface newInstance(
95 THBaseService.Iface handler, ThriftMetrics metrics) {
96 return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
97 new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
98 }
99
100 private static class THBaseServiceMetricsProxy implements InvocationHandler {
101 private final THBaseService.Iface handler;
102 private final ThriftMetrics metrics;
103
104 private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
105 this.handler = handler;
106 this.metrics = metrics;
107 }
108
109 @Override
110 public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
111 Object result;
112 try {
113 long start = now();
114 result = m.invoke(handler, args);
115 int processTime = (int) (now() - start);
116 metrics.incMethodTime(m.getName(), processTime);
117 } catch (InvocationTargetException e) {
118 throw e.getTargetException();
119 } catch (Exception e) {
120 throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
121 }
122 return result;
123 }
124 }
125
126 private static long now() {
127 return System.nanoTime();
128 }
129
130 ThriftHBaseServiceHandler(final Configuration conf,
131 final UserProvider userProvider) throws IOException {
132 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
133 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
134 connectionCache = new ConnectionCache(
135 conf, userProvider, cleanInterval, maxIdleTime);
136 }
137
138 private Table getTable(ByteBuffer tableName) {
139 try {
140 return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
141 } catch (IOException e) {
142 throw new RuntimeException(e);
143 }
144 }
145
146 private RegionLocator getLocator(ByteBuffer tableName) {
147 try {
148 return connectionCache.getRegionLocator(byteBufferToByteArray(tableName));
149 } catch (IOException ie) {
150 throw new RuntimeException(ie);
151 }
152 }
153
154 private void closeTable(Table table) throws TIOError {
155 try {
156 table.close();
157 } catch (IOException e) {
158 throw getTIOError(e);
159 }
160 }
161
162 private TIOError getTIOError(IOException e) {
163 TIOError err = new TIOError();
164 err.setMessage(e.getMessage());
165 return err;
166 }
167
168
169
170
171
172
173 private int addScanner(ResultScanner scanner) {
174 int id = nextScannerId.getAndIncrement();
175 scannerMap.put(id, scanner);
176 return id;
177 }
178
179
180
181
182
183
184 private ResultScanner getScanner(int id) {
185 return scannerMap.get(id);
186 }
187
188 void setEffectiveUser(String effectiveUser) {
189 connectionCache.setEffectiveUser(effectiveUser);
190 }
191
192
193
194
195
196
197 protected ResultScanner removeScanner(int id) {
198 return scannerMap.remove(id);
199 }
200
201 @Override
202 public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
203 Table htable = getTable(table);
204 try {
205 return htable.exists(getFromThrift(get));
206 } catch (IOException e) {
207 throw getTIOError(e);
208 } finally {
209 closeTable(htable);
210 }
211 }
212
213 @Override
214 public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
215 Table htable = getTable(table);
216 try {
217 return resultFromHBase(htable.get(getFromThrift(get)));
218 } catch (IOException e) {
219 throw getTIOError(e);
220 } finally {
221 closeTable(htable);
222 }
223 }
224
225 @Override
226 public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
227 Table htable = getTable(table);
228 try {
229 return resultsFromHBase(htable.get(getsFromThrift(gets)));
230 } catch (IOException e) {
231 throw getTIOError(e);
232 } finally {
233 closeTable(htable);
234 }
235 }
236
237 @Override
238 public void put(ByteBuffer table, TPut put) throws TIOError, TException {
239 Table htable = getTable(table);
240 try {
241 htable.put(putFromThrift(put));
242 } catch (IOException e) {
243 throw getTIOError(e);
244 } finally {
245 closeTable(htable);
246 }
247 }
248
249 @Override
250 public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
251 ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
252 Table htable = getTable(table);
253 try {
254 return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
255 byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
256 putFromThrift(put));
257 } catch (IOException e) {
258 throw getTIOError(e);
259 } finally {
260 closeTable(htable);
261 }
262 }
263
264 @Override
265 public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
266 Table htable = getTable(table);
267 try {
268 htable.put(putsFromThrift(puts));
269 } catch (IOException e) {
270 throw getTIOError(e);
271 } finally {
272 closeTable(htable);
273 }
274 }
275
276 @Override
277 public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
278 Table htable = getTable(table);
279 try {
280 htable.delete(deleteFromThrift(deleteSingle));
281 } catch (IOException e) {
282 throw getTIOError(e);
283 } finally {
284 closeTable(htable);
285 }
286 }
287
288 @Override
289 public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
290 TException {
291 Table htable = getTable(table);
292 try {
293 htable.delete(deletesFromThrift(deletes));
294 } catch (IOException e) {
295 throw getTIOError(e);
296 } finally {
297 closeTable(htable);
298 }
299 return Collections.emptyList();
300 }
301
302 @Override
303 public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
304 ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
305 Table htable = getTable(table);
306
307 try {
308 if (value == null) {
309 return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
310 byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
311 } else {
312 return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
313 byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
314 deleteFromThrift(deleteSingle));
315 }
316 } catch (IOException e) {
317 throw getTIOError(e);
318 } finally {
319 closeTable(htable);
320 }
321 }
322
323 @Override
324 public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
325 Table htable = getTable(table);
326 try {
327 return resultFromHBase(htable.increment(incrementFromThrift(increment)));
328 } catch (IOException e) {
329 throw getTIOError(e);
330 } finally {
331 closeTable(htable);
332 }
333 }
334
335 @Override
336 public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
337 Table htable = getTable(table);
338 try {
339 return resultFromHBase(htable.append(appendFromThrift(append)));
340 } catch (IOException e) {
341 throw getTIOError(e);
342 } finally {
343 closeTable(htable);
344 }
345 }
346
347 @Override
348 public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
349 Table htable = getTable(table);
350 ResultScanner resultScanner = null;
351 try {
352 resultScanner = htable.getScanner(scanFromThrift(scan));
353 } catch (IOException e) {
354 throw getTIOError(e);
355 } finally {
356 closeTable(htable);
357 }
358 return addScanner(resultScanner);
359 }
360
361 @Override
362 public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
363 TIllegalArgument, TException {
364 ResultScanner scanner = getScanner(scannerId);
365 if (scanner == null) {
366 TIllegalArgument ex = new TIllegalArgument();
367 ex.setMessage("Invalid scanner Id");
368 throw ex;
369 }
370
371 try {
372 return resultsFromHBase(scanner.next(numRows));
373 } catch (IOException e) {
374 throw getTIOError(e);
375 }
376 }
377
378 @Override
379 public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
380 throws TIOError, TException {
381 Table htable = getTable(table);
382 List<TResult> results = null;
383 ResultScanner scanner = null;
384 try {
385 scanner = htable.getScanner(scanFromThrift(scan));
386 results = resultsFromHBase(scanner.next(numRows));
387 } catch (IOException e) {
388 throw getTIOError(e);
389 } finally {
390 if (scanner != null) {
391 scanner.close();
392 }
393 closeTable(htable);
394 }
395 return results;
396 }
397
398
399
400 @Override
401 public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
402 LOG.debug("scannerClose: id=" + scannerId);
403 ResultScanner scanner = getScanner(scannerId);
404 if (scanner == null) {
405 String message = "scanner ID is invalid";
406 LOG.warn(message);
407 TIllegalArgument ex = new TIllegalArgument();
408 ex.setMessage("Invalid scanner Id");
409 throw ex;
410 }
411 scanner.close();
412 removeScanner(scannerId);
413 }
414
415 @Override
416 public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
417 Table htable = getTable(table);
418 try {
419 htable.mutateRow(rowMutationsFromThrift(rowMutations));
420 } catch (IOException e) {
421 throw getTIOError(e);
422 } finally {
423 closeTable(htable);
424 }
425 }
426
427 @Override
428 public List<THRegionLocation> getAllRegionLocations(ByteBuffer table)
429 throws TIOError, TException {
430 RegionLocator locator = null;
431 try {
432 locator = getLocator(table);
433 return ThriftUtilities.regionLocationsFromHBase(locator.getAllRegionLocations());
434
435 } catch (IOException e) {
436 throw getTIOError(e);
437 } finally {
438 if (locator != null) {
439 try {
440 locator.close();
441 } catch (IOException e) {
442 LOG.warn("Couldn't close the locator.", e);
443 }
444 }
445 }
446 }
447
448 @Override
449 public THRegionLocation getRegionLocation(ByteBuffer table, ByteBuffer row, boolean reload)
450 throws TIOError, TException {
451
452 RegionLocator locator = null;
453 try {
454 locator = getLocator(table);
455 byte[] rowBytes = byteBufferToByteArray(row);
456 HRegionLocation hrl = locator.getRegionLocation(rowBytes, reload);
457 return ThriftUtilities.regionLocationFromHBase(hrl);
458
459 } catch (IOException e) {
460 throw getTIOError(e);
461 } finally {
462 if (locator != null) {
463 try {
464 locator.close();
465 } catch (IOException e) {
466 LOG.warn("Couldn't close the locator.", e);
467 }
468 }
469 }
470 }
471 }