001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2.client;
020
021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING;
022import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayDeque;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Queue;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.lang3.NotImplementedException;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Increment;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Row;
046import org.apache.hadoop.hbase.client.RowMutations;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.coprocessor.Batch;
051import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
052import org.apache.hadoop.hbase.io.TimeRange;
053import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
054import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
055import org.apache.hadoop.hbase.thrift2.generated.TAppend;
056import org.apache.hadoop.hbase.thrift2.generated.TDelete;
057import org.apache.hadoop.hbase.thrift2.generated.TGet;
058import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
059import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
060import org.apache.hadoop.hbase.thrift2.generated.TPut;
061import org.apache.hadoop.hbase.thrift2.generated.TResult;
062import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
063import org.apache.hadoop.hbase.thrift2.generated.TScan;
064import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.thrift.TException;
067import org.apache.thrift.transport.TTransport;
068import org.apache.yetus.audience.InterfaceAudience;
069
070import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
071import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
072
073@InterfaceAudience.Private
074public class ThriftTable implements Table {
075
076  private TableName tableName;
077  private Configuration conf;
078  private TTransport tTransport;
079  private THBaseService.Client client;
080  private ByteBuffer tableNameInBytes;
081  private int operationTimeout;
082
083  private final int scannerCaching;
084
085  public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport,
086      Configuration conf) {
087    this.tableName = tableName;
088    this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes());
089    this.conf = conf;
090    this.tTransport = tTransport;
091    this.client = client;
092    this.scannerCaching = conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING,
093        HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT);
094    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
095        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
096
097
098  }
099
100  @Override
101  public TableName getName() {
102    return tableName;
103  }
104
105  @Override
106  public Configuration getConfiguration() {
107    return conf;
108  }
109
110  @Override
111  public TableDescriptor getDescriptor() throws IOException {
112    try {
113      TTableDescriptor tableDescriptor = client
114          .getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName));
115      return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor);
116    } catch (TException e) {
117      throw new IOException(e);
118    }
119  }
120
121  @Override
122  public boolean exists(Get get) throws IOException {
123    TGet tGet = ThriftUtilities.getFromHBase(get);
124    try {
125      return client.exists(tableNameInBytes, tGet);
126    }  catch (TException e) {
127      throw new IOException(e);
128    }
129  }
130
131  @Override
132  public boolean[] exists(List<Get> gets) throws IOException {
133    List<TGet> tGets = new ArrayList<>();
134    for (Get get: gets) {
135      tGets.add(ThriftUtilities.getFromHBase(get));
136    }
137    try {
138      List<Boolean> results = client.existsAll(tableNameInBytes, tGets);
139      return Booleans.toArray(results);
140    }  catch (TException e) {
141      throw new IOException(e);
142    }
143  }
144
145  @Override
146  public void batch(List<? extends Row> actions, Object[] results)
147      throws IOException {
148    throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), "
149        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
150
151
152  }
153
154  @Override
155  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
156      Batch.Callback<R> callback) throws IOException {
157    throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), "
158        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
159  }
160
161  @Override
162  public Result get(Get get) throws IOException {
163    TGet tGet = ThriftUtilities.getFromHBase(get);
164    try {
165      TResult tResult = client.get(tableNameInBytes, tGet);
166      return ThriftUtilities.resultFromThrift(tResult);
167    }  catch (TException e) {
168      throw new IOException(e);
169    }
170  }
171
172  @Override
173  public Result[] get(List<Get> gets) throws IOException {
174    List<TGet> tGets = ThriftUtilities.getsFromHBase(gets);
175    try {
176      List<TResult> results = client.getMultiple(tableNameInBytes, tGets);
177      return ThriftUtilities.resultsFromThrift(results);
178    }  catch (TException e) {
179      throw new IOException(e);
180    }
181  }
182
183  /**
184   * A scanner to perform scan from thrift server
185   * getScannerResults is used in this scanner
186   */
187  private class Scanner implements ResultScanner {
188    protected TScan scan;
189    protected Result lastResult = null;
190    protected final Queue<Result> cache = new ArrayDeque<>();;
191
192
193    public Scanner(Scan scan) throws IOException {
194      if (scan.getBatch() > 0) {
195        throw new IOException("Batch is not supported in Scanner");
196      }
197      if (scan.getCaching() <= 0) {
198        scan.setCaching(scannerCaching);
199      } else if (scan.getCaching() == 1 && scan.isReversed()){
200        // for reverse scan, we need to pass the last row to the next scanner
201        // we need caching number bigger than 1
202        scan.setCaching(scan.getCaching() + 1);
203      }
204      this.scan = ThriftUtilities.scanFromHBase(scan);
205    }
206
207
208    @Override
209    public Result next() throws IOException {
210      if (cache.size() == 0) {
211        setupNextScanner();
212        try {
213          List<TResult> tResults = client
214              .getScannerResults(tableNameInBytes, scan, scan.getCaching());
215          Result[] results = ThriftUtilities.resultsFromThrift(tResults);
216          boolean firstKey = true;
217          for (Result result : results) {
218            // If it is a reverse scan, we use the last result's key as the startkey, since there is
219            // no way to construct a closet rowkey smaller than the last result
220            // So when the results return, we must rule out the first result, since it has already
221            // returned to user.
222            if (firstKey) {
223              firstKey = false;
224              if (scan.isReversed() && lastResult != null) {
225                if (Bytes.equals(lastResult.getRow(), result.getRow())) {
226                  continue;
227                }
228              }
229            }
230            cache.add(result);
231            lastResult = result;
232          }
233        } catch (TException e) {
234          throw new IOException(e);
235        }
236      }
237
238      if (cache.size() > 0) {
239        return cache.poll();
240      } else {
241        //scan finished
242        return null;
243      }
244    }
245
246    @Override
247    public void close() {
248    }
249
250    @Override
251    public boolean renewLease() {
252      throw new RuntimeException("renewLease() not supported");
253    }
254
255    @Override
256    public ScanMetrics getScanMetrics() {
257      throw new RuntimeException("getScanMetrics() not supported");
258    }
259
260    private void setupNextScanner() {
261      //if lastResult is null null, it means it is not the fist scan
262      if (lastResult!= null) {
263        byte[] lastRow = lastResult.getRow();
264        if (scan.isReversed()) {
265          //for reverse scan, we can't find the closet row before this row
266          scan.setStartRow(lastRow);
267        } else {
268          scan.setStartRow(createClosestRowAfter(lastRow));
269        }
270      }
271    }
272
273
274    /**
275     * Create the closest row after the specified row
276     */
277    protected byte[] createClosestRowAfter(byte[] row) {
278      if (row == null) {
279        throw new RuntimeException("The passed row is null");
280      }
281      return Arrays.copyOf(row, row.length + 1);
282    }
283  }
284
285  @Override
286  public ResultScanner getScanner(Scan scan) throws IOException {
287    return new Scanner(scan);
288  }
289
290  @Override
291  public ResultScanner getScanner(byte[] family) throws IOException {
292    Scan scan = new Scan();
293    scan.addFamily(family);
294    return getScanner(scan);
295  }
296
297  @Override
298  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
299    Scan scan = new Scan();
300    scan.addColumn(family, qualifier);
301    return getScanner(scan);
302  }
303
304  @Override
305  public void put(Put put) throws IOException {
306    TPut tPut = ThriftUtilities.putFromHBase(put);
307    try {
308      client.put(tableNameInBytes, tPut);
309    }  catch (TException e) {
310      throw new IOException(e);
311    }
312  }
313
314  @Override
315  public void put(List<Put> puts) throws IOException {
316    List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts);
317    try {
318      client.putMultiple(tableNameInBytes, tPuts);
319    }  catch (TException e) {
320      throw new IOException(e);
321    }
322  }
323
324  @Override
325  public void delete(Delete delete) throws IOException {
326    TDelete tDelete = ThriftUtilities.deleteFromHBase(delete);
327    try {
328      client.deleteSingle(tableNameInBytes, tDelete);
329    }  catch (TException e) {
330      throw new IOException(e);
331    }
332  }
333
334  @Override
335  public void delete(List<Delete> deletes) throws IOException {
336    List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes);
337    try {
338      client.deleteMultiple(tableNameInBytes, tDeletes);
339    }  catch (TException e) {
340      throw new IOException(e);
341    }
342  }
343
344  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
345
346    private final byte[] row;
347    private final byte[] family;
348    private byte[] qualifier;
349    private CompareOperator op;
350    private byte[] value;
351
352    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
353      this.row = Preconditions.checkNotNull(row, "row is null");
354      this.family = Preconditions.checkNotNull(family, "family is null");
355    }
356
357    @Override
358    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
359      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
360          " an empty byte array, or just do not call this method if you want a null qualifier");
361      return this;
362    }
363
364    @Override
365    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
366      throw new NotImplementedException("timeRange not supported in ThriftTable");
367    }
368
369    @Override
370    public CheckAndMutateBuilder ifNotExists() {
371      this.op = CompareOperator.EQUAL;
372      this.value = null;
373      return this;
374    }
375
376    @Override
377    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
378      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
379      this.value = Preconditions.checkNotNull(value, "value is null");
380      return this;
381    }
382
383    private void preCheck() {
384      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
385          " calling ifNotExists/ifEquals/ifMatches before executing the request");
386    }
387
388    @Override
389    public boolean thenPut(Put put) throws IOException {
390      preCheck();
391      RowMutations rowMutations = new RowMutations(put.getRow());
392      rowMutations.add(put);
393      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
394    }
395
396    @Override
397    public boolean thenDelete(Delete delete) throws IOException {
398      preCheck();
399      RowMutations rowMutations = new RowMutations(delete.getRow());
400      rowMutations.add(delete);
401      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
402    }
403
404    @Override
405    public boolean thenMutate(RowMutations mutation) throws IOException {
406      preCheck();
407      return checkAndMutate(row, family, qualifier, op, value, mutation);
408    }
409  }
410
411
412  @Override
413  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
414      byte[] value, RowMutations mutation) throws IOException {
415    try {
416      ByteBuffer valueBuffer = value == null? null : ByteBuffer.wrap(value);
417      return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family),
418          ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer,
419          ThriftUtilities.rowMutationsFromHBase(mutation));
420    } catch (TException e) {
421      throw new IOException(e);
422    }
423  }
424
425  @Override
426  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
427    return new CheckAndMutateBuilderImpl(row, family);
428  }
429
430  @Override
431  public void mutateRow(RowMutations rm) throws IOException {
432    TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
433    try {
434      client.mutateRow(tableNameInBytes, tRowMutations);
435    }  catch (TException e) {
436      throw new IOException(e);
437    }
438  }
439
440  @Override
441  public Result append(Append append) throws IOException {
442    TAppend tAppend = ThriftUtilities.appendFromHBase(append);
443    try {
444      TResult tResult = client.append(tableNameInBytes, tAppend);
445      return ThriftUtilities.resultFromThrift(tResult);
446    }  catch (TException e) {
447      throw new IOException(e);
448    }
449  }
450
451  @Override
452  public Result increment(Increment increment) throws IOException {
453    TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment);
454    try {
455      TResult tResult = client.increment(tableNameInBytes, tIncrement);
456      return ThriftUtilities.resultFromThrift(tResult);
457    }  catch (TException e) {
458      throw new IOException(e);
459    }
460  }
461
462  @Override
463  public void close() throws IOException {
464    tTransport.close();
465  }
466
467  @Override
468  public long getRpcTimeout(TimeUnit unit) {
469    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
470  }
471
472  @Override
473  public long getReadRpcTimeout(TimeUnit unit) {
474    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
475  }
476
477  @Override
478  public long getWriteRpcTimeout(TimeUnit unit) {
479    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
480  }
481
482  @Override
483  public long getOperationTimeout(TimeUnit unit) {
484    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
485  }
486
487  @Override
488  public CoprocessorRpcChannel coprocessorService(byte[] row) {
489    throw new NotImplementedException("coprocessorService not supported in ThriftTable");
490  }
491
492}