001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2.client;
020
021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING;
022import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayDeque;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Queue;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.lang3.NotImplementedException;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Increment;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionLocator;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Row;
047import org.apache.hadoop.hbase.client.RowMutations;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.coprocessor.Batch;
052import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
053import org.apache.hadoop.hbase.io.TimeRange;
054import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
055import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
056import org.apache.hadoop.hbase.thrift2.generated.TAppend;
057import org.apache.hadoop.hbase.thrift2.generated.TDelete;
058import org.apache.hadoop.hbase.thrift2.generated.TGet;
059import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
060import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
061import org.apache.hadoop.hbase.thrift2.generated.TPut;
062import org.apache.hadoop.hbase.thrift2.generated.TResult;
063import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
064import org.apache.hadoop.hbase.thrift2.generated.TScan;
065import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.thrift.TException;
068import org.apache.thrift.transport.TTransport;
069import org.apache.yetus.audience.InterfaceAudience;
070
071import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
072import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
073
074@InterfaceAudience.Private
075public class ThriftTable implements Table {
076
077  private TableName tableName;
078  private Configuration conf;
079  private TTransport tTransport;
080  private THBaseService.Client client;
081  private ByteBuffer tableNameInBytes;
082  private int operationTimeout;
083
084  private final int scannerCaching;
085
086  public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport,
087      Configuration conf) {
088    this.tableName = tableName;
089    this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes());
090    this.conf = conf;
091    this.tTransport = tTransport;
092    this.client = client;
093    this.scannerCaching = conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING,
094        HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT);
095    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
096        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
097
098
099  }
100
101  @Override
102  public TableName getName() {
103    return tableName;
104  }
105
106  @Override
107  public Configuration getConfiguration() {
108    return conf;
109  }
110
111  @Override
112  public TableDescriptor getDescriptor() throws IOException {
113    try {
114      TTableDescriptor tableDescriptor = client
115          .getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName));
116      return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor);
117    } catch (TException e) {
118      throw new IOException(e);
119    }
120  }
121
122  @Override
123  public boolean exists(Get get) throws IOException {
124    TGet tGet = ThriftUtilities.getFromHBase(get);
125    try {
126      return client.exists(tableNameInBytes, tGet);
127    }  catch (TException e) {
128      throw new IOException(e);
129    }
130  }
131
132  @Override
133  public boolean[] exists(List<Get> gets) throws IOException {
134    List<TGet> tGets = new ArrayList<>();
135    for (Get get: gets) {
136      tGets.add(ThriftUtilities.getFromHBase(get));
137    }
138    try {
139      List<Boolean> results = client.existsAll(tableNameInBytes, tGets);
140      return Booleans.toArray(results);
141    }  catch (TException e) {
142      throw new IOException(e);
143    }
144  }
145
146  @Override
147  public void batch(List<? extends Row> actions, Object[] results)
148      throws IOException {
149    throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), "
150        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
151
152
153  }
154
155  @Override
156  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
157      Batch.Callback<R> callback) throws IOException {
158    throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), "
159        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
160  }
161
162  @Override
163  public Result get(Get get) throws IOException {
164    TGet tGet = ThriftUtilities.getFromHBase(get);
165    try {
166      TResult tResult = client.get(tableNameInBytes, tGet);
167      return ThriftUtilities.resultFromThrift(tResult);
168    }  catch (TException e) {
169      throw new IOException(e);
170    }
171  }
172
173  @Override
174  public Result[] get(List<Get> gets) throws IOException {
175    List<TGet> tGets = ThriftUtilities.getsFromHBase(gets);
176    try {
177      List<TResult> results = client.getMultiple(tableNameInBytes, tGets);
178      return ThriftUtilities.resultsFromThrift(results);
179    }  catch (TException e) {
180      throw new IOException(e);
181    }
182  }
183
184  /**
185   * A scanner to perform scan from thrift server
186   * getScannerResults is used in this scanner
187   */
188  private class Scanner implements ResultScanner {
189    protected TScan scan;
190    protected Result lastResult = null;
191    protected final Queue<Result> cache = new ArrayDeque<>();;
192
193
194    public Scanner(Scan scan) throws IOException {
195      if (scan.getBatch() > 0) {
196        throw new IOException("Batch is not supported in Scanner");
197      }
198      if (scan.getCaching() <= 0) {
199        scan.setCaching(scannerCaching);
200      } else if (scan.getCaching() == 1 && scan.isReversed()){
201        // for reverse scan, we need to pass the last row to the next scanner
202        // we need caching number bigger than 1
203        scan.setCaching(scan.getCaching() + 1);
204      }
205      this.scan = ThriftUtilities.scanFromHBase(scan);
206    }
207
208
209    @Override
210    public Result next() throws IOException {
211      if (cache.size() == 0) {
212        setupNextScanner();
213        try {
214          List<TResult> tResults = client
215              .getScannerResults(tableNameInBytes, scan, scan.getCaching());
216          Result[] results = ThriftUtilities.resultsFromThrift(tResults);
217          boolean firstKey = true;
218          for (Result result : results) {
219            // If it is a reverse scan, we use the last result's key as the startkey, since there is
220            // no way to construct a closet rowkey smaller than the last result
221            // So when the results return, we must rule out the first result, since it has already
222            // returned to user.
223            if (firstKey) {
224              firstKey = false;
225              if (scan.isReversed() && lastResult != null) {
226                if (Bytes.equals(lastResult.getRow(), result.getRow())) {
227                  continue;
228                }
229              }
230            }
231            cache.add(result);
232            lastResult = result;
233          }
234        } catch (TException e) {
235          throw new IOException(e);
236        }
237      }
238
239      if (cache.size() > 0) {
240        return cache.poll();
241      } else {
242        //scan finished
243        return null;
244      }
245    }
246
247    @Override
248    public void close() {
249    }
250
251    @Override
252    public boolean renewLease() {
253      throw new RuntimeException("renewLease() not supported");
254    }
255
256    @Override
257    public ScanMetrics getScanMetrics() {
258      throw new RuntimeException("getScanMetrics() not supported");
259    }
260
261    private void setupNextScanner() {
262      //if lastResult is null null, it means it is not the fist scan
263      if (lastResult!= null) {
264        byte[] lastRow = lastResult.getRow();
265        if (scan.isReversed()) {
266          //for reverse scan, we can't find the closet row before this row
267          scan.setStartRow(lastRow);
268        } else {
269          scan.setStartRow(createClosestRowAfter(lastRow));
270        }
271      }
272    }
273
274
275    /**
276     * Create the closest row after the specified row
277     */
278    protected byte[] createClosestRowAfter(byte[] row) {
279      if (row == null) {
280        throw new RuntimeException("The passed row is null");
281      }
282      return Arrays.copyOf(row, row.length + 1);
283    }
284  }
285
286  @Override
287  public ResultScanner getScanner(Scan scan) throws IOException {
288    return new Scanner(scan);
289  }
290
291  @Override
292  public ResultScanner getScanner(byte[] family) throws IOException {
293    Scan scan = new Scan();
294    scan.addFamily(family);
295    return getScanner(scan);
296  }
297
298  @Override
299  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
300    Scan scan = new Scan();
301    scan.addColumn(family, qualifier);
302    return getScanner(scan);
303  }
304
305  @Override
306  public void put(Put put) throws IOException {
307    TPut tPut = ThriftUtilities.putFromHBase(put);
308    try {
309      client.put(tableNameInBytes, tPut);
310    }  catch (TException e) {
311      throw new IOException(e);
312    }
313  }
314
315  @Override
316  public void put(List<Put> puts) throws IOException {
317    List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts);
318    try {
319      client.putMultiple(tableNameInBytes, tPuts);
320    }  catch (TException e) {
321      throw new IOException(e);
322    }
323  }
324
325  @Override
326  public void delete(Delete delete) throws IOException {
327    TDelete tDelete = ThriftUtilities.deleteFromHBase(delete);
328    try {
329      client.deleteSingle(tableNameInBytes, tDelete);
330    }  catch (TException e) {
331      throw new IOException(e);
332    }
333  }
334
335  @Override
336  public void delete(List<Delete> deletes) throws IOException {
337    List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes);
338    try {
339      client.deleteMultiple(tableNameInBytes, tDeletes);
340    }  catch (TException e) {
341      throw new IOException(e);
342    }
343  }
344
345  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
346
347    private final byte[] row;
348    private final byte[] family;
349    private byte[] qualifier;
350    private CompareOperator op;
351    private byte[] value;
352
353    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
354      this.row = Preconditions.checkNotNull(row, "row is null");
355      this.family = Preconditions.checkNotNull(family, "family is null");
356    }
357
358    @Override
359    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
360      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
361          " an empty byte array, or just do not call this method if you want a null qualifier");
362      return this;
363    }
364
365    @Override
366    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
367      throw new NotImplementedException("timeRange not supported in ThriftTable");
368    }
369
370    @Override
371    public CheckAndMutateBuilder ifNotExists() {
372      this.op = CompareOperator.EQUAL;
373      this.value = null;
374      return this;
375    }
376
377    @Override
378    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
379      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
380      this.value = Preconditions.checkNotNull(value, "value is null");
381      return this;
382    }
383
384    private void preCheck() {
385      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
386          " calling ifNotExists/ifEquals/ifMatches before executing the request");
387    }
388
389    @Override
390    public boolean thenPut(Put put) throws IOException {
391      preCheck();
392      RowMutations rowMutations = new RowMutations(put.getRow());
393      rowMutations.add(put);
394      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
395    }
396
397    @Override
398    public boolean thenDelete(Delete delete) throws IOException {
399      preCheck();
400      RowMutations rowMutations = new RowMutations(delete.getRow());
401      rowMutations.add(delete);
402      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
403    }
404
405    @Override
406    public boolean thenMutate(RowMutations mutation) throws IOException {
407      preCheck();
408      return checkAndMutate(row, family, qualifier, op, value, mutation);
409    }
410  }
411
412
413  @Override
414  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
415      byte[] value, RowMutations mutation) throws IOException {
416    try {
417      ByteBuffer valueBuffer = value == null? null : ByteBuffer.wrap(value);
418      return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family),
419          ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer,
420          ThriftUtilities.rowMutationsFromHBase(mutation));
421    } catch (TException e) {
422      throw new IOException(e);
423    }
424  }
425
426  @Override
427  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
428    return new CheckAndMutateBuilderImpl(row, family);
429  }
430
431  @Override
432  public void mutateRow(RowMutations rm) throws IOException {
433    TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
434    try {
435      client.mutateRow(tableNameInBytes, tRowMutations);
436    }  catch (TException e) {
437      throw new IOException(e);
438    }
439  }
440
441  @Override
442  public Result append(Append append) throws IOException {
443    TAppend tAppend = ThriftUtilities.appendFromHBase(append);
444    try {
445      TResult tResult = client.append(tableNameInBytes, tAppend);
446      return ThriftUtilities.resultFromThrift(tResult);
447    }  catch (TException e) {
448      throw new IOException(e);
449    }
450  }
451
452  @Override
453  public Result increment(Increment increment) throws IOException {
454    TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment);
455    try {
456      TResult tResult = client.increment(tableNameInBytes, tIncrement);
457      return ThriftUtilities.resultFromThrift(tResult);
458    }  catch (TException e) {
459      throw new IOException(e);
460    }
461  }
462
463  @Override
464  public void close() throws IOException {
465    tTransport.close();
466  }
467
468  @Override
469  public long getRpcTimeout(TimeUnit unit) {
470    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
471  }
472
473  @Override
474  public long getReadRpcTimeout(TimeUnit unit) {
475    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
476  }
477
478  @Override
479  public long getWriteRpcTimeout(TimeUnit unit) {
480    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
481  }
482
483  @Override
484  public long getOperationTimeout(TimeUnit unit) {
485    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
486  }
487
488  @Override
489  public CoprocessorRpcChannel coprocessorService(byte[] row) {
490    throw new NotImplementedException("coprocessorService not supported in ThriftTable");
491  }
492
493  @Override
494  public RegionLocator getRegionLocator() throws IOException {
495    throw new NotImplementedException("getRegionLocator not supported in ThriftTable");
496  }
497}