View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.thrift2;
21  
22  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
23  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromHBase;
24  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
25  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
26  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
27  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
28  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
29  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
30  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
31  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
32  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
33  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
34  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
35  
36  import java.io.IOException;
37  import java.lang.reflect.InvocationHandler;
38  import java.lang.reflect.InvocationTargetException;
39  import java.lang.reflect.Method;
40  import java.lang.reflect.Proxy;
41  import java.nio.ByteBuffer;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.atomic.AtomicInteger;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.hbase.client.Delete;
51  import org.apache.hadoop.hbase.client.HTableInterface;
52  import org.apache.hadoop.hbase.client.HTablePool;
53  import org.apache.hadoop.hbase.client.ResultScanner;
54  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
55  import org.apache.hadoop.hbase.thrift2.generated.TDelete;
56  import org.apache.hadoop.hbase.thrift2.generated.TGet;
57  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
58  import org.apache.hadoop.hbase.thrift2.generated.TIOError;
59  import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
60  import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
61  import org.apache.hadoop.hbase.thrift2.generated.TPut;
62  import org.apache.hadoop.hbase.thrift2.generated.TResult;
63  import org.apache.hadoop.hbase.thrift2.generated.TScan;
64  import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
65  import org.apache.thrift.TException;
66  
67  /**
68   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
69   * defined in the HTableInterface.
70   */
71  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
72  
73    // TODO: Size of pool configuraple
74    private final HTablePool htablePool;
75    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
76  
77    // nextScannerId and scannerMap are used to manage scanner state
78    // TODO: Cleanup thread for Scanners, Scanner id wrap
79    private final AtomicInteger nextScannerId = new AtomicInteger(0);
80    private final Map<Integer, ResultScanner> scannerMap =
81        new ConcurrentHashMap<Integer, ResultScanner>();
82  
83    public static THBaseService.Iface newInstance(Configuration conf, ThriftMetrics metrics) {
84      THBaseService.Iface handler = new ThriftHBaseServiceHandler(conf);
85      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
86        new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
87    }
88  
89    private static class THBaseServiceMetricsProxy implements InvocationHandler {
90      private final THBaseService.Iface handler;
91      private final ThriftMetrics metrics;
92  
93      private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
94        this.handler = handler;
95        this.metrics = metrics;
96      }
97  
98      @Override
99      public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
100       Object result;
101       try {
102         long start = now();
103         result = m.invoke(handler, args);
104         int processTime = (int) (now() - start);
105         metrics.incMethodTime(m.getName(), processTime);
106       } catch (InvocationTargetException e) {
107         throw e.getTargetException();
108       } catch (Exception e) {
109         throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
110       }
111       return result;
112     }
113   }
114 
115   private static long now() {
116     return System.nanoTime();
117   }
118 
119   ThriftHBaseServiceHandler(Configuration conf) {
120     int maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000);
121     htablePool = new HTablePool(conf, maxPoolSize);
122   }
123 
124   private HTableInterface getTable(ByteBuffer tableName) {
125     return htablePool.getTable(byteBufferToByteArray(tableName));
126   }
127 
128   private void closeTable(HTableInterface table) throws TIOError {
129     try {
130       table.close();
131     } catch (IOException e) {
132       throw getTIOError(e);
133     }
134   }
135 
136   private TIOError getTIOError(IOException e) {
137     TIOError err = new TIOError();
138     err.setMessage(e.getMessage());
139     return err;
140   }
141 
142   /**
143    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
144    * @param scanner to add
145    * @return Id for this Scanner
146    */
147   private int addScanner(ResultScanner scanner) {
148     int id = nextScannerId.getAndIncrement();
149     scannerMap.put(id, scanner);
150     return id;
151   }
152 
153   /**
154    * Returns the Scanner associated with the specified Id.
155    * @param id of the Scanner to get
156    * @return a Scanner, or null if the Id is invalid
157    */
158   private ResultScanner getScanner(int id) {
159     return scannerMap.get(id);
160   }
161 
162   /**
163    * Removes the scanner associated with the specified ID from the internal HashMap.
164    * @param id of the Scanner to remove
165    * @return the removed Scanner, or <code>null</code> if the Id is invalid
166    */
167   protected ResultScanner removeScanner(int id) {
168     return scannerMap.remove(id);
169   }
170 
171   @Override
172   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
173     HTableInterface htable = getTable(table);
174     try {
175       return htable.exists(getFromThrift(get));
176     } catch (IOException e) {
177       throw getTIOError(e);
178     } finally {
179       closeTable(htable);
180     }
181   }
182 
183   @Override
184   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
185     HTableInterface htable = getTable(table);
186     try {
187       return resultFromHBase(htable.get(getFromThrift(get)));
188     } catch (IOException e) {
189       throw getTIOError(e);
190     } finally {
191       closeTable(htable);
192     }
193   }
194 
195   @Override
196   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
197     HTableInterface htable = getTable(table);
198     try {
199       return resultsFromHBase(htable.get(getsFromThrift(gets)));
200     } catch (IOException e) {
201       throw getTIOError(e);
202     } finally {
203       closeTable(htable);
204     }
205   }
206 
207   @Override
208   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
209     HTableInterface htable = getTable(table);
210     try {
211       htable.put(putFromThrift(put));
212     } catch (IOException e) {
213       throw getTIOError(e);
214     } finally {
215       closeTable(htable);
216     }
217   }
218 
219   @Override
220   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
221       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
222     HTableInterface htable = getTable(table);
223     try {
224       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
225         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
226         putFromThrift(put));
227     } catch (IOException e) {
228       throw getTIOError(e);
229     } finally {
230       closeTable(htable);
231     }
232   }
233 
234   @Override
235   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
236     HTableInterface htable = getTable(table);
237     try {
238       htable.put(putsFromThrift(puts));
239     } catch (IOException e) {
240       throw getTIOError(e);
241     } finally {
242       closeTable(htable);
243     }
244   }
245 
246   @Override
247   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
248     HTableInterface htable = getTable(table);
249     try {
250       htable.delete(deleteFromThrift(deleteSingle));
251     } catch (IOException e) {
252       throw getTIOError(e);
253     } finally {
254       closeTable(htable);
255     }
256   }
257 
258   @Override
259   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
260       TException {
261     HTableInterface htable = getTable(table);
262     List<Delete> tempDeletes = deletesFromThrift(deletes);
263     try {
264       htable.delete(tempDeletes);
265     } catch (IOException e) {
266       throw getTIOError(e);
267     } finally {
268       closeTable(htable);
269     }
270     return deletesFromHBase(tempDeletes);
271   }
272 
273   @Override
274   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
275       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
276     HTableInterface htable = getTable(table);
277 
278     try {
279       if (value == null) {
280         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
281           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
282       } else {
283         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
284           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
285           deleteFromThrift(deleteSingle));
286       }
287     } catch (IOException e) {
288       throw getTIOError(e);
289     } finally {
290       closeTable(htable);
291     }
292   }
293 
294   @Override
295   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
296     HTableInterface htable = getTable(table);
297     try {
298       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
299     } catch (IOException e) {
300       throw getTIOError(e);
301     } finally {
302       closeTable(htable);
303     }
304   }
305 
306   @Override
307   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
308     HTableInterface htable = getTable(table);
309     ResultScanner resultScanner = null;
310     try {
311       resultScanner = htable.getScanner(scanFromThrift(scan));
312     } catch (IOException e) {
313       throw getTIOError(e);
314     } finally {
315       closeTable(htable);
316     }
317     return addScanner(resultScanner);
318   }
319 
320   @Override
321   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
322       TIllegalArgument, TException {
323     ResultScanner scanner = getScanner(scannerId);
324     if (scanner == null) {
325       TIllegalArgument ex = new TIllegalArgument();
326       ex.setMessage("Invalid scanner Id");
327       throw ex;
328     }
329 
330     try {
331       return resultsFromHBase(scanner.next(numRows));
332     } catch (IOException e) {
333       throw getTIOError(e);
334     }
335   }
336 
337   @Override
338   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
339     LOG.debug("scannerClose: id=" + scannerId);
340     ResultScanner scanner = getScanner(scannerId);
341     if (scanner == null) {
342       String message = "scanner ID is invalid";
343       LOG.warn(message);
344       TIllegalArgument ex = new TIllegalArgument();
345       ex.setMessage("Invalid scanner Id");
346       throw ex;
347     }
348     scanner.close();
349     removeScanner(scannerId);
350   }
351 
352   @Override
353   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
354       throws TIOError, TException {
355     HTableInterface htable = getTable(table);
356     List<TResult> results = null;
357     ResultScanner scanner = null;
358     try {
359       scanner = htable.getScanner(scanFromThrift(scan));
360       results = resultsFromHBase(scanner.next(numRows));
361     } catch (IOException e) {
362       throw getTIOError(e);
363     } finally {
364       if (scanner != null) {
365         scanner.close();
366       }
367       closeTable(htable);
368     }
369     return results;
370   }
371 
372   @Override
373   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
374     HTableInterface htable = getTable(table);
375     try {
376       htable.mutateRow(rowMutationsFromThrift(rowMutations));
377     } catch (IOException e) {
378       throw getTIOError(e);
379     } finally {
380       closeTable(htable);
381     }
382   }
383 }