001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2.client;
020
021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING;
022import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayDeque;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Queue;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.lang3.NotImplementedException;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.CheckAndMutate;
040import org.apache.hadoop.hbase.client.CheckAndMutateResult;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Increment;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionLocator;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Row;
049import org.apache.hadoop.hbase.client.RowMutations;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.coprocessor.Batch;
054import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
055import org.apache.hadoop.hbase.filter.Filter;
056import org.apache.hadoop.hbase.io.TimeRange;
057import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
058import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
059import org.apache.hadoop.hbase.thrift2.generated.TAppend;
060import org.apache.hadoop.hbase.thrift2.generated.TDelete;
061import org.apache.hadoop.hbase.thrift2.generated.TGet;
062import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
063import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
064import org.apache.hadoop.hbase.thrift2.generated.TPut;
065import org.apache.hadoop.hbase.thrift2.generated.TResult;
066import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
067import org.apache.hadoop.hbase.thrift2.generated.TScan;
068import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.thrift.TException;
071import org.apache.thrift.transport.TTransport;
072import org.apache.yetus.audience.InterfaceAudience;
073
074import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
075import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
076
077@InterfaceAudience.Private
078public class ThriftTable implements Table {
079
080  private TableName tableName;
081  private Configuration conf;
082  private TTransport tTransport;
083  private THBaseService.Client client;
084  private ByteBuffer tableNameInBytes;
085  private int operationTimeout;
086
087  private final int scannerCaching;
088
089  public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport,
090      Configuration conf) {
091    this.tableName = tableName;
092    this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes());
093    this.conf = conf;
094    this.tTransport = tTransport;
095    this.client = client;
096    this.scannerCaching = conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING,
097        HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT);
098    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
099        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
100
101
102  }
103
104  @Override
105  public TableName getName() {
106    return tableName;
107  }
108
109  @Override
110  public Configuration getConfiguration() {
111    return conf;
112  }
113
114  @Override
115  public TableDescriptor getDescriptor() throws IOException {
116    try {
117      TTableDescriptor tableDescriptor = client
118          .getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName));
119      return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor);
120    } catch (TException e) {
121      throw new IOException(e);
122    }
123  }
124
125  @Override
126  public boolean exists(Get get) throws IOException {
127    TGet tGet = ThriftUtilities.getFromHBase(get);
128    try {
129      return client.exists(tableNameInBytes, tGet);
130    }  catch (TException e) {
131      throw new IOException(e);
132    }
133  }
134
135  @Override
136  public boolean[] exists(List<Get> gets) throws IOException {
137    List<TGet> tGets = new ArrayList<>();
138    for (Get get: gets) {
139      tGets.add(ThriftUtilities.getFromHBase(get));
140    }
141    try {
142      List<Boolean> results = client.existsAll(tableNameInBytes, tGets);
143      return Booleans.toArray(results);
144    }  catch (TException e) {
145      throw new IOException(e);
146    }
147  }
148
149  @Override
150  public void batch(List<? extends Row> actions, Object[] results)
151      throws IOException {
152    throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), "
153        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
154
155
156  }
157
158  @Override
159  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
160      Batch.Callback<R> callback) throws IOException {
161    throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), "
162        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
163  }
164
165  @Override
166  public Result get(Get get) throws IOException {
167    TGet tGet = ThriftUtilities.getFromHBase(get);
168    try {
169      TResult tResult = client.get(tableNameInBytes, tGet);
170      return ThriftUtilities.resultFromThrift(tResult);
171    }  catch (TException e) {
172      throw new IOException(e);
173    }
174  }
175
176  @Override
177  public Result[] get(List<Get> gets) throws IOException {
178    List<TGet> tGets = ThriftUtilities.getsFromHBase(gets);
179    try {
180      List<TResult> results = client.getMultiple(tableNameInBytes, tGets);
181      return ThriftUtilities.resultsFromThrift(results);
182    }  catch (TException e) {
183      throw new IOException(e);
184    }
185  }
186
187  /**
188   * A scanner to perform scan from thrift server
189   * getScannerResults is used in this scanner
190   */
191  private class Scanner implements ResultScanner {
192    protected TScan scan;
193    protected Result lastResult = null;
194    protected final Queue<Result> cache = new ArrayDeque<>();;
195
196
197    public Scanner(Scan scan) throws IOException {
198      if (scan.getBatch() > 0) {
199        throw new IOException("Batch is not supported in Scanner");
200      }
201      if (scan.getCaching() <= 0) {
202        scan.setCaching(scannerCaching);
203      } else if (scan.getCaching() == 1 && scan.isReversed()){
204        // for reverse scan, we need to pass the last row to the next scanner
205        // we need caching number bigger than 1
206        scan.setCaching(scan.getCaching() + 1);
207      }
208      this.scan = ThriftUtilities.scanFromHBase(scan);
209    }
210
211
212    @Override
213    public Result next() throws IOException {
214      if (cache.size() == 0) {
215        setupNextScanner();
216        try {
217          List<TResult> tResults = client
218              .getScannerResults(tableNameInBytes, scan, scan.getCaching());
219          Result[] results = ThriftUtilities.resultsFromThrift(tResults);
220          boolean firstKey = true;
221          for (Result result : results) {
222            // If it is a reverse scan, we use the last result's key as the startkey, since there is
223            // no way to construct a closet rowkey smaller than the last result
224            // So when the results return, we must rule out the first result, since it has already
225            // returned to user.
226            if (firstKey) {
227              firstKey = false;
228              if (scan.isReversed() && lastResult != null) {
229                if (Bytes.equals(lastResult.getRow(), result.getRow())) {
230                  continue;
231                }
232              }
233            }
234            cache.add(result);
235            lastResult = result;
236          }
237        } catch (TException e) {
238          throw new IOException(e);
239        }
240      }
241
242      if (cache.size() > 0) {
243        return cache.poll();
244      } else {
245        //scan finished
246        return null;
247      }
248    }
249
250    @Override
251    public void close() {
252    }
253
254    @Override
255    public boolean renewLease() {
256      throw new RuntimeException("renewLease() not supported");
257    }
258
259    @Override
260    public ScanMetrics getScanMetrics() {
261      throw new RuntimeException("getScanMetrics() not supported");
262    }
263
264    private void setupNextScanner() {
265      //if lastResult is null null, it means it is not the fist scan
266      if (lastResult!= null) {
267        byte[] lastRow = lastResult.getRow();
268        if (scan.isReversed()) {
269          //for reverse scan, we can't find the closet row before this row
270          scan.setStartRow(lastRow);
271        } else {
272          scan.setStartRow(createClosestRowAfter(lastRow));
273        }
274      }
275    }
276
277
278    /**
279     * Create the closest row after the specified row
280     */
281    protected byte[] createClosestRowAfter(byte[] row) {
282      if (row == null) {
283        throw new RuntimeException("The passed row is null");
284      }
285      return Arrays.copyOf(row, row.length + 1);
286    }
287  }
288
289  @Override
290  public ResultScanner getScanner(Scan scan) throws IOException {
291    return new Scanner(scan);
292  }
293
294  @Override
295  public ResultScanner getScanner(byte[] family) throws IOException {
296    Scan scan = new Scan();
297    scan.addFamily(family);
298    return getScanner(scan);
299  }
300
301  @Override
302  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
303    Scan scan = new Scan();
304    scan.addColumn(family, qualifier);
305    return getScanner(scan);
306  }
307
308  @Override
309  public void put(Put put) throws IOException {
310    TPut tPut = ThriftUtilities.putFromHBase(put);
311    try {
312      client.put(tableNameInBytes, tPut);
313    }  catch (TException e) {
314      throw new IOException(e);
315    }
316  }
317
318  @Override
319  public void put(List<Put> puts) throws IOException {
320    List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts);
321    try {
322      client.putMultiple(tableNameInBytes, tPuts);
323    }  catch (TException e) {
324      throw new IOException(e);
325    }
326  }
327
328  @Override
329  public void delete(Delete delete) throws IOException {
330    TDelete tDelete = ThriftUtilities.deleteFromHBase(delete);
331    try {
332      client.deleteSingle(tableNameInBytes, tDelete);
333    }  catch (TException e) {
334      throw new IOException(e);
335    }
336  }
337
338  @Override
339  public void delete(List<Delete> deletes) throws IOException {
340    List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes);
341    try {
342      client.deleteMultiple(tableNameInBytes, tDeletes);
343    }  catch (TException e) {
344      throw new IOException(e);
345    }
346  }
347
348  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
349
350    private final byte[] row;
351    private final byte[] family;
352    private byte[] qualifier;
353    private CompareOperator op;
354    private byte[] value;
355
356    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
357      this.row = Preconditions.checkNotNull(row, "row is null");
358      this.family = Preconditions.checkNotNull(family, "family is null");
359    }
360
361    @Override
362    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
363      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
364          " an empty byte array, or just do not call this method if you want a null qualifier");
365      return this;
366    }
367
368    @Override
369    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
370      throw new NotImplementedException("timeRange not supported in ThriftTable");
371    }
372
373    @Override
374    public CheckAndMutateBuilder ifNotExists() {
375      this.op = CompareOperator.EQUAL;
376      this.value = null;
377      return this;
378    }
379
380    @Override
381    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
382      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
383      this.value = Preconditions.checkNotNull(value, "value is null");
384      return this;
385    }
386
387    private void preCheck() {
388      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
389          " calling ifNotExists/ifEquals/ifMatches before executing the request");
390    }
391
392    @Override
393    public boolean thenPut(Put put) throws IOException {
394      preCheck();
395      RowMutations rowMutations = new RowMutations(put.getRow());
396      rowMutations.add(put);
397      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
398    }
399
400    @Override
401    public boolean thenDelete(Delete delete) throws IOException {
402      preCheck();
403      RowMutations rowMutations = new RowMutations(delete.getRow());
404      rowMutations.add(delete);
405      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
406    }
407
408    @Override
409    public boolean thenMutate(RowMutations mutation) throws IOException {
410      preCheck();
411      return checkAndMutate(row, family, qualifier, op, value, mutation);
412    }
413  }
414
415
416  @Override
417  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
418      byte[] value, RowMutations mutation) throws IOException {
419    try {
420      ByteBuffer valueBuffer = value == null? null : ByteBuffer.wrap(value);
421      return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family),
422          ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer,
423          ThriftUtilities.rowMutationsFromHBase(mutation));
424    } catch (TException e) {
425      throw new IOException(e);
426    }
427  }
428
429  @Override
430  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
431    return new CheckAndMutateBuilderImpl(row, family);
432  }
433
434  @Override
435  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
436    throw new NotImplementedException("Implement later");
437  }
438
439  @Override
440  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) {
441    throw new NotImplementedException("Implement later");
442  }
443
444  @Override
445  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
446    throw new NotImplementedException("Implement later");
447  }
448
449  @Override
450  public Result mutateRow(RowMutations rm) throws IOException {
451    TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
452    try {
453      client.mutateRow(tableNameInBytes, tRowMutations);
454      return Result.EMPTY_RESULT;
455    }  catch (TException e) {
456      throw new IOException(e);
457    }
458  }
459
460  @Override
461  public Result append(Append append) throws IOException {
462    TAppend tAppend = ThriftUtilities.appendFromHBase(append);
463    try {
464      TResult tResult = client.append(tableNameInBytes, tAppend);
465      return ThriftUtilities.resultFromThrift(tResult);
466    }  catch (TException e) {
467      throw new IOException(e);
468    }
469  }
470
471  @Override
472  public Result increment(Increment increment) throws IOException {
473    TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment);
474    try {
475      TResult tResult = client.increment(tableNameInBytes, tIncrement);
476      return ThriftUtilities.resultFromThrift(tResult);
477    }  catch (TException e) {
478      throw new IOException(e);
479    }
480  }
481
482  @Override
483  public void close() throws IOException {
484    tTransport.close();
485  }
486
487  @Override
488  public long getRpcTimeout(TimeUnit unit) {
489    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
490  }
491
492  @Override
493  public long getReadRpcTimeout(TimeUnit unit) {
494    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
495  }
496
497  @Override
498  public long getWriteRpcTimeout(TimeUnit unit) {
499    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
500  }
501
502  @Override
503  public long getOperationTimeout(TimeUnit unit) {
504    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
505  }
506
507  @Override
508  public CoprocessorRpcChannel coprocessorService(byte[] row) {
509    throw new NotImplementedException("coprocessorService not supported in ThriftTable");
510  }
511
512  @Override
513  public RegionLocator getRegionLocator() throws IOException {
514    throw new NotImplementedException("getRegionLocator not supported in ThriftTable");
515  }
516}