001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.concurrent.ArrayBlockingQueue;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.CountDownLatch;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.ForkJoinPool;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.function.Supplier;
043import java.util.stream.IntStream;
044import org.apache.hadoop.hbase.CompareOperator;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.TableNotEnabledException;
049import org.apache.hadoop.hbase.filter.BinaryComparator;
050import org.apache.hadoop.hbase.filter.FamilyFilter;
051import org.apache.hadoop.hbase.filter.FilterList;
052import org.apache.hadoop.hbase.filter.QualifierFilter;
053import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
054import org.apache.hadoop.hbase.filter.TimestampsFilter;
055import org.apache.hadoop.hbase.io.TimeRange;
056import org.apache.hadoop.hbase.testclassification.ClientTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.junit.runner.RunWith;
069import org.junit.runners.Parameterized;
070import org.junit.runners.Parameterized.Parameter;
071import org.junit.runners.Parameterized.Parameters;
072
073import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
074
075@RunWith(Parameterized.class)
076@Category({ MediumTests.class, ClientTests.class })
077public class TestAsyncTable {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestAsyncTable.class);
082
083  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084
085  private static TableName TABLE_NAME = TableName.valueOf("async");
086
087  private static byte[] FAMILY = Bytes.toBytes("cf");
088
089  private static byte[] QUALIFIER = Bytes.toBytes("cq");
090
091  private static byte[] VALUE = Bytes.toBytes("value");
092
093  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
094
095  private static AsyncConnection ASYNC_CONN;
096
097  @Rule
098  public TestName testName = new TestName();
099
100  private byte[] row;
101
102  @Parameter
103  public Supplier<AsyncTable<?>> getTable;
104
105  private static AsyncTable<?> getRawTable() {
106    return ASYNC_CONN.getTable(TABLE_NAME);
107  }
108
109  private static AsyncTable<?> getTable() {
110    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
111  }
112
113  @Parameters
114  public static List<Object[]> params() {
115    return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
116      new Supplier<?>[] { TestAsyncTable::getTable });
117  }
118
119  @BeforeClass
120  public static void setUpBeforeClass() throws Exception {
121    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
122      MAX_KEY_VALUE_SIZE);
123    TEST_UTIL.startMiniCluster(1);
124    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
125    TEST_UTIL.waitTableAvailable(TABLE_NAME);
126    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
127    assertFalse(ASYNC_CONN.isClosed());
128  }
129
130  @AfterClass
131  public static void tearDownAfterClass() throws Exception {
132    Closeables.close(ASYNC_CONN, true);
133    assertTrue(ASYNC_CONN.isClosed());
134    TEST_UTIL.shutdownMiniCluster();
135  }
136
137  @Before
138  public void setUp() throws IOException, InterruptedException, ExecutionException {
139    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
140    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
141      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
142    }
143  }
144
145  @Test
146  public void testSimple() throws Exception {
147    AsyncTable<?> table = getTable.get();
148    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
149    assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
150    Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
151    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
152    table.delete(new Delete(row)).get();
153    result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
154    assertTrue(result.isEmpty());
155    assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
156  }
157
158  private byte[] concat(byte[] base, int index) {
159    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
160  }
161
162  @SuppressWarnings("FutureReturnValueIgnored")
163  @Test
164  public void testSimpleMultiple() throws Exception {
165    AsyncTable<?> table = getTable.get();
166    int count = 100;
167    CountDownLatch putLatch = new CountDownLatch(count);
168    IntStream.range(0, count).forEach(
169      i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
170        .thenAccept(x -> putLatch.countDown()));
171    putLatch.await();
172    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
173    IntStream.range(0, count)
174      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
175        .thenAccept(x -> existsResp.add(x)));
176    for (int i = 0; i < count; i++) {
177      assertTrue(existsResp.take());
178    }
179    BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
180    IntStream.range(0, count)
181      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
182        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
183    for (int i = 0; i < count; i++) {
184      Pair<Integer, Result> pair = getResp.take();
185      assertArrayEquals(concat(VALUE, pair.getFirst()),
186        pair.getSecond().getValue(FAMILY, QUALIFIER));
187    }
188    CountDownLatch deleteLatch = new CountDownLatch(count);
189    IntStream.range(0, count).forEach(
190      i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
191    deleteLatch.await();
192    IntStream.range(0, count)
193      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
194        .thenAccept(x -> existsResp.add(x)));
195    for (int i = 0; i < count; i++) {
196      assertFalse(existsResp.take());
197    }
198    IntStream.range(0, count)
199      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
200        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
201    for (int i = 0; i < count; i++) {
202      Pair<Integer, Result> pair = getResp.take();
203      assertTrue(pair.getSecond().isEmpty());
204    }
205  }
206
207  @SuppressWarnings("FutureReturnValueIgnored")
208  @Test
209  public void testIncrement() throws InterruptedException, ExecutionException {
210    AsyncTable<?> table = getTable.get();
211    int count = 100;
212    CountDownLatch latch = new CountDownLatch(count);
213    AtomicLong sum = new AtomicLong(0L);
214    IntStream.range(0, count)
215      .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
216        sum.addAndGet(x);
217        latch.countDown();
218      }));
219    latch.await();
220    assertEquals(count, Bytes.toLong(
221      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
222    assertEquals((1 + count) * count / 2, sum.get());
223  }
224
225  @SuppressWarnings("FutureReturnValueIgnored")
226  @Test
227  public void testAppend() throws InterruptedException, ExecutionException {
228    AsyncTable<?> table = getTable.get();
229    int count = 10;
230    CountDownLatch latch = new CountDownLatch(count);
231    char suffix = ':';
232    AtomicLong suffixCount = new AtomicLong(0L);
233    IntStream.range(0, count)
234      .forEachOrdered(i -> table
235        .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
236        .thenAccept(r -> {
237          suffixCount.addAndGet(
238            Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
239          latch.countDown();
240        }));
241    latch.await();
242    assertEquals((1 + count) * count / 2, suffixCount.get());
243    String value = Bytes.toString(
244      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
245    int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
246      .sorted().toArray();
247    assertArrayEquals(IntStream.range(0, count).toArray(), actual);
248  }
249
250  @Test
251  public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
252    AsyncTable<?> table = getTable.get();
253    RowMutations mutation = new RowMutations(row);
254    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
255    Result result = table.mutateRow(mutation).get();
256    assertTrue(result.getExists());
257    assertTrue(result.isEmpty());
258
259    result = table.get(new Get(row)).get();
260    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
261
262    mutation = new RowMutations(row);
263    mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
264    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
265    mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L));
266    mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc")));
267    result = table.mutateRow(mutation).get();
268    assertTrue(result.getExists());
269    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
270    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
271
272    result = table.get(new Get(row)).get();
273    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
274    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
275    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
276    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
277  }
278
279  // Tests for old checkAndMutate API
280
281  @SuppressWarnings("FutureReturnValueIgnored")
282  @Test
283  @Deprecated
284  public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException {
285    AsyncTable<?> table = getTable.get();
286    AtomicInteger successCount = new AtomicInteger(0);
287    AtomicInteger successIndex = new AtomicInteger(-1);
288    int count = 10;
289    CountDownLatch latch = new CountDownLatch(count);
290    IntStream.range(0, count)
291      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
292        .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
293          if (x) {
294            successCount.incrementAndGet();
295            successIndex.set(i);
296          }
297          latch.countDown();
298        }));
299    latch.await();
300    assertEquals(1, successCount.get());
301    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
302    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
303  }
304
305  @SuppressWarnings("FutureReturnValueIgnored")
306  @Test
307  @Deprecated
308  public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException {
309    AsyncTable<?> table = getTable.get();
310    int count = 10;
311    CountDownLatch putLatch = new CountDownLatch(count + 1);
312    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
313    IntStream.range(0, count)
314      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
315        .thenRun(() -> putLatch.countDown()));
316    putLatch.await();
317
318    AtomicInteger successCount = new AtomicInteger(0);
319    AtomicInteger successIndex = new AtomicInteger(-1);
320    CountDownLatch deleteLatch = new CountDownLatch(count);
321    IntStream.range(0, count)
322      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
323        .thenDelete(
324          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
325        .thenAccept(x -> {
326          if (x) {
327            successCount.incrementAndGet();
328            successIndex.set(i);
329          }
330          deleteLatch.countDown();
331        }));
332    deleteLatch.await();
333    assertEquals(1, successCount.get());
334    Result result = table.get(new Get(row)).get();
335    IntStream.range(0, count).forEach(i -> {
336      if (i == successIndex.get()) {
337        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
338      } else {
339        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
340      }
341    });
342  }
343
344  @SuppressWarnings("FutureReturnValueIgnored")
345  @Test
346  @Deprecated
347  public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException {
348    AsyncTable<?> table = getTable.get();
349    int count = 10;
350    CountDownLatch putLatch = new CountDownLatch(count + 1);
351    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
352    IntStream.range(0, count)
353      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
354        .thenRun(() -> putLatch.countDown()));
355    putLatch.await();
356
357    AtomicInteger successCount = new AtomicInteger(0);
358    AtomicInteger successIndex = new AtomicInteger(-1);
359    CountDownLatch mutateLatch = new CountDownLatch(count);
360    IntStream.range(0, count).forEach(i -> {
361      RowMutations mutation = new RowMutations(row);
362      try {
363        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
364        mutation
365          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
366      } catch (IOException e) {
367        throw new UncheckedIOException(e);
368      }
369      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
370        .thenAccept(x -> {
371          if (x) {
372            successCount.incrementAndGet();
373            successIndex.set(i);
374          }
375          mutateLatch.countDown();
376        });
377    });
378    mutateLatch.await();
379    assertEquals(1, successCount.get());
380    Result result = table.get(new Get(row)).get();
381    IntStream.range(0, count).forEach(i -> {
382      if (i == successIndex.get()) {
383        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
384      } else {
385        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
386      }
387    });
388  }
389
390  @Test
391  @Deprecated
392  public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
393    AsyncTable<?> table = getTable.get();
394    final long ts = System.currentTimeMillis() / 2;
395    Put put = new Put(row);
396    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
397
398    boolean ok =
399      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
400    assertTrue(ok);
401
402    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
403      .ifEquals(VALUE).thenPut(put).get();
404    assertFalse(ok);
405
406    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
407      .ifEquals(VALUE).thenPut(put).get();
408    assertTrue(ok);
409
410    RowMutations rm = new RowMutations(row).add((Mutation) put);
411
412    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
413      .ifEquals(VALUE).thenMutate(rm).get();
414    assertFalse(ok);
415
416    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
417      .ifEquals(VALUE).thenMutate(rm).get();
418    assertTrue(ok);
419
420    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
421
422    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
423      .ifEquals(VALUE).thenDelete(delete).get();
424    assertFalse(ok);
425
426    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
427      .ifEquals(VALUE).thenDelete(delete).get();
428    assertTrue(ok);
429  }
430
431  @Test
432  @Deprecated
433  public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
434    AsyncTable<?> table = getTable.get();
435
436    // Put one row
437    Put put = new Put(row);
438    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
439    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
440    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
441    table.put(put).get();
442
443    // Put with success
444    boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
445        CompareOperator.EQUAL, Bytes.toBytes("a")))
446      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
447      .get();
448    assertTrue(ok);
449
450    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
451    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
452
453    // Put with failure
454    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
455        CompareOperator.EQUAL, Bytes.toBytes("b")))
456      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
457      .get();
458    assertFalse(ok);
459
460    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
461
462    // Delete with success
463    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
464        CompareOperator.EQUAL, Bytes.toBytes("a")))
465      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
466      .get();
467    assertTrue(ok);
468
469    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
470
471    // Mutate with success
472    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
473        CompareOperator.EQUAL, Bytes.toBytes("b")))
474      .thenMutate(new RowMutations(row)
475        .add((Mutation) new Put(row)
476          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
477        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
478      .get();
479    assertTrue(ok);
480
481    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
482    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
483
484    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
485  }
486
487  @Test
488  @Deprecated
489  public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
490    AsyncTable<?> table = getTable.get();
491
492    // Put one row
493    Put put = new Put(row);
494    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
495    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
496    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
497    table.put(put).get();
498
499    // Put with success
500    boolean ok = table.checkAndMutate(row, new FilterList(
501        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
502          Bytes.toBytes("a")),
503        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
504          Bytes.toBytes("b"))
505      ))
506      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
507      .get();
508    assertTrue(ok);
509
510    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
511    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
512
513    // Put with failure
514    ok = table.checkAndMutate(row, new FilterList(
515        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
516          Bytes.toBytes("a")),
517        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
518          Bytes.toBytes("c"))
519      ))
520      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
521      .get();
522    assertFalse(ok);
523
524    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
525
526    // Delete with success
527    ok = table.checkAndMutate(row, new FilterList(
528        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
529          Bytes.toBytes("a")),
530        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
531          Bytes.toBytes("b"))
532      ))
533      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
534      .get();
535    assertTrue(ok);
536
537    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
538
539    // Mutate with success
540    ok = table.checkAndMutate(row, new FilterList(
541        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
542          Bytes.toBytes("a")),
543        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
544          Bytes.toBytes("b"))
545      ))
546      .thenMutate(new RowMutations(row)
547        .add((Mutation) new Put(row)
548          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
549        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
550      .get();
551    assertTrue(ok);
552
553    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
554    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
555
556    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
557  }
558
559  @Test
560  @Deprecated
561  public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
562    AsyncTable<?> table = getTable.get();
563
564    // Put with specifying the timestamp
565    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
566
567    // Put with success
568    boolean ok = table.checkAndMutate(row, new FilterList(
569        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
570        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
571        new TimestampsFilter(Collections.singletonList(100L))
572      ))
573      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
574      .get();
575    assertTrue(ok);
576
577    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
578    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
579
580    // Put with failure
581    ok = table.checkAndMutate(row, new FilterList(
582        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
583        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
584        new TimestampsFilter(Collections.singletonList(101L))
585      ))
586      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
587      .get();
588    assertFalse(ok);
589
590    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
591  }
592
593  @Test
594  @Deprecated
595  public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
596    AsyncTable<?> table = getTable.get();
597
598    // Put with specifying the timestamp
599    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
600      .get();
601
602    // Put with success
603    boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
604        CompareOperator.EQUAL, Bytes.toBytes("a")))
605      .timeRange(TimeRange.between(0, 101))
606      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
607      .get();
608    assertTrue(ok);
609
610    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
611    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
612
613    // Put with failure
614    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
615        CompareOperator.EQUAL, Bytes.toBytes("a")))
616      .timeRange(TimeRange.between(0, 100))
617      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
618      .get();
619    assertFalse(ok);
620
621    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
622  }
623
624  @Test(expected = NullPointerException.class)
625  @Deprecated
626  public void testCheckAndMutateWithoutConditionForOldApi() {
627    getTable.get().checkAndMutate(row, FAMILY)
628      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
629  }
630
631  // Tests for new CheckAndMutate API
632
633  @SuppressWarnings("FutureReturnValueIgnored")
634  @Test
635  public void testCheckAndPut() throws InterruptedException, ExecutionException {
636    AsyncTable<?> table = getTable.get();
637    AtomicInteger successCount = new AtomicInteger(0);
638    AtomicInteger successIndex = new AtomicInteger(-1);
639    int count = 10;
640    CountDownLatch latch = new CountDownLatch(count);
641
642    IntStream.range(0, count)
643      .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
644          .ifNotExists(FAMILY, QUALIFIER)
645          .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))))
646        .thenAccept(x -> {
647          if (x.isSuccess()) {
648            successCount.incrementAndGet();
649            successIndex.set(i);
650          }
651          assertNull(x.getResult());
652          latch.countDown();
653        }));
654    latch.await();
655    assertEquals(1, successCount.get());
656    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
657    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
658  }
659
660  @SuppressWarnings("FutureReturnValueIgnored")
661  @Test
662  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
663    AsyncTable<?> table = getTable.get();
664    int count = 10;
665    CountDownLatch putLatch = new CountDownLatch(count + 1);
666    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
667    IntStream.range(0, count)
668      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
669        .thenRun(() -> putLatch.countDown()));
670    putLatch.await();
671
672    AtomicInteger successCount = new AtomicInteger(0);
673    AtomicInteger successIndex = new AtomicInteger(-1);
674    CountDownLatch deleteLatch = new CountDownLatch(count);
675
676    IntStream.range(0, count)
677      .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
678          .ifEquals(FAMILY, QUALIFIER, VALUE)
679          .build(
680            new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
681        .thenAccept(x -> {
682          if (x.isSuccess()) {
683            successCount.incrementAndGet();
684            successIndex.set(i);
685          }
686          assertNull(x.getResult());
687          deleteLatch.countDown();
688        }));
689    deleteLatch.await();
690    assertEquals(1, successCount.get());
691    Result result = table.get(new Get(row)).get();
692    IntStream.range(0, count).forEach(i -> {
693      if (i == successIndex.get()) {
694        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
695      } else {
696        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
697      }
698    });
699  }
700
701  @SuppressWarnings("FutureReturnValueIgnored")
702  @Test
703  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
704    AsyncTable<?> table = getTable.get();
705    int count = 10;
706    CountDownLatch putLatch = new CountDownLatch(count + 1);
707    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
708    IntStream.range(0, count)
709      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
710        .thenRun(() -> putLatch.countDown()));
711    putLatch.await();
712
713    AtomicInteger successCount = new AtomicInteger(0);
714    AtomicInteger successIndex = new AtomicInteger(-1);
715    CountDownLatch mutateLatch = new CountDownLatch(count);
716    IntStream.range(0, count).forEach(i -> {
717      RowMutations mutation = new RowMutations(row);
718      try {
719        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
720        mutation
721          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
722      } catch (IOException e) {
723        throw new UncheckedIOException(e);
724      }
725
726      table.checkAndMutate(CheckAndMutate.newBuilder(row)
727          .ifEquals(FAMILY, QUALIFIER, VALUE)
728          .build(mutation))
729        .thenAccept(x -> {
730          if (x.isSuccess()) {
731            successCount.incrementAndGet();
732            successIndex.set(i);
733          }
734          assertNull(x.getResult());
735          mutateLatch.countDown();
736        });
737    });
738    mutateLatch.await();
739    assertEquals(1, successCount.get());
740    Result result = table.get(new Get(row)).get();
741    IntStream.range(0, count).forEach(i -> {
742      if (i == successIndex.get()) {
743        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
744      } else {
745        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
746      }
747    });
748  }
749
750  @Test
751  public void testCheckAndMutateWithTimeRange() throws Exception {
752    AsyncTable<?> table = getTable.get();
753    final long ts = System.currentTimeMillis() / 2;
754    Put put = new Put(row);
755    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
756
757    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
758      .ifNotExists(FAMILY, QUALIFIER)
759      .build(put)).get();
760    assertTrue(result.isSuccess());
761    assertNull(result.getResult());
762
763    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
764      .ifEquals(FAMILY, QUALIFIER, VALUE)
765      .timeRange(TimeRange.at(ts + 10000))
766      .build(put)).get();
767    assertFalse(result.isSuccess());
768    assertNull(result.getResult());
769
770    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
771      .ifEquals(FAMILY, QUALIFIER, VALUE)
772      .timeRange(TimeRange.at(ts))
773      .build(put)).get();
774    assertTrue(result.isSuccess());
775    assertNull(result.getResult());
776
777    RowMutations rm = new RowMutations(row).add((Mutation) put);
778
779    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
780      .ifEquals(FAMILY, QUALIFIER, VALUE)
781      .timeRange(TimeRange.at(ts + 10000))
782      .build(rm)).get();
783    assertFalse(result.isSuccess());
784    assertNull(result.getResult());
785
786    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
787      .ifEquals(FAMILY, QUALIFIER, VALUE)
788      .timeRange(TimeRange.at(ts))
789      .build(rm)).get();
790    assertTrue(result.isSuccess());
791    assertNull(result.getResult());
792
793    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
794
795    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
796      .ifEquals(FAMILY, QUALIFIER, VALUE)
797      .timeRange(TimeRange.at(ts + 10000))
798      .build(delete)).get();
799    assertFalse(result.isSuccess());
800    assertNull(result.getResult());
801
802    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
803      .ifEquals(FAMILY, QUALIFIER, VALUE)
804      .timeRange(TimeRange.at(ts))
805      .build(delete)).get();
806    assertTrue(result.isSuccess());
807    assertNull(result.getResult());
808  }
809
810  @Test
811  public void testCheckAndMutateWithSingleFilter() throws Throwable {
812    AsyncTable<?> table = getTable.get();
813
814    // Put one row
815    Put put = new Put(row);
816    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
817    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
818    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
819    table.put(put).get();
820
821    // Put with success
822    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
823      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
824        CompareOperator.EQUAL, Bytes.toBytes("a")))
825      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
826    assertTrue(result.isSuccess());
827    assertNull(result.getResult());
828
829    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
830    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
831
832    // Put with failure
833    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
834      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
835        CompareOperator.EQUAL, Bytes.toBytes("b")))
836      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
837    assertFalse(result.isSuccess());
838    assertNull(result.getResult());
839
840    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
841
842    // Delete with success
843    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
844      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
845        CompareOperator.EQUAL, Bytes.toBytes("a")))
846      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
847    assertTrue(result.isSuccess());
848    assertNull(result.getResult());
849
850    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
851
852    // Mutate with success
853    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
854      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
855        CompareOperator.EQUAL, Bytes.toBytes("b")))
856      .build(new RowMutations(row)
857        .add((Mutation) new Put(row)
858          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
859        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
860    assertTrue(result.isSuccess());
861    assertNull(result.getResult());
862
863    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
864    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
865
866    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
867  }
868
869  @Test
870  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
871    AsyncTable<?> table = getTable.get();
872
873    // Put one row
874    Put put = new Put(row);
875    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
876    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
877    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
878    table.put(put).get();
879
880    // Put with success
881    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
882      .ifMatches(new FilterList(
883        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
884          Bytes.toBytes("a")),
885        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
886          Bytes.toBytes("b"))))
887      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
888    assertTrue(result.isSuccess());
889    assertNull(result.getResult());
890
891    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
892    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
893
894    // Put with failure
895    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
896      .ifMatches(new FilterList(
897        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
898          Bytes.toBytes("a")),
899        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
900          Bytes.toBytes("c"))))
901      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
902    assertFalse(result.isSuccess());
903    assertNull(result.getResult());
904
905    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
906
907    // Delete with success
908    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
909      .ifMatches(new FilterList(
910        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
911          Bytes.toBytes("a")),
912        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
913          Bytes.toBytes("b"))))
914      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
915    assertTrue(result.isSuccess());
916    assertNull(result.getResult());
917
918    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
919
920    // Mutate with success
921    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
922      .ifMatches(new FilterList(
923        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
924          Bytes.toBytes("a")),
925        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
926          Bytes.toBytes("b"))))
927      .build(new RowMutations(row)
928        .add((Mutation) new Put(row)
929          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
930        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
931    assertTrue(result.isSuccess());
932    assertNull(result.getResult());
933
934    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
935    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
936
937    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
938  }
939
940  @Test
941  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
942    AsyncTable<?> table = getTable.get();
943
944    // Put with specifying the timestamp
945    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
946
947    // Put with success
948    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
949      .ifMatches(new FilterList(
950        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
951        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
952        new TimestampsFilter(Collections.singletonList(100L))))
953      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
954    assertTrue(result.isSuccess());
955    assertNull(result.getResult());
956
957    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
958    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
959
960    // Put with failure
961    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
962      .ifMatches(new FilterList(
963        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
964        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
965        new TimestampsFilter(Collections.singletonList(101L))))
966      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
967    assertFalse(result.isSuccess());
968    assertNull(result.getResult());
969
970    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
971  }
972
973  @Test
974  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
975    AsyncTable<?> table = getTable.get();
976
977    // Put with specifying the timestamp
978    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
979      .get();
980
981    // Put with success
982    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
983      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
984        CompareOperator.EQUAL, Bytes.toBytes("a")))
985      .timeRange(TimeRange.between(0, 101))
986      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
987    assertTrue(result.isSuccess());
988    assertNull(result.getResult());
989
990    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
991    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
992
993    // Put with failure
994    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
995      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
996        CompareOperator.EQUAL, Bytes.toBytes("a")))
997      .timeRange(TimeRange.between(0, 100))
998      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))))
999      .get();
1000    assertFalse(result.isSuccess());
1001    assertNull(result.getResult());
1002
1003    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
1004  }
1005
1006  @Test
1007  public void testCheckAndIncrement() throws Throwable {
1008    AsyncTable<?> table = getTable.get();
1009
1010    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1011
1012    // CheckAndIncrement with correct value
1013    CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1014      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1015      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
1016    assertTrue(res.isSuccess());
1017    assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1018
1019    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1020    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1021
1022    // CheckAndIncrement with wrong value
1023    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1024      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1025      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
1026    assertFalse(res.isSuccess());
1027    assertNull(res.getResult());
1028
1029    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1030    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1031
1032    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1033
1034    // CheckAndIncrement with a filter and correct value
1035    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1036      .ifMatches(new FilterList(
1037        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1038          Bytes.toBytes("a")),
1039        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1040          Bytes.toBytes("c"))))
1041      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1042    assertTrue(res.isSuccess());
1043    assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1044
1045    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1046    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1047
1048    // CheckAndIncrement with a filter and correct value
1049    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1050      .ifMatches(new FilterList(
1051        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1052          Bytes.toBytes("b")),
1053        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1054          Bytes.toBytes("d"))))
1055      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1056    assertFalse(res.isSuccess());
1057    assertNull(res.getResult());
1058
1059    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1060    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1061  }
1062
1063  @Test
1064  public void testCheckAndAppend() throws Throwable {
1065    AsyncTable<?> table = getTable.get();
1066
1067    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1068
1069    // CheckAndAppend with correct value
1070    CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1071      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1072      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
1073    assertTrue(res.isSuccess());
1074    assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1075
1076    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1077    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1078
1079    // CheckAndAppend with correct value
1080    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1081      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1082      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
1083    assertFalse(res.isSuccess());
1084    assertNull(res.getResult());
1085
1086    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1087    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1088
1089    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1090
1091    // CheckAndAppend with a filter and correct value
1092    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1093      .ifMatches(new FilterList(
1094        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1095          Bytes.toBytes("a")),
1096        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1097          Bytes.toBytes("c"))))
1098      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1099    assertTrue(res.isSuccess());
1100    assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1101
1102    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1103    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1104
1105    // CheckAndAppend with a filter and wrong value
1106    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1107      .ifMatches(new FilterList(
1108        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1109          Bytes.toBytes("b")),
1110        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1111          Bytes.toBytes("d"))))
1112      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1113    assertFalse(res.isSuccess());
1114    assertNull(res.getResult());
1115
1116    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1117    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1118  }
1119
1120  @Test
1121  public void testCheckAndRowMutations() throws Throwable {
1122    final byte[] q1 = Bytes.toBytes("q1");
1123    final byte[] q2 = Bytes.toBytes("q2");
1124    final byte[] q3 = Bytes.toBytes("q3");
1125    final byte[] q4 = Bytes.toBytes("q4");
1126    final String v1 = "v1";
1127
1128    AsyncTable<?> table = getTable.get();
1129
1130    // Initial values
1131    table.putAll(Arrays.asList(
1132      new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
1133      new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
1134      new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get();
1135
1136    // Do CheckAndRowMutations
1137    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
1138      .ifNotExists(FAMILY, q1)
1139      .build(new RowMutations(row).add(Arrays.asList(
1140        new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
1141        new Delete(row).addColumns(FAMILY, q2),
1142        new Increment(row).addColumn(FAMILY, q3, 1),
1143        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
1144      );
1145
1146    CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get();
1147    assertTrue(result.isSuccess());
1148    assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
1149    assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4)));
1150
1151    // Verify the value
1152    Result r = table.get(new Get(row)).get();
1153    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1154    assertNull(r.getValue(FAMILY, q2));
1155    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1156    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1157
1158    // Do CheckAndRowMutations again
1159    checkAndMutate = CheckAndMutate.newBuilder(row)
1160      .ifNotExists(FAMILY, q1)
1161      .build(new RowMutations(row).add(Arrays.asList(
1162        new Delete(row).addColumns(FAMILY, q1),
1163        new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
1164        new Increment(row).addColumn(FAMILY, q3, 1),
1165        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
1166      );
1167
1168    result = table.checkAndMutate(checkAndMutate).get();
1169    assertFalse(result.isSuccess());
1170    assertNull(result.getResult());
1171
1172    // Verify the value
1173    r = table.get(new Get(row)).get();
1174    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1175    assertNull(r.getValue(FAMILY, q2));
1176    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1177    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1178  }
1179
1180  // Tests for batch version of checkAndMutate
1181
1182  @Test
1183  public void testCheckAndMutateBatch() throws Throwable {
1184    AsyncTable<?> table = getTable.get();
1185    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1186    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1187    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1188
1189    table.putAll(Arrays.asList(
1190      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1191      new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1192      new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1193      new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
1194
1195    // Test for Put
1196    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1197      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1198      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1199
1200    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1201      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
1202      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1203
1204    List<CheckAndMutateResult> results =
1205      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1206
1207    assertTrue(results.get(0).isSuccess());
1208    assertNull(results.get(0).getResult());
1209    assertFalse(results.get(1).isSuccess());
1210    assertNull(results.get(1).getResult());
1211
1212    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1213    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1214
1215    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1216    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1217
1218    // Test for Delete
1219    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1220      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
1221      .build(new Delete(row));
1222
1223    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1224      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
1225      .build(new Delete(row2));
1226
1227    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1228
1229    assertTrue(results.get(0).isSuccess());
1230    assertNull(results.get(0).getResult());
1231    assertFalse(results.get(1).isSuccess());
1232    assertNull(results.get(1).getResult());
1233
1234    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
1235
1236    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1237    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1238
1239    // Test for RowMutations
1240    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
1241      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1242      .build(new RowMutations(row3)
1243        .add((Mutation) new Put(row3)
1244          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1245        .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))));
1246
1247    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
1248      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
1249      .build(new RowMutations(row4)
1250        .add((Mutation) new Put(row4)
1251          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1252        .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D"))));
1253
1254    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1255
1256    assertTrue(results.get(0).isSuccess());
1257    assertNull(results.get(0).getResult());
1258    assertFalse(results.get(1).isSuccess());
1259    assertNull(results.get(1).getResult());
1260
1261    result = table.get(new Get(row3)).get();
1262    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1263    assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
1264
1265    result = table.get(new Get(row4)).get();
1266    assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
1267    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1268  }
1269
1270  @Test
1271  public void testCheckAndMutateBatch2() throws Throwable {
1272    AsyncTable<?> table = getTable.get();
1273    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1274    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1275    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1276
1277    table.putAll(Arrays.asList(
1278      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1279      new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1280      new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
1281      new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))).get();
1282
1283    // Test for ifNotExists()
1284    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1285      .ifNotExists(FAMILY, Bytes.toBytes("B"))
1286      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1287
1288    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1289      .ifNotExists(FAMILY, Bytes.toBytes("B"))
1290      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1291
1292    List<CheckAndMutateResult> results =
1293      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1294
1295    assertTrue(results.get(0).isSuccess());
1296    assertNull(results.get(0).getResult());
1297    assertFalse(results.get(1).isSuccess());
1298    assertNull(results.get(1).getResult());
1299
1300    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1301    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1302
1303    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1304    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1305
1306    // Test for ifMatches()
1307    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1308      .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
1309      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
1310
1311    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1312      .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
1313      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1314
1315    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1316
1317    assertTrue(results.get(0).isSuccess());
1318    assertNull(results.get(0).getResult());
1319    assertFalse(results.get(1).isSuccess());
1320    assertNull(results.get(1).getResult());
1321
1322    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1323    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1324
1325    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1326    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1327
1328    // Test for timeRange()
1329    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
1330      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1331      .timeRange(TimeRange.between(0, 101))
1332      .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
1333
1334    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
1335      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1336      .timeRange(TimeRange.between(0, 100))
1337      .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
1338
1339    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1340
1341    assertTrue(results.get(0).isSuccess());
1342    assertNull(results.get(0).getResult());
1343    assertFalse(results.get(1).isSuccess());
1344    assertNull(results.get(1).getResult());
1345
1346    result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1347    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1348
1349    result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1350    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1351  }
1352
1353  @Test
1354  public void testCheckAndMutateBatchWithFilter() throws Throwable {
1355    AsyncTable<?> table = getTable.get();
1356    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1357
1358    table.putAll(Arrays.asList(
1359      new Put(row)
1360        .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1361        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1362        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1363      new Put(row2)
1364        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1365        .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
1366        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
1367
1368    // Test for Put
1369    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1370      .ifMatches(new FilterList(
1371        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1372          Bytes.toBytes("a")),
1373        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1374          Bytes.toBytes("b"))))
1375      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1376
1377    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1378      .ifMatches(new FilterList(
1379        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1380          Bytes.toBytes("a")),
1381        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1382          Bytes.toBytes("b"))))
1383      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1384
1385    List<CheckAndMutateResult> results =
1386      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1387
1388    assertTrue(results.get(0).isSuccess());
1389    assertNull(results.get(0).getResult());
1390    assertFalse(results.get(1).isSuccess());
1391    assertNull(results.get(1).getResult());
1392
1393    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1394    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1395
1396    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1397    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1398
1399    // Test for Delete
1400    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1401      .ifMatches(new FilterList(
1402        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1403          Bytes.toBytes("a")),
1404        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1405          Bytes.toBytes("b"))))
1406      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
1407
1408    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1409      .ifMatches(new FilterList(
1410        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1411          Bytes.toBytes("a")),
1412        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1413          Bytes.toBytes("b"))))
1414      .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
1415
1416    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1417
1418    assertTrue(results.get(0).isSuccess());
1419    assertNull(results.get(0).getResult());
1420    assertFalse(results.get(1).isSuccess());
1421    assertNull(results.get(1).getResult());
1422
1423    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
1424
1425    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1426    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1427
1428    // Test for RowMutations
1429    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1430      .ifMatches(new FilterList(
1431        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1432          Bytes.toBytes("a")),
1433        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1434          Bytes.toBytes("b"))))
1435      .build(new RowMutations(row)
1436        .add((Mutation) new Put(row)
1437          .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
1438        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
1439
1440    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1441      .ifMatches(new FilterList(
1442        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1443          Bytes.toBytes("a")),
1444        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1445          Bytes.toBytes("b"))))
1446      .build(new RowMutations(row2)
1447        .add((Mutation) new Put(row2)
1448          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
1449        .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D"))));
1450
1451    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1452
1453    assertTrue(results.get(0).isSuccess());
1454    assertNull(results.get(0).getResult());
1455    assertFalse(results.get(1).isSuccess());
1456    assertNull(results.get(1).getResult());
1457
1458    result = table.get(new Get(row)).get();
1459    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
1460    assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1461
1462    result = table.get(new Get(row2)).get();
1463    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1464    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1465  }
1466
1467  @Test
1468  public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
1469    AsyncTable<?> table = getTable.get();
1470    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1471
1472    table.putAll(Arrays.asList(
1473      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
1474        .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
1475        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1476      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
1477        .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
1478        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
1479
1480    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1481      .ifMatches(new FilterList(
1482        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1483          Bytes.toBytes("a")),
1484        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1485          Bytes.toBytes("b"))))
1486      .timeRange(TimeRange.between(0, 101))
1487      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1488
1489    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1490      .ifMatches(new FilterList(
1491        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1492          Bytes.toBytes("d")),
1493        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1494          Bytes.toBytes("e"))))
1495      .timeRange(TimeRange.between(0, 100))
1496      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1497
1498    List<CheckAndMutateResult> results =
1499      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1500
1501    assertTrue(results.get(0).isSuccess());
1502    assertNull(results.get(0).getResult());
1503    assertFalse(results.get(1).isSuccess());
1504    assertNull(results.get(1).getResult());
1505
1506    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1507    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1508
1509    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1510    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1511  }
1512
1513  @Test
1514  public void testCheckAndIncrementBatch() throws Throwable {
1515    AsyncTable<?> table = getTable.get();
1516    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1517
1518    table.putAll(Arrays.asList(
1519      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1520        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
1521      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1522        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
1523
1524    // CheckAndIncrement with correct value
1525    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1526      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1527      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
1528
1529    // CheckAndIncrement with wrong value
1530    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1531      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1532      .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
1533
1534    List<CheckAndMutateResult> results =
1535      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1536
1537    assertTrue(results.get(0).isSuccess());
1538    assertEquals(1, Bytes.toLong(results.get(0).getResult()
1539      .getValue(FAMILY, Bytes.toBytes("B"))));
1540    assertFalse(results.get(1).isSuccess());
1541    assertNull(results.get(1).getResult());
1542
1543    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1544    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1545
1546    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1547    assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
1548  }
1549
1550  @Test
1551  public void testCheckAndAppendBatch() throws Throwable {
1552    AsyncTable<?> table = getTable.get();
1553    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1554
1555    table.putAll(Arrays.asList(
1556      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1557        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1558      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1559        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
1560
1561    // CheckAndAppend with correct value
1562    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1563      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1564      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
1565
1566    // CheckAndAppend with wrong value
1567    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1568      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1569      .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
1570
1571    List<CheckAndMutateResult> results =
1572      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1573
1574    assertTrue(results.get(0).isSuccess());
1575    assertEquals("bb", Bytes.toString(results.get(0).getResult()
1576      .getValue(FAMILY, Bytes.toBytes("B"))));
1577    assertFalse(results.get(1).isSuccess());
1578    assertNull(results.get(1).getResult());
1579
1580    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1581    assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1582
1583    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1584    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1585  }
1586
1587  @Test
1588  public void testCheckAndRowMutationsBatch() throws Throwable {
1589    AsyncTable<?> table = getTable.get();
1590    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1591
1592    table.putAll(Arrays.asList(
1593      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1594        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
1595        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
1596      new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
1597        .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
1598        .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))
1599    ).get();
1600
1601    // CheckAndIncrement with correct value
1602    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1603      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1604      .build(new RowMutations(row).add(Arrays.asList(
1605        new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1606        new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")),
1607        new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
1608        new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1609      )));
1610
1611    // CheckAndIncrement with wrong value
1612    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1613      .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
1614      .build(new RowMutations(row2).add(Arrays.asList(
1615        new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
1616        new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")),
1617        new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
1618        new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))
1619      )));
1620
1621    List<CheckAndMutateResult> results =
1622      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1623
1624    assertTrue(results.get(0).isSuccess());
1625    assertEquals(2, Bytes.toLong(results.get(0).getResult()
1626      .getValue(FAMILY, Bytes.toBytes("C"))));
1627    assertEquals("dd", Bytes.toString(results.get(0).getResult()
1628      .getValue(FAMILY, Bytes.toBytes("D"))));
1629
1630    assertFalse(results.get(1).isSuccess());
1631    assertNull(results.get(1).getResult());
1632
1633    Result result = table.get(new Get(row)).get();
1634    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1635    assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
1636    assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
1637    assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1638
1639    result = table.get(new Get(row2)).get();
1640    assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
1641    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1642    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
1643    assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H"))));
1644  }
1645
1646  @Test
1647  public void testDisabled() throws InterruptedException, ExecutionException {
1648    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
1649    try {
1650      getTable.get().get(new Get(row)).get();
1651      fail("Should fail since table has been disabled");
1652    } catch (ExecutionException e) {
1653      Throwable cause = e.getCause();
1654      assertThat(cause, instanceOf(TableNotEnabledException.class));
1655      assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
1656    }
1657  }
1658
1659  @Test
1660  public void testInvalidPut() {
1661    try {
1662      getTable.get().put(new Put(Bytes.toBytes(0)));
1663      fail("Should fail since the put does not contain any cells");
1664    } catch (IllegalArgumentException e) {
1665      assertThat(e.getMessage(), containsString("No columns to insert"));
1666    }
1667
1668    try {
1669      getTable.get()
1670        .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
1671      fail("Should fail since the put exceeds the max key value size");
1672    } catch (IllegalArgumentException e) {
1673      assertThat(e.getMessage(), containsString("KeyValue size too large"));
1674    }
1675  }
1676}