001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertThat;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.concurrent.ArrayBlockingQueue;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.CountDownLatch;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.ForkJoinPool;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.function.Supplier;
043import java.util.stream.IntStream;
044import org.apache.commons.io.IOUtils;
045import org.apache.hadoop.hbase.CompareOperator;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.TableNotEnabledException;
050import org.apache.hadoop.hbase.filter.BinaryComparator;
051import org.apache.hadoop.hbase.filter.FamilyFilter;
052import org.apache.hadoop.hbase.filter.FilterList;
053import org.apache.hadoop.hbase.filter.QualifierFilter;
054import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
055import org.apache.hadoop.hbase.filter.TimestampsFilter;
056import org.apache.hadoop.hbase.io.TimeRange;
057import org.apache.hadoop.hbase.testclassification.ClientTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.Pair;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TestName;
069import org.junit.runner.RunWith;
070import org.junit.runners.Parameterized;
071import org.junit.runners.Parameterized.Parameter;
072import org.junit.runners.Parameterized.Parameters;
073
074@RunWith(Parameterized.class)
075@Category({ MediumTests.class, ClientTests.class })
076public class TestAsyncTable {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestAsyncTable.class);
081
082  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
083
084  private static TableName TABLE_NAME = TableName.valueOf("async");
085
086  private static byte[] FAMILY = Bytes.toBytes("cf");
087
088  private static byte[] QUALIFIER = Bytes.toBytes("cq");
089
090  private static byte[] VALUE = Bytes.toBytes("value");
091
092  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
093
094  private static AsyncConnection ASYNC_CONN;
095
096  @Rule
097  public TestName testName = new TestName();
098
099  private byte[] row;
100
101  @Parameter
102  public Supplier<AsyncTable<?>> getTable;
103
104  private static AsyncTable<?> getRawTable() {
105    return ASYNC_CONN.getTable(TABLE_NAME);
106  }
107
108  private static AsyncTable<?> getTable() {
109    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
110  }
111
112  @Parameters
113  public static List<Object[]> params() {
114    return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
115      new Supplier<?>[] { TestAsyncTable::getTable });
116  }
117
118  @BeforeClass
119  public static void setUpBeforeClass() throws Exception {
120    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
121      MAX_KEY_VALUE_SIZE);
122    TEST_UTIL.startMiniCluster(1);
123    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
124    TEST_UTIL.waitTableAvailable(TABLE_NAME);
125    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
126    assertFalse(ASYNC_CONN.isClosed());
127  }
128
129  @AfterClass
130  public static void tearDownAfterClass() throws Exception {
131    IOUtils.closeQuietly(ASYNC_CONN);
132    assertTrue(ASYNC_CONN.isClosed());
133    TEST_UTIL.shutdownMiniCluster();
134  }
135
136  @Before
137  public void setUp() throws IOException, InterruptedException, ExecutionException {
138    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
139    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
140      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
141    }
142  }
143
144  @Test
145  public void testSimple() throws Exception {
146    AsyncTable<?> table = getTable.get();
147    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
148    assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
149    Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
150    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
151    table.delete(new Delete(row)).get();
152    result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
153    assertTrue(result.isEmpty());
154    assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
155  }
156
157  private byte[] concat(byte[] base, int index) {
158    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
159  }
160
161  @SuppressWarnings("FutureReturnValueIgnored")
162  @Test
163  public void testSimpleMultiple() throws Exception {
164    AsyncTable<?> table = getTable.get();
165    int count = 100;
166    CountDownLatch putLatch = new CountDownLatch(count);
167    IntStream.range(0, count).forEach(
168      i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
169        .thenAccept(x -> putLatch.countDown()));
170    putLatch.await();
171    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
172    IntStream.range(0, count)
173      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
174        .thenAccept(x -> existsResp.add(x)));
175    for (int i = 0; i < count; i++) {
176      assertTrue(existsResp.take());
177    }
178    BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
179    IntStream.range(0, count)
180      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
181        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
182    for (int i = 0; i < count; i++) {
183      Pair<Integer, Result> pair = getResp.take();
184      assertArrayEquals(concat(VALUE, pair.getFirst()),
185        pair.getSecond().getValue(FAMILY, QUALIFIER));
186    }
187    CountDownLatch deleteLatch = new CountDownLatch(count);
188    IntStream.range(0, count).forEach(
189      i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
190    deleteLatch.await();
191    IntStream.range(0, count)
192      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
193        .thenAccept(x -> existsResp.add(x)));
194    for (int i = 0; i < count; i++) {
195      assertFalse(existsResp.take());
196    }
197    IntStream.range(0, count)
198      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
199        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
200    for (int i = 0; i < count; i++) {
201      Pair<Integer, Result> pair = getResp.take();
202      assertTrue(pair.getSecond().isEmpty());
203    }
204  }
205
206  @SuppressWarnings("FutureReturnValueIgnored")
207  @Test
208  public void testIncrement() throws InterruptedException, ExecutionException {
209    AsyncTable<?> table = getTable.get();
210    int count = 100;
211    CountDownLatch latch = new CountDownLatch(count);
212    AtomicLong sum = new AtomicLong(0L);
213    IntStream.range(0, count)
214      .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
215        sum.addAndGet(x);
216        latch.countDown();
217      }));
218    latch.await();
219    assertEquals(count, Bytes.toLong(
220      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
221    assertEquals((1 + count) * count / 2, sum.get());
222  }
223
224  @SuppressWarnings("FutureReturnValueIgnored")
225  @Test
226  public void testAppend() throws InterruptedException, ExecutionException {
227    AsyncTable<?> table = getTable.get();
228    int count = 10;
229    CountDownLatch latch = new CountDownLatch(count);
230    char suffix = ':';
231    AtomicLong suffixCount = new AtomicLong(0L);
232    IntStream.range(0, count)
233      .forEachOrdered(i -> table
234        .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
235        .thenAccept(r -> {
236          suffixCount.addAndGet(
237            Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
238          latch.countDown();
239        }));
240    latch.await();
241    assertEquals((1 + count) * count / 2, suffixCount.get());
242    String value = Bytes.toString(
243      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
244    int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
245      .sorted().toArray();
246    assertArrayEquals(IntStream.range(0, count).toArray(), actual);
247  }
248
249  @SuppressWarnings("FutureReturnValueIgnored")
250  @Test
251  public void testCheckAndPut() throws InterruptedException, ExecutionException {
252    AsyncTable<?> table = getTable.get();
253    AtomicInteger successCount = new AtomicInteger(0);
254    AtomicInteger successIndex = new AtomicInteger(-1);
255    int count = 10;
256    CountDownLatch latch = new CountDownLatch(count);
257    IntStream.range(0, count)
258      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
259        .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
260          if (x) {
261            successCount.incrementAndGet();
262            successIndex.set(i);
263          }
264          latch.countDown();
265        }));
266    latch.await();
267    assertEquals(1, successCount.get());
268    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
269    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
270  }
271
272  @SuppressWarnings("FutureReturnValueIgnored")
273  @Test
274  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
275    AsyncTable<?> table = getTable.get();
276    int count = 10;
277    CountDownLatch putLatch = new CountDownLatch(count + 1);
278    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
279    IntStream.range(0, count)
280      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
281        .thenRun(() -> putLatch.countDown()));
282    putLatch.await();
283
284    AtomicInteger successCount = new AtomicInteger(0);
285    AtomicInteger successIndex = new AtomicInteger(-1);
286    CountDownLatch deleteLatch = new CountDownLatch(count);
287    IntStream.range(0, count)
288      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
289        .thenDelete(
290          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
291        .thenAccept(x -> {
292          if (x) {
293            successCount.incrementAndGet();
294            successIndex.set(i);
295          }
296          deleteLatch.countDown();
297        }));
298    deleteLatch.await();
299    assertEquals(1, successCount.get());
300    Result result = table.get(new Get(row)).get();
301    IntStream.range(0, count).forEach(i -> {
302      if (i == successIndex.get()) {
303        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
304      } else {
305        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
306      }
307    });
308  }
309
310  @Test
311  public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
312    AsyncTable<?> table = getTable.get();
313    RowMutations mutation = new RowMutations(row);
314    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
315    table.mutateRow(mutation).get();
316    Result result = table.get(new Get(row)).get();
317    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
318
319    mutation = new RowMutations(row);
320    mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
321    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
322    table.mutateRow(mutation).get();
323    result = table.get(new Get(row)).get();
324    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
325    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
326  }
327
328  @SuppressWarnings("FutureReturnValueIgnored")
329  @Test
330  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
331    AsyncTable<?> table = getTable.get();
332    int count = 10;
333    CountDownLatch putLatch = new CountDownLatch(count + 1);
334    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
335    IntStream.range(0, count)
336      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
337        .thenRun(() -> putLatch.countDown()));
338    putLatch.await();
339
340    AtomicInteger successCount = new AtomicInteger(0);
341    AtomicInteger successIndex = new AtomicInteger(-1);
342    CountDownLatch mutateLatch = new CountDownLatch(count);
343    IntStream.range(0, count).forEach(i -> {
344      RowMutations mutation = new RowMutations(row);
345      try {
346        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
347        mutation
348          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
349      } catch (IOException e) {
350        throw new UncheckedIOException(e);
351      }
352      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
353        .thenAccept(x -> {
354          if (x) {
355            successCount.incrementAndGet();
356            successIndex.set(i);
357          }
358          mutateLatch.countDown();
359        });
360    });
361    mutateLatch.await();
362    assertEquals(1, successCount.get());
363    Result result = table.get(new Get(row)).get();
364    IntStream.range(0, count).forEach(i -> {
365      if (i == successIndex.get()) {
366        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
367      } else {
368        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
369      }
370    });
371  }
372
373  @Test
374  public void testCheckAndMutateWithTimeRange() throws Exception {
375    AsyncTable<?> table = getTable.get();
376    final long ts = System.currentTimeMillis() / 2;
377    Put put = new Put(row);
378    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
379
380    boolean ok =
381      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
382    assertTrue(ok);
383
384    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
385      .ifEquals(VALUE).thenPut(put).get();
386    assertFalse(ok);
387
388    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
389      .ifEquals(VALUE).thenPut(put).get();
390    assertTrue(ok);
391
392    RowMutations rm = new RowMutations(row).add((Mutation) put);
393    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
394      .ifEquals(VALUE).thenMutate(rm).get();
395    assertFalse(ok);
396
397    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
398      .ifEquals(VALUE).thenMutate(rm).get();
399    assertTrue(ok);
400
401    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
402
403    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
404      .ifEquals(VALUE).thenDelete(delete).get();
405    assertFalse(ok);
406
407    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
408      .ifEquals(VALUE).thenDelete(delete).get();
409    assertTrue(ok);
410  }
411
412  @Test
413  public void testCheckAndMutateWithSingleFilter() throws Throwable {
414    AsyncTable<?> table = getTable.get();
415
416    // Put one row
417    Put put = new Put(row);
418    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
419    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
420    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
421    table.put(put).get();
422
423    // Put with success
424    boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
425        CompareOperator.EQUAL, Bytes.toBytes("a")))
426      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
427      .get();
428    assertTrue(ok);
429
430    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
431    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
432
433    // Put with failure
434    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
435        CompareOperator.EQUAL, Bytes.toBytes("b")))
436      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
437      .get();
438    assertFalse(ok);
439
440    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
441
442    // Delete with success
443    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
444        CompareOperator.EQUAL, Bytes.toBytes("a")))
445      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
446      .get();
447    assertTrue(ok);
448
449    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
450
451    // Mutate with success
452    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
453        CompareOperator.EQUAL, Bytes.toBytes("b")))
454      .thenMutate(new RowMutations(row)
455        .add((Mutation) new Put(row)
456          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
457        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
458      .get();
459    assertTrue(ok);
460
461    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
462    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
463
464    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
465  }
466
467  @Test
468  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
469    AsyncTable<?> table = getTable.get();
470
471    // Put one row
472    Put put = new Put(row);
473    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
474    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
475    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
476    table.put(put).get();
477
478    // Put with success
479    boolean ok = table.checkAndMutate(row, new FilterList(
480        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
481          Bytes.toBytes("a")),
482        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
483          Bytes.toBytes("b"))
484      ))
485      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
486      .get();
487    assertTrue(ok);
488
489    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
490    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
491
492    // Put with failure
493    ok = table.checkAndMutate(row, new FilterList(
494        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
495          Bytes.toBytes("a")),
496        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
497          Bytes.toBytes("c"))
498      ))
499      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
500      .get();
501    assertFalse(ok);
502
503    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
504
505    // Delete with success
506    ok = table.checkAndMutate(row, new FilterList(
507        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
508          Bytes.toBytes("a")),
509        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
510          Bytes.toBytes("b"))
511      ))
512      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
513      .get();
514    assertTrue(ok);
515
516    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
517
518    // Mutate with success
519    ok = table.checkAndMutate(row, new FilterList(
520        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
521          Bytes.toBytes("a")),
522        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
523          Bytes.toBytes("b"))
524      ))
525      .thenMutate(new RowMutations(row)
526        .add((Mutation) new Put(row)
527          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
528        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
529      .get();
530    assertTrue(ok);
531
532    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
533    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
534
535    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
536  }
537
538  @Test
539  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
540    AsyncTable<?> table = getTable.get();
541
542    // Put with specifying the timestamp
543    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
544
545    // Put with success
546    boolean ok = table.checkAndMutate(row, new FilterList(
547        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
548        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
549        new TimestampsFilter(Collections.singletonList(100L))
550      ))
551      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
552      .get();
553    assertTrue(ok);
554
555    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
556    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
557
558    // Put with failure
559    ok = table.checkAndMutate(row, new FilterList(
560        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
561        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
562        new TimestampsFilter(Collections.singletonList(101L))
563      ))
564      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
565      .get();
566    assertFalse(ok);
567
568    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
569  }
570
571  @Test
572  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
573    AsyncTable<?> table = getTable.get();
574
575    // Put with specifying the timestamp
576    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
577      .get();
578
579    // Put with success
580    boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
581        CompareOperator.EQUAL, Bytes.toBytes("a")))
582      .timeRange(TimeRange.between(0, 101))
583      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
584      .get();
585    assertTrue(ok);
586
587    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
588    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
589
590    // Put with failure
591    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
592        CompareOperator.EQUAL, Bytes.toBytes("a")))
593      .timeRange(TimeRange.between(0, 100))
594      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
595      .get();
596    assertFalse(ok);
597
598    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
599  }
600
601  @Test(expected = NullPointerException.class)
602  public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
603    getTable.get().checkAndMutate(row, FAMILY)
604      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
605  }
606
607  @Test
608  public void testDisabled() throws InterruptedException, ExecutionException {
609    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
610    try {
611      getTable.get().get(new Get(row)).get();
612      fail("Should fail since table has been disabled");
613    } catch (ExecutionException e) {
614      Throwable cause = e.getCause();
615      assertThat(cause, instanceOf(TableNotEnabledException.class));
616      assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
617    }
618  }
619
620  @Test
621  public void testInvalidPut() {
622    try {
623      getTable.get().put(new Put(Bytes.toBytes(0)));
624      fail("Should fail since the put does not contain any cells");
625    } catch (IllegalArgumentException e) {
626      assertThat(e.getMessage(), containsString("No columns to insert"));
627    }
628
629    try {
630      getTable.get()
631        .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
632      fail("Should fail since the put exceeds the max key value size");
633    } catch (IllegalArgumentException e) {
634      assertThat(e.getMessage(), containsString("KeyValue size too large"));
635    }
636  }
637}