View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
22  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
23  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
24  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
25  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
26  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
27  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
28  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
29  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
30  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
31  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
32  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
33  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
34  
35  import java.io.IOException;
36  import java.lang.reflect.InvocationHandler;
37  import java.lang.reflect.InvocationTargetException;
38  import java.lang.reflect.Method;
39  import java.lang.reflect.Proxy;
40  import java.nio.ByteBuffer;
41  import java.util.Collections;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.atomic.AtomicInteger;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.client.RegionLocator;
53  import org.apache.hadoop.hbase.client.ResultScanner;
54  import org.apache.hadoop.hbase.client.Table;
55  import org.apache.hadoop.hbase.security.UserProvider;
56  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
57  import org.apache.hadoop.hbase.thrift2.generated.TAppend;
58  import org.apache.hadoop.hbase.thrift2.generated.TDelete;
59  import org.apache.hadoop.hbase.thrift2.generated.TGet;
60  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
61  import org.apache.hadoop.hbase.thrift2.generated.THRegionLocation;
62  import org.apache.hadoop.hbase.thrift2.generated.TIOError;
63  import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
64  import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
65  import org.apache.hadoop.hbase.thrift2.generated.TPut;
66  import org.apache.hadoop.hbase.thrift2.generated.TResult;
67  import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
68  import org.apache.hadoop.hbase.thrift2.generated.TScan;
69  import org.apache.hadoop.hbase.util.Bytes;
70  import org.apache.hadoop.hbase.util.ConnectionCache;
71  import org.apache.thrift.TException;
72  
73  /**
74   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
75   * defined in the HTableInterface.
76   */
77  @InterfaceAudience.Private
78  @SuppressWarnings("deprecation")
79  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
80  
81    // TODO: Size of pool configuraple
82    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
83  
84    // nextScannerId and scannerMap are used to manage scanner state
85    // TODO: Cleanup thread for Scanners, Scanner id wrap
86    private final AtomicInteger nextScannerId = new AtomicInteger(0);
87    private final Map<Integer, ResultScanner> scannerMap =
88        new ConcurrentHashMap<Integer, ResultScanner>();
89  
90    private final ConnectionCache connectionCache;
91  
92    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
93    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
94  
95    public static THBaseService.Iface newInstance(
96        THBaseService.Iface handler, ThriftMetrics metrics) {
97      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
98        new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
99    }
100 
101   private static final class THBaseServiceMetricsProxy implements InvocationHandler {
102     private final THBaseService.Iface handler;
103     private final ThriftMetrics metrics;
104 
105     private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
106       this.handler = handler;
107       this.metrics = metrics;
108     }
109 
110     @Override
111     public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
112       Object result;
113       try {
114         long start = now();
115         result = m.invoke(handler, args);
116         int processTime = (int) (now() - start);
117         metrics.incMethodTime(m.getName(), processTime);
118       } catch (InvocationTargetException e) {
119         throw e.getTargetException();
120       } catch (Exception e) {
121         throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
122       }
123       return result;
124     }
125   }
126 
127   private static long now() {
128     return System.nanoTime();
129   }
130 
131   ThriftHBaseServiceHandler(final Configuration conf,
132       final UserProvider userProvider) throws IOException {
133     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
134     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
135     connectionCache = new ConnectionCache(
136       conf, userProvider, cleanInterval, maxIdleTime);
137   }
138 
139   private Table getTable(ByteBuffer tableName) {
140     try {
141       return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
142     } catch (IOException ie) {
143       throw new RuntimeException(ie);
144     }
145   }
146 
147   private RegionLocator getLocator(ByteBuffer tableName) {
148     try {
149       return connectionCache.getRegionLocator(byteBufferToByteArray(tableName));
150     } catch (IOException ie) {
151       throw new RuntimeException(ie);
152     }
153   }
154 
155   private void closeTable(Table table) throws TIOError {
156     try {
157       table.close();
158     } catch (IOException e) {
159       throw getTIOError(e);
160     }
161   }
162 
163   private TIOError getTIOError(IOException e) {
164     TIOError err = new TIOError();
165     err.setMessage(e.getMessage());
166     return err;
167   }
168 
169   /**
170    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
171    * @param scanner to add
172    * @return Id for this Scanner
173    */
174   private int addScanner(ResultScanner scanner) {
175     int id = nextScannerId.getAndIncrement();
176     scannerMap.put(id, scanner);
177     return id;
178   }
179 
180   /**
181    * Returns the Scanner associated with the specified Id.
182    * @param id of the Scanner to get
183    * @return a Scanner, or null if the Id is invalid
184    */
185   private ResultScanner getScanner(int id) {
186     return scannerMap.get(id);
187   }
188 
189   void setEffectiveUser(String effectiveUser) {
190     connectionCache.setEffectiveUser(effectiveUser);
191   }
192 
193   /**
194    * Removes the scanner associated with the specified ID from the internal HashMap.
195    * @param id of the Scanner to remove
196    * @return the removed Scanner, or <code>null</code> if the Id is invalid
197    */
198   protected ResultScanner removeScanner(int id) {
199     return scannerMap.remove(id);
200   }
201 
202   @Override
203   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
204     Table htable = getTable(table);
205     try {
206       return htable.exists(getFromThrift(get));
207     } catch (IOException e) {
208       throw getTIOError(e);
209     } finally {
210       closeTable(htable);
211     }
212   }
213 
214   @Override
215   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
216     Table htable = getTable(table);
217     try {
218       return resultFromHBase(htable.get(getFromThrift(get)));
219     } catch (IOException e) {
220       throw getTIOError(e);
221     } finally {
222       closeTable(htable);
223     }
224   }
225 
226   @Override
227   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
228     Table htable = getTable(table);
229     try {
230       return resultsFromHBase(htable.get(getsFromThrift(gets)));
231     } catch (IOException e) {
232       throw getTIOError(e);
233     } finally {
234       closeTable(htable);
235     }
236   }
237 
238   @Override
239   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
240     Table htable = getTable(table);
241     try {
242       htable.put(putFromThrift(put));
243     } catch (IOException e) {
244       throw getTIOError(e);
245     } finally {
246       closeTable(htable);
247     }
248   }
249 
250   @Override
251   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
252       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
253     Table htable = getTable(table);
254     try {
255       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
256         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
257         putFromThrift(put));
258     } catch (IOException e) {
259       throw getTIOError(e);
260     } finally {
261       closeTable(htable);
262     }
263   }
264 
265   @Override
266   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
267     Table htable = getTable(table);
268     try {
269       htable.put(putsFromThrift(puts));
270     } catch (IOException e) {
271       throw getTIOError(e);
272     } finally {
273       closeTable(htable);
274     }
275   }
276 
277   @Override
278   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
279     Table htable = getTable(table);
280     try {
281       htable.delete(deleteFromThrift(deleteSingle));
282     } catch (IOException e) {
283       throw getTIOError(e);
284     } finally {
285       closeTable(htable);
286     }
287   }
288 
289   @Override
290   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
291       TException {
292     Table htable = getTable(table);
293     try {
294       htable.delete(deletesFromThrift(deletes));
295     } catch (IOException e) {
296       throw getTIOError(e);
297     } finally {
298       closeTable(htable);
299     }
300     return Collections.emptyList();
301   }
302 
303   @Override
304   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
305       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
306     Table htable = getTable(table);
307 
308     try {
309       if (value == null) {
310         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
311           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
312       } else {
313         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
314           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
315           deleteFromThrift(deleteSingle));
316       }
317     } catch (IOException e) {
318       throw getTIOError(e);
319     } finally {
320       closeTable(htable);
321     }
322   }
323 
324   @Override
325   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
326     Table htable = getTable(table);
327     try {
328       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
329     } catch (IOException e) {
330       throw getTIOError(e);
331     } finally {
332       closeTable(htable);
333     }
334   }
335 
336   @Override
337   public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
338     Table htable = getTable(table);
339     try {
340       return resultFromHBase(htable.append(appendFromThrift(append)));
341     } catch (IOException e) {
342       throw getTIOError(e);
343     } finally {
344       closeTable(htable);
345     }
346   }
347 
348   @Override
349   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
350     Table htable = getTable(table);
351     ResultScanner resultScanner = null;
352     try {
353       resultScanner = htable.getScanner(scanFromThrift(scan));
354     } catch (IOException e) {
355       throw getTIOError(e);
356     } finally {
357       closeTable(htable);
358     }
359     return addScanner(resultScanner);
360   }
361 
362   @Override
363   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
364       TIllegalArgument, TException {
365     ResultScanner scanner = getScanner(scannerId);
366     if (scanner == null) {
367       TIllegalArgument ex = new TIllegalArgument();
368       ex.setMessage("Invalid scanner Id");
369       throw ex;
370     }
371 
372     try {
373       return resultsFromHBase(scanner.next(numRows));
374     } catch (IOException e) {
375       throw getTIOError(e);
376     }
377   }
378 
379   @Override
380   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
381       throws TIOError, TException {
382     Table htable = getTable(table);
383     List<TResult> results = null;
384     ResultScanner scanner = null;
385     try {
386       scanner = htable.getScanner(scanFromThrift(scan));
387       results = resultsFromHBase(scanner.next(numRows));
388     } catch (IOException e) {
389       throw getTIOError(e);
390     } finally {
391       if (scanner != null) {
392         scanner.close();
393       }
394       closeTable(htable);
395     }
396     return results;
397   }
398 
399 
400 
401   @Override
402   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
403     LOG.debug("scannerClose: id=" + scannerId);
404     ResultScanner scanner = getScanner(scannerId);
405     if (scanner == null) {
406       String message = "scanner ID is invalid";
407       LOG.warn(message);
408       TIllegalArgument ex = new TIllegalArgument();
409       ex.setMessage("Invalid scanner Id");
410       throw ex;
411     }
412     scanner.close();
413     removeScanner(scannerId);
414   }
415 
416   @Override
417   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
418     Table htable = getTable(table);
419     try {
420       htable.mutateRow(rowMutationsFromThrift(rowMutations));
421     } catch (IOException e) {
422       throw getTIOError(e);
423     } finally {
424       closeTable(htable);
425     }
426   }
427 
428   @Override
429   public List<THRegionLocation> getAllRegionLocations(ByteBuffer table)
430       throws TIOError, TException {
431     RegionLocator locator = null;
432     try {
433       locator = getLocator(table);
434       return ThriftUtilities.regionLocationsFromHBase(locator.getAllRegionLocations());
435 
436     } catch (IOException e) {
437       throw getTIOError(e);
438     } finally {
439       if (locator != null) {
440         try {
441           locator.close();
442         } catch (IOException e) {
443           LOG.warn("Couldn't close the locator.", e);
444         }
445       }
446     }
447   }
448 
449   @Override
450   public THRegionLocation getRegionLocation(ByteBuffer table, ByteBuffer row, boolean reload)
451       throws TIOError, TException {
452 
453     RegionLocator locator = null;
454     try {
455       locator = getLocator(table);
456       byte[] rowBytes = byteBufferToByteArray(row);
457       HRegionLocation hrl = locator.getRegionLocation(rowBytes, reload);
458       return ThriftUtilities.regionLocationFromHBase(hrl);
459 
460     } catch (IOException e) {
461       throw getTIOError(e);
462     } finally {
463       if (locator != null) {
464         try {
465           locator.close();
466         } catch (IOException e) {
467           LOG.warn("Couldn't close the locator.", e);
468         }
469       }
470     }
471   }
472 }