View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*;
22  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
23  
24  import java.io.IOException;
25  import java.lang.reflect.InvocationHandler;
26  import java.lang.reflect.InvocationTargetException;
27  import java.lang.reflect.Method;
28  import java.lang.reflect.Proxy;
29  import java.nio.ByteBuffer;
30  import java.util.Collections;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.client.HTableInterface;
41  import org.apache.hadoop.hbase.client.HTablePool;
42  import org.apache.hadoop.hbase.client.ResultScanner;
43  import org.apache.hadoop.hbase.client.RowMutations;
44  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
45  import org.apache.hadoop.hbase.thrift2.generated.*;
46  import org.apache.thrift.TException;
47  
48  /**
49   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
50   * defined in the HTableInterface.
51   */
52  @InterfaceAudience.Private
53  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
54  
55    // TODO: Size of pool configuraple
56    private final HTablePool htablePool;
57    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
58  
59    // nextScannerId and scannerMap are used to manage scanner state
60    // TODO: Cleanup thread for Scanners, Scanner id wrap
61    private final AtomicInteger nextScannerId = new AtomicInteger(0);
62    private final Map<Integer, ResultScanner> scannerMap =
63        new ConcurrentHashMap<Integer, ResultScanner>();
64  
65    public static THBaseService.Iface newInstance(Configuration conf, ThriftMetrics metrics) {
66      THBaseService.Iface handler = new ThriftHBaseServiceHandler(conf);
67      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
68        new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
69    }
70  
71    private static class THBaseServiceMetricsProxy implements InvocationHandler {
72      private final THBaseService.Iface handler;
73      private final ThriftMetrics metrics;
74  
75      private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
76        this.handler = handler;
77        this.metrics = metrics;
78      }
79  
80      @Override
81      public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
82        Object result;
83        try {
84          long start = now();
85          result = m.invoke(handler, args);
86          int processTime = (int) (now() - start);
87          metrics.incMethodTime(m.getName(), processTime);
88        } catch (InvocationTargetException e) {
89          throw e.getTargetException();
90        } catch (Exception e) {
91          throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
92        }
93        return result;
94      }
95    }
96  
97    private static long now() {
98      return System.nanoTime();
99    }
100 
101   ThriftHBaseServiceHandler(Configuration conf) {
102     int maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000);
103     htablePool = new HTablePool(conf, maxPoolSize);
104   }
105 
106   private HTableInterface getTable(ByteBuffer tableName) {
107     return htablePool.getTable(byteBufferToByteArray(tableName));
108   }
109 
110   private void closeTable(HTableInterface table) throws TIOError {
111     try {
112       table.close();
113     } catch (IOException e) {
114       throw getTIOError(e);
115     }
116   }
117 
118   private TIOError getTIOError(IOException e) {
119     TIOError err = new TIOError();
120     err.setMessage(e.getMessage());
121     return err;
122   }
123 
124   /**
125    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
126    * @param scanner to add
127    * @return Id for this Scanner
128    */
129   private int addScanner(ResultScanner scanner) {
130     int id = nextScannerId.getAndIncrement();
131     scannerMap.put(id, scanner);
132     return id;
133   }
134 
135   /**
136    * Returns the Scanner associated with the specified Id.
137    * @param id of the Scanner to get
138    * @return a Scanner, or null if the Id is invalid
139    */
140   private ResultScanner getScanner(int id) {
141     return scannerMap.get(id);
142   }
143 
144   /**
145    * Removes the scanner associated with the specified ID from the internal HashMap.
146    * @param id of the Scanner to remove
147    * @return the removed Scanner, or <code>null</code> if the Id is invalid
148    */
149   protected ResultScanner removeScanner(int id) {
150     return scannerMap.remove(id);
151   }
152 
153   @Override
154   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
155     HTableInterface htable = getTable(table);
156     try {
157       return htable.exists(getFromThrift(get));
158     } catch (IOException e) {
159       throw getTIOError(e);
160     } finally {
161       closeTable(htable);
162     }
163   }
164 
165   @Override
166   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
167     HTableInterface htable = getTable(table);
168     try {
169       return resultFromHBase(htable.get(getFromThrift(get)));
170     } catch (IOException e) {
171       throw getTIOError(e);
172     } finally {
173       closeTable(htable);
174     }
175   }
176 
177   @Override
178   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
179     HTableInterface htable = getTable(table);
180     try {
181       return resultsFromHBase(htable.get(getsFromThrift(gets)));
182     } catch (IOException e) {
183       throw getTIOError(e);
184     } finally {
185       closeTable(htable);
186     }
187   }
188 
189   @Override
190   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
191     HTableInterface htable = getTable(table);
192     try {
193       htable.put(putFromThrift(put));
194     } catch (IOException e) {
195       throw getTIOError(e);
196     } finally {
197       closeTable(htable);
198     }
199   }
200 
201   @Override
202   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
203       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
204     HTableInterface htable = getTable(table);
205     try {
206       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
207         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
208         putFromThrift(put));
209     } catch (IOException e) {
210       throw getTIOError(e);
211     } finally {
212       closeTable(htable);
213     }
214   }
215 
216   @Override
217   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
218     HTableInterface htable = getTable(table);
219     try {
220       htable.put(putsFromThrift(puts));
221     } catch (IOException e) {
222       throw getTIOError(e);
223     } finally {
224       closeTable(htable);
225     }
226   }
227 
228   @Override
229   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
230     HTableInterface htable = getTable(table);
231     try {
232       htable.delete(deleteFromThrift(deleteSingle));
233     } catch (IOException e) {
234       throw getTIOError(e);
235     } finally {
236       closeTable(htable);
237     }
238   }
239 
240   @Override
241   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
242       TException {
243     HTableInterface htable = getTable(table);
244     try {
245       htable.delete(deletesFromThrift(deletes));
246     } catch (IOException e) {
247       throw getTIOError(e);
248     } finally {
249       closeTable(htable);
250     }
251     return Collections.emptyList();
252   }
253 
254   @Override
255   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
256       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
257     HTableInterface htable = getTable(table);
258 
259     try {
260       if (value == null) {
261         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
262           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
263       } else {
264         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
265           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
266           deleteFromThrift(deleteSingle));
267       }
268     } catch (IOException e) {
269       throw getTIOError(e);
270     } finally {
271       closeTable(htable);
272     }
273   }
274 
275   @Override
276   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
277     HTableInterface htable = getTable(table);
278     try {
279       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
280     } catch (IOException e) {
281       throw getTIOError(e);
282     } finally {
283       closeTable(htable);
284     }
285   }
286 
287   @Override
288   public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
289     HTableInterface htable = getTable(table);
290     try {
291       return resultFromHBase(htable.append(appendFromThrift(append)));
292     } catch (IOException e) {
293       throw getTIOError(e);
294     } finally {
295       closeTable(htable);
296     }
297   }
298 
299   @Override
300   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
301     HTableInterface htable = getTable(table);
302     ResultScanner resultScanner = null;
303     try {
304       resultScanner = htable.getScanner(scanFromThrift(scan));
305     } catch (IOException e) {
306       throw getTIOError(e);
307     } finally {
308       closeTable(htable);
309     }
310     return addScanner(resultScanner);
311   }
312 
313   @Override
314   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
315       TIllegalArgument, TException {
316     ResultScanner scanner = getScanner(scannerId);
317     if (scanner == null) {
318       TIllegalArgument ex = new TIllegalArgument();
319       ex.setMessage("Invalid scanner Id");
320       throw ex;
321     }
322 
323     try {
324       return resultsFromHBase(scanner.next(numRows));
325     } catch (IOException e) {
326       throw getTIOError(e);
327     }
328   }
329 
330   @Override
331   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
332       throws TIOError, TException {
333     HTableInterface htable = getTable(table);
334     List<TResult> results = null;
335     ResultScanner scanner = null;
336     try {
337       scanner = htable.getScanner(scanFromThrift(scan));
338       results = resultsFromHBase(scanner.next(numRows));
339     } catch (IOException e) {
340       throw getTIOError(e);
341     } finally {
342       if (scanner != null) {
343         scanner.close();
344       }
345       closeTable(htable);
346     }
347     return results;
348   }
349 
350   @Override
351   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
352     LOG.debug("scannerClose: id=" + scannerId);
353     ResultScanner scanner = getScanner(scannerId);
354     if (scanner == null) {
355       String message = "scanner ID is invalid";
356       LOG.warn(message);
357       TIllegalArgument ex = new TIllegalArgument();
358       ex.setMessage("Invalid scanner Id");
359       throw ex;
360     }
361     scanner.close();
362     removeScanner(scannerId);
363   }
364 
365   @Override
366   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
367     HTableInterface htable = getTable(table);
368     try {
369       htable.mutateRow(rowMutationsFromThrift(rowMutations));
370     } catch (IOException e) {
371       throw getTIOError(e);
372     } finally {
373       closeTable(htable);
374     }
375   }
376 
377 }