View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*;
22  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
23  
24  import java.io.IOException;
25  import java.lang.reflect.InvocationHandler;
26  import java.lang.reflect.InvocationTargetException;
27  import java.lang.reflect.Method;
28  import java.lang.reflect.Proxy;
29  import java.nio.ByteBuffer;
30  import java.util.Collections;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.classification.InterfaceAudience;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.client.HTableFactory;
44  import org.apache.hadoop.hbase.client.HTableInterface;
45  import org.apache.hadoop.hbase.client.ResultScanner;
46  import org.apache.hadoop.hbase.client.Table;
47  import org.apache.hadoop.hbase.security.UserProvider;
48  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
49  import org.apache.hadoop.hbase.thrift2.generated.*;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.ConnectionCache;
52  import org.apache.thrift.TException;
53  
54  import com.google.common.cache.Cache;
55  import com.google.common.cache.CacheBuilder;
56  
57  /**
58   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
59   * defined in the HTableInterface.
60   */
61  @InterfaceAudience.Private
62  @SuppressWarnings("deprecation")
63  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
64  
65    // TODO: Size of pool configuraple
66    private final Cache<String, HTablePool> htablePools;
67    private final Callable<? extends HTablePool> htablePoolCreater;
68    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
69  
70    // nextScannerId and scannerMap are used to manage scanner state
71    // TODO: Cleanup thread for Scanners, Scanner id wrap
72    private final AtomicInteger nextScannerId = new AtomicInteger(0);
73    private final Map<Integer, ResultScanner> scannerMap =
74        new ConcurrentHashMap<Integer, ResultScanner>();
75  
76    private final ConnectionCache connectionCache;
77    private final HTableFactory tableFactory;
78    private final int maxPoolSize;
79  
80    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
81    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
82  
83    public static THBaseService.Iface newInstance(
84        THBaseService.Iface handler, ThriftMetrics metrics) {
85      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
86        new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
87    }
88  
89    private static class THBaseServiceMetricsProxy implements InvocationHandler {
90      private final THBaseService.Iface handler;
91      private final ThriftMetrics metrics;
92  
93      private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
94        this.handler = handler;
95        this.metrics = metrics;
96      }
97  
98      @Override
99      public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
100       Object result;
101       try {
102         long start = now();
103         result = m.invoke(handler, args);
104         int processTime = (int) (now() - start);
105         metrics.incMethodTime(m.getName(), processTime);
106       } catch (InvocationTargetException e) {
107         throw e.getTargetException();
108       } catch (Exception e) {
109         throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
110       }
111       return result;
112     }
113   }
114 
115   private static long now() {
116     return System.nanoTime();
117   }
118 
119   ThriftHBaseServiceHandler(final Configuration conf,
120       final UserProvider userProvider) throws IOException {
121     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
122     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
123     connectionCache = new ConnectionCache(
124       conf, userProvider, cleanInterval, maxIdleTime);
125     tableFactory = new HTableFactory() {
126       @Override
127       public HTableInterface createHTableInterface(Configuration config,
128           byte[] tableName) {
129         try {
130           return connectionCache.getTable(Bytes.toString(tableName));
131         } catch (IOException ioe) {
132           throw new RuntimeException(ioe);
133         }
134       }
135     };
136     htablePools = CacheBuilder.newBuilder().expireAfterAccess(
137       maxIdleTime, TimeUnit.MILLISECONDS).softValues().concurrencyLevel(4).build();
138     maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000);
139     htablePoolCreater = new Callable<HTablePool>() {
140       public HTablePool call() {
141         return new HTablePool(conf, maxPoolSize, tableFactory);
142       }
143     };
144   }
145 
146   private Table getTable(ByteBuffer tableName) {
147     String currentUser = connectionCache.getEffectiveUser();
148     try {
149       HTablePool htablePool = htablePools.get(currentUser, htablePoolCreater);
150       return htablePool.getTable(byteBufferToByteArray(tableName));
151     } catch (ExecutionException ee) {
152       throw new RuntimeException(ee);
153     }
154   }
155 
156   private void closeTable(Table table) throws TIOError {
157     try {
158       table.close();
159     } catch (IOException e) {
160       throw getTIOError(e);
161     }
162   }
163 
164   private TIOError getTIOError(IOException e) {
165     TIOError err = new TIOError();
166     err.setMessage(e.getMessage());
167     return err;
168   }
169 
170   /**
171    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
172    * @param scanner to add
173    * @return Id for this Scanner
174    */
175   private int addScanner(ResultScanner scanner) {
176     int id = nextScannerId.getAndIncrement();
177     scannerMap.put(id, scanner);
178     return id;
179   }
180 
181   /**
182    * Returns the Scanner associated with the specified Id.
183    * @param id of the Scanner to get
184    * @return a Scanner, or null if the Id is invalid
185    */
186   private ResultScanner getScanner(int id) {
187     return scannerMap.get(id);
188   }
189 
190   void setEffectiveUser(String effectiveUser) {
191     connectionCache.setEffectiveUser(effectiveUser);
192   }
193 
194   /**
195    * Removes the scanner associated with the specified ID from the internal HashMap.
196    * @param id of the Scanner to remove
197    * @return the removed Scanner, or <code>null</code> if the Id is invalid
198    */
199   protected ResultScanner removeScanner(int id) {
200     return scannerMap.remove(id);
201   }
202 
203   @Override
204   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
205     Table htable = getTable(table);
206     try {
207       return htable.exists(getFromThrift(get));
208     } catch (IOException e) {
209       throw getTIOError(e);
210     } finally {
211       closeTable(htable);
212     }
213   }
214 
215   @Override
216   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
217     Table htable = getTable(table);
218     try {
219       return resultFromHBase(htable.get(getFromThrift(get)));
220     } catch (IOException e) {
221       throw getTIOError(e);
222     } finally {
223       closeTable(htable);
224     }
225   }
226 
227   @Override
228   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
229     Table htable = getTable(table);
230     try {
231       return resultsFromHBase(htable.get(getsFromThrift(gets)));
232     } catch (IOException e) {
233       throw getTIOError(e);
234     } finally {
235       closeTable(htable);
236     }
237   }
238 
239   @Override
240   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
241     Table htable = getTable(table);
242     try {
243       htable.put(putFromThrift(put));
244     } catch (IOException e) {
245       throw getTIOError(e);
246     } finally {
247       closeTable(htable);
248     }
249   }
250 
251   @Override
252   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
253       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
254     Table htable = getTable(table);
255     try {
256       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
257         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
258         putFromThrift(put));
259     } catch (IOException e) {
260       throw getTIOError(e);
261     } finally {
262       closeTable(htable);
263     }
264   }
265 
266   @Override
267   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
268     Table htable = getTable(table);
269     try {
270       htable.put(putsFromThrift(puts));
271     } catch (IOException e) {
272       throw getTIOError(e);
273     } finally {
274       closeTable(htable);
275     }
276   }
277 
278   @Override
279   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
280     Table htable = getTable(table);
281     try {
282       htable.delete(deleteFromThrift(deleteSingle));
283     } catch (IOException e) {
284       throw getTIOError(e);
285     } finally {
286       closeTable(htable);
287     }
288   }
289 
290   @Override
291   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
292       TException {
293     Table htable = getTable(table);
294     try {
295       htable.delete(deletesFromThrift(deletes));
296     } catch (IOException e) {
297       throw getTIOError(e);
298     } finally {
299       closeTable(htable);
300     }
301     return Collections.emptyList();
302   }
303 
304   @Override
305   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
306       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
307     Table htable = getTable(table);
308 
309     try {
310       if (value == null) {
311         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
312           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
313       } else {
314         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
315           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
316           deleteFromThrift(deleteSingle));
317       }
318     } catch (IOException e) {
319       throw getTIOError(e);
320     } finally {
321       closeTable(htable);
322     }
323   }
324 
325   @Override
326   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
327     Table htable = getTable(table);
328     try {
329       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
330     } catch (IOException e) {
331       throw getTIOError(e);
332     } finally {
333       closeTable(htable);
334     }
335   }
336 
337   @Override
338   public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
339     Table htable = getTable(table);
340     try {
341       return resultFromHBase(htable.append(appendFromThrift(append)));
342     } catch (IOException e) {
343       throw getTIOError(e);
344     } finally {
345       closeTable(htable);
346     }
347   }
348 
349   @Override
350   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
351     Table htable = getTable(table);
352     ResultScanner resultScanner = null;
353     try {
354       resultScanner = htable.getScanner(scanFromThrift(scan));
355     } catch (IOException e) {
356       throw getTIOError(e);
357     } finally {
358       closeTable(htable);
359     }
360     return addScanner(resultScanner);
361   }
362 
363   @Override
364   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
365       TIllegalArgument, TException {
366     ResultScanner scanner = getScanner(scannerId);
367     if (scanner == null) {
368       TIllegalArgument ex = new TIllegalArgument();
369       ex.setMessage("Invalid scanner Id");
370       throw ex;
371     }
372 
373     try {
374       return resultsFromHBase(scanner.next(numRows));
375     } catch (IOException e) {
376       throw getTIOError(e);
377     }
378   }
379 
380   @Override
381   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
382       throws TIOError, TException {
383     Table htable = getTable(table);
384     List<TResult> results = null;
385     ResultScanner scanner = null;
386     try {
387       scanner = htable.getScanner(scanFromThrift(scan));
388       results = resultsFromHBase(scanner.next(numRows));
389     } catch (IOException e) {
390       throw getTIOError(e);
391     } finally {
392       if (scanner != null) {
393         scanner.close();
394       }
395       closeTable(htable);
396     }
397     return results;
398   }
399 
400   @Override
401   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
402     LOG.debug("scannerClose: id=" + scannerId);
403     ResultScanner scanner = getScanner(scannerId);
404     if (scanner == null) {
405       String message = "scanner ID is invalid";
406       LOG.warn(message);
407       TIllegalArgument ex = new TIllegalArgument();
408       ex.setMessage("Invalid scanner Id");
409       throw ex;
410     }
411     scanner.close();
412     removeScanner(scannerId);
413   }
414 
415   @Override
416   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
417     Table htable = getTable(table);
418     try {
419       htable.mutateRow(rowMutationsFromThrift(rowMutations));
420     } catch (IOException e) {
421       throw getTIOError(e);
422     } finally {
423       closeTable(htable);
424     }
425   }
426 
427 }