001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift2.client;
019
020import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING;
021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT;
022
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.ArrayDeque;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Queue;
030import java.util.concurrent.TimeUnit;
031import org.apache.commons.lang3.NotImplementedException;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CompareOperator;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Append;
037import org.apache.hadoop.hbase.client.CheckAndMutate;
038import org.apache.hadoop.hbase.client.CheckAndMutateResult;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Increment;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionLocator;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Row;
047import org.apache.hadoop.hbase.client.RowMutations;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.coprocessor.Batch;
052import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
053import org.apache.hadoop.hbase.filter.Filter;
054import org.apache.hadoop.hbase.io.TimeRange;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
056import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
057import org.apache.hadoop.hbase.thrift2.generated.TAppend;
058import org.apache.hadoop.hbase.thrift2.generated.TDelete;
059import org.apache.hadoop.hbase.thrift2.generated.TGet;
060import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
061import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
062import org.apache.hadoop.hbase.thrift2.generated.TPut;
063import org.apache.hadoop.hbase.thrift2.generated.TResult;
064import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
065import org.apache.hadoop.hbase.thrift2.generated.TScan;
066import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.thrift.TException;
069import org.apache.thrift.transport.TTransport;
070import org.apache.yetus.audience.InterfaceAudience;
071
072import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
073import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
074
075@InterfaceAudience.Private
076public class ThriftTable implements Table {
077
078  private TableName tableName;
079  private Configuration conf;
080  private TTransport tTransport;
081  private THBaseService.Client client;
082  private ByteBuffer tableNameInBytes;
083  private int operationTimeout;
084
085  private final int scannerCaching;
086
087  public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport,
088    Configuration conf) {
089    this.tableName = tableName;
090    this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes());
091    this.conf = conf;
092    this.tTransport = tTransport;
093    this.client = client;
094    this.scannerCaching =
095      conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING, HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT);
096    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
097      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
098
099  }
100
101  @Override
102  public TableName getName() {
103    return tableName;
104  }
105
106  @Override
107  public Configuration getConfiguration() {
108    return conf;
109  }
110
111  @Override
112  public TableDescriptor getDescriptor() throws IOException {
113    try {
114      TTableDescriptor tableDescriptor =
115        client.getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName));
116      return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor);
117    } catch (TException e) {
118      throw new IOException(e);
119    }
120  }
121
122  @Override
123  public boolean exists(Get get) throws IOException {
124    TGet tGet = ThriftUtilities.getFromHBase(get);
125    try {
126      return client.exists(tableNameInBytes, tGet);
127    } catch (TException e) {
128      throw new IOException(e);
129    }
130  }
131
132  @Override
133  public boolean[] exists(List<Get> gets) throws IOException {
134    List<TGet> tGets = new ArrayList<>();
135    for (Get get : gets) {
136      tGets.add(ThriftUtilities.getFromHBase(get));
137    }
138    try {
139      List<Boolean> results = client.existsAll(tableNameInBytes, tGets);
140      return Booleans.toArray(results);
141    } catch (TException e) {
142      throw new IOException(e);
143    }
144  }
145
146  @Override
147  public void batch(List<? extends Row> actions, Object[] results) throws IOException {
148    throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), "
149      + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
150
151  }
152
153  @Override
154  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
155    Batch.Callback<R> callback) throws IOException {
156    throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), "
157      + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
158  }
159
160  @Override
161  public Result get(Get get) throws IOException {
162    TGet tGet = ThriftUtilities.getFromHBase(get);
163    try {
164      TResult tResult = client.get(tableNameInBytes, tGet);
165      return ThriftUtilities.resultFromThrift(tResult);
166    } catch (TException e) {
167      throw new IOException(e);
168    }
169  }
170
171  @Override
172  public Result[] get(List<Get> gets) throws IOException {
173    List<TGet> tGets = ThriftUtilities.getsFromHBase(gets);
174    try {
175      List<TResult> results = client.getMultiple(tableNameInBytes, tGets);
176      return ThriftUtilities.resultsFromThrift(results);
177    } catch (TException e) {
178      throw new IOException(e);
179    }
180  }
181
182  /**
183   * A scanner to perform scan from thrift server getScannerResults is used in this scanner
184   */
185  private class Scanner implements ResultScanner {
186    protected TScan scan;
187    protected Result lastResult = null;
188    protected final Queue<Result> cache = new ArrayDeque<>();;
189
190    public Scanner(Scan scan) throws IOException {
191      if (scan.getBatch() > 0) {
192        throw new IOException("Batch is not supported in Scanner");
193      }
194      if (scan.getCaching() <= 0) {
195        scan.setCaching(scannerCaching);
196      } else if (scan.getCaching() == 1 && scan.isReversed()) {
197        // for reverse scan, we need to pass the last row to the next scanner
198        // we need caching number bigger than 1
199        scan.setCaching(scan.getCaching() + 1);
200      }
201      this.scan = ThriftUtilities.scanFromHBase(scan);
202    }
203
204    @Override
205    public Result next() throws IOException {
206      if (cache.size() == 0) {
207        setupNextScanner();
208        try {
209          List<TResult> tResults =
210            client.getScannerResults(tableNameInBytes, scan, scan.getCaching());
211          Result[] results = ThriftUtilities.resultsFromThrift(tResults);
212          boolean firstKey = true;
213          for (Result result : results) {
214            // If it is a reverse scan, we use the last result's key as the startkey, since there is
215            // no way to construct a closet rowkey smaller than the last result
216            // So when the results return, we must rule out the first result, since it has already
217            // returned to user.
218            if (firstKey) {
219              firstKey = false;
220              if (scan.isReversed() && lastResult != null) {
221                if (Bytes.equals(lastResult.getRow(), result.getRow())) {
222                  continue;
223                }
224              }
225            }
226            cache.add(result);
227            lastResult = result;
228          }
229        } catch (TException e) {
230          throw new IOException(e);
231        }
232      }
233
234      if (cache.size() > 0) {
235        return cache.poll();
236      } else {
237        // scan finished
238        return null;
239      }
240    }
241
242    @Override
243    public void close() {
244    }
245
246    @Override
247    public boolean renewLease() {
248      throw new RuntimeException("renewLease() not supported");
249    }
250
251    @Override
252    public ScanMetrics getScanMetrics() {
253      throw new RuntimeException("getScanMetrics() not supported");
254    }
255
256    private void setupNextScanner() {
257      // if lastResult is null null, it means it is not the fist scan
258      if (lastResult != null) {
259        byte[] lastRow = lastResult.getRow();
260        if (scan.isReversed()) {
261          // for reverse scan, we can't find the closet row before this row
262          scan.setStartRow(lastRow);
263        } else {
264          scan.setStartRow(createClosestRowAfter(lastRow));
265        }
266      }
267    }
268
269    /**
270     * Create the closest row after the specified row
271     */
272    protected byte[] createClosestRowAfter(byte[] row) {
273      if (row == null) {
274        throw new RuntimeException("The passed row is null");
275      }
276      return Arrays.copyOf(row, row.length + 1);
277    }
278  }
279
280  @Override
281  public ResultScanner getScanner(Scan scan) throws IOException {
282    return new Scanner(scan);
283  }
284
285  @Override
286  public ResultScanner getScanner(byte[] family) throws IOException {
287    Scan scan = new Scan();
288    scan.addFamily(family);
289    return getScanner(scan);
290  }
291
292  @Override
293  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
294    Scan scan = new Scan();
295    scan.addColumn(family, qualifier);
296    return getScanner(scan);
297  }
298
299  @Override
300  public void put(Put put) throws IOException {
301    TPut tPut = ThriftUtilities.putFromHBase(put);
302    try {
303      client.put(tableNameInBytes, tPut);
304    } catch (TException e) {
305      throw new IOException(e);
306    }
307  }
308
309  @Override
310  public void put(List<Put> puts) throws IOException {
311    List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts);
312    try {
313      client.putMultiple(tableNameInBytes, tPuts);
314    } catch (TException e) {
315      throw new IOException(e);
316    }
317  }
318
319  @Override
320  public void delete(Delete delete) throws IOException {
321    TDelete tDelete = ThriftUtilities.deleteFromHBase(delete);
322    try {
323      client.deleteSingle(tableNameInBytes, tDelete);
324    } catch (TException e) {
325      throw new IOException(e);
326    }
327  }
328
329  @Override
330  public void delete(List<Delete> deletes) throws IOException {
331    List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes);
332    try {
333      client.deleteMultiple(tableNameInBytes, tDeletes);
334    } catch (TException e) {
335      throw new IOException(e);
336    }
337  }
338
339  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
340
341    private final byte[] row;
342    private final byte[] family;
343    private byte[] qualifier;
344    private CompareOperator op;
345    private byte[] value;
346
347    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
348      this.row = Preconditions.checkNotNull(row, "row is null");
349      this.family = Preconditions.checkNotNull(family, "family is null");
350    }
351
352    @Override
353    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
354      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
355        + " an empty byte array, or just do not call this method if you want a null qualifier");
356      return this;
357    }
358
359    @Override
360    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
361      throw new NotImplementedException("timeRange not supported in ThriftTable");
362    }
363
364    @Override
365    public CheckAndMutateBuilder ifNotExists() {
366      this.op = CompareOperator.EQUAL;
367      this.value = null;
368      return this;
369    }
370
371    @Override
372    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
373      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
374      this.value = Preconditions.checkNotNull(value, "value is null");
375      return this;
376    }
377
378    private void preCheck() {
379      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by"
380        + " calling ifNotExists/ifEquals/ifMatches before executing the request");
381    }
382
383    @Override
384    public boolean thenPut(Put put) throws IOException {
385      preCheck();
386      RowMutations rowMutations = new RowMutations(put.getRow());
387      rowMutations.add(put);
388      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
389    }
390
391    @Override
392    public boolean thenDelete(Delete delete) throws IOException {
393      preCheck();
394      RowMutations rowMutations = new RowMutations(delete.getRow());
395      rowMutations.add(delete);
396      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
397    }
398
399    @Override
400    public boolean thenMutate(RowMutations mutation) throws IOException {
401      preCheck();
402      return checkAndMutate(row, family, qualifier, op, value, mutation);
403    }
404  }
405
406  private boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
407    byte[] value, RowMutations mutation) throws IOException {
408    try {
409      ByteBuffer valueBuffer = value == null ? null : ByteBuffer.wrap(value);
410      return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family),
411        ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer,
412        ThriftUtilities.rowMutationsFromHBase(mutation));
413    } catch (TException e) {
414      throw new IOException(e);
415    }
416  }
417
418  @Override
419  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
420    return new CheckAndMutateBuilderImpl(row, family);
421  }
422
423  @Override
424  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
425    throw new NotImplementedException("Implement later");
426  }
427
428  @Override
429  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) {
430    throw new NotImplementedException("Implement later");
431  }
432
433  @Override
434  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
435    throw new NotImplementedException("Implement later");
436  }
437
438  @Override
439  public Result mutateRow(RowMutations rm) throws IOException {
440    TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
441    try {
442      client.mutateRow(tableNameInBytes, tRowMutations);
443      return Result.EMPTY_RESULT;
444    } catch (TException e) {
445      throw new IOException(e);
446    }
447  }
448
449  @Override
450  public Result append(Append append) throws IOException {
451    TAppend tAppend = ThriftUtilities.appendFromHBase(append);
452    try {
453      TResult tResult = client.append(tableNameInBytes, tAppend);
454      return ThriftUtilities.resultFromThrift(tResult);
455    } catch (TException e) {
456      throw new IOException(e);
457    }
458  }
459
460  @Override
461  public Result increment(Increment increment) throws IOException {
462    TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment);
463    try {
464      TResult tResult = client.increment(tableNameInBytes, tIncrement);
465      return ThriftUtilities.resultFromThrift(tResult);
466    } catch (TException e) {
467      throw new IOException(e);
468    }
469  }
470
471  @Override
472  public void close() throws IOException {
473    tTransport.close();
474  }
475
476  @Override
477  public long getRpcTimeout(TimeUnit unit) {
478    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
479  }
480
481  @Override
482  public long getReadRpcTimeout(TimeUnit unit) {
483    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
484  }
485
486  @Override
487  public long getWriteRpcTimeout(TimeUnit unit) {
488    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
489  }
490
491  @Override
492  public long getOperationTimeout(TimeUnit unit) {
493    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
494  }
495
496  @Override
497  public CoprocessorRpcChannel coprocessorService(byte[] row) {
498    throw new NotImplementedException("coprocessorService not supported in ThriftTable");
499  }
500
501  @Override
502  public RegionLocator getRegionLocator() throws IOException {
503    throw new NotImplementedException("getRegionLocator not supported in ThriftTable");
504  }
505}