001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertThat;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.concurrent.ArrayBlockingQueue;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.CountDownLatch;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.ForkJoinPool;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.function.Supplier;
043import java.util.stream.IntStream;
044import org.apache.commons.io.IOUtils;
045import org.apache.hadoop.hbase.CompareOperator;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.TableNotEnabledException;
050import org.apache.hadoop.hbase.filter.BinaryComparator;
051import org.apache.hadoop.hbase.filter.FamilyFilter;
052import org.apache.hadoop.hbase.filter.FilterList;
053import org.apache.hadoop.hbase.filter.QualifierFilter;
054import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
055import org.apache.hadoop.hbase.filter.TimestampsFilter;
056import org.apache.hadoop.hbase.io.TimeRange;
057import org.apache.hadoop.hbase.testclassification.ClientTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.Pair;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TestName;
069import org.junit.runner.RunWith;
070import org.junit.runners.Parameterized;
071import org.junit.runners.Parameterized.Parameter;
072import org.junit.runners.Parameterized.Parameters;
073
074@RunWith(Parameterized.class)
075@Category({ MediumTests.class, ClientTests.class })
076public class TestAsyncTable {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestAsyncTable.class);
081
082  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
083
084  private static TableName TABLE_NAME = TableName.valueOf("async");
085
086  private static byte[] FAMILY = Bytes.toBytes("cf");
087
088  private static byte[] QUALIFIER = Bytes.toBytes("cq");
089
090  private static byte[] VALUE = Bytes.toBytes("value");
091
092  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
093
094  private static AsyncConnection ASYNC_CONN;
095
096  @Rule
097  public TestName testName = new TestName();
098
099  private byte[] row;
100
101  @Parameter
102  public Supplier<AsyncTable<?>> getTable;
103
104  private static AsyncTable<?> getRawTable() {
105    return ASYNC_CONN.getTable(TABLE_NAME);
106  }
107
108  private static AsyncTable<?> getTable() {
109    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
110  }
111
112  @Parameters
113  public static List<Object[]> params() {
114    return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
115      new Supplier<?>[] { TestAsyncTable::getTable });
116  }
117
118  @BeforeClass
119  public static void setUpBeforeClass() throws Exception {
120    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
121      MAX_KEY_VALUE_SIZE);
122    TEST_UTIL.startMiniCluster(1);
123    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
124    TEST_UTIL.waitTableAvailable(TABLE_NAME);
125    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
126    assertFalse(ASYNC_CONN.isClosed());
127  }
128
129  @AfterClass
130  public static void tearDownAfterClass() throws Exception {
131    IOUtils.closeQuietly(ASYNC_CONN);
132    assertTrue(ASYNC_CONN.isClosed());
133    TEST_UTIL.shutdownMiniCluster();
134  }
135
136  @Before
137  public void setUp() throws IOException, InterruptedException, ExecutionException {
138    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
139    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
140      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
141    }
142  }
143
144  @Test
145  public void testSimple() throws Exception {
146    AsyncTable<?> table = getTable.get();
147    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
148    assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
149    Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
150    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
151    table.delete(new Delete(row)).get();
152    result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
153    assertTrue(result.isEmpty());
154    assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
155  }
156
157  private byte[] concat(byte[] base, int index) {
158    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
159  }
160
161  @SuppressWarnings("FutureReturnValueIgnored")
162  @Test
163  public void testSimpleMultiple() throws Exception {
164    AsyncTable<?> table = getTable.get();
165    int count = 100;
166    CountDownLatch putLatch = new CountDownLatch(count);
167    IntStream.range(0, count).forEach(
168      i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
169        .thenAccept(x -> putLatch.countDown()));
170    putLatch.await();
171    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
172    IntStream.range(0, count)
173      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
174        .thenAccept(x -> existsResp.add(x)));
175    for (int i = 0; i < count; i++) {
176      assertTrue(existsResp.take());
177    }
178    BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
179    IntStream.range(0, count)
180      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
181        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
182    for (int i = 0; i < count; i++) {
183      Pair<Integer, Result> pair = getResp.take();
184      assertArrayEquals(concat(VALUE, pair.getFirst()),
185        pair.getSecond().getValue(FAMILY, QUALIFIER));
186    }
187    CountDownLatch deleteLatch = new CountDownLatch(count);
188    IntStream.range(0, count).forEach(
189      i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
190    deleteLatch.await();
191    IntStream.range(0, count)
192      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
193        .thenAccept(x -> existsResp.add(x)));
194    for (int i = 0; i < count; i++) {
195      assertFalse(existsResp.take());
196    }
197    IntStream.range(0, count)
198      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
199        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
200    for (int i = 0; i < count; i++) {
201      Pair<Integer, Result> pair = getResp.take();
202      assertTrue(pair.getSecond().isEmpty());
203    }
204  }
205
206  @SuppressWarnings("FutureReturnValueIgnored")
207  @Test
208  public void testIncrement() throws InterruptedException, ExecutionException {
209    AsyncTable<?> table = getTable.get();
210    int count = 100;
211    CountDownLatch latch = new CountDownLatch(count);
212    AtomicLong sum = new AtomicLong(0L);
213    IntStream.range(0, count)
214      .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
215        sum.addAndGet(x);
216        latch.countDown();
217      }));
218    latch.await();
219    assertEquals(count, Bytes.toLong(
220      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
221    assertEquals((1 + count) * count / 2, sum.get());
222  }
223
224  @SuppressWarnings("FutureReturnValueIgnored")
225  @Test
226  public void testAppend() throws InterruptedException, ExecutionException {
227    AsyncTable<?> table = getTable.get();
228    int count = 10;
229    CountDownLatch latch = new CountDownLatch(count);
230    char suffix = ':';
231    AtomicLong suffixCount = new AtomicLong(0L);
232    IntStream.range(0, count)
233      .forEachOrdered(i -> table
234        .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
235        .thenAccept(r -> {
236          suffixCount.addAndGet(
237            Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
238          latch.countDown();
239        }));
240    latch.await();
241    assertEquals((1 + count) * count / 2, suffixCount.get());
242    String value = Bytes.toString(
243      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
244    int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
245      .sorted().toArray();
246    assertArrayEquals(IntStream.range(0, count).toArray(), actual);
247  }
248
249  @Test
250  public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
251    AsyncTable<?> table = getTable.get();
252    RowMutations mutation = new RowMutations(row);
253    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
254    Result result = table.mutateRow(mutation).get();
255    assertTrue(result.getExists());
256    assertTrue(result.isEmpty());
257
258    result = table.get(new Get(row)).get();
259    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
260
261    mutation = new RowMutations(row);
262    mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
263    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
264    mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L));
265    mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc")));
266    result = table.mutateRow(mutation).get();
267    assertTrue(result.getExists());
268    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
269    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
270
271    result = table.get(new Get(row)).get();
272    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
273    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
274    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
275    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
276  }
277
278  // Tests for old checkAndMutate API
279
280  @SuppressWarnings("FutureReturnValueIgnored")
281  @Test
282  @Deprecated
283  public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException {
284    AsyncTable<?> table = getTable.get();
285    AtomicInteger successCount = new AtomicInteger(0);
286    AtomicInteger successIndex = new AtomicInteger(-1);
287    int count = 10;
288    CountDownLatch latch = new CountDownLatch(count);
289    IntStream.range(0, count)
290      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
291        .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
292          if (x) {
293            successCount.incrementAndGet();
294            successIndex.set(i);
295          }
296          latch.countDown();
297        }));
298    latch.await();
299    assertEquals(1, successCount.get());
300    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
301    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
302  }
303
304  @SuppressWarnings("FutureReturnValueIgnored")
305  @Test
306  @Deprecated
307  public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException {
308    AsyncTable<?> table = getTable.get();
309    int count = 10;
310    CountDownLatch putLatch = new CountDownLatch(count + 1);
311    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
312    IntStream.range(0, count)
313      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
314        .thenRun(() -> putLatch.countDown()));
315    putLatch.await();
316
317    AtomicInteger successCount = new AtomicInteger(0);
318    AtomicInteger successIndex = new AtomicInteger(-1);
319    CountDownLatch deleteLatch = new CountDownLatch(count);
320    IntStream.range(0, count)
321      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
322        .thenDelete(
323          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
324        .thenAccept(x -> {
325          if (x) {
326            successCount.incrementAndGet();
327            successIndex.set(i);
328          }
329          deleteLatch.countDown();
330        }));
331    deleteLatch.await();
332    assertEquals(1, successCount.get());
333    Result result = table.get(new Get(row)).get();
334    IntStream.range(0, count).forEach(i -> {
335      if (i == successIndex.get()) {
336        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
337      } else {
338        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
339      }
340    });
341  }
342
343  @SuppressWarnings("FutureReturnValueIgnored")
344  @Test
345  @Deprecated
346  public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException {
347    AsyncTable<?> table = getTable.get();
348    int count = 10;
349    CountDownLatch putLatch = new CountDownLatch(count + 1);
350    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
351    IntStream.range(0, count)
352      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
353        .thenRun(() -> putLatch.countDown()));
354    putLatch.await();
355
356    AtomicInteger successCount = new AtomicInteger(0);
357    AtomicInteger successIndex = new AtomicInteger(-1);
358    CountDownLatch mutateLatch = new CountDownLatch(count);
359    IntStream.range(0, count).forEach(i -> {
360      RowMutations mutation = new RowMutations(row);
361      try {
362        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
363        mutation
364          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
365      } catch (IOException e) {
366        throw new UncheckedIOException(e);
367      }
368      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
369        .thenAccept(x -> {
370          if (x) {
371            successCount.incrementAndGet();
372            successIndex.set(i);
373          }
374          mutateLatch.countDown();
375        });
376    });
377    mutateLatch.await();
378    assertEquals(1, successCount.get());
379    Result result = table.get(new Get(row)).get();
380    IntStream.range(0, count).forEach(i -> {
381      if (i == successIndex.get()) {
382        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
383      } else {
384        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
385      }
386    });
387  }
388
389  @Test
390  @Deprecated
391  public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
392    AsyncTable<?> table = getTable.get();
393    final long ts = System.currentTimeMillis() / 2;
394    Put put = new Put(row);
395    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
396
397    boolean ok =
398      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
399    assertTrue(ok);
400
401    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
402      .ifEquals(VALUE).thenPut(put).get();
403    assertFalse(ok);
404
405    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
406      .ifEquals(VALUE).thenPut(put).get();
407    assertTrue(ok);
408
409    RowMutations rm = new RowMutations(row).add((Mutation) put);
410
411    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
412      .ifEquals(VALUE).thenMutate(rm).get();
413    assertFalse(ok);
414
415    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
416      .ifEquals(VALUE).thenMutate(rm).get();
417    assertTrue(ok);
418
419    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
420
421    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
422      .ifEquals(VALUE).thenDelete(delete).get();
423    assertFalse(ok);
424
425    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
426      .ifEquals(VALUE).thenDelete(delete).get();
427    assertTrue(ok);
428  }
429
430  @Test
431  @Deprecated
432  public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
433    AsyncTable<?> table = getTable.get();
434
435    // Put one row
436    Put put = new Put(row);
437    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
438    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
439    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
440    table.put(put).get();
441
442    // Put with success
443    boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
444        CompareOperator.EQUAL, Bytes.toBytes("a")))
445      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
446      .get();
447    assertTrue(ok);
448
449    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
450    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
451
452    // Put with failure
453    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
454        CompareOperator.EQUAL, Bytes.toBytes("b")))
455      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
456      .get();
457    assertFalse(ok);
458
459    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
460
461    // Delete with success
462    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
463        CompareOperator.EQUAL, Bytes.toBytes("a")))
464      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
465      .get();
466    assertTrue(ok);
467
468    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
469
470    // Mutate with success
471    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
472        CompareOperator.EQUAL, Bytes.toBytes("b")))
473      .thenMutate(new RowMutations(row)
474        .add((Mutation) new Put(row)
475          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
476        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
477      .get();
478    assertTrue(ok);
479
480    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
481    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
482
483    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
484  }
485
486  @Test
487  @Deprecated
488  public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
489    AsyncTable<?> table = getTable.get();
490
491    // Put one row
492    Put put = new Put(row);
493    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
494    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
495    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
496    table.put(put).get();
497
498    // Put with success
499    boolean ok = table.checkAndMutate(row, new FilterList(
500        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
501          Bytes.toBytes("a")),
502        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
503          Bytes.toBytes("b"))
504      ))
505      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
506      .get();
507    assertTrue(ok);
508
509    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
510    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
511
512    // Put with failure
513    ok = table.checkAndMutate(row, new FilterList(
514        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
515          Bytes.toBytes("a")),
516        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
517          Bytes.toBytes("c"))
518      ))
519      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
520      .get();
521    assertFalse(ok);
522
523    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
524
525    // Delete with success
526    ok = table.checkAndMutate(row, new FilterList(
527        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
528          Bytes.toBytes("a")),
529        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
530          Bytes.toBytes("b"))
531      ))
532      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
533      .get();
534    assertTrue(ok);
535
536    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
537
538    // Mutate with success
539    ok = table.checkAndMutate(row, new FilterList(
540        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
541          Bytes.toBytes("a")),
542        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
543          Bytes.toBytes("b"))
544      ))
545      .thenMutate(new RowMutations(row)
546        .add((Mutation) new Put(row)
547          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
548        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
549      .get();
550    assertTrue(ok);
551
552    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
553    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
554
555    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
556  }
557
558  @Test
559  @Deprecated
560  public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
561    AsyncTable<?> table = getTable.get();
562
563    // Put with specifying the timestamp
564    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
565
566    // Put with success
567    boolean ok = table.checkAndMutate(row, new FilterList(
568        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
569        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
570        new TimestampsFilter(Collections.singletonList(100L))
571      ))
572      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
573      .get();
574    assertTrue(ok);
575
576    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
577    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
578
579    // Put with failure
580    ok = table.checkAndMutate(row, new FilterList(
581        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
582        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
583        new TimestampsFilter(Collections.singletonList(101L))
584      ))
585      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
586      .get();
587    assertFalse(ok);
588
589    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
590  }
591
592  @Test
593  @Deprecated
594  public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
595    AsyncTable<?> table = getTable.get();
596
597    // Put with specifying the timestamp
598    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
599      .get();
600
601    // Put with success
602    boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
603        CompareOperator.EQUAL, Bytes.toBytes("a")))
604      .timeRange(TimeRange.between(0, 101))
605      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
606      .get();
607    assertTrue(ok);
608
609    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
610    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
611
612    // Put with failure
613    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
614        CompareOperator.EQUAL, Bytes.toBytes("a")))
615      .timeRange(TimeRange.between(0, 100))
616      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
617      .get();
618    assertFalse(ok);
619
620    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
621  }
622
623  @Test(expected = NullPointerException.class)
624  @Deprecated
625  public void testCheckAndMutateWithoutConditionForOldApi() {
626    getTable.get().checkAndMutate(row, FAMILY)
627      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
628  }
629
630  // Tests for new CheckAndMutate API
631
632  @SuppressWarnings("FutureReturnValueIgnored")
633  @Test
634  public void testCheckAndPut() throws InterruptedException, ExecutionException {
635    AsyncTable<?> table = getTable.get();
636    AtomicInteger successCount = new AtomicInteger(0);
637    AtomicInteger successIndex = new AtomicInteger(-1);
638    int count = 10;
639    CountDownLatch latch = new CountDownLatch(count);
640
641    IntStream.range(0, count)
642      .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
643          .ifNotExists(FAMILY, QUALIFIER)
644          .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))))
645        .thenAccept(x -> {
646          if (x.isSuccess()) {
647            successCount.incrementAndGet();
648            successIndex.set(i);
649          }
650          assertNull(x.getResult());
651          latch.countDown();
652        }));
653    latch.await();
654    assertEquals(1, successCount.get());
655    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
656    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
657  }
658
659  @SuppressWarnings("FutureReturnValueIgnored")
660  @Test
661  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
662    AsyncTable<?> table = getTable.get();
663    int count = 10;
664    CountDownLatch putLatch = new CountDownLatch(count + 1);
665    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
666    IntStream.range(0, count)
667      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
668        .thenRun(() -> putLatch.countDown()));
669    putLatch.await();
670
671    AtomicInteger successCount = new AtomicInteger(0);
672    AtomicInteger successIndex = new AtomicInteger(-1);
673    CountDownLatch deleteLatch = new CountDownLatch(count);
674
675    IntStream.range(0, count)
676      .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
677          .ifEquals(FAMILY, QUALIFIER, VALUE)
678          .build(
679            new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
680        .thenAccept(x -> {
681          if (x.isSuccess()) {
682            successCount.incrementAndGet();
683            successIndex.set(i);
684          }
685          assertNull(x.getResult());
686          deleteLatch.countDown();
687        }));
688    deleteLatch.await();
689    assertEquals(1, successCount.get());
690    Result result = table.get(new Get(row)).get();
691    IntStream.range(0, count).forEach(i -> {
692      if (i == successIndex.get()) {
693        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
694      } else {
695        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
696      }
697    });
698  }
699
700  @SuppressWarnings("FutureReturnValueIgnored")
701  @Test
702  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
703    AsyncTable<?> table = getTable.get();
704    int count = 10;
705    CountDownLatch putLatch = new CountDownLatch(count + 1);
706    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
707    IntStream.range(0, count)
708      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
709        .thenRun(() -> putLatch.countDown()));
710    putLatch.await();
711
712    AtomicInteger successCount = new AtomicInteger(0);
713    AtomicInteger successIndex = new AtomicInteger(-1);
714    CountDownLatch mutateLatch = new CountDownLatch(count);
715    IntStream.range(0, count).forEach(i -> {
716      RowMutations mutation = new RowMutations(row);
717      try {
718        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
719        mutation
720          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
721      } catch (IOException e) {
722        throw new UncheckedIOException(e);
723      }
724
725      table.checkAndMutate(CheckAndMutate.newBuilder(row)
726          .ifEquals(FAMILY, QUALIFIER, VALUE)
727          .build(mutation))
728        .thenAccept(x -> {
729          if (x.isSuccess()) {
730            successCount.incrementAndGet();
731            successIndex.set(i);
732          }
733          assertNull(x.getResult());
734          mutateLatch.countDown();
735        });
736    });
737    mutateLatch.await();
738    assertEquals(1, successCount.get());
739    Result result = table.get(new Get(row)).get();
740    IntStream.range(0, count).forEach(i -> {
741      if (i == successIndex.get()) {
742        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
743      } else {
744        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
745      }
746    });
747  }
748
749  @Test
750  public void testCheckAndMutateWithTimeRange() throws Exception {
751    AsyncTable<?> table = getTable.get();
752    final long ts = System.currentTimeMillis() / 2;
753    Put put = new Put(row);
754    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
755
756    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
757      .ifNotExists(FAMILY, QUALIFIER)
758      .build(put)).get();
759    assertTrue(result.isSuccess());
760    assertNull(result.getResult());
761
762    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
763      .ifEquals(FAMILY, QUALIFIER, VALUE)
764      .timeRange(TimeRange.at(ts + 10000))
765      .build(put)).get();
766    assertFalse(result.isSuccess());
767    assertNull(result.getResult());
768
769    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
770      .ifEquals(FAMILY, QUALIFIER, VALUE)
771      .timeRange(TimeRange.at(ts))
772      .build(put)).get();
773    assertTrue(result.isSuccess());
774    assertNull(result.getResult());
775
776    RowMutations rm = new RowMutations(row).add((Mutation) put);
777
778    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
779      .ifEquals(FAMILY, QUALIFIER, VALUE)
780      .timeRange(TimeRange.at(ts + 10000))
781      .build(rm)).get();
782    assertFalse(result.isSuccess());
783    assertNull(result.getResult());
784
785    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
786      .ifEquals(FAMILY, QUALIFIER, VALUE)
787      .timeRange(TimeRange.at(ts))
788      .build(rm)).get();
789    assertTrue(result.isSuccess());
790    assertNull(result.getResult());
791
792    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
793
794    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
795      .ifEquals(FAMILY, QUALIFIER, VALUE)
796      .timeRange(TimeRange.at(ts + 10000))
797      .build(delete)).get();
798    assertFalse(result.isSuccess());
799    assertNull(result.getResult());
800
801    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
802      .ifEquals(FAMILY, QUALIFIER, VALUE)
803      .timeRange(TimeRange.at(ts))
804      .build(delete)).get();
805    assertTrue(result.isSuccess());
806    assertNull(result.getResult());
807  }
808
809  @Test
810  public void testCheckAndMutateWithSingleFilter() throws Throwable {
811    AsyncTable<?> table = getTable.get();
812
813    // Put one row
814    Put put = new Put(row);
815    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
816    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
817    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
818    table.put(put).get();
819
820    // Put with success
821    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
822      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
823        CompareOperator.EQUAL, Bytes.toBytes("a")))
824      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
825    assertTrue(result.isSuccess());
826    assertNull(result.getResult());
827
828    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
829    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
830
831    // Put with failure
832    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
833      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
834        CompareOperator.EQUAL, Bytes.toBytes("b")))
835      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
836    assertFalse(result.isSuccess());
837    assertNull(result.getResult());
838
839    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
840
841    // Delete with success
842    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
843      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
844        CompareOperator.EQUAL, Bytes.toBytes("a")))
845      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
846    assertTrue(result.isSuccess());
847    assertNull(result.getResult());
848
849    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
850
851    // Mutate with success
852    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
853      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
854        CompareOperator.EQUAL, Bytes.toBytes("b")))
855      .build(new RowMutations(row)
856        .add((Mutation) new Put(row)
857          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
858        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
859    assertTrue(result.isSuccess());
860    assertNull(result.getResult());
861
862    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
863    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
864
865    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
866  }
867
868  @Test
869  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
870    AsyncTable<?> table = getTable.get();
871
872    // Put one row
873    Put put = new Put(row);
874    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
875    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
876    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
877    table.put(put).get();
878
879    // Put with success
880    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
881      .ifMatches(new FilterList(
882        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
883          Bytes.toBytes("a")),
884        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
885          Bytes.toBytes("b"))))
886      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
887    assertTrue(result.isSuccess());
888    assertNull(result.getResult());
889
890    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
891    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
892
893    // Put with failure
894    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
895      .ifMatches(new FilterList(
896        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
897          Bytes.toBytes("a")),
898        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
899          Bytes.toBytes("c"))))
900      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
901    assertFalse(result.isSuccess());
902    assertNull(result.getResult());
903
904    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
905
906    // Delete with success
907    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
908      .ifMatches(new FilterList(
909        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
910          Bytes.toBytes("a")),
911        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
912          Bytes.toBytes("b"))))
913      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
914    assertTrue(result.isSuccess());
915    assertNull(result.getResult());
916
917    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
918
919    // Mutate with success
920    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
921      .ifMatches(new FilterList(
922        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
923          Bytes.toBytes("a")),
924        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
925          Bytes.toBytes("b"))))
926      .build(new RowMutations(row)
927        .add((Mutation) new Put(row)
928          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
929        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
930    assertTrue(result.isSuccess());
931    assertNull(result.getResult());
932
933    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
934    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
935
936    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
937  }
938
939  @Test
940  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
941    AsyncTable<?> table = getTable.get();
942
943    // Put with specifying the timestamp
944    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
945
946    // Put with success
947    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
948      .ifMatches(new FilterList(
949        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
950        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
951        new TimestampsFilter(Collections.singletonList(100L))))
952      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
953    assertTrue(result.isSuccess());
954    assertNull(result.getResult());
955
956    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
957    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
958
959    // Put with failure
960    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
961      .ifMatches(new FilterList(
962        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
963        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
964        new TimestampsFilter(Collections.singletonList(101L))))
965      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
966    assertFalse(result.isSuccess());
967    assertNull(result.getResult());
968
969    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
970  }
971
972  @Test
973  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
974    AsyncTable<?> table = getTable.get();
975
976    // Put with specifying the timestamp
977    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
978      .get();
979
980    // Put with success
981    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
982      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
983        CompareOperator.EQUAL, Bytes.toBytes("a")))
984      .timeRange(TimeRange.between(0, 101))
985      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
986    assertTrue(result.isSuccess());
987    assertNull(result.getResult());
988
989    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
990    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
991
992    // Put with failure
993    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
994      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
995        CompareOperator.EQUAL, Bytes.toBytes("a")))
996      .timeRange(TimeRange.between(0, 100))
997      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))))
998      .get();
999    assertFalse(result.isSuccess());
1000    assertNull(result.getResult());
1001
1002    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
1003  }
1004
1005  @Test
1006  public void testCheckAndIncrement() throws Throwable {
1007    AsyncTable<?> table = getTable.get();
1008
1009    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1010
1011    // CheckAndIncrement with correct value
1012    CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1013      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1014      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
1015    assertTrue(res.isSuccess());
1016    assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1017
1018    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1019    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1020
1021    // CheckAndIncrement with wrong value
1022    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1023      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1024      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
1025    assertFalse(res.isSuccess());
1026    assertNull(res.getResult());
1027
1028    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1029    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1030
1031    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1032
1033    // CheckAndIncrement with a filter and correct value
1034    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1035      .ifMatches(new FilterList(
1036        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1037          Bytes.toBytes("a")),
1038        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1039          Bytes.toBytes("c"))))
1040      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1041    assertTrue(res.isSuccess());
1042    assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1043
1044    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1045    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1046
1047    // CheckAndIncrement with a filter and correct value
1048    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1049      .ifMatches(new FilterList(
1050        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1051          Bytes.toBytes("b")),
1052        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1053          Bytes.toBytes("d"))))
1054      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1055    assertFalse(res.isSuccess());
1056    assertNull(res.getResult());
1057
1058    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1059    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1060  }
1061
1062  @Test
1063  public void testCheckAndAppend() throws Throwable {
1064    AsyncTable<?> table = getTable.get();
1065
1066    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1067
1068    // CheckAndAppend with correct value
1069    CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1070      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1071      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
1072    assertTrue(res.isSuccess());
1073    assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1074
1075    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1076    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1077
1078    // CheckAndAppend with correct value
1079    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1080      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1081      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
1082    assertFalse(res.isSuccess());
1083    assertNull(res.getResult());
1084
1085    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1086    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1087
1088    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1089
1090    // CheckAndAppend with a filter and correct value
1091    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1092      .ifMatches(new FilterList(
1093        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1094          Bytes.toBytes("a")),
1095        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1096          Bytes.toBytes("c"))))
1097      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1098    assertTrue(res.isSuccess());
1099    assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1100
1101    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1102    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1103
1104    // CheckAndAppend with a filter and wrong value
1105    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1106      .ifMatches(new FilterList(
1107        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1108          Bytes.toBytes("b")),
1109        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1110          Bytes.toBytes("d"))))
1111      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1112    assertFalse(res.isSuccess());
1113    assertNull(res.getResult());
1114
1115    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1116    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1117  }
1118
1119  @Test
1120  public void testCheckAndRowMutations() throws Throwable {
1121    final byte[] q1 = Bytes.toBytes("q1");
1122    final byte[] q2 = Bytes.toBytes("q2");
1123    final byte[] q3 = Bytes.toBytes("q3");
1124    final byte[] q4 = Bytes.toBytes("q4");
1125    final String v1 = "v1";
1126
1127    AsyncTable<?> table = getTable.get();
1128
1129    // Initial values
1130    table.putAll(Arrays.asList(
1131      new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
1132      new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
1133      new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get();
1134
1135    // Do CheckAndRowMutations
1136    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
1137      .ifNotExists(FAMILY, q1)
1138      .build(new RowMutations(row).add(Arrays.asList(
1139        new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
1140        new Delete(row).addColumns(FAMILY, q2),
1141        new Increment(row).addColumn(FAMILY, q3, 1),
1142        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
1143      );
1144
1145    CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get();
1146    assertTrue(result.isSuccess());
1147    assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
1148    assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4)));
1149
1150    // Verify the value
1151    Result r = table.get(new Get(row)).get();
1152    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1153    assertNull(r.getValue(FAMILY, q2));
1154    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1155    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1156
1157    // Do CheckAndRowMutations again
1158    checkAndMutate = CheckAndMutate.newBuilder(row)
1159      .ifNotExists(FAMILY, q1)
1160      .build(new RowMutations(row).add(Arrays.asList(
1161        new Delete(row).addColumns(FAMILY, q1),
1162        new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
1163        new Increment(row).addColumn(FAMILY, q3, 1),
1164        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
1165      );
1166
1167    result = table.checkAndMutate(checkAndMutate).get();
1168    assertFalse(result.isSuccess());
1169    assertNull(result.getResult());
1170
1171    // Verify the value
1172    r = table.get(new Get(row)).get();
1173    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1174    assertNull(r.getValue(FAMILY, q2));
1175    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1176    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1177  }
1178
1179  // Tests for batch version of checkAndMutate
1180
1181  @Test
1182  public void testCheckAndMutateBatch() throws Throwable {
1183    AsyncTable<?> table = getTable.get();
1184    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1185    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1186    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1187
1188    table.putAll(Arrays.asList(
1189      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1190      new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1191      new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1192      new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
1193
1194    // Test for Put
1195    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1196      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1197      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1198
1199    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1200      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
1201      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1202
1203    List<CheckAndMutateResult> results =
1204      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1205
1206    assertTrue(results.get(0).isSuccess());
1207    assertNull(results.get(0).getResult());
1208    assertFalse(results.get(1).isSuccess());
1209    assertNull(results.get(1).getResult());
1210
1211    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1212    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1213
1214    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1215    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1216
1217    // Test for Delete
1218    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1219      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
1220      .build(new Delete(row));
1221
1222    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1223      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
1224      .build(new Delete(row2));
1225
1226    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1227
1228    assertTrue(results.get(0).isSuccess());
1229    assertNull(results.get(0).getResult());
1230    assertFalse(results.get(1).isSuccess());
1231    assertNull(results.get(1).getResult());
1232
1233    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
1234
1235    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1236    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1237
1238    // Test for RowMutations
1239    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
1240      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1241      .build(new RowMutations(row3)
1242        .add((Mutation) new Put(row3)
1243          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1244        .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))));
1245
1246    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
1247      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
1248      .build(new RowMutations(row4)
1249        .add((Mutation) new Put(row4)
1250          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1251        .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D"))));
1252
1253    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1254
1255    assertTrue(results.get(0).isSuccess());
1256    assertNull(results.get(0).getResult());
1257    assertFalse(results.get(1).isSuccess());
1258    assertNull(results.get(1).getResult());
1259
1260    result = table.get(new Get(row3)).get();
1261    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1262    assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
1263
1264    result = table.get(new Get(row4)).get();
1265    assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
1266    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1267  }
1268
1269  @Test
1270  public void testCheckAndMutateBatch2() throws Throwable {
1271    AsyncTable<?> table = getTable.get();
1272    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1273    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1274    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1275
1276    table.putAll(Arrays.asList(
1277      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1278      new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1279      new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
1280      new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))).get();
1281
1282    // Test for ifNotExists()
1283    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1284      .ifNotExists(FAMILY, Bytes.toBytes("B"))
1285      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1286
1287    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1288      .ifNotExists(FAMILY, Bytes.toBytes("B"))
1289      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1290
1291    List<CheckAndMutateResult> results =
1292      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1293
1294    assertTrue(results.get(0).isSuccess());
1295    assertNull(results.get(0).getResult());
1296    assertFalse(results.get(1).isSuccess());
1297    assertNull(results.get(1).getResult());
1298
1299    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1300    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1301
1302    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1303    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1304
1305    // Test for ifMatches()
1306    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1307      .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
1308      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
1309
1310    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1311      .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
1312      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1313
1314    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1315
1316    assertTrue(results.get(0).isSuccess());
1317    assertNull(results.get(0).getResult());
1318    assertFalse(results.get(1).isSuccess());
1319    assertNull(results.get(1).getResult());
1320
1321    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1322    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1323
1324    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1325    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1326
1327    // Test for timeRange()
1328    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
1329      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1330      .timeRange(TimeRange.between(0, 101))
1331      .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
1332
1333    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
1334      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1335      .timeRange(TimeRange.between(0, 100))
1336      .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
1337
1338    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1339
1340    assertTrue(results.get(0).isSuccess());
1341    assertNull(results.get(0).getResult());
1342    assertFalse(results.get(1).isSuccess());
1343    assertNull(results.get(1).getResult());
1344
1345    result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1346    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1347
1348    result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1349    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1350  }
1351
1352  @Test
1353  public void testCheckAndMutateBatchWithFilter() throws Throwable {
1354    AsyncTable<?> table = getTable.get();
1355    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1356
1357    table.putAll(Arrays.asList(
1358      new Put(row)
1359        .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1360        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1361        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1362      new Put(row2)
1363        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1364        .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
1365        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
1366
1367    // Test for Put
1368    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1369      .ifMatches(new FilterList(
1370        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1371          Bytes.toBytes("a")),
1372        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1373          Bytes.toBytes("b"))))
1374      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1375
1376    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1377      .ifMatches(new FilterList(
1378        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1379          Bytes.toBytes("a")),
1380        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1381          Bytes.toBytes("b"))))
1382      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1383
1384    List<CheckAndMutateResult> results =
1385      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1386
1387    assertTrue(results.get(0).isSuccess());
1388    assertNull(results.get(0).getResult());
1389    assertFalse(results.get(1).isSuccess());
1390    assertNull(results.get(1).getResult());
1391
1392    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1393    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1394
1395    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1396    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1397
1398    // Test for Delete
1399    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1400      .ifMatches(new FilterList(
1401        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1402          Bytes.toBytes("a")),
1403        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1404          Bytes.toBytes("b"))))
1405      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
1406
1407    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1408      .ifMatches(new FilterList(
1409        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1410          Bytes.toBytes("a")),
1411        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1412          Bytes.toBytes("b"))))
1413      .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
1414
1415    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1416
1417    assertTrue(results.get(0).isSuccess());
1418    assertNull(results.get(0).getResult());
1419    assertFalse(results.get(1).isSuccess());
1420    assertNull(results.get(1).getResult());
1421
1422    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
1423
1424    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1425    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1426
1427    // Test for RowMutations
1428    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1429      .ifMatches(new FilterList(
1430        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1431          Bytes.toBytes("a")),
1432        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1433          Bytes.toBytes("b"))))
1434      .build(new RowMutations(row)
1435        .add((Mutation) new Put(row)
1436          .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
1437        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
1438
1439    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1440      .ifMatches(new FilterList(
1441        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1442          Bytes.toBytes("a")),
1443        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1444          Bytes.toBytes("b"))))
1445      .build(new RowMutations(row2)
1446        .add((Mutation) new Put(row2)
1447          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
1448        .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D"))));
1449
1450    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1451
1452    assertTrue(results.get(0).isSuccess());
1453    assertNull(results.get(0).getResult());
1454    assertFalse(results.get(1).isSuccess());
1455    assertNull(results.get(1).getResult());
1456
1457    result = table.get(new Get(row)).get();
1458    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
1459    assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1460
1461    result = table.get(new Get(row2)).get();
1462    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1463    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1464  }
1465
1466  @Test
1467  public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
1468    AsyncTable<?> table = getTable.get();
1469    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1470
1471    table.putAll(Arrays.asList(
1472      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
1473        .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
1474        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1475      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
1476        .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
1477        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
1478
1479    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1480      .ifMatches(new FilterList(
1481        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1482          Bytes.toBytes("a")),
1483        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1484          Bytes.toBytes("b"))))
1485      .timeRange(TimeRange.between(0, 101))
1486      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1487
1488    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1489      .ifMatches(new FilterList(
1490        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1491          Bytes.toBytes("d")),
1492        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1493          Bytes.toBytes("e"))))
1494      .timeRange(TimeRange.between(0, 100))
1495      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1496
1497    List<CheckAndMutateResult> results =
1498      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1499
1500    assertTrue(results.get(0).isSuccess());
1501    assertNull(results.get(0).getResult());
1502    assertFalse(results.get(1).isSuccess());
1503    assertNull(results.get(1).getResult());
1504
1505    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1506    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1507
1508    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1509    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1510  }
1511
1512  @Test
1513  public void testCheckAndIncrementBatch() throws Throwable {
1514    AsyncTable<?> table = getTable.get();
1515    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1516
1517    table.putAll(Arrays.asList(
1518      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1519        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
1520      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1521        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
1522
1523    // CheckAndIncrement with correct value
1524    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1525      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1526      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
1527
1528    // CheckAndIncrement with wrong value
1529    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1530      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1531      .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
1532
1533    List<CheckAndMutateResult> results =
1534      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1535
1536    assertTrue(results.get(0).isSuccess());
1537    assertEquals(1, Bytes.toLong(results.get(0).getResult()
1538      .getValue(FAMILY, Bytes.toBytes("B"))));
1539    assertFalse(results.get(1).isSuccess());
1540    assertNull(results.get(1).getResult());
1541
1542    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1543    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1544
1545    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1546    assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
1547  }
1548
1549  @Test
1550  public void testCheckAndAppendBatch() throws Throwable {
1551    AsyncTable<?> table = getTable.get();
1552    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1553
1554    table.putAll(Arrays.asList(
1555      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1556        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1557      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1558        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
1559
1560    // CheckAndAppend with correct value
1561    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1562      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1563      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
1564
1565    // CheckAndAppend with wrong value
1566    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1567      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1568      .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
1569
1570    List<CheckAndMutateResult> results =
1571      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1572
1573    assertTrue(results.get(0).isSuccess());
1574    assertEquals("bb", Bytes.toString(results.get(0).getResult()
1575      .getValue(FAMILY, Bytes.toBytes("B"))));
1576    assertFalse(results.get(1).isSuccess());
1577    assertNull(results.get(1).getResult());
1578
1579    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1580    assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1581
1582    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1583    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1584  }
1585
1586  @Test
1587  public void testCheckAndRowMutationsBatch() throws Throwable {
1588    AsyncTable<?> table = getTable.get();
1589    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1590
1591    table.putAll(Arrays.asList(
1592      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1593        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
1594        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
1595      new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
1596        .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
1597        .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))
1598    ).get();
1599
1600    // CheckAndIncrement with correct value
1601    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1602      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1603      .build(new RowMutations(row).add(Arrays.asList(
1604        new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1605        new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")),
1606        new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
1607        new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1608      )));
1609
1610    // CheckAndIncrement with wrong value
1611    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1612      .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
1613      .build(new RowMutations(row2).add(Arrays.asList(
1614        new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
1615        new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")),
1616        new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
1617        new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))
1618      )));
1619
1620    List<CheckAndMutateResult> results =
1621      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1622
1623    assertTrue(results.get(0).isSuccess());
1624    assertEquals(2, Bytes.toLong(results.get(0).getResult()
1625      .getValue(FAMILY, Bytes.toBytes("C"))));
1626    assertEquals("dd", Bytes.toString(results.get(0).getResult()
1627      .getValue(FAMILY, Bytes.toBytes("D"))));
1628
1629    assertFalse(results.get(1).isSuccess());
1630    assertNull(results.get(1).getResult());
1631
1632    Result result = table.get(new Get(row)).get();
1633    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1634    assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
1635    assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
1636    assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1637
1638    result = table.get(new Get(row2)).get();
1639    assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
1640    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1641    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
1642    assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H"))));
1643  }
1644
1645  @Test
1646  public void testDisabled() throws InterruptedException, ExecutionException {
1647    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
1648    try {
1649      getTable.get().get(new Get(row)).get();
1650      fail("Should fail since table has been disabled");
1651    } catch (ExecutionException e) {
1652      Throwable cause = e.getCause();
1653      assertThat(cause, instanceOf(TableNotEnabledException.class));
1654      assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
1655    }
1656  }
1657
1658  @Test
1659  public void testInvalidPut() {
1660    try {
1661      getTable.get().put(new Put(Bytes.toBytes(0)));
1662      fail("Should fail since the put does not contain any cells");
1663    } catch (IllegalArgumentException e) {
1664      assertThat(e.getMessage(), containsString("No columns to insert"));
1665    }
1666
1667    try {
1668      getTable.get()
1669        .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
1670      fail("Should fail since the put exceeds the max key value size");
1671    } catch (IllegalArgumentException e) {
1672      assertThat(e.getMessage(), containsString("KeyValue size too large"));
1673    }
1674  }
1675}