View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*;
22  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
23  
24  import java.io.IOException;
25  import java.lang.reflect.InvocationHandler;
26  import java.lang.reflect.InvocationTargetException;
27  import java.lang.reflect.Method;
28  import java.lang.reflect.Proxy;
29  import java.nio.ByteBuffer;
30  import java.util.Collections;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.classification.InterfaceAudience;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.client.HTableFactory;
44  import org.apache.hadoop.hbase.client.HTableInterface;
45  import org.apache.hadoop.hbase.client.ResultScanner;
46  import org.apache.hadoop.hbase.security.UserProvider;
47  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
48  import org.apache.hadoop.hbase.thrift2.generated.*;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.ConnectionCache;
51  import org.apache.thrift.TException;
52  
53  import com.google.common.cache.Cache;
54  import com.google.common.cache.CacheBuilder;
55  
56  /**
57   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
58   * defined in the HTableInterface.
59   */
60  @InterfaceAudience.Private
61  @SuppressWarnings("deprecation")
62  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
63  
64    // TODO: Size of pool configuraple
65    private final Cache<String, HTablePool> htablePools;
66    private final Callable<? extends HTablePool> htablePoolCreater;
67    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
68  
69    // nextScannerId and scannerMap are used to manage scanner state
70    // TODO: Cleanup thread for Scanners, Scanner id wrap
71    private final AtomicInteger nextScannerId = new AtomicInteger(0);
72    private final Map<Integer, ResultScanner> scannerMap =
73        new ConcurrentHashMap<Integer, ResultScanner>();
74  
75    private final ConnectionCache connectionCache;
76    private final HTableFactory tableFactory;
77    private final int maxPoolSize;
78  
79    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
80    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
81  
82    public static THBaseService.Iface newInstance(
83        THBaseService.Iface handler, ThriftMetrics metrics) {
84      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
85        new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
86    }
87  
88    private static class THBaseServiceMetricsProxy implements InvocationHandler {
89      private final THBaseService.Iface handler;
90      private final ThriftMetrics metrics;
91  
92      private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
93        this.handler = handler;
94        this.metrics = metrics;
95      }
96  
97      @Override
98      public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
99        Object result;
100       try {
101         long start = now();
102         result = m.invoke(handler, args);
103         int processTime = (int) (now() - start);
104         metrics.incMethodTime(m.getName(), processTime);
105       } catch (InvocationTargetException e) {
106         throw e.getTargetException();
107       } catch (Exception e) {
108         throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
109       }
110       return result;
111     }
112   }
113 
114   private static long now() {
115     return System.nanoTime();
116   }
117 
118   ThriftHBaseServiceHandler(final Configuration conf,
119       final UserProvider userProvider) throws IOException {
120     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
121     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
122     connectionCache = new ConnectionCache(
123       conf, userProvider, cleanInterval, maxIdleTime);
124     tableFactory = new HTableFactory() {
125       @Override
126       public HTableInterface createHTableInterface(Configuration config,
127           byte[] tableName) {
128         try {
129           return connectionCache.getTable(Bytes.toString(tableName));
130         } catch (IOException ioe) {
131           throw new RuntimeException(ioe);
132         }
133       }
134     };
135     htablePools = CacheBuilder.newBuilder().expireAfterAccess(
136       maxIdleTime, TimeUnit.MILLISECONDS).softValues().concurrencyLevel(4).build();
137     maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000);
138     htablePoolCreater = new Callable<HTablePool>() {
139       public HTablePool call() {
140         return new HTablePool(conf, maxPoolSize, tableFactory);
141       }
142     };
143   }
144 
145   private HTableInterface getTable(ByteBuffer tableName) {
146     String currentUser = connectionCache.getEffectiveUser();
147     try {
148       HTablePool htablePool = htablePools.get(currentUser, htablePoolCreater);
149       return htablePool.getTable(byteBufferToByteArray(tableName));
150     } catch (ExecutionException ee) {
151       throw new RuntimeException(ee);
152     }
153   }
154 
155   private void closeTable(HTableInterface table) throws TIOError {
156     try {
157       table.close();
158     } catch (IOException e) {
159       throw getTIOError(e);
160     }
161   }
162 
163   private TIOError getTIOError(IOException e) {
164     TIOError err = new TIOError();
165     err.setMessage(e.getMessage());
166     return err;
167   }
168 
169   /**
170    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
171    * @param scanner to add
172    * @return Id for this Scanner
173    */
174   private int addScanner(ResultScanner scanner) {
175     int id = nextScannerId.getAndIncrement();
176     scannerMap.put(id, scanner);
177     return id;
178   }
179 
180   /**
181    * Returns the Scanner associated with the specified Id.
182    * @param id of the Scanner to get
183    * @return a Scanner, or null if the Id is invalid
184    */
185   private ResultScanner getScanner(int id) {
186     return scannerMap.get(id);
187   }
188 
189   void setEffectiveUser(String effectiveUser) {
190     connectionCache.setEffectiveUser(effectiveUser);
191   }
192 
193   /**
194    * Removes the scanner associated with the specified ID from the internal HashMap.
195    * @param id of the Scanner to remove
196    * @return the removed Scanner, or <code>null</code> if the Id is invalid
197    */
198   protected ResultScanner removeScanner(int id) {
199     return scannerMap.remove(id);
200   }
201 
202   @Override
203   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
204     HTableInterface htable = getTable(table);
205     try {
206       return htable.exists(getFromThrift(get));
207     } catch (IOException e) {
208       throw getTIOError(e);
209     } finally {
210       closeTable(htable);
211     }
212   }
213 
214   @Override
215   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
216     HTableInterface htable = getTable(table);
217     try {
218       return resultFromHBase(htable.get(getFromThrift(get)));
219     } catch (IOException e) {
220       throw getTIOError(e);
221     } finally {
222       closeTable(htable);
223     }
224   }
225 
226   @Override
227   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
228     HTableInterface htable = getTable(table);
229     try {
230       return resultsFromHBase(htable.get(getsFromThrift(gets)));
231     } catch (IOException e) {
232       throw getTIOError(e);
233     } finally {
234       closeTable(htable);
235     }
236   }
237 
238   @Override
239   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
240     HTableInterface htable = getTable(table);
241     try {
242       htable.put(putFromThrift(put));
243     } catch (IOException e) {
244       throw getTIOError(e);
245     } finally {
246       closeTable(htable);
247     }
248   }
249 
250   @Override
251   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
252       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
253     HTableInterface htable = getTable(table);
254     try {
255       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
256         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
257         putFromThrift(put));
258     } catch (IOException e) {
259       throw getTIOError(e);
260     } finally {
261       closeTable(htable);
262     }
263   }
264 
265   @Override
266   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
267     HTableInterface htable = getTable(table);
268     try {
269       htable.put(putsFromThrift(puts));
270     } catch (IOException e) {
271       throw getTIOError(e);
272     } finally {
273       closeTable(htable);
274     }
275   }
276 
277   @Override
278   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
279     HTableInterface htable = getTable(table);
280     try {
281       htable.delete(deleteFromThrift(deleteSingle));
282     } catch (IOException e) {
283       throw getTIOError(e);
284     } finally {
285       closeTable(htable);
286     }
287   }
288 
289   @Override
290   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
291       TException {
292     HTableInterface htable = getTable(table);
293     try {
294       htable.delete(deletesFromThrift(deletes));
295     } catch (IOException e) {
296       throw getTIOError(e);
297     } finally {
298       closeTable(htable);
299     }
300     return Collections.emptyList();
301   }
302 
303   @Override
304   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
305       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
306     HTableInterface htable = getTable(table);
307 
308     try {
309       if (value == null) {
310         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
311           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
312       } else {
313         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
314           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
315           deleteFromThrift(deleteSingle));
316       }
317     } catch (IOException e) {
318       throw getTIOError(e);
319     } finally {
320       closeTable(htable);
321     }
322   }
323 
324   @Override
325   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
326     HTableInterface htable = getTable(table);
327     try {
328       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
329     } catch (IOException e) {
330       throw getTIOError(e);
331     } finally {
332       closeTable(htable);
333     }
334   }
335 
336   @Override
337   public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
338     HTableInterface htable = getTable(table);
339     try {
340       return resultFromHBase(htable.append(appendFromThrift(append)));
341     } catch (IOException e) {
342       throw getTIOError(e);
343     } finally {
344       closeTable(htable);
345     }
346   }
347 
348   @Override
349   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
350     HTableInterface htable = getTable(table);
351     ResultScanner resultScanner = null;
352     try {
353       resultScanner = htable.getScanner(scanFromThrift(scan));
354     } catch (IOException e) {
355       throw getTIOError(e);
356     } finally {
357       closeTable(htable);
358     }
359     return addScanner(resultScanner);
360   }
361 
362   @Override
363   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
364       TIllegalArgument, TException {
365     ResultScanner scanner = getScanner(scannerId);
366     if (scanner == null) {
367       TIllegalArgument ex = new TIllegalArgument();
368       ex.setMessage("Invalid scanner Id");
369       throw ex;
370     }
371 
372     try {
373       return resultsFromHBase(scanner.next(numRows));
374     } catch (IOException e) {
375       throw getTIOError(e);
376     }
377   }
378 
379   @Override
380   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
381       throws TIOError, TException {
382     HTableInterface htable = getTable(table);
383     List<TResult> results = null;
384     ResultScanner scanner = null;
385     try {
386       scanner = htable.getScanner(scanFromThrift(scan));
387       results = resultsFromHBase(scanner.next(numRows));
388     } catch (IOException e) {
389       throw getTIOError(e);
390     } finally {
391       if (scanner != null) {
392         scanner.close();
393       }
394       closeTable(htable);
395     }
396     return results;
397   }
398 
399   @Override
400   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
401     LOG.debug("scannerClose: id=" + scannerId);
402     ResultScanner scanner = getScanner(scannerId);
403     if (scanner == null) {
404       String message = "scanner ID is invalid";
405       LOG.warn(message);
406       TIllegalArgument ex = new TIllegalArgument();
407       ex.setMessage("Invalid scanner Id");
408       throw ex;
409     }
410     scanner.close();
411     removeScanner(scannerId);
412   }
413 
414   @Override
415   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
416     HTableInterface htable = getTable(table);
417     try {
418       htable.mutateRow(rowMutationsFromThrift(rowMutations));
419     } catch (IOException e) {
420       throw getTIOError(e);
421     } finally {
422       closeTable(htable);
423     }
424   }
425 
426 }