001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertThat; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.concurrent.ArrayBlockingQueue; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.CountDownLatch; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.ForkJoinPool; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.function.Supplier; 043import java.util.stream.IntStream; 044import org.apache.commons.io.IOUtils; 045import org.apache.hadoop.hbase.CompareOperator; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.TableNotEnabledException; 050import org.apache.hadoop.hbase.filter.BinaryComparator; 051import org.apache.hadoop.hbase.filter.FamilyFilter; 052import org.apache.hadoop.hbase.filter.FilterList; 053import org.apache.hadoop.hbase.filter.QualifierFilter; 054import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 055import org.apache.hadoop.hbase.filter.TimestampsFilter; 056import org.apache.hadoop.hbase.io.TimeRange; 057import org.apache.hadoop.hbase.testclassification.ClientTests; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.Pair; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.junit.runners.Parameterized.Parameter; 072import org.junit.runners.Parameterized.Parameters; 073 074@RunWith(Parameterized.class) 075@Category({ MediumTests.class, ClientTests.class }) 076public class TestAsyncTable { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestAsyncTable.class); 081 082 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 084 private static TableName TABLE_NAME = TableName.valueOf("async"); 085 086 private static byte[] FAMILY = Bytes.toBytes("cf"); 087 088 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 089 090 private static byte[] VALUE = Bytes.toBytes("value"); 091 092 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 093 094 private static AsyncConnection ASYNC_CONN; 095 096 @Rule 097 public TestName testName = new TestName(); 098 099 private byte[] row; 100 101 @Parameter 102 public Supplier<AsyncTable<?>> getTable; 103 104 private static AsyncTable<?> getRawTable() { 105 return ASYNC_CONN.getTable(TABLE_NAME); 106 } 107 108 private static AsyncTable<?> getTable() { 109 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 110 } 111 112 @Parameters 113 public static List<Object[]> params() { 114 return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable }, 115 new Supplier<?>[] { TestAsyncTable::getTable }); 116 } 117 118 @BeforeClass 119 public static void setUpBeforeClass() throws Exception { 120 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 121 MAX_KEY_VALUE_SIZE); 122 TEST_UTIL.startMiniCluster(1); 123 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 124 TEST_UTIL.waitTableAvailable(TABLE_NAME); 125 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 126 assertFalse(ASYNC_CONN.isClosed()); 127 } 128 129 @AfterClass 130 public static void tearDownAfterClass() throws Exception { 131 IOUtils.closeQuietly(ASYNC_CONN); 132 assertTrue(ASYNC_CONN.isClosed()); 133 TEST_UTIL.shutdownMiniCluster(); 134 } 135 136 @Before 137 public void setUp() throws IOException, InterruptedException, ExecutionException { 138 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); 139 if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) { 140 ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get(); 141 } 142 } 143 144 @Test 145 public void testSimple() throws Exception { 146 AsyncTable<?> table = getTable.get(); 147 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 148 assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 149 Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 150 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 151 table.delete(new Delete(row)).get(); 152 result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 153 assertTrue(result.isEmpty()); 154 assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 155 } 156 157 private byte[] concat(byte[] base, int index) { 158 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 159 } 160 161 @SuppressWarnings("FutureReturnValueIgnored") 162 @Test 163 public void testSimpleMultiple() throws Exception { 164 AsyncTable<?> table = getTable.get(); 165 int count = 100; 166 CountDownLatch putLatch = new CountDownLatch(count); 167 IntStream.range(0, count).forEach( 168 i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) 169 .thenAccept(x -> putLatch.countDown())); 170 putLatch.await(); 171 BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count); 172 IntStream.range(0, count) 173 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 174 .thenAccept(x -> existsResp.add(x))); 175 for (int i = 0; i < count; i++) { 176 assertTrue(existsResp.take()); 177 } 178 BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count); 179 IntStream.range(0, count) 180 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 181 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 182 for (int i = 0; i < count; i++) { 183 Pair<Integer, Result> pair = getResp.take(); 184 assertArrayEquals(concat(VALUE, pair.getFirst()), 185 pair.getSecond().getValue(FAMILY, QUALIFIER)); 186 } 187 CountDownLatch deleteLatch = new CountDownLatch(count); 188 IntStream.range(0, count).forEach( 189 i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); 190 deleteLatch.await(); 191 IntStream.range(0, count) 192 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 193 .thenAccept(x -> existsResp.add(x))); 194 for (int i = 0; i < count; i++) { 195 assertFalse(existsResp.take()); 196 } 197 IntStream.range(0, count) 198 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 199 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 200 for (int i = 0; i < count; i++) { 201 Pair<Integer, Result> pair = getResp.take(); 202 assertTrue(pair.getSecond().isEmpty()); 203 } 204 } 205 206 @SuppressWarnings("FutureReturnValueIgnored") 207 @Test 208 public void testIncrement() throws InterruptedException, ExecutionException { 209 AsyncTable<?> table = getTable.get(); 210 int count = 100; 211 CountDownLatch latch = new CountDownLatch(count); 212 AtomicLong sum = new AtomicLong(0L); 213 IntStream.range(0, count) 214 .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { 215 sum.addAndGet(x); 216 latch.countDown(); 217 })); 218 latch.await(); 219 assertEquals(count, Bytes.toLong( 220 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); 221 assertEquals((1 + count) * count / 2, sum.get()); 222 } 223 224 @SuppressWarnings("FutureReturnValueIgnored") 225 @Test 226 public void testAppend() throws InterruptedException, ExecutionException { 227 AsyncTable<?> table = getTable.get(); 228 int count = 10; 229 CountDownLatch latch = new CountDownLatch(count); 230 char suffix = ':'; 231 AtomicLong suffixCount = new AtomicLong(0L); 232 IntStream.range(0, count) 233 .forEachOrdered(i -> table 234 .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) 235 .thenAccept(r -> { 236 suffixCount.addAndGet( 237 Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count()); 238 latch.countDown(); 239 })); 240 latch.await(); 241 assertEquals((1 + count) * count / 2, suffixCount.get()); 242 String value = Bytes.toString( 243 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); 244 int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) 245 .sorted().toArray(); 246 assertArrayEquals(IntStream.range(0, count).toArray(), actual); 247 } 248 249 @Test 250 public void testMutateRow() throws InterruptedException, ExecutionException, IOException { 251 AsyncTable<?> table = getTable.get(); 252 RowMutations mutation = new RowMutations(row); 253 mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); 254 Result result = table.mutateRow(mutation).get(); 255 assertTrue(result.getExists()); 256 assertTrue(result.isEmpty()); 257 258 result = table.get(new Get(row)).get(); 259 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); 260 261 mutation = new RowMutations(row); 262 mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); 263 mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); 264 mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L)); 265 mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc"))); 266 result = table.mutateRow(mutation).get(); 267 assertTrue(result.getExists()); 268 assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); 269 assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); 270 271 result = table.get(new Get(row)).get(); 272 assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); 273 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); 274 assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); 275 assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); 276 } 277 278 // Tests for old checkAndMutate API 279 280 @SuppressWarnings("FutureReturnValueIgnored") 281 @Test 282 @Deprecated 283 public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException { 284 AsyncTable<?> table = getTable.get(); 285 AtomicInteger successCount = new AtomicInteger(0); 286 AtomicInteger successIndex = new AtomicInteger(-1); 287 int count = 10; 288 CountDownLatch latch = new CountDownLatch(count); 289 IntStream.range(0, count) 290 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() 291 .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { 292 if (x) { 293 successCount.incrementAndGet(); 294 successIndex.set(i); 295 } 296 latch.countDown(); 297 })); 298 latch.await(); 299 assertEquals(1, successCount.get()); 300 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 301 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 302 } 303 304 @SuppressWarnings("FutureReturnValueIgnored") 305 @Test 306 @Deprecated 307 public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException { 308 AsyncTable<?> table = getTable.get(); 309 int count = 10; 310 CountDownLatch putLatch = new CountDownLatch(count + 1); 311 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 312 IntStream.range(0, count) 313 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 314 .thenRun(() -> putLatch.countDown())); 315 putLatch.await(); 316 317 AtomicInteger successCount = new AtomicInteger(0); 318 AtomicInteger successIndex = new AtomicInteger(-1); 319 CountDownLatch deleteLatch = new CountDownLatch(count); 320 IntStream.range(0, count) 321 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) 322 .thenDelete( 323 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) 324 .thenAccept(x -> { 325 if (x) { 326 successCount.incrementAndGet(); 327 successIndex.set(i); 328 } 329 deleteLatch.countDown(); 330 })); 331 deleteLatch.await(); 332 assertEquals(1, successCount.get()); 333 Result result = table.get(new Get(row)).get(); 334 IntStream.range(0, count).forEach(i -> { 335 if (i == successIndex.get()) { 336 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 337 } else { 338 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 339 } 340 }); 341 } 342 343 @SuppressWarnings("FutureReturnValueIgnored") 344 @Test 345 @Deprecated 346 public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException { 347 AsyncTable<?> table = getTable.get(); 348 int count = 10; 349 CountDownLatch putLatch = new CountDownLatch(count + 1); 350 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 351 IntStream.range(0, count) 352 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 353 .thenRun(() -> putLatch.countDown())); 354 putLatch.await(); 355 356 AtomicInteger successCount = new AtomicInteger(0); 357 AtomicInteger successIndex = new AtomicInteger(-1); 358 CountDownLatch mutateLatch = new CountDownLatch(count); 359 IntStream.range(0, count).forEach(i -> { 360 RowMutations mutation = new RowMutations(row); 361 try { 362 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 363 mutation 364 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 365 } catch (IOException e) { 366 throw new UncheckedIOException(e); 367 } 368 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) 369 .thenAccept(x -> { 370 if (x) { 371 successCount.incrementAndGet(); 372 successIndex.set(i); 373 } 374 mutateLatch.countDown(); 375 }); 376 }); 377 mutateLatch.await(); 378 assertEquals(1, successCount.get()); 379 Result result = table.get(new Get(row)).get(); 380 IntStream.range(0, count).forEach(i -> { 381 if (i == successIndex.get()) { 382 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 383 } else { 384 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 385 } 386 }); 387 } 388 389 @Test 390 @Deprecated 391 public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception { 392 AsyncTable<?> table = getTable.get(); 393 final long ts = System.currentTimeMillis() / 2; 394 Put put = new Put(row); 395 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 396 397 boolean ok = 398 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get(); 399 assertTrue(ok); 400 401 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 402 .ifEquals(VALUE).thenPut(put).get(); 403 assertFalse(ok); 404 405 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 406 .ifEquals(VALUE).thenPut(put).get(); 407 assertTrue(ok); 408 409 RowMutations rm = new RowMutations(row).add((Mutation) put); 410 411 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 412 .ifEquals(VALUE).thenMutate(rm).get(); 413 assertFalse(ok); 414 415 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 416 .ifEquals(VALUE).thenMutate(rm).get(); 417 assertTrue(ok); 418 419 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 420 421 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 422 .ifEquals(VALUE).thenDelete(delete).get(); 423 assertFalse(ok); 424 425 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 426 .ifEquals(VALUE).thenDelete(delete).get(); 427 assertTrue(ok); 428 } 429 430 @Test 431 @Deprecated 432 public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable { 433 AsyncTable<?> table = getTable.get(); 434 435 // Put one row 436 Put put = new Put(row); 437 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 438 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 439 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 440 table.put(put).get(); 441 442 // Put with success 443 boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 444 CompareOperator.EQUAL, Bytes.toBytes("a"))) 445 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 446 .get(); 447 assertTrue(ok); 448 449 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 450 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 451 452 // Put with failure 453 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 454 CompareOperator.EQUAL, Bytes.toBytes("b"))) 455 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) 456 .get(); 457 assertFalse(ok); 458 459 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 460 461 // Delete with success 462 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 463 CompareOperator.EQUAL, Bytes.toBytes("a"))) 464 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))) 465 .get(); 466 assertTrue(ok); 467 468 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 469 470 // Mutate with success 471 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 472 CompareOperator.EQUAL, Bytes.toBytes("b"))) 473 .thenMutate(new RowMutations(row) 474 .add((Mutation) new Put(row) 475 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 476 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 477 .get(); 478 assertTrue(ok); 479 480 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 481 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 482 483 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 484 } 485 486 @Test 487 @Deprecated 488 public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable { 489 AsyncTable<?> table = getTable.get(); 490 491 // Put one row 492 Put put = new Put(row); 493 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 494 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 495 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 496 table.put(put).get(); 497 498 // Put with success 499 boolean ok = table.checkAndMutate(row, new FilterList( 500 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 501 Bytes.toBytes("a")), 502 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 503 Bytes.toBytes("b")) 504 )) 505 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 506 .get(); 507 assertTrue(ok); 508 509 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 510 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 511 512 // Put with failure 513 ok = table.checkAndMutate(row, new FilterList( 514 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 515 Bytes.toBytes("a")), 516 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 517 Bytes.toBytes("c")) 518 )) 519 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) 520 .get(); 521 assertFalse(ok); 522 523 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 524 525 // Delete with success 526 ok = table.checkAndMutate(row, new FilterList( 527 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 528 Bytes.toBytes("a")), 529 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 530 Bytes.toBytes("b")) 531 )) 532 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))) 533 .get(); 534 assertTrue(ok); 535 536 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 537 538 // Mutate with success 539 ok = table.checkAndMutate(row, new FilterList( 540 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 541 Bytes.toBytes("a")), 542 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 543 Bytes.toBytes("b")) 544 )) 545 .thenMutate(new RowMutations(row) 546 .add((Mutation) new Put(row) 547 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 548 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 549 .get(); 550 assertTrue(ok); 551 552 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 553 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 554 555 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 556 } 557 558 @Test 559 @Deprecated 560 public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable { 561 AsyncTable<?> table = getTable.get(); 562 563 // Put with specifying the timestamp 564 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 565 566 // Put with success 567 boolean ok = table.checkAndMutate(row, new FilterList( 568 new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 569 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 570 new TimestampsFilter(Collections.singletonList(100L)) 571 )) 572 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))) 573 .get(); 574 assertTrue(ok); 575 576 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 577 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 578 579 // Put with failure 580 ok = table.checkAndMutate(row, new FilterList( 581 new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 582 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 583 new TimestampsFilter(Collections.singletonList(101L)) 584 )) 585 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) 586 .get(); 587 assertFalse(ok); 588 589 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 590 } 591 592 @Test 593 @Deprecated 594 public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable { 595 AsyncTable<?> table = getTable.get(); 596 597 // Put with specifying the timestamp 598 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))) 599 .get(); 600 601 // Put with success 602 boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 603 CompareOperator.EQUAL, Bytes.toBytes("a"))) 604 .timeRange(TimeRange.between(0, 101)) 605 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))) 606 .get(); 607 assertTrue(ok); 608 609 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 610 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 611 612 // Put with failure 613 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 614 CompareOperator.EQUAL, Bytes.toBytes("a"))) 615 .timeRange(TimeRange.between(0, 100)) 616 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) 617 .get(); 618 assertFalse(ok); 619 620 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 621 } 622 623 @Test(expected = NullPointerException.class) 624 @Deprecated 625 public void testCheckAndMutateWithoutConditionForOldApi() { 626 getTable.get().checkAndMutate(row, FAMILY) 627 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 628 } 629 630 // Tests for new CheckAndMutate API 631 632 @SuppressWarnings("FutureReturnValueIgnored") 633 @Test 634 public void testCheckAndPut() throws InterruptedException, ExecutionException { 635 AsyncTable<?> table = getTable.get(); 636 AtomicInteger successCount = new AtomicInteger(0); 637 AtomicInteger successIndex = new AtomicInteger(-1); 638 int count = 10; 639 CountDownLatch latch = new CountDownLatch(count); 640 641 IntStream.range(0, count) 642 .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row) 643 .ifNotExists(FAMILY, QUALIFIER) 644 .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))) 645 .thenAccept(x -> { 646 if (x.isSuccess()) { 647 successCount.incrementAndGet(); 648 successIndex.set(i); 649 } 650 assertNull(x.getResult()); 651 latch.countDown(); 652 })); 653 latch.await(); 654 assertEquals(1, successCount.get()); 655 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 656 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 657 } 658 659 @SuppressWarnings("FutureReturnValueIgnored") 660 @Test 661 public void testCheckAndDelete() throws InterruptedException, ExecutionException { 662 AsyncTable<?> table = getTable.get(); 663 int count = 10; 664 CountDownLatch putLatch = new CountDownLatch(count + 1); 665 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 666 IntStream.range(0, count) 667 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 668 .thenRun(() -> putLatch.countDown())); 669 putLatch.await(); 670 671 AtomicInteger successCount = new AtomicInteger(0); 672 AtomicInteger successIndex = new AtomicInteger(-1); 673 CountDownLatch deleteLatch = new CountDownLatch(count); 674 675 IntStream.range(0, count) 676 .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row) 677 .ifEquals(FAMILY, QUALIFIER, VALUE) 678 .build( 679 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))) 680 .thenAccept(x -> { 681 if (x.isSuccess()) { 682 successCount.incrementAndGet(); 683 successIndex.set(i); 684 } 685 assertNull(x.getResult()); 686 deleteLatch.countDown(); 687 })); 688 deleteLatch.await(); 689 assertEquals(1, successCount.get()); 690 Result result = table.get(new Get(row)).get(); 691 IntStream.range(0, count).forEach(i -> { 692 if (i == successIndex.get()) { 693 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 694 } else { 695 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 696 } 697 }); 698 } 699 700 @SuppressWarnings("FutureReturnValueIgnored") 701 @Test 702 public void testCheckAndMutate() throws InterruptedException, ExecutionException { 703 AsyncTable<?> table = getTable.get(); 704 int count = 10; 705 CountDownLatch putLatch = new CountDownLatch(count + 1); 706 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 707 IntStream.range(0, count) 708 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 709 .thenRun(() -> putLatch.countDown())); 710 putLatch.await(); 711 712 AtomicInteger successCount = new AtomicInteger(0); 713 AtomicInteger successIndex = new AtomicInteger(-1); 714 CountDownLatch mutateLatch = new CountDownLatch(count); 715 IntStream.range(0, count).forEach(i -> { 716 RowMutations mutation = new RowMutations(row); 717 try { 718 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 719 mutation 720 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 721 } catch (IOException e) { 722 throw new UncheckedIOException(e); 723 } 724 725 table.checkAndMutate(CheckAndMutate.newBuilder(row) 726 .ifEquals(FAMILY, QUALIFIER, VALUE) 727 .build(mutation)) 728 .thenAccept(x -> { 729 if (x.isSuccess()) { 730 successCount.incrementAndGet(); 731 successIndex.set(i); 732 } 733 assertNull(x.getResult()); 734 mutateLatch.countDown(); 735 }); 736 }); 737 mutateLatch.await(); 738 assertEquals(1, successCount.get()); 739 Result result = table.get(new Get(row)).get(); 740 IntStream.range(0, count).forEach(i -> { 741 if (i == successIndex.get()) { 742 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 743 } else { 744 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 745 } 746 }); 747 } 748 749 @Test 750 public void testCheckAndMutateWithTimeRange() throws Exception { 751 AsyncTable<?> table = getTable.get(); 752 final long ts = System.currentTimeMillis() / 2; 753 Put put = new Put(row); 754 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 755 756 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 757 .ifNotExists(FAMILY, QUALIFIER) 758 .build(put)).get(); 759 assertTrue(result.isSuccess()); 760 assertNull(result.getResult()); 761 762 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 763 .ifEquals(FAMILY, QUALIFIER, VALUE) 764 .timeRange(TimeRange.at(ts + 10000)) 765 .build(put)).get(); 766 assertFalse(result.isSuccess()); 767 assertNull(result.getResult()); 768 769 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 770 .ifEquals(FAMILY, QUALIFIER, VALUE) 771 .timeRange(TimeRange.at(ts)) 772 .build(put)).get(); 773 assertTrue(result.isSuccess()); 774 assertNull(result.getResult()); 775 776 RowMutations rm = new RowMutations(row).add((Mutation) put); 777 778 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 779 .ifEquals(FAMILY, QUALIFIER, VALUE) 780 .timeRange(TimeRange.at(ts + 10000)) 781 .build(rm)).get(); 782 assertFalse(result.isSuccess()); 783 assertNull(result.getResult()); 784 785 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 786 .ifEquals(FAMILY, QUALIFIER, VALUE) 787 .timeRange(TimeRange.at(ts)) 788 .build(rm)).get(); 789 assertTrue(result.isSuccess()); 790 assertNull(result.getResult()); 791 792 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 793 794 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 795 .ifEquals(FAMILY, QUALIFIER, VALUE) 796 .timeRange(TimeRange.at(ts + 10000)) 797 .build(delete)).get(); 798 assertFalse(result.isSuccess()); 799 assertNull(result.getResult()); 800 801 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 802 .ifEquals(FAMILY, QUALIFIER, VALUE) 803 .timeRange(TimeRange.at(ts)) 804 .build(delete)).get(); 805 assertTrue(result.isSuccess()); 806 assertNull(result.getResult()); 807 } 808 809 @Test 810 public void testCheckAndMutateWithSingleFilter() throws Throwable { 811 AsyncTable<?> table = getTable.get(); 812 813 // Put one row 814 Put put = new Put(row); 815 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 816 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 817 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 818 table.put(put).get(); 819 820 // Put with success 821 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 822 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 823 CompareOperator.EQUAL, Bytes.toBytes("a"))) 824 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 825 assertTrue(result.isSuccess()); 826 assertNull(result.getResult()); 827 828 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 829 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 830 831 // Put with failure 832 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 833 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 834 CompareOperator.EQUAL, Bytes.toBytes("b"))) 835 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); 836 assertFalse(result.isSuccess()); 837 assertNull(result.getResult()); 838 839 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 840 841 // Delete with success 842 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 843 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 844 CompareOperator.EQUAL, Bytes.toBytes("a"))) 845 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get(); 846 assertTrue(result.isSuccess()); 847 assertNull(result.getResult()); 848 849 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 850 851 // Mutate with success 852 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 853 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 854 CompareOperator.EQUAL, Bytes.toBytes("b"))) 855 .build(new RowMutations(row) 856 .add((Mutation) new Put(row) 857 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 858 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get(); 859 assertTrue(result.isSuccess()); 860 assertNull(result.getResult()); 861 862 r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 863 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 864 865 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 866 } 867 868 @Test 869 public void testCheckAndMutateWithMultipleFilters() throws Throwable { 870 AsyncTable<?> table = getTable.get(); 871 872 // Put one row 873 Put put = new Put(row); 874 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 875 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 876 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 877 table.put(put).get(); 878 879 // Put with success 880 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 881 .ifMatches(new FilterList( 882 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 883 Bytes.toBytes("a")), 884 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 885 Bytes.toBytes("b")))) 886 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 887 assertTrue(result.isSuccess()); 888 assertNull(result.getResult()); 889 890 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 891 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 892 893 // Put with failure 894 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 895 .ifMatches(new FilterList( 896 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 897 Bytes.toBytes("a")), 898 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 899 Bytes.toBytes("c")))) 900 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); 901 assertFalse(result.isSuccess()); 902 assertNull(result.getResult()); 903 904 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 905 906 // Delete with success 907 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 908 .ifMatches(new FilterList( 909 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 910 Bytes.toBytes("a")), 911 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 912 Bytes.toBytes("b")))) 913 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get(); 914 assertTrue(result.isSuccess()); 915 assertNull(result.getResult()); 916 917 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 918 919 // Mutate with success 920 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 921 .ifMatches(new FilterList( 922 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 923 Bytes.toBytes("a")), 924 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 925 Bytes.toBytes("b")))) 926 .build(new RowMutations(row) 927 .add((Mutation) new Put(row) 928 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 929 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get(); 930 assertTrue(result.isSuccess()); 931 assertNull(result.getResult()); 932 933 r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 934 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 935 936 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 937 } 938 939 @Test 940 public void testCheckAndMutateWithTimestampFilter() throws Throwable { 941 AsyncTable<?> table = getTable.get(); 942 943 // Put with specifying the timestamp 944 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 945 946 // Put with success 947 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 948 .ifMatches(new FilterList( 949 new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 950 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 951 new TimestampsFilter(Collections.singletonList(100L)))) 952 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 953 assertTrue(result.isSuccess()); 954 assertNull(result.getResult()); 955 956 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 957 assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B")))); 958 959 // Put with failure 960 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 961 .ifMatches(new FilterList( 962 new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 963 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 964 new TimestampsFilter(Collections.singletonList(101L)))) 965 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get(); 966 assertFalse(result.isSuccess()); 967 assertNull(result.getResult()); 968 969 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 970 } 971 972 @Test 973 public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { 974 AsyncTable<?> table = getTable.get(); 975 976 // Put with specifying the timestamp 977 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))) 978 .get(); 979 980 // Put with success 981 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 982 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 983 CompareOperator.EQUAL, Bytes.toBytes("a"))) 984 .timeRange(TimeRange.between(0, 101)) 985 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 986 assertTrue(result.isSuccess()); 987 assertNull(result.getResult()); 988 989 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 990 assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B")))); 991 992 // Put with failure 993 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 994 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 995 CompareOperator.EQUAL, Bytes.toBytes("a"))) 996 .timeRange(TimeRange.between(0, 100)) 997 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))) 998 .get(); 999 assertFalse(result.isSuccess()); 1000 assertNull(result.getResult()); 1001 1002 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 1003 } 1004 1005 @Test 1006 public void testCheckAndIncrement() throws Throwable { 1007 AsyncTable<?> table = getTable.get(); 1008 1009 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); 1010 1011 // CheckAndIncrement with correct value 1012 CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1013 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1014 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get(); 1015 assertTrue(res.isSuccess()); 1016 assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1017 1018 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1019 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1020 1021 // CheckAndIncrement with wrong value 1022 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1023 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b")) 1024 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get(); 1025 assertFalse(res.isSuccess()); 1026 assertNull(res.getResult()); 1027 1028 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1029 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1030 1031 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 1032 1033 // CheckAndIncrement with a filter and correct value 1034 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1035 .ifMatches(new FilterList( 1036 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1037 Bytes.toBytes("a")), 1038 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1039 Bytes.toBytes("c")))) 1040 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get(); 1041 assertTrue(res.isSuccess()); 1042 assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1043 1044 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1045 assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1046 1047 // CheckAndIncrement with a filter and correct value 1048 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1049 .ifMatches(new FilterList( 1050 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1051 Bytes.toBytes("b")), 1052 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1053 Bytes.toBytes("d")))) 1054 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get(); 1055 assertFalse(res.isSuccess()); 1056 assertNull(res.getResult()); 1057 1058 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1059 assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1060 } 1061 1062 @Test 1063 public void testCheckAndAppend() throws Throwable { 1064 AsyncTable<?> table = getTable.get(); 1065 1066 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); 1067 1068 // CheckAndAppend with correct value 1069 CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1070 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1071 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 1072 assertTrue(res.isSuccess()); 1073 assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1074 1075 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1076 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1077 1078 // CheckAndAppend with correct value 1079 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1080 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b")) 1081 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 1082 assertFalse(res.isSuccess()); 1083 assertNull(res.getResult()); 1084 1085 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1086 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1087 1088 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 1089 1090 // CheckAndAppend with a filter and correct value 1091 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1092 .ifMatches(new FilterList( 1093 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1094 Bytes.toBytes("a")), 1095 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1096 Bytes.toBytes("c")))) 1097 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get(); 1098 assertTrue(res.isSuccess()); 1099 assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1100 1101 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1102 assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1103 1104 // CheckAndAppend with a filter and wrong value 1105 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1106 .ifMatches(new FilterList( 1107 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1108 Bytes.toBytes("b")), 1109 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1110 Bytes.toBytes("d")))) 1111 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get(); 1112 assertFalse(res.isSuccess()); 1113 assertNull(res.getResult()); 1114 1115 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1116 assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1117 } 1118 1119 @Test 1120 public void testCheckAndRowMutations() throws Throwable { 1121 final byte[] q1 = Bytes.toBytes("q1"); 1122 final byte[] q2 = Bytes.toBytes("q2"); 1123 final byte[] q3 = Bytes.toBytes("q3"); 1124 final byte[] q4 = Bytes.toBytes("q4"); 1125 final String v1 = "v1"; 1126 1127 AsyncTable<?> table = getTable.get(); 1128 1129 // Initial values 1130 table.putAll(Arrays.asList( 1131 new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")), 1132 new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)), 1133 new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get(); 1134 1135 // Do CheckAndRowMutations 1136 CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row) 1137 .ifNotExists(FAMILY, q1) 1138 .build(new RowMutations(row).add(Arrays.asList( 1139 new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)), 1140 new Delete(row).addColumns(FAMILY, q2), 1141 new Increment(row).addColumn(FAMILY, q3, 1), 1142 new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))) 1143 ); 1144 1145 CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get(); 1146 assertTrue(result.isSuccess()); 1147 assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3))); 1148 assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4))); 1149 1150 // Verify the value 1151 Result r = table.get(new Get(row)).get(); 1152 assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); 1153 assertNull(r.getValue(FAMILY, q2)); 1154 assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); 1155 assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); 1156 1157 // Do CheckAndRowMutations again 1158 checkAndMutate = CheckAndMutate.newBuilder(row) 1159 .ifNotExists(FAMILY, q1) 1160 .build(new RowMutations(row).add(Arrays.asList( 1161 new Delete(row).addColumns(FAMILY, q1), 1162 new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)), 1163 new Increment(row).addColumn(FAMILY, q3, 1), 1164 new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))) 1165 ); 1166 1167 result = table.checkAndMutate(checkAndMutate).get(); 1168 assertFalse(result.isSuccess()); 1169 assertNull(result.getResult()); 1170 1171 // Verify the value 1172 r = table.get(new Get(row)).get(); 1173 assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); 1174 assertNull(r.getValue(FAMILY, q2)); 1175 assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); 1176 assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); 1177 } 1178 1179 // Tests for batch version of checkAndMutate 1180 1181 @Test 1182 public void testCheckAndMutateBatch() throws Throwable { 1183 AsyncTable<?> table = getTable.get(); 1184 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1185 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 1186 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 1187 1188 table.putAll(Arrays.asList( 1189 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1190 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1191 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1192 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 1193 1194 // Test for Put 1195 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1196 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1197 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); 1198 1199 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1200 .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) 1201 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1202 1203 List<CheckAndMutateResult> results = 1204 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1205 1206 assertTrue(results.get(0).isSuccess()); 1207 assertNull(results.get(0).getResult()); 1208 assertFalse(results.get(1).isSuccess()); 1209 assertNull(results.get(1).getResult()); 1210 1211 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1212 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1213 1214 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1215 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1216 1217 // Test for Delete 1218 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1219 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")) 1220 .build(new Delete(row)); 1221 1222 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1223 .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) 1224 .build(new Delete(row2)); 1225 1226 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1227 1228 assertTrue(results.get(0).isSuccess()); 1229 assertNull(results.get(0).getResult()); 1230 assertFalse(results.get(1).isSuccess()); 1231 assertNull(results.get(1).getResult()); 1232 1233 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 1234 1235 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1236 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1237 1238 // Test for RowMutations 1239 checkAndMutate1 = CheckAndMutate.newBuilder(row3) 1240 .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) 1241 .build(new RowMutations(row3) 1242 .add((Mutation) new Put(row3) 1243 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 1244 .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))); 1245 1246 checkAndMutate2 = CheckAndMutate.newBuilder(row4) 1247 .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")) 1248 .build(new RowMutations(row4) 1249 .add((Mutation) new Put(row4) 1250 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 1251 .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D")))); 1252 1253 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1254 1255 assertTrue(results.get(0).isSuccess()); 1256 assertNull(results.get(0).getResult()); 1257 assertFalse(results.get(1).isSuccess()); 1258 assertNull(results.get(1).getResult()); 1259 1260 result = table.get(new Get(row3)).get(); 1261 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1262 assertNull(result.getValue(FAMILY, Bytes.toBytes("D"))); 1263 1264 result = table.get(new Get(row4)).get(); 1265 assertNull(result.getValue(FAMILY, Bytes.toBytes("F"))); 1266 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1267 } 1268 1269 @Test 1270 public void testCheckAndMutateBatch2() throws Throwable { 1271 AsyncTable<?> table = getTable.get(); 1272 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1273 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 1274 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 1275 1276 table.putAll(Arrays.asList( 1277 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1278 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1279 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")), 1280 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))).get(); 1281 1282 // Test for ifNotExists() 1283 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1284 .ifNotExists(FAMILY, Bytes.toBytes("B")) 1285 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); 1286 1287 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1288 .ifNotExists(FAMILY, Bytes.toBytes("B")) 1289 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1290 1291 List<CheckAndMutateResult> results = 1292 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1293 1294 assertTrue(results.get(0).isSuccess()); 1295 assertNull(results.get(0).getResult()); 1296 assertFalse(results.get(1).isSuccess()); 1297 assertNull(results.get(1).getResult()); 1298 1299 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1300 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1301 1302 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1303 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1304 1305 // Test for ifMatches() 1306 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1307 .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a")) 1308 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); 1309 1310 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1311 .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b")) 1312 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1313 1314 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1315 1316 assertTrue(results.get(0).isSuccess()); 1317 assertNull(results.get(0).getResult()); 1318 assertFalse(results.get(1).isSuccess()); 1319 assertNull(results.get(1).getResult()); 1320 1321 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1322 assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1323 1324 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1325 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1326 1327 // Test for timeRange() 1328 checkAndMutate1 = CheckAndMutate.newBuilder(row3) 1329 .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) 1330 .timeRange(TimeRange.between(0, 101)) 1331 .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e"))); 1332 1333 checkAndMutate2 = CheckAndMutate.newBuilder(row4) 1334 .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) 1335 .timeRange(TimeRange.between(0, 100)) 1336 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))); 1337 1338 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1339 1340 assertTrue(results.get(0).isSuccess()); 1341 assertNull(results.get(0).getResult()); 1342 assertFalse(results.get(1).isSuccess()); 1343 assertNull(results.get(1).getResult()); 1344 1345 result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1346 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1347 1348 result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1349 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1350 } 1351 1352 @Test 1353 public void testCheckAndMutateBatchWithFilter() throws Throwable { 1354 AsyncTable<?> table = getTable.get(); 1355 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1356 1357 table.putAll(Arrays.asList( 1358 new Put(row) 1359 .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1360 .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1361 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1362 new Put(row2) 1363 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) 1364 .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")) 1365 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get(); 1366 1367 // Test for Put 1368 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1369 .ifMatches(new FilterList( 1370 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1371 Bytes.toBytes("a")), 1372 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1373 Bytes.toBytes("b")))) 1374 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); 1375 1376 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1377 .ifMatches(new FilterList( 1378 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1379 Bytes.toBytes("a")), 1380 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1381 Bytes.toBytes("b")))) 1382 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); 1383 1384 List<CheckAndMutateResult> results = 1385 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1386 1387 assertTrue(results.get(0).isSuccess()); 1388 assertNull(results.get(0).getResult()); 1389 assertFalse(results.get(1).isSuccess()); 1390 assertNull(results.get(1).getResult()); 1391 1392 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1393 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1394 1395 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1396 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1397 1398 // Test for Delete 1399 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1400 .ifMatches(new FilterList( 1401 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1402 Bytes.toBytes("a")), 1403 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1404 Bytes.toBytes("b")))) 1405 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C"))); 1406 1407 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1408 .ifMatches(new FilterList( 1409 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1410 Bytes.toBytes("a")), 1411 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1412 Bytes.toBytes("b")))) 1413 .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F"))); 1414 1415 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1416 1417 assertTrue(results.get(0).isSuccess()); 1418 assertNull(results.get(0).getResult()); 1419 assertFalse(results.get(1).isSuccess()); 1420 assertNull(results.get(1).getResult()); 1421 1422 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 1423 1424 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1425 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1426 1427 // Test for RowMutations 1428 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1429 .ifMatches(new FilterList( 1430 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1431 Bytes.toBytes("a")), 1432 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1433 Bytes.toBytes("b")))) 1434 .build(new RowMutations(row) 1435 .add((Mutation) new Put(row) 1436 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) 1437 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); 1438 1439 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1440 .ifMatches(new FilterList( 1441 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1442 Bytes.toBytes("a")), 1443 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1444 Bytes.toBytes("b")))) 1445 .build(new RowMutations(row2) 1446 .add((Mutation) new Put(row2) 1447 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g"))) 1448 .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D")))); 1449 1450 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1451 1452 assertTrue(results.get(0).isSuccess()); 1453 assertNull(results.get(0).getResult()); 1454 assertFalse(results.get(1).isSuccess()); 1455 assertNull(results.get(1).getResult()); 1456 1457 result = table.get(new Get(row)).get(); 1458 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 1459 assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1460 1461 result = table.get(new Get(row2)).get(); 1462 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1463 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1464 } 1465 1466 @Test 1467 public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable { 1468 AsyncTable<?> table = getTable.get(); 1469 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1470 1471 table.putAll(Arrays.asList( 1472 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")) 1473 .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b")) 1474 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1475 new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")) 1476 .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e")) 1477 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get(); 1478 1479 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1480 .ifMatches(new FilterList( 1481 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1482 Bytes.toBytes("a")), 1483 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1484 Bytes.toBytes("b")))) 1485 .timeRange(TimeRange.between(0, 101)) 1486 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); 1487 1488 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1489 .ifMatches(new FilterList( 1490 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1491 Bytes.toBytes("d")), 1492 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1493 Bytes.toBytes("e")))) 1494 .timeRange(TimeRange.between(0, 100)) 1495 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); 1496 1497 List<CheckAndMutateResult> results = 1498 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1499 1500 assertTrue(results.get(0).isSuccess()); 1501 assertNull(results.get(0).getResult()); 1502 assertFalse(results.get(1).isSuccess()); 1503 assertNull(results.get(1).getResult()); 1504 1505 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1506 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1507 1508 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1509 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1510 } 1511 1512 @Test 1513 public void testCheckAndIncrementBatch() throws Throwable { 1514 AsyncTable<?> table = getTable.get(); 1515 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1516 1517 table.putAll(Arrays.asList( 1518 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1519 .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)), 1520 new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) 1521 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get(); 1522 1523 // CheckAndIncrement with correct value 1524 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1525 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1526 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)); 1527 1528 // CheckAndIncrement with wrong value 1529 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1530 .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d")) 1531 .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1)); 1532 1533 List<CheckAndMutateResult> results = 1534 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1535 1536 assertTrue(results.get(0).isSuccess()); 1537 assertEquals(1, Bytes.toLong(results.get(0).getResult() 1538 .getValue(FAMILY, Bytes.toBytes("B")))); 1539 assertFalse(results.get(1).isSuccess()); 1540 assertNull(results.get(1).getResult()); 1541 1542 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1543 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1544 1545 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1546 assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D")))); 1547 } 1548 1549 @Test 1550 public void testCheckAndAppendBatch() throws Throwable { 1551 AsyncTable<?> table = getTable.get(); 1552 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1553 1554 table.putAll(Arrays.asList( 1555 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1556 .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1557 new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) 1558 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 1559 1560 // CheckAndAppend with correct value 1561 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1562 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1563 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 1564 1565 // CheckAndAppend with wrong value 1566 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1567 .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d")) 1568 .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 1569 1570 List<CheckAndMutateResult> results = 1571 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1572 1573 assertTrue(results.get(0).isSuccess()); 1574 assertEquals("bb", Bytes.toString(results.get(0).getResult() 1575 .getValue(FAMILY, Bytes.toBytes("B")))); 1576 assertFalse(results.get(1).isSuccess()); 1577 assertNull(results.get(1).getResult()); 1578 1579 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1580 assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1581 1582 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1583 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1584 } 1585 1586 @Test 1587 public void testCheckAndRowMutationsBatch() throws Throwable { 1588 AsyncTable<?> table = getTable.get(); 1589 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1590 1591 table.putAll(Arrays.asList( 1592 new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1593 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L)) 1594 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 1595 new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")) 1596 .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L)) 1597 .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))) 1598 ).get(); 1599 1600 // CheckAndIncrement with correct value 1601 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1602 .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1603 .build(new RowMutations(row).add(Arrays.asList( 1604 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1605 new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")), 1606 new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L), 1607 new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) 1608 ))); 1609 1610 // CheckAndIncrement with wrong value 1611 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1612 .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a")) 1613 .build(new RowMutations(row2).add(Arrays.asList( 1614 new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 1615 new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")), 1616 new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L), 1617 new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")) 1618 ))); 1619 1620 List<CheckAndMutateResult> results = 1621 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1622 1623 assertTrue(results.get(0).isSuccess()); 1624 assertEquals(2, Bytes.toLong(results.get(0).getResult() 1625 .getValue(FAMILY, Bytes.toBytes("C")))); 1626 assertEquals("dd", Bytes.toString(results.get(0).getResult() 1627 .getValue(FAMILY, Bytes.toBytes("D")))); 1628 1629 assertFalse(results.get(1).isSuccess()); 1630 assertNull(results.get(1).getResult()); 1631 1632 Result result = table.get(new Get(row)).get(); 1633 assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1634 assertNull(result.getValue(FAMILY, Bytes.toBytes("B"))); 1635 assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 1636 assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1637 1638 result = table.get(new Get(row2)).get(); 1639 assertNull(result.getValue(FAMILY, Bytes.toBytes("E"))); 1640 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1641 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G")))); 1642 assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H")))); 1643 } 1644 1645 @Test 1646 public void testDisabled() throws InterruptedException, ExecutionException { 1647 ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); 1648 try { 1649 getTable.get().get(new Get(row)).get(); 1650 fail("Should fail since table has been disabled"); 1651 } catch (ExecutionException e) { 1652 Throwable cause = e.getCause(); 1653 assertThat(cause, instanceOf(TableNotEnabledException.class)); 1654 assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); 1655 } 1656 } 1657 1658 @Test 1659 public void testInvalidPut() { 1660 try { 1661 getTable.get().put(new Put(Bytes.toBytes(0))); 1662 fail("Should fail since the put does not contain any cells"); 1663 } catch (IllegalArgumentException e) { 1664 assertThat(e.getMessage(), containsString("No columns to insert")); 1665 } 1666 1667 try { 1668 getTable.get() 1669 .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])); 1670 fail("Should fail since the put exceeds the max key value size"); 1671 } catch (IllegalArgumentException e) { 1672 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1673 } 1674 } 1675}