001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift2;
019
020import static java.nio.ByteBuffer.wrap;
021import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
022import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
023import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
024import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
025import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
026import static org.junit.Assert.assertArrayEquals;
027import static org.junit.Assert.assertEquals;
028import static org.junit.Assert.assertFalse;
029import static org.junit.Assert.assertNull;
030import static org.junit.Assert.assertTrue;
031import static org.junit.Assert.fail;
032
033import java.io.IOException;
034import java.io.InterruptedIOException;
035import java.nio.ByteBuffer;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Collections;
039import java.util.Comparator;
040import java.util.HashMap;
041import java.util.List;
042import java.util.Map;
043import java.util.Optional;
044import java.util.concurrent.TimeUnit;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CompatibilityFactory;
048import org.apache.hadoop.hbase.CoprocessorEnvironment;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HColumnDescriptor;
052import org.apache.hadoop.hbase.HTableDescriptor;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.Delete;
056import org.apache.hadoop.hbase.client.Durability;
057import org.apache.hadoop.hbase.client.Get;
058import org.apache.hadoop.hbase.client.Increment;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.coprocessor.ObserverContext;
063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
065import org.apache.hadoop.hbase.coprocessor.RegionObserver;
066import org.apache.hadoop.hbase.filter.ParseFilter;
067import org.apache.hadoop.hbase.security.UserProvider;
068import org.apache.hadoop.hbase.test.MetricsAssertHelper;
069import org.apache.hadoop.hbase.testclassification.ClientTests;
070import org.apache.hadoop.hbase.testclassification.MediumTests;
071import org.apache.hadoop.hbase.thrift.ErrorThrowingGetObserver;
072import org.apache.hadoop.hbase.thrift.ThriftMetrics;
073import org.apache.hadoop.hbase.thrift2.generated.TAppend;
074import org.apache.hadoop.hbase.thrift2.generated.TColumn;
075import org.apache.hadoop.hbase.thrift2.generated.TColumnIncrement;
076import org.apache.hadoop.hbase.thrift2.generated.TColumnValue;
077import org.apache.hadoop.hbase.thrift2.generated.TCompareOp;
078import org.apache.hadoop.hbase.thrift2.generated.TDelete;
079import org.apache.hadoop.hbase.thrift2.generated.TDeleteType;
080import org.apache.hadoop.hbase.thrift2.generated.TDurability;
081import org.apache.hadoop.hbase.thrift2.generated.TGet;
082import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
083import org.apache.hadoop.hbase.thrift2.generated.TIOError;
084import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
085import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
086import org.apache.hadoop.hbase.thrift2.generated.TMutation;
087import org.apache.hadoop.hbase.thrift2.generated.TPut;
088import org.apache.hadoop.hbase.thrift2.generated.TReadType;
089import org.apache.hadoop.hbase.thrift2.generated.TResult;
090import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
091import org.apache.hadoop.hbase.thrift2.generated.TScan;
092import org.apache.hadoop.hbase.thrift2.generated.TTimeRange;
093import org.apache.hadoop.hbase.util.Bytes;
094import org.apache.thrift.TException;
095import org.junit.AfterClass;
096import org.junit.Before;
097import org.junit.BeforeClass;
098import org.junit.ClassRule;
099import org.junit.Rule;
100import org.junit.Test;
101import org.junit.experimental.categories.Category;
102import org.junit.rules.TestName;
103import org.slf4j.Logger;
104import org.slf4j.LoggerFactory;
105
106import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
107import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
108
109/**
110 * Unit testing for ThriftServer.HBaseHandler, a part of the org.apache.hadoop.hbase.thrift2
111 * package.
112 */
113@Category({ClientTests.class, MediumTests.class})
114public class TestThriftHBaseServiceHandler {
115
116  @ClassRule
117  public static final HBaseClassTestRule CLASS_RULE =
118      HBaseClassTestRule.forClass(TestThriftHBaseServiceHandler.class);
119
120  private static final Logger LOG = LoggerFactory.getLogger(TestThriftHBaseServiceHandler.class);
121  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
122
123  // Static names for tables, columns, rows, and values
124  private static byte[] tableAname = Bytes.toBytes("tableA");
125  private static byte[] familyAname = Bytes.toBytes("familyA");
126  private static byte[] familyBname = Bytes.toBytes("familyB");
127  private static byte[] qualifierAname = Bytes.toBytes("qualifierA");
128  private static byte[] qualifierBname = Bytes.toBytes("qualifierB");
129  private static byte[] valueAname = Bytes.toBytes("valueA");
130  private static byte[] valueBname = Bytes.toBytes("valueB");
131  private static HColumnDescriptor[] families = new HColumnDescriptor[] {
132      new HColumnDescriptor(familyAname).setMaxVersions(3),
133      new HColumnDescriptor(familyBname).setMaxVersions(2)
134  };
135
136
137  private static final MetricsAssertHelper metricsHelper =
138      CompatibilityFactory.getInstance(MetricsAssertHelper.class);
139
140  @Rule
141  public TestName name = new TestName();
142
143
144  public void assertTColumnValuesEqual(List<TColumnValue> columnValuesA,
145      List<TColumnValue> columnValuesB) {
146    assertEquals(columnValuesA.size(), columnValuesB.size());
147    Comparator<TColumnValue> comparator = new Comparator<TColumnValue>() {
148      @Override
149      public int compare(TColumnValue o1, TColumnValue o2) {
150        return Bytes.compareTo(Bytes.add(o1.getFamily(), o1.getQualifier()),
151            Bytes.add(o2.getFamily(), o2.getQualifier()));
152      }
153    };
154    Collections.sort(columnValuesA, comparator);
155    Collections.sort(columnValuesB, comparator);
156
157    for (int i = 0; i < columnValuesA.size(); i++) {
158      TColumnValue a = columnValuesA.get(i);
159      TColumnValue b = columnValuesB.get(i);
160      assertTColumnValueEqual(a, b);
161    }
162  }
163
164  public void assertTColumnValueEqual(TColumnValue a, TColumnValue b) {
165    assertArrayEquals(a.getFamily(), b.getFamily());
166    assertArrayEquals(a.getQualifier(), b.getQualifier());
167    assertArrayEquals(a.getValue(), b.getValue());
168  }
169
170  @BeforeClass
171  public static void beforeClass() throws Exception {
172    UTIL.getConfiguration().set("hbase.client.retries.number", "3");
173    UTIL.startMiniCluster();
174    Admin admin = UTIL.getAdmin();
175    HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableAname));
176    for (HColumnDescriptor family : families) {
177      tableDescriptor.addFamily(family);
178    }
179    admin.createTable(tableDescriptor);
180    admin.close();
181  }
182
183  @AfterClass
184  public static void afterClass() throws Exception {
185    UTIL.shutdownMiniCluster();
186  }
187
188  @Before
189  public void setup() throws Exception {
190
191  }
192
193  private ThriftHBaseServiceHandler createHandler() throws TException {
194    try {
195      Configuration conf = UTIL.getConfiguration();
196      return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
197    } catch (IOException ie) {
198      throw new TException(ie);
199    }
200  }
201
202  @Test
203  public void testExists() throws TIOError, TException {
204    ThriftHBaseServiceHandler handler = createHandler();
205    byte[] rowName = Bytes.toBytes("testExists");
206    ByteBuffer table = wrap(tableAname);
207
208    TGet get = new TGet(wrap(rowName));
209    assertFalse(handler.exists(table, get));
210
211    List<TColumnValue> columnValues = new ArrayList<>(2);
212    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
213    columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname)));
214    TPut put = new TPut(wrap(rowName), columnValues);
215    put.setColumnValues(columnValues);
216
217    handler.put(table, put);
218
219    assertTrue(handler.exists(table, get));
220  }
221
222  @Test
223  public void testExistsAll() throws TIOError, TException {
224    ThriftHBaseServiceHandler handler = createHandler();
225    byte[] rowName1 = Bytes.toBytes("testExistsAll1");
226    byte[] rowName2 = Bytes.toBytes("testExistsAll2");
227    ByteBuffer table = wrap(tableAname);
228
229    List<TGet> gets = new ArrayList<>();
230    gets.add(new TGet(wrap(rowName2)));
231    gets.add(new TGet(wrap(rowName2)));
232    List<Boolean> existsResult1 = handler.existsAll(table, gets);
233    assertFalse(existsResult1.get(0));
234    assertFalse(existsResult1.get(1));
235
236    List<TColumnValue> columnValues = new ArrayList<TColumnValue>();
237    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
238    columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname)));
239    List<TPut> puts = new ArrayList<TPut>();
240    puts.add(new TPut(wrap(rowName1), columnValues));
241    puts.add(new TPut(wrap(rowName2), columnValues));
242
243    handler.putMultiple(table, puts);
244    List<Boolean> existsResult2 = handler.existsAll(table,gets );
245
246    assertTrue(existsResult2.get(0));
247    assertTrue(existsResult2.get(1));
248  }
249
250  @Test
251  public void testPutGet() throws Exception {
252    ThriftHBaseServiceHandler handler = createHandler();
253    byte[] rowName = Bytes.toBytes("testPutGet");
254    ByteBuffer table = wrap(tableAname);
255
256    List<TColumnValue> columnValues = new ArrayList<>(2);
257    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
258    columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname)));
259    TPut put = new TPut(wrap(rowName), columnValues);
260
261    put.setColumnValues(columnValues);
262
263    handler.put(table, put);
264
265    TGet get = new TGet(wrap(rowName));
266
267    TResult result = handler.get(table, get);
268    assertArrayEquals(rowName, result.getRow());
269    List<TColumnValue> returnedColumnValues = result.getColumnValues();
270    assertTColumnValuesEqual(columnValues, returnedColumnValues);
271  }
272
273  @Test
274  public void testPutGetMultiple() throws Exception {
275    ThriftHBaseServiceHandler handler = createHandler();
276    ByteBuffer table = wrap(tableAname);
277    byte[] rowName1 = Bytes.toBytes("testPutGetMultiple1");
278    byte[] rowName2 = Bytes.toBytes("testPutGetMultiple2");
279
280    List<TColumnValue> columnValues = new ArrayList<>(2);
281    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
282    columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname)));
283    List<TPut> puts = new ArrayList<>(2);
284    puts.add(new TPut(wrap(rowName1), columnValues));
285    puts.add(new TPut(wrap(rowName2), columnValues));
286
287    handler.putMultiple(table, puts);
288
289    List<TGet> gets = new ArrayList<>(2);
290    gets.add(new TGet(wrap(rowName1)));
291    gets.add(new TGet(wrap(rowName2)));
292
293    List<TResult> results = handler.getMultiple(table, gets);
294    assertEquals(2, results.size());
295
296    assertArrayEquals(rowName1, results.get(0).getRow());
297    assertTColumnValuesEqual(columnValues, results.get(0).getColumnValues());
298
299    assertArrayEquals(rowName2, results.get(1).getRow());
300    assertTColumnValuesEqual(columnValues, results.get(1).getColumnValues());
301  }
302
303  @Test
304  public void testDeleteMultiple() throws Exception {
305    ThriftHBaseServiceHandler handler = createHandler();
306    ByteBuffer table = wrap(tableAname);
307    byte[] rowName1 = Bytes.toBytes("testDeleteMultiple1");
308    byte[] rowName2 = Bytes.toBytes("testDeleteMultiple2");
309
310    List<TColumnValue> columnValues = new ArrayList<>(2);
311    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
312    columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname)));
313    List<TPut> puts = new ArrayList<>(2);
314    puts.add(new TPut(wrap(rowName1), columnValues));
315    puts.add(new TPut(wrap(rowName2), columnValues));
316
317    handler.putMultiple(table, puts);
318
319    List<TDelete> deletes = new ArrayList<>(2);
320    deletes.add(new TDelete(wrap(rowName1)));
321    deletes.add(new TDelete(wrap(rowName2)));
322
323    List<TDelete> deleteResults = handler.deleteMultiple(table, deletes);
324    // 0 means they were all successfully applies
325    assertEquals(0, deleteResults.size());
326
327    assertFalse(handler.exists(table, new TGet(wrap(rowName1))));
328    assertFalse(handler.exists(table, new TGet(wrap(rowName2))));
329  }
330
331  @Test
332  public void testDelete() throws Exception {
333    ThriftHBaseServiceHandler handler = createHandler();
334    byte[] rowName = Bytes.toBytes("testDelete");
335    ByteBuffer table = wrap(tableAname);
336
337    List<TColumnValue> columnValues = new ArrayList<>(2);
338    TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
339      wrap(valueAname));
340    TColumnValue columnValueB = new TColumnValue(wrap(familyBname), wrap(qualifierBname),
341      wrap(valueBname));
342    columnValues.add(columnValueA);
343    columnValues.add(columnValueB);
344    TPut put = new TPut(wrap(rowName), columnValues);
345
346    put.setColumnValues(columnValues);
347
348    handler.put(table, put);
349
350    TDelete delete = new TDelete(wrap(rowName));
351    List<TColumn> deleteColumns = new ArrayList<>(1);
352    TColumn deleteColumn = new TColumn(wrap(familyAname));
353    deleteColumn.setQualifier(qualifierAname);
354    deleteColumns.add(deleteColumn);
355    delete.setColumns(deleteColumns);
356
357    handler.deleteSingle(table, delete);
358
359    TGet get = new TGet(wrap(rowName));
360    TResult result = handler.get(table, get);
361    assertArrayEquals(rowName, result.getRow());
362    List<TColumnValue> returnedColumnValues = result.getColumnValues();
363    List<TColumnValue> expectedColumnValues = new ArrayList<>(1);
364    expectedColumnValues.add(columnValueB);
365    assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues);
366  }
367
368  @Test
369  public void testDeleteAllTimestamps() throws Exception {
370    ThriftHBaseServiceHandler handler = createHandler();
371    byte[] rowName = Bytes.toBytes("testDeleteAllTimestamps");
372    ByteBuffer table = wrap(tableAname);
373
374    List<TColumnValue> columnValues = new ArrayList<>(1);
375    TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
376      wrap(valueAname));
377    columnValueA.setTimestamp(System.currentTimeMillis() - 10);
378    columnValues.add(columnValueA);
379    TPut put = new TPut(wrap(rowName), columnValues);
380
381    put.setColumnValues(columnValues);
382
383    handler.put(table, put);
384    columnValueA.setTimestamp(System.currentTimeMillis());
385    handler.put(table, put);
386
387    TGet get = new TGet(wrap(rowName));
388    get.setMaxVersions(2);
389    TResult result = handler.get(table, get);
390    assertEquals(2, result.getColumnValuesSize());
391
392    TDelete delete = new TDelete(wrap(rowName));
393    List<TColumn> deleteColumns = new ArrayList<>(1);
394    TColumn deleteColumn = new TColumn(wrap(familyAname));
395    deleteColumn.setQualifier(qualifierAname);
396    deleteColumns.add(deleteColumn);
397    delete.setColumns(deleteColumns);
398    delete.setDeleteType(TDeleteType.DELETE_COLUMNS); // This is the default anyway.
399
400    handler.deleteSingle(table, delete);
401
402    get = new TGet(wrap(rowName));
403    result = handler.get(table, get);
404    assertNull(result.getRow());
405    assertEquals(0, result.getColumnValuesSize());
406  }
407
408  @Test
409  public void testDeleteSingleTimestamp() throws Exception {
410    ThriftHBaseServiceHandler handler = createHandler();
411    byte[] rowName = Bytes.toBytes("testDeleteSingleTimestamp");
412    ByteBuffer table = wrap(tableAname);
413
414    long timestamp1 = System.currentTimeMillis() - 10;
415    long timestamp2 = System.currentTimeMillis();
416
417    List<TColumnValue> columnValues = new ArrayList<>(1);
418    TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
419      wrap(valueAname));
420    columnValueA.setTimestamp(timestamp1);
421    columnValues.add(columnValueA);
422    TPut put = new TPut(wrap(rowName), columnValues);
423
424    put.setColumnValues(columnValues);
425
426    handler.put(table, put);
427    columnValueA.setTimestamp(timestamp2);
428    handler.put(table, put);
429
430    TGet get = new TGet(wrap(rowName));
431    get.setMaxVersions(2);
432    TResult result = handler.get(table, get);
433    assertEquals(2, result.getColumnValuesSize());
434
435    TDelete delete = new TDelete(wrap(rowName));
436    List<TColumn> deleteColumns = new ArrayList<>(1);
437    TColumn deleteColumn = new TColumn(wrap(familyAname));
438    deleteColumn.setQualifier(qualifierAname);
439    deleteColumns.add(deleteColumn);
440    delete.setColumns(deleteColumns);
441    delete.setDeleteType(TDeleteType.DELETE_COLUMN);
442
443    handler.deleteSingle(table, delete);
444
445    get = new TGet(wrap(rowName));
446    result = handler.get(table, get);
447    assertArrayEquals(rowName, result.getRow());
448    assertEquals(1, result.getColumnValuesSize());
449    // the older timestamp should remain.
450    assertEquals(timestamp1, result.getColumnValues().get(0).getTimestamp());
451  }
452
453  @Test
454  public void testDeleteFamily() throws Exception {
455    ThriftHBaseServiceHandler handler = createHandler();
456    byte[] rowName = Bytes.toBytes("testDeleteFamily");
457    ByteBuffer table = wrap(tableAname);
458
459    long timestamp1 = System.currentTimeMillis() - 10;
460    long timestamp2 = System.currentTimeMillis();
461
462    List<TColumnValue> columnValues = new ArrayList<>();
463    TColumnValue columnValueA =
464        new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname));
465    columnValueA.setTimestamp(timestamp1);
466    columnValues.add(columnValueA);
467    TPut put = new TPut(wrap(rowName), columnValues);
468
469    put.setColumnValues(columnValues);
470
471    handler.put(table, put);
472    columnValueA.setTimestamp(timestamp2);
473    handler.put(table, put);
474
475    TGet get = new TGet(wrap(rowName));
476    get.setMaxVersions(2);
477    TResult result = handler.get(table, get);
478    assertEquals(2, result.getColumnValuesSize());
479
480    TDelete delete = new TDelete(wrap(rowName));
481    List<TColumn> deleteColumns = new ArrayList<>();
482    TColumn deleteColumn = new TColumn(wrap(familyAname));
483    deleteColumns.add(deleteColumn);
484    delete.setColumns(deleteColumns);
485    delete.setDeleteType(TDeleteType.DELETE_FAMILY);
486
487    handler.deleteSingle(table, delete);
488
489    get = new TGet(wrap(rowName));
490    result = handler.get(table, get);
491    assertArrayEquals(null, result.getRow());
492    assertEquals(0, result.getColumnValuesSize());
493  }
494
495  @Test
496  public void testDeleteFamilyVersion() throws Exception {
497    ThriftHBaseServiceHandler handler = createHandler();
498    byte[] rowName = Bytes.toBytes("testDeleteFamilyVersion");
499    ByteBuffer table = wrap(tableAname);
500
501    long timestamp1 = System.currentTimeMillis() - 10;
502    long timestamp2 = System.currentTimeMillis();
503
504    List<TColumnValue> columnValues = new ArrayList<>();
505    TColumnValue columnValueA =
506        new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname));
507    columnValueA.setTimestamp(timestamp1);
508    columnValues.add(columnValueA);
509    TPut put = new TPut(wrap(rowName), columnValues);
510
511    put.setColumnValues(columnValues);
512
513    handler.put(table, put);
514    columnValueA.setTimestamp(timestamp2);
515    handler.put(table, put);
516
517    TGet get = new TGet(wrap(rowName));
518    get.setMaxVersions(2);
519    TResult result = handler.get(table, get);
520    assertEquals(2, result.getColumnValuesSize());
521
522    TDelete delete = new TDelete(wrap(rowName));
523    List<TColumn> deleteColumns = new ArrayList<>();
524    TColumn deleteColumn = new TColumn(wrap(familyAname));
525    deleteColumn.setTimestamp(timestamp1);
526    deleteColumns.add(deleteColumn);
527    delete.setColumns(deleteColumns);
528    delete.setDeleteType(TDeleteType.DELETE_FAMILY_VERSION);
529
530    handler.deleteSingle(table, delete);
531
532    get = new TGet(wrap(rowName));
533    result = handler.get(table, get);
534    assertArrayEquals(rowName, result.getRow());
535    assertEquals(1, result.getColumnValuesSize());
536    assertEquals(timestamp2, result.getColumnValues().get(0).getTimestamp());
537  }
538
539  @Test
540  public void testIncrement() throws Exception {
541    ThriftHBaseServiceHandler handler = createHandler();
542    byte[] rowName = Bytes.toBytes("testIncrement");
543    ByteBuffer table = wrap(tableAname);
544
545    List<TColumnValue> columnValues = new ArrayList<>(1);
546    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
547      wrap(Bytes.toBytes(1L))));
548    TPut put = new TPut(wrap(rowName), columnValues);
549    put.setColumnValues(columnValues);
550    handler.put(table, put);
551
552    List<TColumnIncrement> incrementColumns = new ArrayList<>(1);
553    incrementColumns.add(new TColumnIncrement(wrap(familyAname), wrap(qualifierAname)));
554    TIncrement increment = new TIncrement(wrap(rowName), incrementColumns);
555    handler.increment(table, increment);
556
557    TGet get = new TGet(wrap(rowName));
558    TResult result = handler.get(table, get);
559
560    assertArrayEquals(rowName, result.getRow());
561    assertEquals(1, result.getColumnValuesSize());
562    TColumnValue columnValue = result.getColumnValues().get(0);
563    assertArrayEquals(Bytes.toBytes(2L), columnValue.getValue());
564  }
565
566  @Test
567  public void testAppend() throws Exception {
568    ThriftHBaseServiceHandler handler = createHandler();
569    byte[] rowName = Bytes.toBytes("testAppend");
570    ByteBuffer table = wrap(tableAname);
571    byte[] v1 = Bytes.toBytes("42");
572    byte[] v2 = Bytes.toBytes("23");
573    List<TColumnValue> columnValues = new ArrayList<>(1);
574    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(v1)));
575    TPut put = new TPut(wrap(rowName), columnValues);
576    put.setColumnValues(columnValues);
577    handler.put(table, put);
578
579    List<TColumnValue> appendColumns = new ArrayList<>(1);
580    appendColumns.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(v2)));
581    TAppend append = new TAppend(wrap(rowName), appendColumns);
582    handler.append(table, append);
583
584    TGet get = new TGet(wrap(rowName));
585    TResult result = handler.get(table, get);
586
587    assertArrayEquals(rowName, result.getRow());
588    assertEquals(1, result.getColumnValuesSize());
589    TColumnValue columnValue = result.getColumnValues().get(0);
590    assertArrayEquals(Bytes.add(v1, v2), columnValue.getValue());
591  }
592
593  /**
594   * check that checkAndPut fails if the cell does not exist, then put in the cell, then check
595   * that the checkAndPut succeeds.
596   */
597  @Test
598  public void testCheckAndPut() throws Exception {
599    ThriftHBaseServiceHandler handler = createHandler();
600    byte[] rowName = Bytes.toBytes("testCheckAndPut");
601    ByteBuffer table = wrap(tableAname);
602
603    List<TColumnValue> columnValuesA = new ArrayList<>(1);
604    TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
605      wrap(valueAname));
606    columnValuesA.add(columnValueA);
607    TPut putA = new TPut(wrap(rowName), columnValuesA);
608    putA.setColumnValues(columnValuesA);
609
610    List<TColumnValue> columnValuesB = new ArrayList<>(1);
611    TColumnValue columnValueB = new TColumnValue(wrap(familyBname), wrap(qualifierBname),
612      wrap(valueBname));
613    columnValuesB.add(columnValueB);
614    TPut putB = new TPut(wrap(rowName), columnValuesB);
615    putB.setColumnValues(columnValuesB);
616
617    assertFalse(handler.checkAndPut(table, wrap(rowName), wrap(familyAname),
618      wrap(qualifierAname), wrap(valueAname), putB));
619
620    TGet get = new TGet(wrap(rowName));
621    TResult result = handler.get(table, get);
622    assertEquals(0, result.getColumnValuesSize());
623
624    handler.put(table, putA);
625
626    assertTrue(handler.checkAndPut(table, wrap(rowName), wrap(familyAname),
627      wrap(qualifierAname), wrap(valueAname), putB));
628
629    result = handler.get(table, get);
630    assertArrayEquals(rowName, result.getRow());
631    List<TColumnValue> returnedColumnValues = result.getColumnValues();
632    List<TColumnValue> expectedColumnValues = new ArrayList<>(2);
633    expectedColumnValues.add(columnValueA);
634    expectedColumnValues.add(columnValueB);
635    assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues);
636  }
637
638  /**
639   * check that checkAndDelete fails if the cell does not exist, then put in the cell, then
640   * check that the checkAndDelete succeeds.
641   */
642  @Test
643  public void testCheckAndDelete() throws Exception {
644    ThriftHBaseServiceHandler handler = createHandler();
645    byte[] rowName = Bytes.toBytes("testCheckAndDelete");
646    ByteBuffer table = wrap(tableAname);
647
648    List<TColumnValue> columnValuesA = new ArrayList<>(1);
649    TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
650      wrap(valueAname));
651    columnValuesA.add(columnValueA);
652    TPut putA = new TPut(wrap(rowName), columnValuesA);
653    putA.setColumnValues(columnValuesA);
654
655    List<TColumnValue> columnValuesB = new ArrayList<>(1);
656    TColumnValue columnValueB = new TColumnValue(wrap(familyBname), wrap(qualifierBname),
657      wrap(valueBname));
658    columnValuesB.add(columnValueB);
659    TPut putB = new TPut(wrap(rowName), columnValuesB);
660    putB.setColumnValues(columnValuesB);
661
662    // put putB so that we know whether the row has been deleted or not
663    handler.put(table, putB);
664
665    TDelete delete = new TDelete(wrap(rowName));
666
667    assertFalse(handler.checkAndDelete(table, wrap(rowName), wrap(familyAname),
668        wrap(qualifierAname), wrap(valueAname), delete));
669
670    TGet get = new TGet(wrap(rowName));
671    TResult result = handler.get(table, get);
672    assertArrayEquals(rowName, result.getRow());
673    assertTColumnValuesEqual(columnValuesB, result.getColumnValues());
674
675    handler.put(table, putA);
676
677    assertTrue(handler.checkAndDelete(table, wrap(rowName), wrap(familyAname),
678      wrap(qualifierAname), wrap(valueAname), delete));
679
680    result = handler.get(table, get);
681    assertFalse(result.isSetRow());
682    assertEquals(0, result.getColumnValuesSize());
683  }
684
685  @Test
686  public void testScan() throws Exception {
687    ThriftHBaseServiceHandler handler = createHandler();
688    ByteBuffer table = wrap(tableAname);
689
690    // insert data
691    TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
692      wrap(valueAname));
693    List<TColumnValue> columnValues = new ArrayList<>(1);
694    columnValues.add(columnValue);
695    for (int i = 0; i < 10; i++) {
696      TPut put = new TPut(wrap(Bytes.toBytes("testScan" + i)), columnValues);
697      handler.put(table, put);
698    }
699
700    // create scan instance
701    TScan scan = new TScan();
702    List<TColumn> columns = new ArrayList<>(1);
703    TColumn column = new TColumn();
704    column.setFamily(familyAname);
705    column.setQualifier(qualifierAname);
706    columns.add(column);
707    scan.setColumns(columns);
708    scan.setStartRow(Bytes.toBytes("testScan"));
709    scan.setStopRow(Bytes.toBytes("testScan\uffff"));
710
711    // get scanner and rows
712    int scanId = handler.openScanner(table, scan);
713    List<TResult> results = handler.getScannerRows(scanId, 10);
714    assertEquals(10, results.size());
715    for (int i = 0; i < 10; i++) {
716      // check if the rows are returned and in order
717      assertArrayEquals(Bytes.toBytes("testScan" + i), results.get(i).getRow());
718    }
719
720    // check that we are at the end of the scan
721    results = handler.getScannerRows(scanId, 10);
722    assertEquals(0, results.size());
723
724    // close scanner and check that it was indeed closed
725    handler.closeScanner(scanId);
726    try {
727      handler.getScannerRows(scanId, 10);
728      fail("Scanner id should be invalid");
729    } catch (TIllegalArgument e) {
730    }
731  }
732
733  /**
734   * Tests keeping a HBase scanner alive for long periods of time. Each call to getScannerRow()
735   * should reset the ConnectionCache timeout for the scanner's connection
736   */
737  @Test
738  public void testLongLivedScan() throws Exception {
739    int numTrials = 6;
740    int trialPause = 1000;
741    int cleanUpInterval = 100;
742    Configuration conf = new Configuration(UTIL.getConfiguration());
743    // Set the ConnectionCache timeout to trigger halfway through the trials
744    conf.setInt(ThriftHBaseServiceHandler.MAX_IDLETIME, (numTrials / 2) * trialPause);
745    conf.setInt(ThriftHBaseServiceHandler.CLEANUP_INTERVAL, cleanUpInterval);
746    ThriftHBaseServiceHandler handler = new ThriftHBaseServiceHandler(conf,
747        UserProvider.instantiate(conf));
748
749    ByteBuffer table = wrap(tableAname);
750    // insert data
751    TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
752        wrap(valueAname));
753    List<TColumnValue> columnValues = new ArrayList<>(1);
754    columnValues.add(columnValue);
755    for (int i = 0; i < numTrials; i++) {
756      TPut put = new TPut(wrap(Bytes.toBytes("testScan" + i)), columnValues);
757      handler.put(table, put);
758    }
759
760    // create scan instance
761    TScan scan = new TScan();
762    List<TColumn> columns = new ArrayList<>(1);
763    TColumn column = new TColumn();
764    column.setFamily(familyAname);
765    column.setQualifier(qualifierAname);
766    columns.add(column);
767    scan.setColumns(columns);
768    scan.setStartRow(Bytes.toBytes("testScan"));
769    scan.setStopRow(Bytes.toBytes("testScan\uffff"));
770    // Prevent the scanner from caching results
771    scan.setCaching(1);
772
773    // get scanner and rows
774    int scanId = handler.openScanner(table, scan);
775    for (int i = 0; i < numTrials; i++) {
776      // Make sure that the Scanner doesn't throw an exception after the ConnectionCache timeout
777      List<TResult> results = handler.getScannerRows(scanId, 1);
778      assertArrayEquals(Bytes.toBytes("testScan" + i), results.get(0).getRow());
779      Thread.sleep(trialPause);
780    }
781  }
782
783  @Test
784  public void testReverseScan() throws Exception {
785    ThriftHBaseServiceHandler handler = createHandler();
786    ByteBuffer table = wrap(tableAname);
787
788    // insert data
789    TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
790      wrap(valueAname));
791    List<TColumnValue> columnValues = new ArrayList<>(1);
792    columnValues.add(columnValue);
793    for (int i = 0; i < 10; i++) {
794      TPut put = new TPut(wrap(Bytes.toBytes("testReverseScan" + i)), columnValues);
795      handler.put(table, put);
796    }
797
798    // create reverse scan instance
799    TScan scan = new TScan();
800    scan.setReversed(true);
801    List<TColumn> columns = new ArrayList<>(1);
802    TColumn column = new TColumn();
803    column.setFamily(familyAname);
804    column.setQualifier(qualifierAname);
805    columns.add(column);
806    scan.setColumns(columns);
807    scan.setStartRow(Bytes.toBytes("testReverseScan\uffff"));
808    scan.setStopRow(Bytes.toBytes("testReverseScan"));
809
810    // get scanner and rows
811    int scanId = handler.openScanner(table, scan);
812    List<TResult> results = handler.getScannerRows(scanId, 10);
813    assertEquals(10, results.size());
814    for (int i = 0; i < 10; i++) {
815      // check if the rows are returned and in order
816      assertArrayEquals(Bytes.toBytes("testReverseScan" + (9 - i)), results.get(i).getRow());
817    }
818
819    // check that we are at the end of the scan
820    results = handler.getScannerRows(scanId, 10);
821    assertEquals(0, results.size());
822
823    // close scanner and check that it was indeed closed
824    handler.closeScanner(scanId);
825    try {
826      handler.getScannerRows(scanId, 10);
827      fail("Scanner id should be invalid");
828    } catch (TIllegalArgument e) {
829    }
830  }
831
832  @Test
833  public void testScanWithFilter() throws Exception {
834    ThriftHBaseServiceHandler handler = createHandler();
835    ByteBuffer table = wrap(tableAname);
836
837    // insert data
838    TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
839      wrap(valueAname));
840    List<TColumnValue> columnValues = new ArrayList<>(1);
841    columnValues.add(columnValue);
842    for (int i = 0; i < 10; i++) {
843      TPut put = new TPut(wrap(Bytes.toBytes("testScanWithFilter" + i)), columnValues);
844      handler.put(table, put);
845    }
846
847    // create scan instance with filter
848    TScan scan = new TScan();
849    List<TColumn> columns = new ArrayList<>(1);
850    TColumn column = new TColumn();
851    column.setFamily(familyAname);
852    column.setQualifier(qualifierAname);
853    columns.add(column);
854    scan.setColumns(columns);
855    scan.setStartRow(Bytes.toBytes("testScanWithFilter"));
856    scan.setStopRow(Bytes.toBytes("testScanWithFilter\uffff"));
857    // only get the key part
858    scan.setFilterString(wrap(Bytes.toBytes("KeyOnlyFilter()")));
859
860    // get scanner and rows
861    int scanId = handler.openScanner(table, scan);
862    List<TResult> results = handler.getScannerRows(scanId, 10);
863    assertEquals(10, results.size());
864    for (int i = 0; i < 10; i++) {
865      // check if the rows are returned and in order
866      assertArrayEquals(Bytes.toBytes("testScanWithFilter" + i), results.get(i).getRow());
867      // check that the value is indeed stripped by the filter
868      assertEquals(0, results.get(i).getColumnValues().get(0).getValue().length);
869    }
870
871    // check that we are at the end of the scan
872    results = handler.getScannerRows(scanId, 10);
873    assertEquals(0, results.size());
874
875    // close scanner and check that it was indeed closed
876    handler.closeScanner(scanId);
877    try {
878      handler.getScannerRows(scanId, 10);
879      fail("Scanner id should be invalid");
880    } catch (TIllegalArgument e) {
881    }
882  }
883
884  @Test
885  public void testScanWithColumnFamilyTimeRange() throws Exception {
886    ThriftHBaseServiceHandler handler = createHandler();
887    ByteBuffer table = wrap(tableAname);
888
889    // insert data
890    TColumnValue familyAColumnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
891        wrap(valueAname));
892    TColumnValue familyBColumnValue = new TColumnValue(wrap(familyBname), wrap(qualifierBname),
893        wrap(valueBname));
894    long minTimestamp = System.currentTimeMillis();
895    for (int i = 0; i < 10; i++) {
896      familyAColumnValue.setTimestamp(minTimestamp + i);
897      familyBColumnValue.setTimestamp(minTimestamp + i);
898      List<TColumnValue> columnValues = new ArrayList<>(2);
899      columnValues.add(familyAColumnValue);
900      columnValues.add(familyBColumnValue);
901      TPut put = new TPut(wrap(Bytes.toBytes("testScanWithColumnFamilyTimeRange" + i)),
902          columnValues);
903      handler.put(table, put);
904    }
905
906    // create scan instance with column family time range
907    TScan scan = new TScan();
908    Map<ByteBuffer,TTimeRange> colFamTimeRangeMap = new HashMap<>(2);
909    colFamTimeRangeMap.put(wrap(familyAname), new TTimeRange(minTimestamp + 3, minTimestamp + 5));
910    colFamTimeRangeMap.put(wrap(familyBname), new TTimeRange(minTimestamp + 6, minTimestamp + 9));
911    scan.setColFamTimeRangeMap(colFamTimeRangeMap);
912
913    // get scanner and rows
914    int scanId = handler.openScanner(table, scan);
915    List<TResult> results = handler.getScannerRows(scanId, 5);
916    assertEquals(5, results.size());
917    int familyACount = 0;
918    int familyBCount = 0;
919    for (TResult result : results) {
920      List<TColumnValue> columnValues = result.getColumnValues();
921      if (CollectionUtils.isNotEmpty(columnValues)) {
922        if (Bytes.equals(familyAname, columnValues.get(0).getFamily())) {
923          familyACount++;
924        } else if (Bytes.equals(familyBname, columnValues.get(0).getFamily())) {
925          familyBCount++;
926        }
927      }
928    }
929    assertEquals(2, familyACount);
930    assertEquals(3, familyBCount);
931
932    // check that we are at the end of the scan
933    results = handler.getScannerRows(scanId, 1);
934    assertEquals(0, results.size());
935
936    // close scanner and check that it was indeed closed
937    handler.closeScanner(scanId);
938    try {
939      handler.getScannerRows(scanId, 1);
940      fail("Scanner id should be invalid");
941    } catch (TIllegalArgument e) {
942    }
943  }
944
945  @Test
946  public void testSmallScan() throws Exception {
947    ThriftHBaseServiceHandler handler = createHandler();
948    ByteBuffer table = wrap(tableAname);
949
950    // insert data
951    TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
952            wrap(valueAname));
953    List<TColumnValue> columnValues = new ArrayList<>();
954    columnValues.add(columnValue);
955    for (int i = 0; i < 10; i++) {
956      TPut put = new TPut(wrap(Bytes.toBytes("testSmallScan" + i)), columnValues);
957      handler.put(table, put);
958    }
959
960    // small scan instance
961    TScan scan = new TScan();
962    scan.setStartRow(Bytes.toBytes("testSmallScan"));
963    scan.setStopRow(Bytes.toBytes("testSmallScan\uffff"));
964    scan.setReadType(TReadType.PREAD);
965    scan.setCaching(2);
966
967    // get scanner and rows
968    int scanId = handler.openScanner(table, scan);
969    List<TResult> results = handler.getScannerRows(scanId, 10);
970    assertEquals(10, results.size());
971    for (int i = 0; i < 10; i++) {
972      // check if the rows are returned and in order
973      assertArrayEquals(Bytes.toBytes("testSmallScan" + i), results.get(i).getRow());
974    }
975
976    // check that we are at the end of the scan
977    results = handler.getScannerRows(scanId, 10);
978    assertEquals(0, results.size());
979
980    // close scanner and check that it was indeed closed
981    handler.closeScanner(scanId);
982    try {
983      handler.getScannerRows(scanId, 10);
984      fail("Scanner id should be invalid");
985    } catch (TIllegalArgument e) {
986    }
987  }
988
989  @Test
990  public void testPutTTL() throws Exception {
991    ThriftHBaseServiceHandler handler = createHandler();
992    byte[] rowName = Bytes.toBytes("testPutTTL");
993    ByteBuffer table = wrap(tableAname);
994    List<TColumnValue> columnValues = new ArrayList<>(1);
995
996    // Add some dummy data
997    columnValues.add(
998        new TColumnValue(
999            wrap(familyAname),
1000            wrap(qualifierAname),
1001            wrap(Bytes.toBytes(1L))));
1002
1003
1004    TPut put = new TPut(wrap(rowName), columnValues);
1005    put.setColumnValues(columnValues);
1006
1007    Map<ByteBuffer, ByteBuffer> attributes = new HashMap<>();
1008
1009    // Time in ms for the kv's to live.
1010    long ttlTimeMs = 2000L;
1011
1012    // the _ttl attribute is a number of ms ttl for key values in this put.
1013    attributes.put(wrap(Bytes.toBytes("_ttl")), wrap(Bytes.toBytes(ttlTimeMs)));
1014    // Attach the attributes
1015    put.setAttributes(attributes);
1016    // Send it.
1017    handler.put(table, put);
1018
1019    // Now get the data back
1020    TGet getOne = new TGet(wrap(rowName));
1021    TResult resultOne = handler.get(table, getOne);
1022
1023    // It's there.
1024    assertArrayEquals(rowName, resultOne.getRow());
1025    assertEquals(1, resultOne.getColumnValuesSize());
1026
1027    // Sleep 30 seconds just to make 100% sure that the key value should be expired.
1028    Thread.sleep(ttlTimeMs * 15);
1029
1030    TGet getTwo = new TGet(wrap(rowName));
1031    TResult resultTwo = handler.get(table, getTwo);
1032
1033
1034    // Nothing should be there since it's ttl'd out.
1035    assertNull(resultTwo.getRow());
1036    assertEquals(0, resultTwo.getColumnValuesSize());
1037  }
1038
1039  /**
1040   * Padding numbers to make comparison of sort order easier in a for loop
1041   *
1042   * @param n  The number to pad.
1043   * @param pad  The length to pad up to.
1044   * @return The padded number as a string.
1045   */
1046  private String pad(int n, byte pad) {
1047    String res = Integer.toString(n);
1048    while (res.length() < pad) res = "0" + res;
1049    return res;
1050  }
1051
1052  @Test
1053  public void testScanWithBatchSize() throws Exception {
1054    ThriftHBaseServiceHandler handler = createHandler();
1055    ByteBuffer table = wrap(tableAname);
1056
1057    // insert data
1058    List<TColumnValue> columnValues = new ArrayList<>(100);
1059    for (int i = 0; i < 100; i++) {
1060      String colNum = pad(i, (byte) 3);
1061      TColumnValue columnValue = new TColumnValue(wrap(familyAname),
1062        wrap(Bytes.toBytes("col" + colNum)), wrap(Bytes.toBytes("val" + colNum)));
1063      columnValues.add(columnValue);
1064    }
1065    TPut put = new TPut(wrap(Bytes.toBytes("testScanWithBatchSize")), columnValues);
1066    handler.put(table, put);
1067
1068    // create scan instance
1069    TScan scan = new TScan();
1070    List<TColumn> columns = new ArrayList<>(1);
1071    TColumn column = new TColumn();
1072    column.setFamily(familyAname);
1073    columns.add(column);
1074    scan.setColumns(columns);
1075    scan.setStartRow(Bytes.toBytes("testScanWithBatchSize"));
1076    scan.setStopRow(Bytes.toBytes("testScanWithBatchSize\uffff"));
1077    // set batch size to 10 columns per call
1078    scan.setBatchSize(10);
1079
1080    // get scanner
1081    int scanId = handler.openScanner(table, scan);
1082    List<TResult> results = null;
1083    for (int i = 0; i < 10; i++) {
1084      // get batch for single row (10x10 is what we expect)
1085      results = handler.getScannerRows(scanId, 1);
1086      assertEquals(1, results.size());
1087      // check length of batch
1088      List<TColumnValue> cols = results.get(0).getColumnValues();
1089      assertEquals(10, cols.size());
1090      // check if the columns are returned and in order
1091      for (int y = 0; y < 10; y++) {
1092        int colNum = y + (10 * i);
1093        String colNumPad = pad(colNum, (byte) 3);
1094        assertArrayEquals(Bytes.toBytes("col" + colNumPad), cols.get(y).getQualifier());
1095      }
1096    }
1097
1098    // check that we are at the end of the scan
1099    results = handler.getScannerRows(scanId, 1);
1100    assertEquals(0, results.size());
1101
1102    // close scanner and check that it was indeed closed
1103    handler.closeScanner(scanId);
1104    try {
1105      handler.getScannerRows(scanId, 1);
1106      fail("Scanner id should be invalid");
1107    } catch (TIllegalArgument e) {
1108    }
1109  }
1110
1111  @Test
1112  public void testGetScannerResults() throws Exception {
1113    ThriftHBaseServiceHandler handler = createHandler();
1114    ByteBuffer table = wrap(tableAname);
1115
1116    // insert data
1117    TColumnValue columnValue =
1118        new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname));
1119    List<TColumnValue> columnValues = new ArrayList<>(1);
1120    columnValues.add(columnValue);
1121    for (int i = 0; i < 20; i++) {
1122      TPut put =
1123          new TPut(wrap(Bytes.toBytes("testGetScannerResults" + pad(i, (byte) 2))), columnValues);
1124      handler.put(table, put);
1125    }
1126
1127    // create scan instance
1128    TScan scan = new TScan();
1129    List<TColumn> columns = new ArrayList<>(1);
1130    TColumn column = new TColumn();
1131    column.setFamily(familyAname);
1132    column.setQualifier(qualifierAname);
1133    columns.add(column);
1134    scan.setColumns(columns);
1135    scan.setStartRow(Bytes.toBytes("testGetScannerResults"));
1136
1137    // get 5 rows and check the returned results
1138    scan.setStopRow(Bytes.toBytes("testGetScannerResults05"));
1139    List<TResult> results = handler.getScannerResults(table, scan, 5);
1140    assertEquals(5, results.size());
1141    for (int i = 0; i < 5; i++) {
1142      // check if the rows are returned and in order
1143      assertArrayEquals(Bytes.toBytes("testGetScannerResults" + pad(i, (byte) 2)), results.get(i)
1144          .getRow());
1145    }
1146
1147    // get 10 rows and check the returned results
1148    scan.setStopRow(Bytes.toBytes("testGetScannerResults10"));
1149    results = handler.getScannerResults(table, scan, 10);
1150    assertEquals(10, results.size());
1151    for (int i = 0; i < 10; i++) {
1152      // check if the rows are returned and in order
1153      assertArrayEquals(Bytes.toBytes("testGetScannerResults" + pad(i, (byte) 2)), results.get(i)
1154          .getRow());
1155    }
1156
1157    // get 20 rows and check the returned results
1158    scan.setStopRow(Bytes.toBytes("testGetScannerResults20"));
1159    results = handler.getScannerResults(table, scan, 20);
1160    assertEquals(20, results.size());
1161    for (int i = 0; i < 20; i++) {
1162      // check if the rows are returned and in order
1163      assertArrayEquals(Bytes.toBytes("testGetScannerResults" + pad(i, (byte) 2)), results.get(i)
1164          .getRow());
1165    }
1166
1167    // reverse scan
1168    scan = new TScan();
1169    scan.setColumns(columns);
1170    scan.setReversed(true);
1171    scan.setStartRow(Bytes.toBytes("testGetScannerResults20"));
1172    scan.setStopRow(Bytes.toBytes("testGetScannerResults"));
1173    results = handler.getScannerResults(table, scan, 20);
1174    assertEquals(20, results.size());
1175    for (int i = 0; i < 20; i++) {
1176      // check if the rows are returned and in order
1177      assertArrayEquals(Bytes.toBytes("testGetScannerResults" + pad(19 - i, (byte) 2)),
1178          results.get(i).getRow());
1179    }
1180 }
1181
1182  @Test
1183  public void testFilterRegistration() throws Exception {
1184    Configuration conf = UTIL.getConfiguration();
1185    conf.set("hbase.thrift.filters", "MyFilter:filterclass");
1186    ThriftServer.registerFilters(conf);
1187    Map<String, String> registeredFilters = ParseFilter.getAllFilters();
1188    assertEquals("filterclass", registeredFilters.get("MyFilter"));
1189  }
1190
1191  @Test
1192  public void testMetrics() throws Exception {
1193    Configuration conf = UTIL.getConfiguration();
1194    ThriftMetrics metrics = getMetrics(conf);
1195    ThriftHBaseServiceHandler hbaseHandler = createHandler();
1196    THBaseService.Iface handler =
1197        ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
1198    byte[] rowName = Bytes.toBytes("testMetrics");
1199    ByteBuffer table = wrap(tableAname);
1200
1201    TGet get = new TGet(wrap(rowName));
1202    assertFalse(handler.exists(table, get));
1203
1204    List<TColumnValue> columnValues = new ArrayList<>(2);
1205    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
1206    columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname),  wrap(valueBname)));
1207    TPut put = new TPut(wrap(rowName), columnValues);
1208    put.setColumnValues(columnValues);
1209
1210    handler.put(table, put);
1211
1212    assertTrue(handler.exists(table, get));
1213    metricsHelper.assertCounter("put_num_ops", 1, metrics.getSource());
1214    metricsHelper.assertCounter( "exists_num_ops", 2, metrics.getSource());
1215  }
1216
1217  private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
1218    ThriftMetrics m = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
1219    m.getSource().init(); //Clear all the metrics
1220    return m;
1221  }
1222
1223  @Test
1224  public void testMetricsWithException() throws Exception {
1225    byte[] rowkey = Bytes.toBytes("row1");
1226    byte[] family = Bytes.toBytes("f");
1227    byte[] col = Bytes.toBytes("c");
1228    // create a table which will throw exceptions for requests
1229    TableName tableName = TableName.valueOf(name.getMethodName());
1230    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
1231    tableDesc.addCoprocessor(ErrorThrowingGetObserver.class.getName());
1232    tableDesc.addFamily(new HColumnDescriptor(family));
1233
1234    Table table = UTIL.createTable(tableDesc, null);
1235    table.put(new Put(rowkey).addColumn(family, col, Bytes.toBytes("val1")));
1236
1237    ThriftHBaseServiceHandler hbaseHandler = createHandler();
1238    ThriftMetrics metrics = getMetrics(UTIL.getConfiguration());
1239    THBaseService.Iface handler =
1240        ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
1241    ByteBuffer tTableName = wrap(tableName.getName());
1242
1243    // check metrics increment with a successful get
1244    long preGetCounter = metricsHelper.checkCounterExists("get_num_ops", metrics.getSource()) ?
1245        metricsHelper.getCounter("get_num_ops", metrics.getSource()) :
1246        0;
1247    TGet tGet = new TGet(wrap(rowkey));
1248    TResult tResult = handler.get(tTableName, tGet);
1249
1250    List<TColumnValue> expectedColumnValues = Lists.newArrayList(
1251        new TColumnValue(wrap(family), wrap(col), wrap(Bytes.toBytes("val1")))
1252    );
1253    assertArrayEquals(rowkey, tResult.getRow());
1254    List<TColumnValue> returnedColumnValues = tResult.getColumnValues();
1255    assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues);
1256
1257    metricsHelper.assertCounter("get_num_ops", preGetCounter + 1, metrics.getSource());
1258
1259    // check metrics increment when the get throws each exception type
1260    for (ErrorThrowingGetObserver.ErrorType type : ErrorThrowingGetObserver.ErrorType.values()) {
1261      testExceptionType(handler, metrics, tTableName, rowkey, type);
1262    }
1263  }
1264
1265  private void testExceptionType(THBaseService.Iface handler, ThriftMetrics metrics,
1266                                 ByteBuffer tTableName, byte[] rowkey, ErrorThrowingGetObserver.ErrorType errorType) {
1267    long preGetCounter = metricsHelper.getCounter("get_num_ops", metrics.getSource());
1268    String exceptionKey = errorType.getMetricName();
1269    long preExceptionCounter = metricsHelper.checkCounterExists(exceptionKey, metrics.getSource()) ?
1270        metricsHelper.getCounter(exceptionKey, metrics.getSource()) :
1271        0;
1272    TGet tGet = new TGet(wrap(rowkey));
1273    Map<ByteBuffer, ByteBuffer> attributes = new HashMap<>();
1274    attributes.put(wrap(Bytes.toBytes(ErrorThrowingGetObserver.SHOULD_ERROR_ATTRIBUTE)),
1275        wrap(Bytes.toBytes(errorType.name())));
1276    tGet.setAttributes(attributes);
1277    try {
1278      TResult tResult = handler.get(tTableName, tGet);
1279      fail("Get with error attribute should have thrown an exception");
1280    } catch (TException e) {
1281      LOG.info("Received exception: ", e);
1282      metricsHelper.assertCounter("get_num_ops", preGetCounter + 1, metrics.getSource());
1283      metricsHelper.assertCounter(exceptionKey, preExceptionCounter + 1, metrics.getSource());
1284    }
1285
1286  }
1287
1288  /**
1289   * See HBASE-17611
1290   *
1291   * Latency metrics were capped at ~ 2 seconds due to the use of an int variable to capture the
1292   * duration.
1293   */
1294  @Test
1295  public void testMetricsPrecision() throws Exception {
1296    byte[] rowkey = Bytes.toBytes("row1");
1297    byte[] family = Bytes.toBytes("f");
1298    byte[] col = Bytes.toBytes("c");
1299    // create a table which will throw exceptions for requests
1300    TableName tableName = TableName.valueOf("testMetricsPrecision");
1301    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
1302    tableDesc.addCoprocessor(DelayingRegionObserver.class.getName());
1303    tableDesc.addFamily(new HColumnDescriptor(family));
1304
1305    Table table = null;
1306    try {
1307      table = UTIL.createTable(tableDesc, null);
1308
1309      table.put(new Put(rowkey).addColumn(family, col, Bytes.toBytes("val1")));
1310
1311      ThriftHBaseServiceHandler hbaseHandler = createHandler();
1312      ThriftMetrics metrics = getMetrics(UTIL.getConfiguration());
1313      THBaseService.Iface handler =
1314          ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
1315      ByteBuffer tTableName = wrap(tableName.getName());
1316
1317      // check metrics latency with a successful get
1318      TGet tGet = new TGet(wrap(rowkey));
1319      TResult tResult = handler.get(tTableName, tGet);
1320
1321      List<TColumnValue> expectedColumnValues = Lists.newArrayList(
1322          new TColumnValue(wrap(family), wrap(col), wrap(Bytes.toBytes("val1")))
1323      );
1324      assertArrayEquals(rowkey, tResult.getRow());
1325      List<TColumnValue> returnedColumnValues = tResult.getColumnValues();
1326      assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues);
1327
1328      metricsHelper.assertGaugeGt("get_max", 3000L, metrics.getSource());
1329    } finally {
1330      if (table != null) {
1331        try {
1332          table.close();
1333        } catch (IOException ignored) {
1334        }
1335        UTIL.deleteTable(tableName);
1336      }
1337    }
1338  }
1339
1340
1341  @Test
1342  public void testAttribute() throws Exception {
1343    byte[] rowName = Bytes.toBytes("testAttribute");
1344    byte[] attributeKey = Bytes.toBytes("attribute1");
1345    byte[] attributeValue = Bytes.toBytes("value1");
1346    Map<ByteBuffer, ByteBuffer> attributes = new HashMap<>();
1347    attributes.put(wrap(attributeKey), wrap(attributeValue));
1348
1349    TGet tGet = new TGet(wrap(rowName));
1350    tGet.setAttributes(attributes);
1351    Get get = getFromThrift(tGet);
1352    assertArrayEquals(get.getAttribute("attribute1"), attributeValue);
1353
1354    List<TColumnValue> columnValues = new ArrayList<>(1);
1355    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
1356    TPut tPut = new TPut(wrap(rowName) , columnValues);
1357    tPut.setAttributes(attributes);
1358    Put put = putFromThrift(tPut);
1359    assertArrayEquals(put.getAttribute("attribute1"), attributeValue);
1360
1361    TScan tScan = new TScan();
1362    tScan.setAttributes(attributes);
1363    Scan scan = scanFromThrift(tScan);
1364    assertArrayEquals(scan.getAttribute("attribute1"), attributeValue);
1365
1366    List<TColumnIncrement> incrementColumns = new ArrayList<>(1);
1367    incrementColumns.add(new TColumnIncrement(wrap(familyAname), wrap(qualifierAname)));
1368    TIncrement tIncrement = new TIncrement(wrap(rowName), incrementColumns);
1369    tIncrement.setAttributes(attributes);
1370    Increment increment = incrementFromThrift(tIncrement);
1371    assertArrayEquals(increment.getAttribute("attribute1"), attributeValue);
1372
1373    TDelete tDelete = new TDelete(wrap(rowName));
1374    tDelete.setAttributes(attributes);
1375    Delete delete = deleteFromThrift(tDelete);
1376    assertArrayEquals(delete.getAttribute("attribute1"), attributeValue);
1377  }
1378
1379  /**
1380   * Put valueA to a row, make sure put has happened, then create a mutation object to put valueB
1381   * and delete ValueA, then check that the row value is only valueB.
1382   */
1383  @Test
1384  public void testMutateRow() throws Exception {
1385    ThriftHBaseServiceHandler handler = createHandler();
1386    byte[] rowName = Bytes.toBytes("testMutateRow");
1387    ByteBuffer table = wrap(tableAname);
1388
1389    List<TColumnValue> columnValuesA = new ArrayList<>(1);
1390    TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
1391        wrap(valueAname));
1392    columnValuesA.add(columnValueA);
1393    TPut putA = new TPut(wrap(rowName), columnValuesA);
1394    putA.setColumnValues(columnValuesA);
1395
1396    handler.put(table,putA);
1397
1398    TGet get = new TGet(wrap(rowName));
1399    TResult result = handler.get(table, get);
1400    assertArrayEquals(rowName, result.getRow());
1401    List<TColumnValue> returnedColumnValues = result.getColumnValues();
1402
1403    List<TColumnValue> expectedColumnValues = new ArrayList<>(1);
1404    expectedColumnValues.add(columnValueA);
1405    assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues);
1406
1407    List<TColumnValue> columnValuesB = new ArrayList<>(1);
1408    TColumnValue columnValueB = new TColumnValue(wrap(familyAname), wrap(qualifierBname),
1409        wrap(valueBname));
1410    columnValuesB.add(columnValueB);
1411    TPut putB = new TPut(wrap(rowName), columnValuesB);
1412    putB.setColumnValues(columnValuesB);
1413
1414    TDelete delete = new TDelete(wrap(rowName));
1415    List<TColumn> deleteColumns = new ArrayList<>(1);
1416    TColumn deleteColumn = new TColumn(wrap(familyAname));
1417    deleteColumn.setQualifier(qualifierAname);
1418    deleteColumns.add(deleteColumn);
1419    delete.setColumns(deleteColumns);
1420
1421    List<TMutation> mutations = new ArrayList<>(2);
1422    TMutation mutationA = TMutation.put(putB);
1423    mutations.add(mutationA);
1424
1425    TMutation mutationB = TMutation.deleteSingle(delete);
1426    mutations.add(mutationB);
1427
1428    TRowMutations tRowMutations = new TRowMutations(wrap(rowName),mutations);
1429    handler.mutateRow(table,tRowMutations);
1430
1431    result = handler.get(table, get);
1432    assertArrayEquals(rowName, result.getRow());
1433    returnedColumnValues = result.getColumnValues();
1434
1435    expectedColumnValues = new ArrayList<>(1);
1436    expectedColumnValues.add(columnValueB);
1437    assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues);
1438  }
1439
1440  /**
1441   * Create TPut, TDelete , TIncrement objects, set durability then call ThriftUtility
1442   * functions to get Put , Delete and Increment respectively. Use getDurability to make sure
1443   * the returned objects have the appropriate durability setting.
1444   */
1445  @Test
1446  public void testDurability() throws Exception {
1447    byte[] rowName = Bytes.toBytes("testDurability");
1448    List<TColumnValue> columnValues = new ArrayList<>(1);
1449    columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
1450
1451    List<TColumnIncrement> incrementColumns = new ArrayList<>(1);
1452    incrementColumns.add(new TColumnIncrement(wrap(familyAname), wrap(qualifierAname)));
1453
1454    TDelete tDelete = new TDelete(wrap(rowName));
1455    tDelete.setDurability(TDurability.SKIP_WAL);
1456    Delete delete = deleteFromThrift(tDelete);
1457    assertEquals(Durability.SKIP_WAL, delete.getDurability());
1458
1459    tDelete.setDurability(TDurability.ASYNC_WAL);
1460    delete = deleteFromThrift(tDelete);
1461    assertEquals(Durability.ASYNC_WAL, delete.getDurability());
1462
1463    tDelete.setDurability(TDurability.SYNC_WAL);
1464    delete = deleteFromThrift(tDelete);
1465    assertEquals(Durability.SYNC_WAL, delete.getDurability());
1466
1467    tDelete.setDurability(TDurability.FSYNC_WAL);
1468    delete = deleteFromThrift(tDelete);
1469    assertEquals(Durability.FSYNC_WAL, delete.getDurability());
1470
1471    TPut tPut = new TPut(wrap(rowName), columnValues);
1472    tPut.setDurability(TDurability.SKIP_WAL);
1473    Put put = putFromThrift(tPut);
1474    assertEquals(Durability.SKIP_WAL, put.getDurability());
1475
1476    tPut.setDurability(TDurability.ASYNC_WAL);
1477    put = putFromThrift(tPut);
1478    assertEquals(Durability.ASYNC_WAL, put.getDurability());
1479
1480    tPut.setDurability(TDurability.SYNC_WAL);
1481    put = putFromThrift(tPut);
1482    assertEquals(Durability.SYNC_WAL, put.getDurability());
1483
1484    tPut.setDurability(TDurability.FSYNC_WAL);
1485    put = putFromThrift(tPut);
1486    assertEquals(Durability.FSYNC_WAL, put.getDurability());
1487
1488    TIncrement tIncrement = new TIncrement(wrap(rowName), incrementColumns);
1489
1490    tIncrement.setDurability(TDurability.SKIP_WAL);
1491    Increment increment = incrementFromThrift(tIncrement);
1492    assertEquals(Durability.SKIP_WAL, increment.getDurability());
1493
1494    tIncrement.setDurability(TDurability.ASYNC_WAL);
1495    increment = incrementFromThrift(tIncrement);
1496    assertEquals(Durability.ASYNC_WAL, increment.getDurability());
1497
1498    tIncrement.setDurability(TDurability.SYNC_WAL);
1499    increment = incrementFromThrift(tIncrement);
1500    assertEquals(Durability.SYNC_WAL, increment.getDurability());
1501
1502    tIncrement.setDurability(TDurability.FSYNC_WAL);
1503    increment = incrementFromThrift(tIncrement);
1504    assertEquals(Durability.FSYNC_WAL, increment.getDurability());
1505  }
1506
1507  @Test
1508  public void testCheckAndMutate() throws Exception {
1509    ThriftHBaseServiceHandler handler = createHandler();
1510    ByteBuffer table = wrap(tableAname);
1511    ByteBuffer row = wrap(Bytes.toBytes("row"));
1512    ByteBuffer family = wrap(familyAname);
1513    ByteBuffer qualifier = wrap(qualifierAname);
1514    ByteBuffer value = wrap(valueAname);
1515
1516    // Create a mutation to write to 'B', our "mutate" of "checkAndMutate"
1517    List<TColumnValue> columnValuesB = new ArrayList<>(1);
1518    TColumnValue columnValueB = new TColumnValue(family, wrap(qualifierBname), wrap(valueBname));
1519    columnValuesB.add(columnValueB);
1520    TPut putB = new TPut(row, columnValuesB);
1521    putB.setColumnValues(columnValuesB);
1522
1523    TRowMutations tRowMutations = new TRowMutations(row,
1524        Arrays.<TMutation> asList(TMutation.put(putB)));
1525
1526    // Empty table when we begin
1527    TResult result = handler.get(table, new TGet(row));
1528    assertEquals(0, result.getColumnValuesSize());
1529
1530    // checkAndMutate -- condition should fail because the value doesn't exist.
1531    assertFalse("Expected condition to not pass",
1532        handler.checkAndMutate(table, row, family, qualifier, TCompareOp.EQUAL, value,
1533            tRowMutations));
1534
1535    List<TColumnValue> columnValuesA = new ArrayList<>(1);
1536    TColumnValue columnValueA = new TColumnValue(family, qualifier, value);
1537    columnValuesA.add(columnValueA);
1538
1539    // Put an update 'A'
1540    handler.put(table, new TPut(row, columnValuesA));
1541
1542    // Verify that the update is there
1543    result = handler.get(table, new TGet(row));
1544    assertEquals(1, result.getColumnValuesSize());
1545    assertTColumnValueEqual(columnValueA, result.getColumnValues().get(0));
1546
1547    // checkAndMutate -- condition should pass since we added the value
1548    assertTrue("Expected condition to pass",
1549        handler.checkAndMutate(table, row, family, qualifier, TCompareOp.EQUAL, value,
1550            tRowMutations));
1551
1552    result = handler.get(table, new TGet(row));
1553    assertEquals(2, result.getColumnValuesSize());
1554    assertTColumnValueEqual(columnValueA, result.getColumnValues().get(0));
1555    assertTColumnValueEqual(columnValueB, result.getColumnValues().get(1));
1556  }
1557
1558  public static class DelayingRegionObserver implements RegionCoprocessor, RegionObserver {
1559    private static final Logger LOG = LoggerFactory.getLogger(DelayingRegionObserver.class);
1560    // sleep time in msec
1561    private long delayMillis;
1562
1563    @Override
1564    public Optional<RegionObserver> getRegionObserver() {
1565      return Optional.of(this);
1566    }
1567
1568    @Override
1569    public void start(CoprocessorEnvironment e) throws IOException {
1570      this.delayMillis = e.getConfiguration()
1571          .getLong("delayingregionobserver.delay", 3000);
1572    }
1573
1574    @Override
1575    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1576                         List<Cell> results) throws IOException {
1577      try {
1578        long start = System.currentTimeMillis();
1579        TimeUnit.MILLISECONDS.sleep(delayMillis);
1580        if (LOG.isTraceEnabled()) {
1581          LOG.trace("Slept for " + (System.currentTimeMillis() - start) + " msec");
1582        }
1583      } catch (InterruptedException ie) {
1584        throw new InterruptedIOException("Interrupted while sleeping");
1585      }
1586    }
1587  }
1588}
1589