View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
22  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.compareOpFromThrift;
23  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
24  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
25  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
26  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
27  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
28  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
29  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
30  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
31  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
32  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
33  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
34  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
35  
36  import java.io.IOException;
37  import java.lang.reflect.InvocationHandler;
38  import java.lang.reflect.InvocationTargetException;
39  import java.lang.reflect.Method;
40  import java.lang.reflect.Proxy;
41  import java.nio.ByteBuffer;
42  import java.util.Collections;
43  import java.util.List;
44  import java.util.Map;
45  import java.util.concurrent.ConcurrentHashMap;
46  import java.util.concurrent.atomic.AtomicInteger;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.conf.Configuration;
51  import org.apache.hadoop.hbase.HRegionLocation;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.client.RegionLocator;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Table;
56  import org.apache.hadoop.hbase.security.UserProvider;
57  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
58  import org.apache.hadoop.hbase.thrift2.generated.TAppend;
59  import org.apache.hadoop.hbase.thrift2.generated.TCompareOp;
60  import org.apache.hadoop.hbase.thrift2.generated.TDelete;
61  import org.apache.hadoop.hbase.thrift2.generated.TGet;
62  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
63  import org.apache.hadoop.hbase.thrift2.generated.THRegionLocation;
64  import org.apache.hadoop.hbase.thrift2.generated.TIOError;
65  import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
66  import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
67  import org.apache.hadoop.hbase.thrift2.generated.TPut;
68  import org.apache.hadoop.hbase.thrift2.generated.TResult;
69  import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
70  import org.apache.hadoop.hbase.thrift2.generated.TScan;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.ConnectionCache;
73  import org.apache.thrift.TException;
74  
75  /**
76   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
77   * defined in the Table interface.
78   */
79  @InterfaceAudience.Private
80  @SuppressWarnings("deprecation")
81  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
82  
83    // TODO: Size of pool configuraple
84    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
85  
86    // nextScannerId and scannerMap are used to manage scanner state
87    // TODO: Cleanup thread for Scanners, Scanner id wrap
88    private final AtomicInteger nextScannerId = new AtomicInteger(0);
89    private final Map<Integer, ResultScanner> scannerMap =
90        new ConcurrentHashMap<Integer, ResultScanner>();
91  
92    private final ConnectionCache connectionCache;
93  
94    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
95    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
96  
97    public static THBaseService.Iface newInstance(
98        THBaseService.Iface handler, ThriftMetrics metrics) {
99      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
100       new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
101   }
102 
103   private static final class THBaseServiceMetricsProxy implements InvocationHandler {
104     private final THBaseService.Iface handler;
105     private final ThriftMetrics metrics;
106 
107     private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
108       this.handler = handler;
109       this.metrics = metrics;
110     }
111 
112     @Override
113     public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
114       Object result;
115       try {
116         long start = now();
117         result = m.invoke(handler, args);
118         int processTime = (int) (now() - start);
119         metrics.incMethodTime(m.getName(), processTime);
120       } catch (InvocationTargetException e) {
121         throw e.getTargetException();
122       } catch (Exception e) {
123         throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
124       }
125       return result;
126     }
127   }
128 
129   private static long now() {
130     return System.nanoTime();
131   }
132 
133   ThriftHBaseServiceHandler(final Configuration conf,
134       final UserProvider userProvider) throws IOException {
135     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
136     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
137     connectionCache = new ConnectionCache(
138       conf, userProvider, cleanInterval, maxIdleTime);
139   }
140 
141   private Table getTable(ByteBuffer tableName) {
142     try {
143       return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
144     } catch (IOException ie) {
145       throw new RuntimeException(ie);
146     }
147   }
148 
149   private RegionLocator getLocator(ByteBuffer tableName) {
150     try {
151       return connectionCache.getRegionLocator(byteBufferToByteArray(tableName));
152     } catch (IOException ie) {
153       throw new RuntimeException(ie);
154     }
155   }
156 
157   private void closeTable(Table table) throws TIOError {
158     try {
159       table.close();
160     } catch (IOException e) {
161       throw getTIOError(e);
162     }
163   }
164 
165   private TIOError getTIOError(IOException e) {
166     TIOError err = new TIOError();
167     err.setMessage(e.getMessage());
168     return err;
169   }
170 
171   /**
172    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
173    * @param scanner to add
174    * @return Id for this Scanner
175    */
176   private int addScanner(ResultScanner scanner) {
177     int id = nextScannerId.getAndIncrement();
178     scannerMap.put(id, scanner);
179     return id;
180   }
181 
182   /**
183    * Returns the Scanner associated with the specified Id.
184    * @param id of the Scanner to get
185    * @return a Scanner, or null if the Id is invalid
186    */
187   private ResultScanner getScanner(int id) {
188     return scannerMap.get(id);
189   }
190 
191   void setEffectiveUser(String effectiveUser) {
192     connectionCache.setEffectiveUser(effectiveUser);
193   }
194 
195   /**
196    * Removes the scanner associated with the specified ID from the internal HashMap.
197    * @param id of the Scanner to remove
198    * @return the removed Scanner, or <code>null</code> if the Id is invalid
199    */
200   protected ResultScanner removeScanner(int id) {
201     return scannerMap.remove(id);
202   }
203 
204   @Override
205   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
206     Table htable = getTable(table);
207     try {
208       return htable.exists(getFromThrift(get));
209     } catch (IOException e) {
210       throw getTIOError(e);
211     } finally {
212       closeTable(htable);
213     }
214   }
215 
216   @Override
217   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
218     Table htable = getTable(table);
219     try {
220       return resultFromHBase(htable.get(getFromThrift(get)));
221     } catch (IOException e) {
222       throw getTIOError(e);
223     } finally {
224       closeTable(htable);
225     }
226   }
227 
228   @Override
229   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
230     Table htable = getTable(table);
231     try {
232       return resultsFromHBase(htable.get(getsFromThrift(gets)));
233     } catch (IOException e) {
234       throw getTIOError(e);
235     } finally {
236       closeTable(htable);
237     }
238   }
239 
240   @Override
241   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
242     Table htable = getTable(table);
243     try {
244       htable.put(putFromThrift(put));
245     } catch (IOException e) {
246       throw getTIOError(e);
247     } finally {
248       closeTable(htable);
249     }
250   }
251 
252   @Override
253   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
254       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
255     Table htable = getTable(table);
256     try {
257       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
258         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
259         putFromThrift(put));
260     } catch (IOException e) {
261       throw getTIOError(e);
262     } finally {
263       closeTable(htable);
264     }
265   }
266 
267   @Override
268   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
269     Table htable = getTable(table);
270     try {
271       htable.put(putsFromThrift(puts));
272     } catch (IOException e) {
273       throw getTIOError(e);
274     } finally {
275       closeTable(htable);
276     }
277   }
278 
279   @Override
280   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
281     Table htable = getTable(table);
282     try {
283       htable.delete(deleteFromThrift(deleteSingle));
284     } catch (IOException e) {
285       throw getTIOError(e);
286     } finally {
287       closeTable(htable);
288     }
289   }
290 
291   @Override
292   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
293       TException {
294     Table htable = getTable(table);
295     try {
296       htable.delete(deletesFromThrift(deletes));
297     } catch (IOException e) {
298       throw getTIOError(e);
299     } finally {
300       closeTable(htable);
301     }
302     return Collections.emptyList();
303   }
304   
305   @Override
306   public boolean checkAndMutate(ByteBuffer table, ByteBuffer row, ByteBuffer family,
307       ByteBuffer qualifier, TCompareOp compareOp, ByteBuffer value, TRowMutations rowMutations)
308           throws TIOError, TException {
309     try (final Table htable = getTable(table)) {
310       return htable.checkAndMutate(byteBufferToByteArray(row), byteBufferToByteArray(family),
311           byteBufferToByteArray(qualifier), compareOpFromThrift(compareOp),
312           byteBufferToByteArray(value), rowMutationsFromThrift(rowMutations));
313     } catch (IOException e) {
314       throw getTIOError(e);
315     }
316   }
317 
318   @Override
319   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
320       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
321     Table htable = getTable(table);
322 
323     try {
324       if (value == null) {
325         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
326           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
327       } else {
328         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
329           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
330           deleteFromThrift(deleteSingle));
331       }
332     } catch (IOException e) {
333       throw getTIOError(e);
334     } finally {
335       closeTable(htable);
336     }
337   }
338 
339   @Override
340   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
341     Table htable = getTable(table);
342     try {
343       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
344     } catch (IOException e) {
345       throw getTIOError(e);
346     } finally {
347       closeTable(htable);
348     }
349   }
350 
351   @Override
352   public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
353     Table htable = getTable(table);
354     try {
355       return resultFromHBase(htable.append(appendFromThrift(append)));
356     } catch (IOException e) {
357       throw getTIOError(e);
358     } finally {
359       closeTable(htable);
360     }
361   }
362 
363   @Override
364   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
365     Table htable = getTable(table);
366     ResultScanner resultScanner = null;
367     try {
368       resultScanner = htable.getScanner(scanFromThrift(scan));
369     } catch (IOException e) {
370       throw getTIOError(e);
371     } finally {
372       closeTable(htable);
373     }
374     return addScanner(resultScanner);
375   }
376 
377   @Override
378   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
379       TIllegalArgument, TException {
380     ResultScanner scanner = getScanner(scannerId);
381     if (scanner == null) {
382       TIllegalArgument ex = new TIllegalArgument();
383       ex.setMessage("Invalid scanner Id");
384       throw ex;
385     }
386 
387     try {
388       return resultsFromHBase(scanner.next(numRows));
389     } catch (IOException e) {
390       throw getTIOError(e);
391     }
392   }
393 
394   @Override
395   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
396       throws TIOError, TException {
397     Table htable = getTable(table);
398     List<TResult> results = null;
399     ResultScanner scanner = null;
400     try {
401       scanner = htable.getScanner(scanFromThrift(scan));
402       results = resultsFromHBase(scanner.next(numRows));
403     } catch (IOException e) {
404       throw getTIOError(e);
405     } finally {
406       if (scanner != null) {
407         scanner.close();
408       }
409       closeTable(htable);
410     }
411     return results;
412   }
413 
414 
415 
416   @Override
417   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
418     LOG.debug("scannerClose: id=" + scannerId);
419     ResultScanner scanner = getScanner(scannerId);
420     if (scanner == null) {
421       String message = "scanner ID is invalid";
422       LOG.warn(message);
423       TIllegalArgument ex = new TIllegalArgument();
424       ex.setMessage("Invalid scanner Id");
425       throw ex;
426     }
427     scanner.close();
428     removeScanner(scannerId);
429   }
430 
431   @Override
432   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
433     Table htable = getTable(table);
434     try {
435       htable.mutateRow(rowMutationsFromThrift(rowMutations));
436     } catch (IOException e) {
437       throw getTIOError(e);
438     } finally {
439       closeTable(htable);
440     }
441   }
442 
443   @Override
444   public List<THRegionLocation> getAllRegionLocations(ByteBuffer table)
445       throws TIOError, TException {
446     RegionLocator locator = null;
447     try {
448       locator = getLocator(table);
449       return ThriftUtilities.regionLocationsFromHBase(locator.getAllRegionLocations());
450 
451     } catch (IOException e) {
452       throw getTIOError(e);
453     } finally {
454       if (locator != null) {
455         try {
456           locator.close();
457         } catch (IOException e) {
458           LOG.warn("Couldn't close the locator.", e);
459         }
460       }
461     }
462   }
463 
464   @Override
465   public THRegionLocation getRegionLocation(ByteBuffer table, ByteBuffer row, boolean reload)
466       throws TIOError, TException {
467 
468     RegionLocator locator = null;
469     try {
470       locator = getLocator(table);
471       byte[] rowBytes = byteBufferToByteArray(row);
472       HRegionLocation hrl = locator.getRegionLocation(rowBytes, reload);
473       return ThriftUtilities.regionLocationFromHBase(hrl);
474 
475     } catch (IOException e) {
476       throw getTIOError(e);
477     } finally {
478       if (locator != null) {
479         try {
480           locator.close();
481         } catch (IOException e) {
482           LOG.warn("Couldn't close the locator.", e);
483         }
484       }
485     }
486   }
487 }