001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2.client;
020
021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING;
022import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayDeque;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Queue;
031import java.util.concurrent.TimeUnit;
032import org.apache.commons.lang3.NotImplementedException;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Append;
038import org.apache.hadoop.hbase.client.CheckAndMutate;
039import org.apache.hadoop.hbase.client.CheckAndMutateResult;
040import org.apache.hadoop.hbase.client.Delete;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Increment;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionLocator;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Row;
048import org.apache.hadoop.hbase.client.RowMutations;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.coprocessor.Batch;
053import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
054import org.apache.hadoop.hbase.filter.Filter;
055import org.apache.hadoop.hbase.io.TimeRange;
056import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
057import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
058import org.apache.hadoop.hbase.thrift2.generated.TAppend;
059import org.apache.hadoop.hbase.thrift2.generated.TDelete;
060import org.apache.hadoop.hbase.thrift2.generated.TGet;
061import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
062import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
063import org.apache.hadoop.hbase.thrift2.generated.TPut;
064import org.apache.hadoop.hbase.thrift2.generated.TResult;
065import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
066import org.apache.hadoop.hbase.thrift2.generated.TScan;
067import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.thrift.TException;
070import org.apache.thrift.transport.TTransport;
071import org.apache.yetus.audience.InterfaceAudience;
072
073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
074import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
075
076@InterfaceAudience.Private
077public class ThriftTable implements Table {
078
079  private TableName tableName;
080  private Configuration conf;
081  private TTransport tTransport;
082  private THBaseService.Client client;
083  private ByteBuffer tableNameInBytes;
084  private int operationTimeout;
085
086  private final int scannerCaching;
087
088  public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport,
089      Configuration conf) {
090    this.tableName = tableName;
091    this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes());
092    this.conf = conf;
093    this.tTransport = tTransport;
094    this.client = client;
095    this.scannerCaching = conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING,
096        HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT);
097    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
098        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
099
100
101  }
102
103  @Override
104  public TableName getName() {
105    return tableName;
106  }
107
108  @Override
109  public Configuration getConfiguration() {
110    return conf;
111  }
112
113  @Override
114  public TableDescriptor getDescriptor() throws IOException {
115    try {
116      TTableDescriptor tableDescriptor = client
117          .getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName));
118      return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor);
119    } catch (TException e) {
120      throw new IOException(e);
121    }
122  }
123
124  @Override
125  public boolean exists(Get get) throws IOException {
126    TGet tGet = ThriftUtilities.getFromHBase(get);
127    try {
128      return client.exists(tableNameInBytes, tGet);
129    }  catch (TException e) {
130      throw new IOException(e);
131    }
132  }
133
134  @Override
135  public boolean[] exists(List<Get> gets) throws IOException {
136    List<TGet> tGets = new ArrayList<>();
137    for (Get get: gets) {
138      tGets.add(ThriftUtilities.getFromHBase(get));
139    }
140    try {
141      List<Boolean> results = client.existsAll(tableNameInBytes, tGets);
142      return Booleans.toArray(results);
143    }  catch (TException e) {
144      throw new IOException(e);
145    }
146  }
147
148  @Override
149  public void batch(List<? extends Row> actions, Object[] results)
150      throws IOException {
151    throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), "
152        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
153
154
155  }
156
157  @Override
158  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
159      Batch.Callback<R> callback) throws IOException {
160    throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), "
161        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
162  }
163
164  @Override
165  public Result get(Get get) throws IOException {
166    TGet tGet = ThriftUtilities.getFromHBase(get);
167    try {
168      TResult tResult = client.get(tableNameInBytes, tGet);
169      return ThriftUtilities.resultFromThrift(tResult);
170    }  catch (TException e) {
171      throw new IOException(e);
172    }
173  }
174
175  @Override
176  public Result[] get(List<Get> gets) throws IOException {
177    List<TGet> tGets = ThriftUtilities.getsFromHBase(gets);
178    try {
179      List<TResult> results = client.getMultiple(tableNameInBytes, tGets);
180      return ThriftUtilities.resultsFromThrift(results);
181    }  catch (TException e) {
182      throw new IOException(e);
183    }
184  }
185
186  /**
187   * A scanner to perform scan from thrift server
188   * getScannerResults is used in this scanner
189   */
190  private class Scanner implements ResultScanner {
191    protected TScan scan;
192    protected Result lastResult = null;
193    protected final Queue<Result> cache = new ArrayDeque<>();;
194
195
196    public Scanner(Scan scan) throws IOException {
197      if (scan.getBatch() > 0) {
198        throw new IOException("Batch is not supported in Scanner");
199      }
200      if (scan.getCaching() <= 0) {
201        scan.setCaching(scannerCaching);
202      } else if (scan.getCaching() == 1 && scan.isReversed()){
203        // for reverse scan, we need to pass the last row to the next scanner
204        // we need caching number bigger than 1
205        scan.setCaching(scan.getCaching() + 1);
206      }
207      this.scan = ThriftUtilities.scanFromHBase(scan);
208    }
209
210
211    @Override
212    public Result next() throws IOException {
213      if (cache.size() == 0) {
214        setupNextScanner();
215        try {
216          List<TResult> tResults = client
217              .getScannerResults(tableNameInBytes, scan, scan.getCaching());
218          Result[] results = ThriftUtilities.resultsFromThrift(tResults);
219          boolean firstKey = true;
220          for (Result result : results) {
221            // If it is a reverse scan, we use the last result's key as the startkey, since there is
222            // no way to construct a closet rowkey smaller than the last result
223            // So when the results return, we must rule out the first result, since it has already
224            // returned to user.
225            if (firstKey) {
226              firstKey = false;
227              if (scan.isReversed() && lastResult != null) {
228                if (Bytes.equals(lastResult.getRow(), result.getRow())) {
229                  continue;
230                }
231              }
232            }
233            cache.add(result);
234            lastResult = result;
235          }
236        } catch (TException e) {
237          throw new IOException(e);
238        }
239      }
240
241      if (cache.size() > 0) {
242        return cache.poll();
243      } else {
244        //scan finished
245        return null;
246      }
247    }
248
249    @Override
250    public void close() {
251    }
252
253    @Override
254    public boolean renewLease() {
255      throw new RuntimeException("renewLease() not supported");
256    }
257
258    @Override
259    public ScanMetrics getScanMetrics() {
260      throw new RuntimeException("getScanMetrics() not supported");
261    }
262
263    private void setupNextScanner() {
264      //if lastResult is null null, it means it is not the fist scan
265      if (lastResult!= null) {
266        byte[] lastRow = lastResult.getRow();
267        if (scan.isReversed()) {
268          //for reverse scan, we can't find the closet row before this row
269          scan.setStartRow(lastRow);
270        } else {
271          scan.setStartRow(createClosestRowAfter(lastRow));
272        }
273      }
274    }
275
276
277    /**
278     * Create the closest row after the specified row
279     */
280    protected byte[] createClosestRowAfter(byte[] row) {
281      if (row == null) {
282        throw new RuntimeException("The passed row is null");
283      }
284      return Arrays.copyOf(row, row.length + 1);
285    }
286  }
287
288  @Override
289  public ResultScanner getScanner(Scan scan) throws IOException {
290    return new Scanner(scan);
291  }
292
293  @Override
294  public ResultScanner getScanner(byte[] family) throws IOException {
295    Scan scan = new Scan();
296    scan.addFamily(family);
297    return getScanner(scan);
298  }
299
300  @Override
301  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
302    Scan scan = new Scan();
303    scan.addColumn(family, qualifier);
304    return getScanner(scan);
305  }
306
307  @Override
308  public void put(Put put) throws IOException {
309    TPut tPut = ThriftUtilities.putFromHBase(put);
310    try {
311      client.put(tableNameInBytes, tPut);
312    }  catch (TException e) {
313      throw new IOException(e);
314    }
315  }
316
317  @Override
318  public void put(List<Put> puts) throws IOException {
319    List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts);
320    try {
321      client.putMultiple(tableNameInBytes, tPuts);
322    }  catch (TException e) {
323      throw new IOException(e);
324    }
325  }
326
327  @Override
328  public void delete(Delete delete) throws IOException {
329    TDelete tDelete = ThriftUtilities.deleteFromHBase(delete);
330    try {
331      client.deleteSingle(tableNameInBytes, tDelete);
332    }  catch (TException e) {
333      throw new IOException(e);
334    }
335  }
336
337  @Override
338  public void delete(List<Delete> deletes) throws IOException {
339    List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes);
340    try {
341      client.deleteMultiple(tableNameInBytes, tDeletes);
342    }  catch (TException e) {
343      throw new IOException(e);
344    }
345  }
346
347  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
348
349    private final byte[] row;
350    private final byte[] family;
351    private byte[] qualifier;
352    private CompareOperator op;
353    private byte[] value;
354
355    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
356      this.row = Preconditions.checkNotNull(row, "row is null");
357      this.family = Preconditions.checkNotNull(family, "family is null");
358    }
359
360    @Override
361    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
362      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
363          " an empty byte array, or just do not call this method if you want a null qualifier");
364      return this;
365    }
366
367    @Override
368    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
369      throw new NotImplementedException("timeRange not supported in ThriftTable");
370    }
371
372    @Override
373    public CheckAndMutateBuilder ifNotExists() {
374      this.op = CompareOperator.EQUAL;
375      this.value = null;
376      return this;
377    }
378
379    @Override
380    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
381      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
382      this.value = Preconditions.checkNotNull(value, "value is null");
383      return this;
384    }
385
386    private void preCheck() {
387      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
388          " calling ifNotExists/ifEquals/ifMatches before executing the request");
389    }
390
391    @Override
392    public boolean thenPut(Put put) throws IOException {
393      preCheck();
394      RowMutations rowMutations = new RowMutations(put.getRow());
395      rowMutations.add(put);
396      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
397    }
398
399    @Override
400    public boolean thenDelete(Delete delete) throws IOException {
401      preCheck();
402      RowMutations rowMutations = new RowMutations(delete.getRow());
403      rowMutations.add(delete);
404      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
405    }
406
407    @Override
408    public boolean thenMutate(RowMutations mutation) throws IOException {
409      preCheck();
410      return checkAndMutate(row, family, qualifier, op, value, mutation);
411    }
412  }
413
414  private boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
415      byte[] value, RowMutations mutation) throws IOException {
416    try {
417      ByteBuffer valueBuffer = value == null ? null : ByteBuffer.wrap(value);
418      return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family),
419        ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer,
420        ThriftUtilities.rowMutationsFromHBase(mutation));
421    } catch (TException e) {
422      throw new IOException(e);
423    }
424  }
425
426  @Override
427  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
428    return new CheckAndMutateBuilderImpl(row, family);
429  }
430
431  @Override
432  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
433    throw new NotImplementedException("Implement later");
434  }
435
436  @Override
437  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) {
438    throw new NotImplementedException("Implement later");
439  }
440
441  @Override
442  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
443    throw new NotImplementedException("Implement later");
444  }
445
446  @Override
447  public Result mutateRow(RowMutations rm) throws IOException {
448    TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
449    try {
450      client.mutateRow(tableNameInBytes, tRowMutations);
451      return Result.EMPTY_RESULT;
452    }  catch (TException e) {
453      throw new IOException(e);
454    }
455  }
456
457  @Override
458  public Result append(Append append) throws IOException {
459    TAppend tAppend = ThriftUtilities.appendFromHBase(append);
460    try {
461      TResult tResult = client.append(tableNameInBytes, tAppend);
462      return ThriftUtilities.resultFromThrift(tResult);
463    }  catch (TException e) {
464      throw new IOException(e);
465    }
466  }
467
468  @Override
469  public Result increment(Increment increment) throws IOException {
470    TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment);
471    try {
472      TResult tResult = client.increment(tableNameInBytes, tIncrement);
473      return ThriftUtilities.resultFromThrift(tResult);
474    }  catch (TException e) {
475      throw new IOException(e);
476    }
477  }
478
479  @Override
480  public void close() throws IOException {
481    tTransport.close();
482  }
483
484  @Override
485  public long getRpcTimeout(TimeUnit unit) {
486    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
487  }
488
489  @Override
490  public long getReadRpcTimeout(TimeUnit unit) {
491    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
492  }
493
494  @Override
495  public long getWriteRpcTimeout(TimeUnit unit) {
496    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
497  }
498
499  @Override
500  public long getOperationTimeout(TimeUnit unit) {
501    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
502  }
503
504  @Override
505  public CoprocessorRpcChannel coprocessorService(byte[] row) {
506    throw new NotImplementedException("coprocessorService not supported in ThriftTable");
507  }
508
509  @Override
510  public RegionLocator getRegionLocator() throws IOException {
511    throw new NotImplementedException("getRegionLocator not supported in ThriftTable");
512  }
513}