View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
22  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
23  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
24  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
25  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
26  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
27  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
28  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
29  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
30  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
31  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
32  import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
33  import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
34  
35  import java.io.IOException;
36  import java.lang.reflect.InvocationHandler;
37  import java.lang.reflect.InvocationTargetException;
38  import java.lang.reflect.Method;
39  import java.lang.reflect.Proxy;
40  import java.nio.ByteBuffer;
41  import java.util.Collections;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.atomic.AtomicInteger;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.client.ResultScanner;
52  import org.apache.hadoop.hbase.client.Table;
53  import org.apache.hadoop.hbase.security.UserProvider;
54  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
55  import org.apache.hadoop.hbase.thrift2.generated.TAppend;
56  import org.apache.hadoop.hbase.thrift2.generated.TDelete;
57  import org.apache.hadoop.hbase.thrift2.generated.TGet;
58  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
59  import org.apache.hadoop.hbase.thrift2.generated.TIOError;
60  import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
61  import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
62  import org.apache.hadoop.hbase.thrift2.generated.TPut;
63  import org.apache.hadoop.hbase.thrift2.generated.TResult;
64  import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
65  import org.apache.hadoop.hbase.thrift2.generated.TScan;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.ConnectionCache;
68  import org.apache.thrift.TException;
69  
70  /**
71   * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
72   * defined in the HTableInterface.
73   */
74  @InterfaceAudience.Private
75  @SuppressWarnings("deprecation")
76  public class ThriftHBaseServiceHandler implements THBaseService.Iface {
77  
78    // TODO: Size of pool configuraple
79    private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
80  
81    // nextScannerId and scannerMap are used to manage scanner state
82    // TODO: Cleanup thread for Scanners, Scanner id wrap
83    private final AtomicInteger nextScannerId = new AtomicInteger(0);
84    private final Map<Integer, ResultScanner> scannerMap =
85        new ConcurrentHashMap<Integer, ResultScanner>();
86  
87    private final ConnectionCache connectionCache;
88  
89    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
90    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
91  
92    public static THBaseService.Iface newInstance(
93        THBaseService.Iface handler, ThriftMetrics metrics) {
94      return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
95        new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
96    }
97  
98    private static final class THBaseServiceMetricsProxy implements InvocationHandler {
99      private final THBaseService.Iface handler;
100     private final ThriftMetrics metrics;
101 
102     private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
103       this.handler = handler;
104       this.metrics = metrics;
105     }
106 
107     @Override
108     public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
109       Object result;
110       try {
111         long start = now();
112         result = m.invoke(handler, args);
113         int processTime = (int) (now() - start);
114         metrics.incMethodTime(m.getName(), processTime);
115       } catch (InvocationTargetException e) {
116         throw e.getTargetException();
117       } catch (Exception e) {
118         throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
119       }
120       return result;
121     }
122   }
123 
124   private static long now() {
125     return System.nanoTime();
126   }
127 
128   ThriftHBaseServiceHandler(final Configuration conf,
129       final UserProvider userProvider) throws IOException {
130     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
131     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
132     connectionCache = new ConnectionCache(
133       conf, userProvider, cleanInterval, maxIdleTime);
134   }
135 
136   private Table getTable(ByteBuffer tableName) {
137     try {
138       return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
139     } catch (IOException ie) {
140       throw new RuntimeException(ie);
141     }
142   }
143 
144   private void closeTable(Table table) throws TIOError {
145     try {
146       table.close();
147     } catch (IOException e) {
148       throw getTIOError(e);
149     }
150   }
151 
152   private TIOError getTIOError(IOException e) {
153     TIOError err = new TIOError();
154     err.setMessage(e.getMessage());
155     return err;
156   }
157 
158   /**
159    * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
160    * @param scanner to add
161    * @return Id for this Scanner
162    */
163   private int addScanner(ResultScanner scanner) {
164     int id = nextScannerId.getAndIncrement();
165     scannerMap.put(id, scanner);
166     return id;
167   }
168 
169   /**
170    * Returns the Scanner associated with the specified Id.
171    * @param id of the Scanner to get
172    * @return a Scanner, or null if the Id is invalid
173    */
174   private ResultScanner getScanner(int id) {
175     return scannerMap.get(id);
176   }
177 
178   void setEffectiveUser(String effectiveUser) {
179     connectionCache.setEffectiveUser(effectiveUser);
180   }
181 
182   /**
183    * Removes the scanner associated with the specified ID from the internal HashMap.
184    * @param id of the Scanner to remove
185    * @return the removed Scanner, or <code>null</code> if the Id is invalid
186    */
187   protected ResultScanner removeScanner(int id) {
188     return scannerMap.remove(id);
189   }
190 
191   @Override
192   public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
193     Table htable = getTable(table);
194     try {
195       return htable.exists(getFromThrift(get));
196     } catch (IOException e) {
197       throw getTIOError(e);
198     } finally {
199       closeTable(htable);
200     }
201   }
202 
203   @Override
204   public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
205     Table htable = getTable(table);
206     try {
207       return resultFromHBase(htable.get(getFromThrift(get)));
208     } catch (IOException e) {
209       throw getTIOError(e);
210     } finally {
211       closeTable(htable);
212     }
213   }
214 
215   @Override
216   public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
217     Table htable = getTable(table);
218     try {
219       return resultsFromHBase(htable.get(getsFromThrift(gets)));
220     } catch (IOException e) {
221       throw getTIOError(e);
222     } finally {
223       closeTable(htable);
224     }
225   }
226 
227   @Override
228   public void put(ByteBuffer table, TPut put) throws TIOError, TException {
229     Table htable = getTable(table);
230     try {
231       htable.put(putFromThrift(put));
232     } catch (IOException e) {
233       throw getTIOError(e);
234     } finally {
235       closeTable(htable);
236     }
237   }
238 
239   @Override
240   public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
241       ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
242     Table htable = getTable(table);
243     try {
244       return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
245         byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
246         putFromThrift(put));
247     } catch (IOException e) {
248       throw getTIOError(e);
249     } finally {
250       closeTable(htable);
251     }
252   }
253 
254   @Override
255   public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
256     Table htable = getTable(table);
257     try {
258       htable.put(putsFromThrift(puts));
259     } catch (IOException e) {
260       throw getTIOError(e);
261     } finally {
262       closeTable(htable);
263     }
264   }
265 
266   @Override
267   public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
268     Table htable = getTable(table);
269     try {
270       htable.delete(deleteFromThrift(deleteSingle));
271     } catch (IOException e) {
272       throw getTIOError(e);
273     } finally {
274       closeTable(htable);
275     }
276   }
277 
278   @Override
279   public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
280       TException {
281     Table htable = getTable(table);
282     try {
283       htable.delete(deletesFromThrift(deletes));
284     } catch (IOException e) {
285       throw getTIOError(e);
286     } finally {
287       closeTable(htable);
288     }
289     return Collections.emptyList();
290   }
291 
292   @Override
293   public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
294       ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
295     Table htable = getTable(table);
296 
297     try {
298       if (value == null) {
299         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
300           byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
301       } else {
302         return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
303           byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
304           deleteFromThrift(deleteSingle));
305       }
306     } catch (IOException e) {
307       throw getTIOError(e);
308     } finally {
309       closeTable(htable);
310     }
311   }
312 
313   @Override
314   public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
315     Table htable = getTable(table);
316     try {
317       return resultFromHBase(htable.increment(incrementFromThrift(increment)));
318     } catch (IOException e) {
319       throw getTIOError(e);
320     } finally {
321       closeTable(htable);
322     }
323   }
324 
325   @Override
326   public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
327     Table htable = getTable(table);
328     try {
329       return resultFromHBase(htable.append(appendFromThrift(append)));
330     } catch (IOException e) {
331       throw getTIOError(e);
332     } finally {
333       closeTable(htable);
334     }
335   }
336 
337   @Override
338   public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
339     Table htable = getTable(table);
340     ResultScanner resultScanner = null;
341     try {
342       resultScanner = htable.getScanner(scanFromThrift(scan));
343     } catch (IOException e) {
344       throw getTIOError(e);
345     } finally {
346       closeTable(htable);
347     }
348     return addScanner(resultScanner);
349   }
350 
351   @Override
352   public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
353       TIllegalArgument, TException {
354     ResultScanner scanner = getScanner(scannerId);
355     if (scanner == null) {
356       TIllegalArgument ex = new TIllegalArgument();
357       ex.setMessage("Invalid scanner Id");
358       throw ex;
359     }
360 
361     try {
362       return resultsFromHBase(scanner.next(numRows));
363     } catch (IOException e) {
364       throw getTIOError(e);
365     }
366   }
367 
368   @Override
369   public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
370       throws TIOError, TException {
371     Table htable = getTable(table);
372     List<TResult> results = null;
373     ResultScanner scanner = null;
374     try {
375       scanner = htable.getScanner(scanFromThrift(scan));
376       results = resultsFromHBase(scanner.next(numRows));
377     } catch (IOException e) {
378       throw getTIOError(e);
379     } finally {
380       if (scanner != null) {
381         scanner.close();
382       }
383       closeTable(htable);
384     }
385     return results;
386   }
387 
388   @Override
389   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
390     LOG.debug("scannerClose: id=" + scannerId);
391     ResultScanner scanner = getScanner(scannerId);
392     if (scanner == null) {
393       String message = "scanner ID is invalid";
394       LOG.warn(message);
395       TIllegalArgument ex = new TIllegalArgument();
396       ex.setMessage("Invalid scanner Id");
397       throw ex;
398     }
399     scanner.close();
400     removeScanner(scannerId);
401   }
402 
403   @Override
404   public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
405     Table htable = getTable(table);
406     try {
407       htable.mutateRow(rowMutationsFromThrift(rowMutations));
408     } catch (IOException e) {
409       throw getTIOError(e);
410     } finally {
411       closeTable(htable);
412     }
413   }
414 
415 }