001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2.client;
020
021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING;
022import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayDeque;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Queue;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.lang3.NotImplementedException;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Increment;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionLocator;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Row;
047import org.apache.hadoop.hbase.client.RowMutations;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.coprocessor.Batch;
052import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
053import org.apache.hadoop.hbase.filter.Filter;
054import org.apache.hadoop.hbase.io.TimeRange;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
056import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
057import org.apache.hadoop.hbase.thrift2.generated.TAppend;
058import org.apache.hadoop.hbase.thrift2.generated.TDelete;
059import org.apache.hadoop.hbase.thrift2.generated.TGet;
060import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
061import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
062import org.apache.hadoop.hbase.thrift2.generated.TPut;
063import org.apache.hadoop.hbase.thrift2.generated.TResult;
064import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
065import org.apache.hadoop.hbase.thrift2.generated.TScan;
066import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.thrift.TException;
069import org.apache.thrift.transport.TTransport;
070import org.apache.yetus.audience.InterfaceAudience;
071
072import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
073import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
074
075@InterfaceAudience.Private
076public class ThriftTable implements Table {
077
078  private TableName tableName;
079  private Configuration conf;
080  private TTransport tTransport;
081  private THBaseService.Client client;
082  private ByteBuffer tableNameInBytes;
083  private int operationTimeout;
084
085  private final int scannerCaching;
086
087  public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport,
088      Configuration conf) {
089    this.tableName = tableName;
090    this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes());
091    this.conf = conf;
092    this.tTransport = tTransport;
093    this.client = client;
094    this.scannerCaching = conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING,
095        HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT);
096    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
097        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
098
099
100  }
101
102  @Override
103  public TableName getName() {
104    return tableName;
105  }
106
107  @Override
108  public Configuration getConfiguration() {
109    return conf;
110  }
111
112  @Override
113  public TableDescriptor getDescriptor() throws IOException {
114    try {
115      TTableDescriptor tableDescriptor = client
116          .getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName));
117      return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor);
118    } catch (TException e) {
119      throw new IOException(e);
120    }
121  }
122
123  @Override
124  public boolean exists(Get get) throws IOException {
125    TGet tGet = ThriftUtilities.getFromHBase(get);
126    try {
127      return client.exists(tableNameInBytes, tGet);
128    }  catch (TException e) {
129      throw new IOException(e);
130    }
131  }
132
133  @Override
134  public boolean[] exists(List<Get> gets) throws IOException {
135    List<TGet> tGets = new ArrayList<>();
136    for (Get get: gets) {
137      tGets.add(ThriftUtilities.getFromHBase(get));
138    }
139    try {
140      List<Boolean> results = client.existsAll(tableNameInBytes, tGets);
141      return Booleans.toArray(results);
142    }  catch (TException e) {
143      throw new IOException(e);
144    }
145  }
146
147  @Override
148  public void batch(List<? extends Row> actions, Object[] results)
149      throws IOException {
150    throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), "
151        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
152
153
154  }
155
156  @Override
157  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
158      Batch.Callback<R> callback) throws IOException {
159    throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), "
160        + "get(List<Get> gets) or delete(List<Delete> deletes) respectively");
161  }
162
163  @Override
164  public Result get(Get get) throws IOException {
165    TGet tGet = ThriftUtilities.getFromHBase(get);
166    try {
167      TResult tResult = client.get(tableNameInBytes, tGet);
168      return ThriftUtilities.resultFromThrift(tResult);
169    }  catch (TException e) {
170      throw new IOException(e);
171    }
172  }
173
174  @Override
175  public Result[] get(List<Get> gets) throws IOException {
176    List<TGet> tGets = ThriftUtilities.getsFromHBase(gets);
177    try {
178      List<TResult> results = client.getMultiple(tableNameInBytes, tGets);
179      return ThriftUtilities.resultsFromThrift(results);
180    }  catch (TException e) {
181      throw new IOException(e);
182    }
183  }
184
185  /**
186   * A scanner to perform scan from thrift server
187   * getScannerResults is used in this scanner
188   */
189  private class Scanner implements ResultScanner {
190    protected TScan scan;
191    protected Result lastResult = null;
192    protected final Queue<Result> cache = new ArrayDeque<>();;
193
194
195    public Scanner(Scan scan) throws IOException {
196      if (scan.getBatch() > 0) {
197        throw new IOException("Batch is not supported in Scanner");
198      }
199      if (scan.getCaching() <= 0) {
200        scan.setCaching(scannerCaching);
201      } else if (scan.getCaching() == 1 && scan.isReversed()){
202        // for reverse scan, we need to pass the last row to the next scanner
203        // we need caching number bigger than 1
204        scan.setCaching(scan.getCaching() + 1);
205      }
206      this.scan = ThriftUtilities.scanFromHBase(scan);
207    }
208
209
210    @Override
211    public Result next() throws IOException {
212      if (cache.size() == 0) {
213        setupNextScanner();
214        try {
215          List<TResult> tResults = client
216              .getScannerResults(tableNameInBytes, scan, scan.getCaching());
217          Result[] results = ThriftUtilities.resultsFromThrift(tResults);
218          boolean firstKey = true;
219          for (Result result : results) {
220            // If it is a reverse scan, we use the last result's key as the startkey, since there is
221            // no way to construct a closet rowkey smaller than the last result
222            // So when the results return, we must rule out the first result, since it has already
223            // returned to user.
224            if (firstKey) {
225              firstKey = false;
226              if (scan.isReversed() && lastResult != null) {
227                if (Bytes.equals(lastResult.getRow(), result.getRow())) {
228                  continue;
229                }
230              }
231            }
232            cache.add(result);
233            lastResult = result;
234          }
235        } catch (TException e) {
236          throw new IOException(e);
237        }
238      }
239
240      if (cache.size() > 0) {
241        return cache.poll();
242      } else {
243        //scan finished
244        return null;
245      }
246    }
247
248    @Override
249    public void close() {
250    }
251
252    @Override
253    public boolean renewLease() {
254      throw new RuntimeException("renewLease() not supported");
255    }
256
257    @Override
258    public ScanMetrics getScanMetrics() {
259      throw new RuntimeException("getScanMetrics() not supported");
260    }
261
262    private void setupNextScanner() {
263      //if lastResult is null null, it means it is not the fist scan
264      if (lastResult!= null) {
265        byte[] lastRow = lastResult.getRow();
266        if (scan.isReversed()) {
267          //for reverse scan, we can't find the closet row before this row
268          scan.setStartRow(lastRow);
269        } else {
270          scan.setStartRow(createClosestRowAfter(lastRow));
271        }
272      }
273    }
274
275
276    /**
277     * Create the closest row after the specified row
278     */
279    protected byte[] createClosestRowAfter(byte[] row) {
280      if (row == null) {
281        throw new RuntimeException("The passed row is null");
282      }
283      return Arrays.copyOf(row, row.length + 1);
284    }
285  }
286
287  @Override
288  public ResultScanner getScanner(Scan scan) throws IOException {
289    return new Scanner(scan);
290  }
291
292  @Override
293  public ResultScanner getScanner(byte[] family) throws IOException {
294    Scan scan = new Scan();
295    scan.addFamily(family);
296    return getScanner(scan);
297  }
298
299  @Override
300  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
301    Scan scan = new Scan();
302    scan.addColumn(family, qualifier);
303    return getScanner(scan);
304  }
305
306  @Override
307  public void put(Put put) throws IOException {
308    TPut tPut = ThriftUtilities.putFromHBase(put);
309    try {
310      client.put(tableNameInBytes, tPut);
311    }  catch (TException e) {
312      throw new IOException(e);
313    }
314  }
315
316  @Override
317  public void put(List<Put> puts) throws IOException {
318    List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts);
319    try {
320      client.putMultiple(tableNameInBytes, tPuts);
321    }  catch (TException e) {
322      throw new IOException(e);
323    }
324  }
325
326  @Override
327  public void delete(Delete delete) throws IOException {
328    TDelete tDelete = ThriftUtilities.deleteFromHBase(delete);
329    try {
330      client.deleteSingle(tableNameInBytes, tDelete);
331    }  catch (TException e) {
332      throw new IOException(e);
333    }
334  }
335
336  @Override
337  public void delete(List<Delete> deletes) throws IOException {
338    List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes);
339    try {
340      client.deleteMultiple(tableNameInBytes, tDeletes);
341    }  catch (TException e) {
342      throw new IOException(e);
343    }
344  }
345
346  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
347
348    private final byte[] row;
349    private final byte[] family;
350    private byte[] qualifier;
351    private CompareOperator op;
352    private byte[] value;
353
354    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
355      this.row = Preconditions.checkNotNull(row, "row is null");
356      this.family = Preconditions.checkNotNull(family, "family is null");
357    }
358
359    @Override
360    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
361      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
362          " an empty byte array, or just do not call this method if you want a null qualifier");
363      return this;
364    }
365
366    @Override
367    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
368      throw new NotImplementedException("timeRange not supported in ThriftTable");
369    }
370
371    @Override
372    public CheckAndMutateBuilder ifNotExists() {
373      this.op = CompareOperator.EQUAL;
374      this.value = null;
375      return this;
376    }
377
378    @Override
379    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
380      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
381      this.value = Preconditions.checkNotNull(value, "value is null");
382      return this;
383    }
384
385    private void preCheck() {
386      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
387          " calling ifNotExists/ifEquals/ifMatches before executing the request");
388    }
389
390    @Override
391    public boolean thenPut(Put put) throws IOException {
392      preCheck();
393      RowMutations rowMutations = new RowMutations(put.getRow());
394      rowMutations.add(put);
395      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
396    }
397
398    @Override
399    public boolean thenDelete(Delete delete) throws IOException {
400      preCheck();
401      RowMutations rowMutations = new RowMutations(delete.getRow());
402      rowMutations.add(delete);
403      return checkAndMutate(row, family, qualifier, op, value, rowMutations);
404    }
405
406    @Override
407    public boolean thenMutate(RowMutations mutation) throws IOException {
408      preCheck();
409      return checkAndMutate(row, family, qualifier, op, value, mutation);
410    }
411  }
412
413
414  @Override
415  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
416      byte[] value, RowMutations mutation) throws IOException {
417    try {
418      ByteBuffer valueBuffer = value == null? null : ByteBuffer.wrap(value);
419      return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family),
420          ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer,
421          ThriftUtilities.rowMutationsFromHBase(mutation));
422    } catch (TException e) {
423      throw new IOException(e);
424    }
425  }
426
427  @Override
428  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
429    return new CheckAndMutateBuilderImpl(row, family);
430  }
431
432  @Override
433  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
434    throw new NotImplementedException("Implement later");
435  }
436
437  @Override
438  public void mutateRow(RowMutations rm) throws IOException {
439    TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
440    try {
441      client.mutateRow(tableNameInBytes, tRowMutations);
442    }  catch (TException e) {
443      throw new IOException(e);
444    }
445  }
446
447  @Override
448  public Result append(Append append) throws IOException {
449    TAppend tAppend = ThriftUtilities.appendFromHBase(append);
450    try {
451      TResult tResult = client.append(tableNameInBytes, tAppend);
452      return ThriftUtilities.resultFromThrift(tResult);
453    }  catch (TException e) {
454      throw new IOException(e);
455    }
456  }
457
458  @Override
459  public Result increment(Increment increment) throws IOException {
460    TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment);
461    try {
462      TResult tResult = client.increment(tableNameInBytes, tIncrement);
463      return ThriftUtilities.resultFromThrift(tResult);
464    }  catch (TException e) {
465      throw new IOException(e);
466    }
467  }
468
469  @Override
470  public void close() throws IOException {
471    tTransport.close();
472  }
473
474  @Override
475  public long getRpcTimeout(TimeUnit unit) {
476    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
477  }
478
479  @Override
480  public long getReadRpcTimeout(TimeUnit unit) {
481    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
482  }
483
484  @Override
485  public long getWriteRpcTimeout(TimeUnit unit) {
486    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
487  }
488
489  @Override
490  public long getOperationTimeout(TimeUnit unit) {
491    return unit.convert(operationTimeout, TimeUnit.MILLISECONDS);
492  }
493
494  @Override
495  public CoprocessorRpcChannel coprocessorService(byte[] row) {
496    throw new NotImplementedException("coprocessorService not supported in ThriftTable");
497  }
498
499  @Override
500  public RegionLocator getRegionLocator() throws IOException {
501    throw new NotImplementedException("getRegionLocator not supported in ThriftTable");
502  }
503}