View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
22  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
23  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
24  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
25  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
26  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
27  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
28  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
29  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
30  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
31  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
32  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
33  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
34  
35  import java.io.IOException;
36  import java.lang.reflect.InvocationHandler;
37  import java.lang.reflect.InvocationTargetException;
38  import java.lang.reflect.Method;
39  import java.lang.reflect.Proxy;
40  import java.nio.ByteBuffer;
41  import java.util.Collections;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.atomic.AtomicInteger;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.client.RegionLocator;
53  import org.apache.hadoop.hbase.client.ResultScanner;
54  import org.apache.hadoop.hbase.client.Table;
55  import org.apache.hadoop.hbase.security.UserProvider;
56  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
57  import org.apache.hadoop.hbase.thrift2.generated.TAppend;
58  import org.apache.hadoop.hbase.thrift2.generated.TDelete;
59  import org.apache.hadoop.hbase.thrift2.generated.TGet;
60  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
61  import org.apache.hadoop.hbase.thrift2.generated.THRegionLocation;
62  import org.apache.hadoop.hbase.thrift2.generated.TIOError;
63  import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
64  import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
65  import org.apache.hadoop.hbase.thrift2.generated.TPut;
66  import org.apache.hadoop.hbase.thrift2.generated.TResult;
67  import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
68  import org.apache.hadoop.hbase.thrift2.generated.TScan;
69  import org.apache.hadoop.hbase.util.Bytes;
70  import org.apache.hadoop.hbase.util.ConnectionCache;
71  import org.apache.thrift.TException;
72  
73  /**
74   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
75   * defined in the HTableInterface.
76   */
77  @InterfaceAudience.Private
78  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
79  
80    // TODO: Size of pool configuraple
81    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
82  
83    // nextScannerId and scannerMap are used to manage scanner state
84    // TODO: Cleanup thread for Scanners, Scanner id wrap
85    private final AtomicInteger nextScannerId = new AtomicInteger(0);
86    private final Map<Integer, ResultScanner> scannerMap =
87        new ConcurrentHashMap<Integer, ResultScanner>();
88  
89    private final ConnectionCache connectionCache;
90  
91    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
92    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
93  
94    public static THBaseService.Iface newInstance(
95        THBaseService.Iface handler, ThriftMetrics metrics) {
96      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
97        new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
98    }
99  
100   private static class THBaseServiceMetricsProxy implements InvocationHandler {
101     private final THBaseService.Iface handler;
102     private final ThriftMetrics metrics;
103 
104     private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
105       this.handler = handler;
106       this.metrics = metrics;
107     }
108 
109     @Override
110     public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
111       Object result;
112       try {
113         long start = now();
114         result = m.invoke(handler, args);
115         int processTime = (int) (now() - start);
116         metrics.incMethodTime(m.getName(), processTime);
117       } catch (InvocationTargetException e) {
118         throw e.getTargetException();
119       } catch (Exception e) {
120         throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
121       }
122       return result;
123     }
124   }
125 
126   private static long now() {
127     return System.nanoTime();
128   }
129 
130   ThriftHBaseServiceHandler(final Configuration conf,
131       final UserProvider userProvider) throws IOException {
132     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
133     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
134     connectionCache = new ConnectionCache(
135       conf, userProvider, cleanInterval, maxIdleTime);
136   }
137 
138   private Table getTable(ByteBuffer tableName) {
139     try {
140       return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
141     } catch (IOException e) {
142       throw new RuntimeException(e);
143     }
144   }
145 
146   private RegionLocator getLocator(ByteBuffer tableName) {
147     try {
148       return connectionCache.getRegionLocator(byteBufferToByteArray(tableName));
149     } catch (IOException ie) {
150       throw new RuntimeException(ie);
151     }
152   }
153 
154   private void closeTable(Table table) throws TIOError {
155     try {
156       table.close();
157     } catch (IOException e) {
158       throw getTIOError(e);
159     }
160   }
161 
162   private TIOError getTIOError(IOException e) {
163     TIOError err = new TIOError();
164     err.setMessage(e.getMessage());
165     return err;
166   }
167 
168   /**
169    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
170    * @param scanner to add
171    * @return Id for this Scanner
172    */
173   private int addScanner(ResultScanner scanner) {
174     int id = nextScannerId.getAndIncrement();
175     scannerMap.put(id, scanner);
176     return id;
177   }
178 
179   /**
180    * Returns the Scanner associated with the specified Id.
181    * @param id of the Scanner to get
182    * @return a Scanner, or null if the Id is invalid
183    */
184   private ResultScanner getScanner(int id) {
185     return scannerMap.get(id);
186   }
187 
188   void setEffectiveUser(String effectiveUser) {
189     connectionCache.setEffectiveUser(effectiveUser);
190   }
191 
192   /**
193    * Removes the scanner associated with the specified ID from the internal HashMap.
194    * @param id of the Scanner to remove
195    * @return the removed Scanner, or <code>null</code> if the Id is invalid
196    */
197   protected ResultScanner removeScanner(int id) {
198     return scannerMap.remove(id);
199   }
200 
201   @Override
202   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
203     Table htable = getTable(table);
204     try {
205       return htable.exists(getFromThrift(get));
206     } catch (IOException e) {
207       throw getTIOError(e);
208     } finally {
209       closeTable(htable);
210     }
211   }
212 
213   @Override
214   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
215     Table htable = getTable(table);
216     try {
217       return resultFromHBase(htable.get(getFromThrift(get)));
218     } catch (IOException e) {
219       throw getTIOError(e);
220     } finally {
221       closeTable(htable);
222     }
223   }
224 
225   @Override
226   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
227     Table htable = getTable(table);
228     try {
229       return resultsFromHBase(htable.get(getsFromThrift(gets)));
230     } catch (IOException e) {
231       throw getTIOError(e);
232     } finally {
233       closeTable(htable);
234     }
235   }
236 
237   @Override
238   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
239     Table htable = getTable(table);
240     try {
241       htable.put(putFromThrift(put));
242     } catch (IOException e) {
243       throw getTIOError(e);
244     } finally {
245       closeTable(htable);
246     }
247   }
248 
249   @Override
250   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
251       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
252     Table htable = getTable(table);
253     try {
254       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
255         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
256         putFromThrift(put));
257     } catch (IOException e) {
258       throw getTIOError(e);
259     } finally {
260       closeTable(htable);
261     }
262   }
263 
264   @Override
265   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
266     Table htable = getTable(table);
267     try {
268       htable.put(putsFromThrift(puts));
269     } catch (IOException e) {
270       throw getTIOError(e);
271     } finally {
272       closeTable(htable);
273     }
274   }
275 
276   @Override
277   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
278     Table htable = getTable(table);
279     try {
280       htable.delete(deleteFromThrift(deleteSingle));
281     } catch (IOException e) {
282       throw getTIOError(e);
283     } finally {
284       closeTable(htable);
285     }
286   }
287 
288   @Override
289   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
290       TException {
291     Table htable = getTable(table);
292     try {
293       htable.delete(deletesFromThrift(deletes));
294     } catch (IOException e) {
295       throw getTIOError(e);
296     } finally {
297       closeTable(htable);
298     }
299     return Collections.emptyList();
300   }
301 
302   @Override
303   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
304       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
305     Table htable = getTable(table);
306 
307     try {
308       if (value == null) {
309         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
310           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
311       } else {
312         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
313           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
314           deleteFromThrift(deleteSingle));
315       }
316     } catch (IOException e) {
317       throw getTIOError(e);
318     } finally {
319       closeTable(htable);
320     }
321   }
322 
323   @Override
324   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
325     Table htable = getTable(table);
326     try {
327       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
328     } catch (IOException e) {
329       throw getTIOError(e);
330     } finally {
331       closeTable(htable);
332     }
333   }
334 
335   @Override
336   public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
337     Table htable = getTable(table);
338     try {
339       return resultFromHBase(htable.append(appendFromThrift(append)));
340     } catch (IOException e) {
341       throw getTIOError(e);
342     } finally {
343       closeTable(htable);
344     }
345   }
346 
347   @Override
348   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
349     Table htable = getTable(table);
350     ResultScanner resultScanner = null;
351     try {
352       resultScanner = htable.getScanner(scanFromThrift(scan));
353     } catch (IOException e) {
354       throw getTIOError(e);
355     } finally {
356       closeTable(htable);
357     }
358     return addScanner(resultScanner);
359   }
360 
361   @Override
362   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
363       TIllegalArgument, TException {
364     ResultScanner scanner = getScanner(scannerId);
365     if (scanner == null) {
366       TIllegalArgument ex = new TIllegalArgument();
367       ex.setMessage("Invalid scanner Id");
368       throw ex;
369     }
370 
371     try {
372       return resultsFromHBase(scanner.next(numRows));
373     } catch (IOException e) {
374       throw getTIOError(e);
375     }
376   }
377 
378   @Override
379   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
380       throws TIOError, TException {
381     Table htable = getTable(table);
382     List<TResult> results = null;
383     ResultScanner scanner = null;
384     try {
385       scanner = htable.getScanner(scanFromThrift(scan));
386       results = resultsFromHBase(scanner.next(numRows));
387     } catch (IOException e) {
388       throw getTIOError(e);
389     } finally {
390       if (scanner != null) {
391         scanner.close();
392       }
393       closeTable(htable);
394     }
395     return results;
396   }
397 
398 
399 
400   @Override
401   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
402     LOG.debug("scannerClose: id=" + scannerId);
403     ResultScanner scanner = getScanner(scannerId);
404     if (scanner == null) {
405       String message = "scanner ID is invalid";
406       LOG.warn(message);
407       TIllegalArgument ex = new TIllegalArgument();
408       ex.setMessage("Invalid scanner Id");
409       throw ex;
410     }
411     scanner.close();
412     removeScanner(scannerId);
413   }
414 
415   @Override
416   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
417     Table htable = getTable(table);
418     try {
419       htable.mutateRow(rowMutationsFromThrift(rowMutations));
420     } catch (IOException e) {
421       throw getTIOError(e);
422     } finally {
423       closeTable(htable);
424     }
425   }
426 
427   @Override
428   public List<THRegionLocation> getAllRegionLocations(ByteBuffer table)
429       throws TIOError, TException {
430     RegionLocator locator = null;
431     try {
432       locator = getLocator(table);
433       return ThriftUtilities.regionLocationsFromHBase(locator.getAllRegionLocations());
434 
435     } catch (IOException e) {
436       throw getTIOError(e);
437     } finally {
438       if (locator != null) {
439         try {
440           locator.close();
441         } catch (IOException e) {
442           LOG.warn("Couldn't close the locator.", e);
443         }
444       }
445     }
446   }
447 
448   @Override
449   public THRegionLocation getRegionLocation(ByteBuffer table, ByteBuffer row, boolean reload)
450       throws TIOError, TException {
451 
452     RegionLocator locator = null;
453     try {
454       locator = getLocator(table);
455       byte[] rowBytes = byteBufferToByteArray(row);
456       HRegionLocation hrl = locator.getRegionLocation(rowBytes, reload);
457       return ThriftUtilities.regionLocationFromHBase(hrl);
458 
459     } catch (IOException e) {
460       throw getTIOError(e);
461     } finally {
462       if (locator != null) {
463         try {
464           locator.close();
465         } catch (IOException e) {
466           LOG.warn("Couldn't close the locator.", e);
467         }
468       }
469     }
470   }
471 }