001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2;
020
021import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
022import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.compareOpFromThrift;
023import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
024import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
025import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
026import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
027import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
028import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
029import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
030import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
031import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
032import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
033import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
034import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
035
036import java.io.IOException;
037import java.lang.reflect.InvocationHandler;
038import java.lang.reflect.InvocationTargetException;
039import java.lang.reflect.Method;
040import java.lang.reflect.Proxy;
041import java.nio.ByteBuffer;
042import java.util.ArrayList;
043import java.util.Collections;
044import java.util.List;
045import java.util.Map;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.atomic.AtomicInteger;
048
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.DoNotRetryIOException;
051import org.apache.hadoop.hbase.HRegionLocation;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.security.UserProvider;
056import org.apache.hadoop.hbase.thrift.ThriftMetrics;
057import org.apache.hadoop.hbase.thrift2.generated.TAppend;
058import org.apache.hadoop.hbase.thrift2.generated.TCompareOp;
059import org.apache.hadoop.hbase.thrift2.generated.TDelete;
060import org.apache.hadoop.hbase.thrift2.generated.TGet;
061import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
062import org.apache.hadoop.hbase.thrift2.generated.THRegionLocation;
063import org.apache.hadoop.hbase.thrift2.generated.TIOError;
064import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
065import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
066import org.apache.hadoop.hbase.thrift2.generated.TPut;
067import org.apache.hadoop.hbase.thrift2.generated.TResult;
068import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
069import org.apache.hadoop.hbase.thrift2.generated.TScan;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.ConnectionCache;
072import org.apache.thrift.TException;
073import org.apache.yetus.audience.InterfaceAudience;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/**
078 * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
079 * defined in the Table interface.
080 */
081@InterfaceAudience.Private
082@SuppressWarnings("deprecation")
083public class ThriftHBaseServiceHandler implements THBaseService.Iface {
084
085  // TODO: Size of pool configuraple
086  private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
087
088  // nextScannerId and scannerMap are used to manage scanner state
089  // TODO: Cleanup thread for Scanners, Scanner id wrap
090  private final AtomicInteger nextScannerId = new AtomicInteger(0);
091  private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>();
092
093  private final ConnectionCache connectionCache;
094
095  static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
096  static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
097
098  private static final IOException ioe
099      = new DoNotRetryIOException("Thrift Server is in Read-only mode.");
100  private boolean isReadOnly;
101
102  public static THBaseService.Iface newInstance(
103      THBaseService.Iface handler, ThriftMetrics metrics) {
104    return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
105      new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
106  }
107
108  private static final class THBaseServiceMetricsProxy implements InvocationHandler {
109    private final THBaseService.Iface handler;
110    private final ThriftMetrics metrics;
111
112    private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
113      this.handler = handler;
114      this.metrics = metrics;
115    }
116
117    @Override
118    public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
119      Object result;
120      long start = now();
121      try {
122        result = m.invoke(handler, args);
123      } catch (InvocationTargetException e) {
124        metrics.exception(e.getCause());
125        throw e.getTargetException();
126      } catch (Exception e) {
127        metrics.exception(e);
128        throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
129      } finally {
130        long processTime = now() - start;
131        metrics.incMethodTime(m.getName(), processTime);
132      }
133      return result;
134    }
135  }
136
137  private static class TIOErrorWithCause extends TIOError {
138    private Throwable cause;
139
140    public TIOErrorWithCause(Throwable cause) {
141      super();
142      this.cause = cause;
143    }
144
145    @Override
146    public synchronized Throwable getCause() {
147      return cause;
148    }
149
150    @Override
151    public boolean equals(Object other) {
152      if (super.equals(other) &&
153          other instanceof TIOErrorWithCause) {
154        Throwable otherCause = ((TIOErrorWithCause) other).getCause();
155        if (this.getCause() != null) {
156          return otherCause != null && this.getCause().equals(otherCause);
157        } else {
158          return otherCause == null;
159        }
160      }
161      return false;
162    }
163
164    @Override
165    public int hashCode() {
166      int result = super.hashCode();
167      result = 31 * result + (cause != null ? cause.hashCode() : 0);
168      return result;
169    }
170  }
171
172  private static long now() {
173    return System.nanoTime();
174  }
175
176  ThriftHBaseServiceHandler(final Configuration conf,
177      final UserProvider userProvider) throws IOException {
178    int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
179    int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
180    connectionCache = new ConnectionCache(
181      conf, userProvider, cleanInterval, maxIdleTime);
182    isReadOnly = conf.getBoolean("hbase.thrift.readonly", false);
183  }
184
185  private Table getTable(ByteBuffer tableName) {
186    try {
187      return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
188    } catch (IOException ie) {
189      throw new RuntimeException(ie);
190    }
191  }
192
193  private RegionLocator getLocator(ByteBuffer tableName) {
194    try {
195      return connectionCache.getRegionLocator(byteBufferToByteArray(tableName));
196    } catch (IOException ie) {
197      throw new RuntimeException(ie);
198    }
199  }
200
201  private void closeTable(Table table) throws TIOError {
202    try {
203      table.close();
204    } catch (IOException e) {
205      throw getTIOError(e);
206    }
207  }
208
209  private TIOError getTIOError(IOException e) {
210    TIOError err = new TIOErrorWithCause(e);
211    err.setMessage(e.getMessage());
212    return err;
213  }
214
215  /**
216   * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
217   * @param scanner to add
218   * @return Id for this Scanner
219   */
220  private int addScanner(ResultScanner scanner) {
221    int id = nextScannerId.getAndIncrement();
222    scannerMap.put(id, scanner);
223    return id;
224  }
225
226  /**
227   * Returns the Scanner associated with the specified Id.
228   * @param id of the Scanner to get
229   * @return a Scanner, or null if the Id is invalid
230   */
231  private ResultScanner getScanner(int id) {
232    return scannerMap.get(id);
233  }
234
235  void setEffectiveUser(String effectiveUser) {
236    connectionCache.setEffectiveUser(effectiveUser);
237  }
238
239  /**
240   * Removes the scanner associated with the specified ID from the internal HashMap.
241   * @param id of the Scanner to remove
242   * @return the removed Scanner, or <code>null</code> if the Id is invalid
243   */
244  protected ResultScanner removeScanner(int id) {
245    return scannerMap.remove(id);
246  }
247
248  @Override
249  public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
250    Table htable = getTable(table);
251    try {
252      return htable.exists(getFromThrift(get));
253    } catch (IOException e) {
254      throw getTIOError(e);
255    } finally {
256      closeTable(htable);
257    }
258  }
259
260  @Override
261  public List<Boolean> existsAll(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
262    Table htable = getTable(table);
263    try {
264      boolean[] exists = htable.existsAll(getsFromThrift(gets));
265      List<Boolean> result = new ArrayList<>(exists.length);
266      for (boolean exist : exists) {
267        result.add(exist);
268      }
269      return result;
270    } catch (IOException e) {
271      throw getTIOError(e);
272    } finally {
273      closeTable(htable);
274    }
275  }
276
277  @Override
278  public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
279    Table htable = getTable(table);
280    try {
281      return resultFromHBase(htable.get(getFromThrift(get)));
282    } catch (IOException e) {
283      throw getTIOError(e);
284    } finally {
285      closeTable(htable);
286    }
287  }
288
289  @Override
290  public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
291    Table htable = getTable(table);
292    try {
293      return resultsFromHBase(htable.get(getsFromThrift(gets)));
294    } catch (IOException e) {
295      throw getTIOError(e);
296    } finally {
297      closeTable(htable);
298    }
299  }
300
301  @Override
302  public void put(ByteBuffer table, TPut put) throws TIOError, TException {
303    checkReadOnlyMode();
304    Table htable = getTable(table);
305    try {
306      htable.put(putFromThrift(put));
307    } catch (IOException e) {
308      throw getTIOError(e);
309    } finally {
310      closeTable(htable);
311    }
312  }
313
314  @Override
315  public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
316      ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
317    checkReadOnlyMode();
318    Table htable = getTable(table);
319    try {
320      Table.CheckAndMutateBuilder builder = htable.checkAndMutate(byteBufferToByteArray(row),
321          byteBufferToByteArray(family)).qualifier(byteBufferToByteArray(qualifier));
322      if (value == null) {
323        return builder.ifNotExists().thenPut(putFromThrift(put));
324      } else {
325        return builder.ifEquals(byteBufferToByteArray(value)).thenPut(putFromThrift(put));
326      }
327    } catch (IOException e) {
328      throw getTIOError(e);
329    } finally {
330      closeTable(htable);
331    }
332  }
333
334  @Override
335  public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
336    checkReadOnlyMode();
337    Table htable = getTable(table);
338    try {
339      htable.put(putsFromThrift(puts));
340    } catch (IOException e) {
341      throw getTIOError(e);
342    } finally {
343      closeTable(htable);
344    }
345  }
346
347  @Override
348  public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
349    checkReadOnlyMode();
350    Table htable = getTable(table);
351    try {
352      htable.delete(deleteFromThrift(deleteSingle));
353    } catch (IOException e) {
354      throw getTIOError(e);
355    } finally {
356      closeTable(htable);
357    }
358  }
359
360  @Override
361  public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
362      TException {
363    checkReadOnlyMode();
364    Table htable = getTable(table);
365    try {
366      htable.delete(deletesFromThrift(deletes));
367    } catch (IOException e) {
368      throw getTIOError(e);
369    } finally {
370      closeTable(htable);
371    }
372    return Collections.emptyList();
373  }
374  
375  @Override
376  public boolean checkAndMutate(ByteBuffer table, ByteBuffer row, ByteBuffer family,
377      ByteBuffer qualifier, TCompareOp compareOp, ByteBuffer value, TRowMutations rowMutations)
378          throws TIOError, TException {
379    checkReadOnlyMode();
380    try (final Table htable = getTable(table)) {
381      return htable.checkAndMutate(byteBufferToByteArray(row), byteBufferToByteArray(family))
382          .qualifier(byteBufferToByteArray(qualifier))
383          .ifMatches(compareOpFromThrift(compareOp), byteBufferToByteArray(value))
384          .thenMutate(rowMutationsFromThrift(rowMutations));
385    } catch (IOException e) {
386      throw getTIOError(e);
387    }
388  }
389
390  @Override
391  public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
392      ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
393    checkReadOnlyMode();
394    Table htable = getTable(table);
395    try {
396      Table.CheckAndMutateBuilder mutateBuilder =
397          htable.checkAndMutate(byteBufferToByteArray(row), byteBufferToByteArray(family))
398              .qualifier(byteBufferToByteArray(qualifier));
399      if (value == null) {
400        return mutateBuilder.ifNotExists().thenDelete(deleteFromThrift(deleteSingle));
401      } else {
402        return mutateBuilder.ifEquals(byteBufferToByteArray(value))
403            .thenDelete(deleteFromThrift(deleteSingle));
404      }
405    } catch (IOException e) {
406      throw getTIOError(e);
407    } finally {
408      closeTable(htable);
409    }
410  }
411
412  @Override
413  public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
414    checkReadOnlyMode();
415    Table htable = getTable(table);
416    try {
417      return resultFromHBase(htable.increment(incrementFromThrift(increment)));
418    } catch (IOException e) {
419      throw getTIOError(e);
420    } finally {
421      closeTable(htable);
422    }
423  }
424
425  @Override
426  public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
427    checkReadOnlyMode();
428    Table htable = getTable(table);
429    try {
430      return resultFromHBase(htable.append(appendFromThrift(append)));
431    } catch (IOException e) {
432      throw getTIOError(e);
433    } finally {
434      closeTable(htable);
435    }
436  }
437
438  @Override
439  public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
440    Table htable = getTable(table);
441    ResultScanner resultScanner = null;
442    try {
443      resultScanner = htable.getScanner(scanFromThrift(scan));
444    } catch (IOException e) {
445      throw getTIOError(e);
446    } finally {
447      closeTable(htable);
448    }
449    return addScanner(resultScanner);
450  }
451
452  @Override
453  public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
454      TIllegalArgument, TException {
455    ResultScanner scanner = getScanner(scannerId);
456    if (scanner == null) {
457      TIllegalArgument ex = new TIllegalArgument();
458      ex.setMessage("Invalid scanner Id");
459      throw ex;
460    }
461    try {
462      connectionCache.updateConnectionAccessTime();
463      return resultsFromHBase(scanner.next(numRows));
464    } catch (IOException e) {
465      throw getTIOError(e);
466    }
467  }
468
469  @Override
470  public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
471      throws TIOError, TException {
472    Table htable = getTable(table);
473    List<TResult> results = null;
474    ResultScanner scanner = null;
475    try {
476      scanner = htable.getScanner(scanFromThrift(scan));
477      results = resultsFromHBase(scanner.next(numRows));
478    } catch (IOException e) {
479      throw getTIOError(e);
480    } finally {
481      if (scanner != null) {
482        scanner.close();
483      }
484      closeTable(htable);
485    }
486    return results;
487  }
488
489
490
491  @Override
492  public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
493    LOG.debug("scannerClose: id=" + scannerId);
494    ResultScanner scanner = getScanner(scannerId);
495    if (scanner == null) {
496      String message = "scanner ID is invalid";
497      LOG.warn(message);
498      TIllegalArgument ex = new TIllegalArgument();
499      ex.setMessage("Invalid scanner Id");
500      throw ex;
501    }
502    scanner.close();
503    removeScanner(scannerId);
504  }
505
506  @Override
507  public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
508    checkReadOnlyMode();
509    Table htable = getTable(table);
510    try {
511      htable.mutateRow(rowMutationsFromThrift(rowMutations));
512    } catch (IOException e) {
513      throw getTIOError(e);
514    } finally {
515      closeTable(htable);
516    }
517  }
518
519  @Override
520  public List<THRegionLocation> getAllRegionLocations(ByteBuffer table)
521      throws TIOError, TException {
522    RegionLocator locator = null;
523    try {
524      locator = getLocator(table);
525      return ThriftUtilities.regionLocationsFromHBase(locator.getAllRegionLocations());
526
527    } catch (IOException e) {
528      throw getTIOError(e);
529    } finally {
530      if (locator != null) {
531        try {
532          locator.close();
533        } catch (IOException e) {
534          LOG.warn("Couldn't close the locator.", e);
535        }
536      }
537    }
538  }
539
540  @Override
541  public THRegionLocation getRegionLocation(ByteBuffer table, ByteBuffer row, boolean reload)
542      throws TIOError, TException {
543
544    RegionLocator locator = null;
545    try {
546      locator = getLocator(table);
547      byte[] rowBytes = byteBufferToByteArray(row);
548      HRegionLocation hrl = locator.getRegionLocation(rowBytes, reload);
549      return ThriftUtilities.regionLocationFromHBase(hrl);
550
551    } catch (IOException e) {
552      throw getTIOError(e);
553    } finally {
554      if (locator != null) {
555        try {
556          locator.close();
557        } catch (IOException e) {
558          LOG.warn("Couldn't close the locator.", e);
559        }
560      }
561    }
562  }
563
564  private void checkReadOnlyMode() throws TIOError {
565    if (isReadOnly()) {
566      throw getTIOError(ioe);
567    }
568  }
569
570  private boolean isReadOnly() {
571    return isReadOnly;
572  }
573}