View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
22  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
23  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
24  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
25  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
26  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
27  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
28  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
29  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
30  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
31  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
32  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
33  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
34  
35  import java.io.IOException;
36  import java.lang.reflect.InvocationHandler;
37  import java.lang.reflect.InvocationTargetException;
38  import java.lang.reflect.Method;
39  import java.lang.reflect.Proxy;
40  import java.nio.ByteBuffer;
41  import java.util.Collections;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.atomic.AtomicInteger;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.client.ResultScanner;
52  import org.apache.hadoop.hbase.client.Table;
53  import org.apache.hadoop.hbase.security.UserProvider;
54  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
55  import org.apache.hadoop.hbase.thrift2.generated.TAppend;
56  import org.apache.hadoop.hbase.thrift2.generated.TDelete;
57  import org.apache.hadoop.hbase.thrift2.generated.TGet;
58  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
59  import org.apache.hadoop.hbase.thrift2.generated.TIOError;
60  import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
61  import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
62  import org.apache.hadoop.hbase.thrift2.generated.TPut;
63  import org.apache.hadoop.hbase.thrift2.generated.TResult;
64  import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
65  import org.apache.hadoop.hbase.thrift2.generated.TScan;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.ConnectionCache;
68  import org.apache.thrift.TException;
69  
70  /**
71   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
72   * defined in the HTableInterface.
73   */
74  @InterfaceAudience.Private
75  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
76  
77    // TODO: Size of pool configuraple
78    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
79  
80    // nextScannerId and scannerMap are used to manage scanner state
81    // TODO: Cleanup thread for Scanners, Scanner id wrap
82    private final AtomicInteger nextScannerId = new AtomicInteger(0);
83    private final Map<Integer, ResultScanner> scannerMap =
84        new ConcurrentHashMap<Integer, ResultScanner>();
85  
86    private final ConnectionCache connectionCache;
87  
88    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
89    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
90  
91    public static THBaseService.Iface newInstance(
92        THBaseService.Iface handler, ThriftMetrics metrics) {
93      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
94        new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
95    }
96  
97    private static class THBaseServiceMetricsProxy implements InvocationHandler {
98      private final THBaseService.Iface handler;
99      private final ThriftMetrics metrics;
100 
101     private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
102       this.handler = handler;
103       this.metrics = metrics;
104     }
105 
106     @Override
107     public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
108       Object result;
109       try {
110         long start = now();
111         result = m.invoke(handler, args);
112         int processTime = (int) (now() - start);
113         metrics.incMethodTime(m.getName(), processTime);
114       } catch (InvocationTargetException e) {
115         throw e.getTargetException();
116       } catch (Exception e) {
117         throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
118       }
119       return result;
120     }
121   }
122 
123   private static long now() {
124     return System.nanoTime();
125   }
126 
127   ThriftHBaseServiceHandler(final Configuration conf,
128       final UserProvider userProvider) throws IOException {
129     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
130     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
131     connectionCache = new ConnectionCache(
132       conf, userProvider, cleanInterval, maxIdleTime);
133   }
134 
135   private Table getTable(ByteBuffer tableName) {
136     try {
137       return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
138     } catch (IOException e) {
139       throw new RuntimeException(e);
140     }
141   }
142 
143   private void closeTable(Table table) throws TIOError {
144     try {
145       table.close();
146     } catch (IOException e) {
147       throw getTIOError(e);
148     }
149   }
150 
151   private TIOError getTIOError(IOException e) {
152     TIOError err = new TIOError();
153     err.setMessage(e.getMessage());
154     return err;
155   }
156 
157   /**
158    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
159    * @param scanner to add
160    * @return Id for this Scanner
161    */
162   private int addScanner(ResultScanner scanner) {
163     int id = nextScannerId.getAndIncrement();
164     scannerMap.put(id, scanner);
165     return id;
166   }
167 
168   /**
169    * Returns the Scanner associated with the specified Id.
170    * @param id of the Scanner to get
171    * @return a Scanner, or null if the Id is invalid
172    */
173   private ResultScanner getScanner(int id) {
174     return scannerMap.get(id);
175   }
176 
177   void setEffectiveUser(String effectiveUser) {
178     connectionCache.setEffectiveUser(effectiveUser);
179   }
180 
181   /**
182    * Removes the scanner associated with the specified ID from the internal HashMap.
183    * @param id of the Scanner to remove
184    * @return the removed Scanner, or <code>null</code> if the Id is invalid
185    */
186   protected ResultScanner removeScanner(int id) {
187     return scannerMap.remove(id);
188   }
189 
190   @Override
191   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
192     Table htable = getTable(table);
193     try {
194       return htable.exists(getFromThrift(get));
195     } catch (IOException e) {
196       throw getTIOError(e);
197     } finally {
198       closeTable(htable);
199     }
200   }
201 
202   @Override
203   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
204     Table htable = getTable(table);
205     try {
206       return resultFromHBase(htable.get(getFromThrift(get)));
207     } catch (IOException e) {
208       throw getTIOError(e);
209     } finally {
210       closeTable(htable);
211     }
212   }
213 
214   @Override
215   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
216     Table htable = getTable(table);
217     try {
218       return resultsFromHBase(htable.get(getsFromThrift(gets)));
219     } catch (IOException e) {
220       throw getTIOError(e);
221     } finally {
222       closeTable(htable);
223     }
224   }
225 
226   @Override
227   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
228     Table htable = getTable(table);
229     try {
230       htable.put(putFromThrift(put));
231     } catch (IOException e) {
232       throw getTIOError(e);
233     } finally {
234       closeTable(htable);
235     }
236   }
237 
238   @Override
239   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
240       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
241     Table htable = getTable(table);
242     try {
243       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
244         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
245         putFromThrift(put));
246     } catch (IOException e) {
247       throw getTIOError(e);
248     } finally {
249       closeTable(htable);
250     }
251   }
252 
253   @Override
254   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
255     Table htable = getTable(table);
256     try {
257       htable.put(putsFromThrift(puts));
258     } catch (IOException e) {
259       throw getTIOError(e);
260     } finally {
261       closeTable(htable);
262     }
263   }
264 
265   @Override
266   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
267     Table htable = getTable(table);
268     try {
269       htable.delete(deleteFromThrift(deleteSingle));
270     } catch (IOException e) {
271       throw getTIOError(e);
272     } finally {
273       closeTable(htable);
274     }
275   }
276 
277   @Override
278   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
279       TException {
280     Table htable = getTable(table);
281     try {
282       htable.delete(deletesFromThrift(deletes));
283     } catch (IOException e) {
284       throw getTIOError(e);
285     } finally {
286       closeTable(htable);
287     }
288     return Collections.emptyList();
289   }
290 
291   @Override
292   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
293       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
294     Table htable = getTable(table);
295 
296     try {
297       if (value == null) {
298         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
299           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
300       } else {
301         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
302           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
303           deleteFromThrift(deleteSingle));
304       }
305     } catch (IOException e) {
306       throw getTIOError(e);
307     } finally {
308       closeTable(htable);
309     }
310   }
311 
312   @Override
313   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
314     Table htable = getTable(table);
315     try {
316       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
317     } catch (IOException e) {
318       throw getTIOError(e);
319     } finally {
320       closeTable(htable);
321     }
322   }
323 
324   @Override
325   public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
326     Table htable = getTable(table);
327     try {
328       return resultFromHBase(htable.append(appendFromThrift(append)));
329     } catch (IOException e) {
330       throw getTIOError(e);
331     } finally {
332       closeTable(htable);
333     }
334   }
335 
336   @Override
337   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
338     Table htable = getTable(table);
339     ResultScanner resultScanner = null;
340     try {
341       resultScanner = htable.getScanner(scanFromThrift(scan));
342     } catch (IOException e) {
343       throw getTIOError(e);
344     } finally {
345       closeTable(htable);
346     }
347     return addScanner(resultScanner);
348   }
349 
350   @Override
351   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
352       TIllegalArgument, TException {
353     ResultScanner scanner = getScanner(scannerId);
354     if (scanner == null) {
355       TIllegalArgument ex = new TIllegalArgument();
356       ex.setMessage("Invalid scanner Id");
357       throw ex;
358     }
359 
360     try {
361       return resultsFromHBase(scanner.next(numRows));
362     } catch (IOException e) {
363       throw getTIOError(e);
364     }
365   }
366 
367   @Override
368   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
369       throws TIOError, TException {
370     Table htable = getTable(table);
371     List<TResult> results = null;
372     ResultScanner scanner = null;
373     try {
374       scanner = htable.getScanner(scanFromThrift(scan));
375       results = resultsFromHBase(scanner.next(numRows));
376     } catch (IOException e) {
377       throw getTIOError(e);
378     } finally {
379       if (scanner != null) {
380         scanner.close();
381       }
382       closeTable(htable);
383     }
384     return results;
385   }
386 
387   @Override
388   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
389     LOG.debug("scannerClose: id=" + scannerId);
390     ResultScanner scanner = getScanner(scannerId);
391     if (scanner == null) {
392       String message = "scanner ID is invalid";
393       LOG.warn(message);
394       TIllegalArgument ex = new TIllegalArgument();
395       ex.setMessage("Invalid scanner Id");
396       throw ex;
397     }
398     scanner.close();
399     removeScanner(scannerId);
400   }
401 
402   @Override
403   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
404     Table htable = getTable(table);
405     try {
406       htable.mutateRow(rowMutationsFromThrift(rowMutations));
407     } catch (IOException e) {
408       throw getTIOError(e);
409     } finally {
410       closeTable(htable);
411     }
412   }
413 }