001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.MatcherAssert.assertThat; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.concurrent.ArrayBlockingQueue; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.CountDownLatch; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.ForkJoinPool; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.function.Consumer; 043import java.util.function.Supplier; 044import java.util.stream.IntStream; 045import org.apache.hadoop.hbase.CompareOperator; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtil; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.TableNotEnabledException; 050import org.apache.hadoop.hbase.filter.BinaryComparator; 051import org.apache.hadoop.hbase.filter.FamilyFilter; 052import org.apache.hadoop.hbase.filter.FilterList; 053import org.apache.hadoop.hbase.filter.QualifierFilter; 054import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 055import org.apache.hadoop.hbase.filter.TimestampsFilter; 056import org.apache.hadoop.hbase.io.TimeRange; 057import org.apache.hadoop.hbase.testclassification.ClientTests; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.util.Pair; 062import org.junit.AfterClass; 063import org.junit.Before; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Rule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.rules.TestName; 070import org.junit.runner.RunWith; 071import org.junit.runners.Parameterized; 072import org.junit.runners.Parameterized.Parameter; 073import org.junit.runners.Parameterized.Parameters; 074 075import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 076 077@RunWith(Parameterized.class) 078@Category({ MediumTests.class, ClientTests.class }) 079public class TestAsyncTable { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestAsyncTable.class); 084 085 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 086 087 private static TableName TABLE_NAME = TableName.valueOf("async"); 088 089 private static byte[] FAMILY = Bytes.toBytes("cf"); 090 091 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 092 093 private static byte[] VALUE = Bytes.toBytes("value"); 094 095 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 096 097 private static AsyncConnection ASYNC_CONN; 098 099 @Rule 100 public TestName testName = new TestName(); 101 102 private byte[] row; 103 104 @Parameter 105 public Supplier<AsyncTable<?>> getTable; 106 107 private static AsyncTable<?> getRawTable() { 108 return ASYNC_CONN.getTable(TABLE_NAME); 109 } 110 111 private static AsyncTable<?> getTable() { 112 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 113 } 114 115 @Parameters 116 public static List<Object[]> params() { 117 return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable }, 118 new Supplier<?>[] { TestAsyncTable::getTable }); 119 } 120 121 @BeforeClass 122 public static void setUpBeforeClass() throws Exception { 123 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 124 MAX_KEY_VALUE_SIZE); 125 TEST_UTIL.startMiniCluster(1); 126 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 127 TEST_UTIL.waitTableAvailable(TABLE_NAME); 128 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 129 assertFalse(ASYNC_CONN.isClosed()); 130 } 131 132 @AfterClass 133 public static void tearDownAfterClass() throws Exception { 134 Closeables.close(ASYNC_CONN, true); 135 assertTrue(ASYNC_CONN.isClosed()); 136 TEST_UTIL.shutdownMiniCluster(); 137 } 138 139 @Before 140 public void setUp() throws IOException, InterruptedException, ExecutionException { 141 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); 142 if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) { 143 ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get(); 144 } 145 } 146 147 @Test 148 public void testSimple() throws Exception { 149 AsyncTable<?> table = getTable.get(); 150 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 151 assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 152 Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 153 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 154 table.delete(new Delete(row)).get(); 155 result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 156 assertTrue(result.isEmpty()); 157 assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 158 } 159 160 private byte[] concat(byte[] base, int index) { 161 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 162 } 163 164 @SuppressWarnings("FutureReturnValueIgnored") 165 @Test 166 public void testSimpleMultiple() throws Exception { 167 AsyncTable<?> table = getTable.get(); 168 int count = 100; 169 CountDownLatch putLatch = new CountDownLatch(count); 170 IntStream.range(0, count).forEach( 171 i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) 172 .thenAccept(x -> putLatch.countDown())); 173 putLatch.await(); 174 BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count); 175 IntStream.range(0, count) 176 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 177 .thenAccept(x -> existsResp.add(x))); 178 for (int i = 0; i < count; i++) { 179 assertTrue(existsResp.take()); 180 } 181 BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count); 182 IntStream.range(0, count) 183 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 184 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 185 for (int i = 0; i < count; i++) { 186 Pair<Integer, Result> pair = getResp.take(); 187 assertArrayEquals(concat(VALUE, pair.getFirst()), 188 pair.getSecond().getValue(FAMILY, QUALIFIER)); 189 } 190 CountDownLatch deleteLatch = new CountDownLatch(count); 191 IntStream.range(0, count).forEach( 192 i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); 193 deleteLatch.await(); 194 IntStream.range(0, count) 195 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 196 .thenAccept(x -> existsResp.add(x))); 197 for (int i = 0; i < count; i++) { 198 assertFalse(existsResp.take()); 199 } 200 IntStream.range(0, count) 201 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 202 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 203 for (int i = 0; i < count; i++) { 204 Pair<Integer, Result> pair = getResp.take(); 205 assertTrue(pair.getSecond().isEmpty()); 206 } 207 } 208 209 @SuppressWarnings("FutureReturnValueIgnored") 210 @Test 211 public void testIncrement() throws InterruptedException, ExecutionException { 212 AsyncTable<?> table = getTable.get(); 213 int count = 100; 214 CountDownLatch latch = new CountDownLatch(count); 215 AtomicLong sum = new AtomicLong(0L); 216 IntStream.range(0, count) 217 .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { 218 sum.addAndGet(x); 219 latch.countDown(); 220 })); 221 latch.await(); 222 assertEquals(count, Bytes.toLong( 223 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); 224 assertEquals((1 + count) * count / 2, sum.get()); 225 } 226 227 @SuppressWarnings("FutureReturnValueIgnored") 228 @Test 229 public void testAppend() throws InterruptedException, ExecutionException { 230 AsyncTable<?> table = getTable.get(); 231 int count = 10; 232 CountDownLatch latch = new CountDownLatch(count); 233 char suffix = ':'; 234 AtomicLong suffixCount = new AtomicLong(0L); 235 IntStream.range(0, count) 236 .forEachOrdered(i -> table 237 .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) 238 .thenAccept(r -> { 239 suffixCount.addAndGet( 240 Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count()); 241 latch.countDown(); 242 })); 243 latch.await(); 244 assertEquals((1 + count) * count / 2, suffixCount.get()); 245 String value = Bytes.toString( 246 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); 247 int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) 248 .sorted().toArray(); 249 assertArrayEquals(IntStream.range(0, count).toArray(), actual); 250 } 251 252 @Test 253 public void testMutateRow() throws InterruptedException, ExecutionException, IOException { 254 AsyncTable<?> table = getTable.get(); 255 RowMutations mutation = new RowMutations(row); 256 mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); 257 Result result = table.mutateRow(mutation).get(); 258 assertTrue(result.getExists()); 259 assertTrue(result.isEmpty()); 260 261 result = table.get(new Get(row)).get(); 262 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); 263 264 mutation = new RowMutations(row); 265 mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); 266 mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); 267 mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L)); 268 mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc"))); 269 result = table.mutateRow(mutation).get(); 270 assertTrue(result.getExists()); 271 assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); 272 assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); 273 274 result = table.get(new Get(row)).get(); 275 assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); 276 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); 277 assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); 278 assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); 279 } 280 281 // Tests for old checkAndMutate API 282 283 @SuppressWarnings("FutureReturnValueIgnored") 284 @Test 285 @Deprecated 286 public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException { 287 AsyncTable<?> table = getTable.get(); 288 AtomicInteger successCount = new AtomicInteger(0); 289 AtomicInteger successIndex = new AtomicInteger(-1); 290 int count = 10; 291 CountDownLatch latch = new CountDownLatch(count); 292 IntStream.range(0, count) 293 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() 294 .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { 295 if (x) { 296 successCount.incrementAndGet(); 297 successIndex.set(i); 298 } 299 latch.countDown(); 300 })); 301 latch.await(); 302 assertEquals(1, successCount.get()); 303 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 304 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 305 } 306 307 @SuppressWarnings("FutureReturnValueIgnored") 308 @Test 309 @Deprecated 310 public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException { 311 AsyncTable<?> table = getTable.get(); 312 int count = 10; 313 CountDownLatch putLatch = new CountDownLatch(count + 1); 314 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 315 IntStream.range(0, count) 316 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 317 .thenRun(() -> putLatch.countDown())); 318 putLatch.await(); 319 320 AtomicInteger successCount = new AtomicInteger(0); 321 AtomicInteger successIndex = new AtomicInteger(-1); 322 CountDownLatch deleteLatch = new CountDownLatch(count); 323 IntStream.range(0, count) 324 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) 325 .thenDelete( 326 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) 327 .thenAccept(x -> { 328 if (x) { 329 successCount.incrementAndGet(); 330 successIndex.set(i); 331 } 332 deleteLatch.countDown(); 333 })); 334 deleteLatch.await(); 335 assertEquals(1, successCount.get()); 336 Result result = table.get(new Get(row)).get(); 337 IntStream.range(0, count).forEach(i -> { 338 if (i == successIndex.get()) { 339 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 340 } else { 341 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 342 } 343 }); 344 } 345 346 @SuppressWarnings("FutureReturnValueIgnored") 347 @Test 348 @Deprecated 349 public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException { 350 AsyncTable<?> table = getTable.get(); 351 int count = 10; 352 CountDownLatch putLatch = new CountDownLatch(count + 1); 353 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 354 IntStream.range(0, count) 355 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 356 .thenRun(() -> putLatch.countDown())); 357 putLatch.await(); 358 359 AtomicInteger successCount = new AtomicInteger(0); 360 AtomicInteger successIndex = new AtomicInteger(-1); 361 CountDownLatch mutateLatch = new CountDownLatch(count); 362 IntStream.range(0, count).forEach(i -> { 363 RowMutations mutation = new RowMutations(row); 364 try { 365 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 366 mutation 367 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 368 } catch (IOException e) { 369 throw new UncheckedIOException(e); 370 } 371 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) 372 .thenAccept(x -> { 373 if (x) { 374 successCount.incrementAndGet(); 375 successIndex.set(i); 376 } 377 mutateLatch.countDown(); 378 }); 379 }); 380 mutateLatch.await(); 381 assertEquals(1, successCount.get()); 382 Result result = table.get(new Get(row)).get(); 383 IntStream.range(0, count).forEach(i -> { 384 if (i == successIndex.get()) { 385 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 386 } else { 387 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 388 } 389 }); 390 } 391 392 @Test 393 @Deprecated 394 public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception { 395 AsyncTable<?> table = getTable.get(); 396 final long ts = EnvironmentEdgeManager.currentTime() / 2; 397 Put put = new Put(row); 398 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 399 400 boolean ok = 401 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get(); 402 assertTrue(ok); 403 404 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 405 .ifEquals(VALUE).thenPut(put).get(); 406 assertFalse(ok); 407 408 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 409 .ifEquals(VALUE).thenPut(put).get(); 410 assertTrue(ok); 411 412 RowMutations rm = new RowMutations(row).add((Mutation) put); 413 414 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 415 .ifEquals(VALUE).thenMutate(rm).get(); 416 assertFalse(ok); 417 418 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 419 .ifEquals(VALUE).thenMutate(rm).get(); 420 assertTrue(ok); 421 422 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 423 424 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 425 .ifEquals(VALUE).thenDelete(delete).get(); 426 assertFalse(ok); 427 428 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 429 .ifEquals(VALUE).thenDelete(delete).get(); 430 assertTrue(ok); 431 } 432 433 @Test 434 @Deprecated 435 public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable { 436 AsyncTable<?> table = getTable.get(); 437 438 // Put one row 439 Put put = new Put(row); 440 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 441 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 442 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 443 table.put(put).get(); 444 445 // Put with success 446 boolean ok = table 447 .checkAndMutate(row, 448 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 449 Bytes.toBytes("a"))) 450 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get(); 451 assertTrue(ok); 452 453 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 454 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 455 456 // Put with failure 457 ok = table 458 .checkAndMutate(row, 459 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 460 Bytes.toBytes("b"))) 461 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get(); 462 assertFalse(ok); 463 464 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 465 466 // Delete with success 467 ok = table 468 .checkAndMutate(row, 469 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 470 Bytes.toBytes("a"))) 471 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get(); 472 assertTrue(ok); 473 474 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 475 476 // Mutate with success 477 ok = table 478 .checkAndMutate(row, 479 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 480 Bytes.toBytes("b"))) 481 .thenMutate(new RowMutations(row) 482 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 483 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 484 .get(); 485 assertTrue(ok); 486 487 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 488 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 489 490 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 491 } 492 493 @Test 494 @Deprecated 495 public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable { 496 AsyncTable<?> table = getTable.get(); 497 498 // Put one row 499 Put put = new Put(row); 500 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 501 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 502 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 503 table.put(put).get(); 504 505 // Put with success 506 boolean ok = table 507 .checkAndMutate(row, 508 new FilterList( 509 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 510 Bytes.toBytes("a")), 511 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 512 Bytes.toBytes("b")))) 513 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get(); 514 assertTrue(ok); 515 516 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 517 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 518 519 // Put with failure 520 ok = table 521 .checkAndMutate(row, 522 new FilterList( 523 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 524 Bytes.toBytes("a")), 525 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 526 Bytes.toBytes("c")))) 527 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get(); 528 assertFalse(ok); 529 530 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 531 532 // Delete with success 533 ok = table 534 .checkAndMutate(row, 535 new FilterList( 536 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 537 Bytes.toBytes("a")), 538 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 539 Bytes.toBytes("b")))) 540 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get(); 541 assertTrue(ok); 542 543 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 544 545 // Mutate with success 546 ok = table 547 .checkAndMutate(row, 548 new FilterList( 549 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 550 Bytes.toBytes("a")), 551 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 552 Bytes.toBytes("b")))) 553 .thenMutate(new RowMutations(row) 554 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 555 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 556 .get(); 557 assertTrue(ok); 558 559 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 560 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 561 562 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 563 } 564 565 @Test 566 @Deprecated 567 public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable { 568 AsyncTable<?> table = getTable.get(); 569 570 // Put with specifying the timestamp 571 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 572 573 // Put with success 574 boolean ok = table 575 .checkAndMutate(row, 576 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 577 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 578 new TimestampsFilter(Collections.singletonList(100L)))) 579 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get(); 580 assertTrue(ok); 581 582 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 583 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 584 585 // Put with failure 586 ok = table 587 .checkAndMutate(row, 588 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 589 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 590 new TimestampsFilter(Collections.singletonList(101L)))) 591 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get(); 592 assertFalse(ok); 593 594 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 595 } 596 597 @Test 598 @Deprecated 599 public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable { 600 AsyncTable<?> table = getTable.get(); 601 602 // Put with specifying the timestamp 603 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 604 605 // Put with success 606 boolean ok = table 607 .checkAndMutate(row, 608 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 609 Bytes.toBytes("a"))) 610 .timeRange(TimeRange.between(0, 101)) 611 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get(); 612 assertTrue(ok); 613 614 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 615 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 616 617 // Put with failure 618 ok = table 619 .checkAndMutate(row, 620 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 621 Bytes.toBytes("a"))) 622 .timeRange(TimeRange.between(0, 100)) 623 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get(); 624 assertFalse(ok); 625 626 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 627 } 628 629 @Test(expected = NullPointerException.class) 630 @Deprecated 631 public void testCheckAndMutateWithoutConditionForOldApi() { 632 getTable.get().checkAndMutate(row, FAMILY) 633 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 634 } 635 636 // Tests for new CheckAndMutate API 637 638 @SuppressWarnings("FutureReturnValueIgnored") 639 @Test 640 public void testCheckAndPut() throws InterruptedException, ExecutionException { 641 AsyncTable<?> table = getTable.get(); 642 AtomicInteger successCount = new AtomicInteger(0); 643 AtomicInteger successIndex = new AtomicInteger(-1); 644 int count = 10; 645 CountDownLatch latch = new CountDownLatch(count); 646 647 IntStream.range(0, count).forEach( 648 i -> table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER) 649 .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))).thenAccept(x -> { 650 if (x.isSuccess()) { 651 successCount.incrementAndGet(); 652 successIndex.set(i); 653 } 654 assertNull(x.getResult()); 655 latch.countDown(); 656 })); 657 latch.await(); 658 assertEquals(1, successCount.get()); 659 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 660 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 661 } 662 663 @SuppressWarnings("FutureReturnValueIgnored") 664 @Test 665 public void testCheckAndDelete() throws InterruptedException, ExecutionException { 666 AsyncTable<?> table = getTable.get(); 667 int count = 10; 668 CountDownLatch putLatch = new CountDownLatch(count + 1); 669 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 670 IntStream.range(0, count) 671 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 672 .thenRun(() -> putLatch.countDown())); 673 putLatch.await(); 674 675 AtomicInteger successCount = new AtomicInteger(0); 676 AtomicInteger successIndex = new AtomicInteger(-1); 677 CountDownLatch deleteLatch = new CountDownLatch(count); 678 679 IntStream.range(0, count) 680 .forEach(i -> table 681 .checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build( 682 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))) 683 .thenAccept(x -> { 684 if (x.isSuccess()) { 685 successCount.incrementAndGet(); 686 successIndex.set(i); 687 } 688 assertNull(x.getResult()); 689 deleteLatch.countDown(); 690 })); 691 deleteLatch.await(); 692 assertEquals(1, successCount.get()); 693 Result result = table.get(new Get(row)).get(); 694 IntStream.range(0, count).forEach(i -> { 695 if (i == successIndex.get()) { 696 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 697 } else { 698 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 699 } 700 }); 701 } 702 703 @SuppressWarnings("FutureReturnValueIgnored") 704 @Test 705 public void testCheckAndMutate() throws InterruptedException, ExecutionException { 706 AsyncTable<?> table = getTable.get(); 707 int count = 10; 708 CountDownLatch putLatch = new CountDownLatch(count + 1); 709 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 710 IntStream.range(0, count) 711 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 712 .thenRun(() -> putLatch.countDown())); 713 putLatch.await(); 714 715 AtomicInteger successCount = new AtomicInteger(0); 716 AtomicInteger successIndex = new AtomicInteger(-1); 717 CountDownLatch mutateLatch = new CountDownLatch(count); 718 IntStream.range(0, count).forEach(i -> { 719 RowMutations mutation = new RowMutations(row); 720 try { 721 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 722 mutation 723 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 724 } catch (IOException e) { 725 throw new UncheckedIOException(e); 726 } 727 728 table 729 .checkAndMutate( 730 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(mutation)) 731 .thenAccept(x -> { 732 if (x.isSuccess()) { 733 successCount.incrementAndGet(); 734 successIndex.set(i); 735 } 736 assertNull(x.getResult()); 737 mutateLatch.countDown(); 738 }); 739 }); 740 mutateLatch.await(); 741 assertEquals(1, successCount.get()); 742 Result result = table.get(new Get(row)).get(); 743 IntStream.range(0, count).forEach(i -> { 744 if (i == successIndex.get()) { 745 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 746 } else { 747 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 748 } 749 }); 750 } 751 752 @Test 753 public void testCheckAndMutateWithTimeRange() throws Exception { 754 AsyncTable<?> table = getTable.get(); 755 final long ts = EnvironmentEdgeManager.currentTime() / 2; 756 Put put = new Put(row); 757 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 758 759 CheckAndMutateResult result = 760 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER).build(put)) 761 .get(); 762 assertTrue(result.isSuccess()); 763 assertNull(result.getResult()); 764 765 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 766 .timeRange(TimeRange.at(ts + 10000)).build(put)).get(); 767 assertFalse(result.isSuccess()); 768 assertNull(result.getResult()); 769 770 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 771 .timeRange(TimeRange.at(ts)).build(put)).get(); 772 assertTrue(result.isSuccess()); 773 assertNull(result.getResult()); 774 775 RowMutations rm = new RowMutations(row).add((Mutation) put); 776 777 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 778 .timeRange(TimeRange.at(ts + 10000)).build(rm)).get(); 779 assertFalse(result.isSuccess()); 780 assertNull(result.getResult()); 781 782 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 783 .timeRange(TimeRange.at(ts)).build(rm)).get(); 784 assertTrue(result.isSuccess()); 785 assertNull(result.getResult()); 786 787 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 788 789 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 790 .timeRange(TimeRange.at(ts + 10000)).build(delete)).get(); 791 assertFalse(result.isSuccess()); 792 assertNull(result.getResult()); 793 794 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 795 .timeRange(TimeRange.at(ts)).build(delete)).get(); 796 assertTrue(result.isSuccess()); 797 assertNull(result.getResult()); 798 } 799 800 @Test 801 public void testCheckAndMutateWithSingleFilter() throws Throwable { 802 AsyncTable<?> table = getTable.get(); 803 804 // Put one row 805 Put put = new Put(row); 806 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 807 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 808 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 809 table.put(put).get(); 810 811 // Put with success 812 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 813 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 814 Bytes.toBytes("a"))) 815 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 816 assertTrue(result.isSuccess()); 817 assertNull(result.getResult()); 818 819 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 820 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 821 822 // Put with failure 823 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 824 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 825 Bytes.toBytes("b"))) 826 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); 827 assertFalse(result.isSuccess()); 828 assertNull(result.getResult()); 829 830 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 831 832 // Delete with success 833 result = 834 table 835 .checkAndMutate(CheckAndMutate.newBuilder(row) 836 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 837 Bytes.toBytes("a"))) 838 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))) 839 .get(); 840 assertTrue(result.isSuccess()); 841 assertNull(result.getResult()); 842 843 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 844 845 // Mutate with success 846 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 847 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 848 Bytes.toBytes("b"))) 849 .build(new RowMutations(row) 850 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 851 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))) 852 .get(); 853 assertTrue(result.isSuccess()); 854 assertNull(result.getResult()); 855 856 r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 857 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 858 859 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 860 } 861 862 @Test 863 public void testCheckAndMutateWithMultipleFilters() throws Throwable { 864 AsyncTable<?> table = getTable.get(); 865 866 // Put one row 867 Put put = new Put(row); 868 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 869 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 870 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 871 table.put(put).get(); 872 873 // Put with success 874 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 875 .ifMatches(new FilterList( 876 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 877 Bytes.toBytes("a")), 878 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 879 Bytes.toBytes("b")))) 880 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 881 assertTrue(result.isSuccess()); 882 assertNull(result.getResult()); 883 884 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 885 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 886 887 // Put with failure 888 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 889 .ifMatches(new FilterList( 890 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 891 Bytes.toBytes("a")), 892 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 893 Bytes.toBytes("c")))) 894 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); 895 assertFalse(result.isSuccess()); 896 assertNull(result.getResult()); 897 898 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 899 900 // Delete with success 901 result = table 902 .checkAndMutate(CheckAndMutate.newBuilder(row) 903 .ifMatches(new FilterList( 904 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 905 Bytes.toBytes("a")), 906 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 907 Bytes.toBytes("b")))) 908 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))) 909 .get(); 910 assertTrue(result.isSuccess()); 911 assertNull(result.getResult()); 912 913 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 914 915 // Mutate with success 916 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 917 .ifMatches(new FilterList( 918 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 919 Bytes.toBytes("a")), 920 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 921 Bytes.toBytes("b")))) 922 .build(new RowMutations(row) 923 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 924 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))) 925 .get(); 926 assertTrue(result.isSuccess()); 927 assertNull(result.getResult()); 928 929 r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 930 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 931 932 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 933 } 934 935 @Test 936 public void testCheckAndMutateWithTimestampFilter() throws Throwable { 937 AsyncTable<?> table = getTable.get(); 938 939 // Put with specifying the timestamp 940 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 941 942 // Put with success 943 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 944 .ifMatches( 945 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 946 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 947 new TimestampsFilter(Collections.singletonList(100L)))) 948 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 949 assertTrue(result.isSuccess()); 950 assertNull(result.getResult()); 951 952 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 953 assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B")))); 954 955 // Put with failure 956 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 957 .ifMatches( 958 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 959 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 960 new TimestampsFilter(Collections.singletonList(101L)))) 961 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get(); 962 assertFalse(result.isSuccess()); 963 assertNull(result.getResult()); 964 965 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 966 } 967 968 @Test 969 public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { 970 AsyncTable<?> table = getTable.get(); 971 972 // Put with specifying the timestamp 973 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 974 975 // Put with success 976 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 977 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 978 Bytes.toBytes("a"))) 979 .timeRange(TimeRange.between(0, 101)) 980 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 981 assertTrue(result.isSuccess()); 982 assertNull(result.getResult()); 983 984 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 985 assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B")))); 986 987 // Put with failure 988 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 989 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 990 Bytes.toBytes("a"))) 991 .timeRange(TimeRange.between(0, 100)) 992 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get(); 993 assertFalse(result.isSuccess()); 994 assertNull(result.getResult()); 995 996 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 997 } 998 999 @Test 1000 public void testCheckAndIncrement() throws Throwable { 1001 AsyncTable<?> table = getTable.get(); 1002 1003 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); 1004 1005 // CheckAndIncrement with correct value 1006 CheckAndMutateResult res = table.checkAndMutate( 1007 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1008 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))) 1009 .get(); 1010 assertTrue(res.isSuccess()); 1011 assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1012 1013 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1014 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1015 1016 // CheckAndIncrement with wrong value 1017 res = table.checkAndMutate( 1018 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b")) 1019 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))) 1020 .get(); 1021 assertFalse(res.isSuccess()); 1022 assertNull(res.getResult()); 1023 1024 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1025 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1026 1027 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 1028 1029 // CheckAndIncrement with a filter and correct value 1030 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1031 .ifMatches(new FilterList( 1032 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1033 Bytes.toBytes("a")), 1034 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1035 Bytes.toBytes("c")))) 1036 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get(); 1037 assertTrue(res.isSuccess()); 1038 assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1039 1040 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1041 assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1042 1043 // CheckAndIncrement with a filter and correct value 1044 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1045 .ifMatches(new FilterList( 1046 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1047 Bytes.toBytes("b")), 1048 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1049 Bytes.toBytes("d")))) 1050 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get(); 1051 assertFalse(res.isSuccess()); 1052 assertNull(res.getResult()); 1053 1054 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1055 assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1056 } 1057 1058 @Test 1059 public void testCheckAndAppend() throws Throwable { 1060 AsyncTable<?> table = getTable.get(); 1061 1062 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); 1063 1064 // CheckAndAppend with correct value 1065 CheckAndMutateResult res = 1066 table 1067 .checkAndMutate( 1068 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1069 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))) 1070 .get(); 1071 assertTrue(res.isSuccess()); 1072 assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1073 1074 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1075 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1076 1077 // CheckAndAppend with correct value 1078 res = 1079 table 1080 .checkAndMutate( 1081 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b")) 1082 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))) 1083 .get(); 1084 assertFalse(res.isSuccess()); 1085 assertNull(res.getResult()); 1086 1087 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1088 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1089 1090 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 1091 1092 // CheckAndAppend with a filter and correct value 1093 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1094 .ifMatches(new FilterList( 1095 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1096 Bytes.toBytes("a")), 1097 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1098 Bytes.toBytes("c")))) 1099 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get(); 1100 assertTrue(res.isSuccess()); 1101 assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1102 1103 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1104 assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1105 1106 // CheckAndAppend with a filter and wrong value 1107 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1108 .ifMatches(new FilterList( 1109 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1110 Bytes.toBytes("b")), 1111 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1112 Bytes.toBytes("d")))) 1113 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get(); 1114 assertFalse(res.isSuccess()); 1115 assertNull(res.getResult()); 1116 1117 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1118 assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1119 } 1120 1121 @Test 1122 public void testCheckAndRowMutations() throws Throwable { 1123 final byte[] q1 = Bytes.toBytes("q1"); 1124 final byte[] q2 = Bytes.toBytes("q2"); 1125 final byte[] q3 = Bytes.toBytes("q3"); 1126 final byte[] q4 = Bytes.toBytes("q4"); 1127 final String v1 = "v1"; 1128 1129 AsyncTable<?> table = getTable.get(); 1130 1131 // Initial values 1132 table.putAll(Arrays.asList(new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")), 1133 new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)), 1134 new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get(); 1135 1136 // Do CheckAndRowMutations 1137 CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1).build( 1138 new RowMutations(row).add(Arrays.asList(new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)), 1139 new Delete(row).addColumns(FAMILY, q2), new Increment(row).addColumn(FAMILY, q3, 1), 1140 new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))); 1141 1142 CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get(); 1143 assertTrue(result.isSuccess()); 1144 assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3))); 1145 assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4))); 1146 1147 // Verify the value 1148 Result r = table.get(new Get(row)).get(); 1149 assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); 1150 assertNull(r.getValue(FAMILY, q2)); 1151 assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); 1152 assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); 1153 1154 // Do CheckAndRowMutations again 1155 checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1) 1156 .build(new RowMutations(row).add(Arrays.asList(new Delete(row).addColumns(FAMILY, q1), 1157 new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)), 1158 new Increment(row).addColumn(FAMILY, q3, 1), 1159 new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))); 1160 1161 result = table.checkAndMutate(checkAndMutate).get(); 1162 assertFalse(result.isSuccess()); 1163 assertNull(result.getResult()); 1164 1165 // Verify the value 1166 r = table.get(new Get(row)).get(); 1167 assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); 1168 assertNull(r.getValue(FAMILY, q2)); 1169 assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); 1170 assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); 1171 } 1172 1173 // Tests for batch version of checkAndMutate 1174 1175 @Test 1176 public void testCheckAndMutateBatch() throws Throwable { 1177 AsyncTable<?> table = getTable.get(); 1178 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1179 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 1180 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 1181 1182 table 1183 .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1184 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1185 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1186 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))) 1187 .get(); 1188 1189 // Test for Put 1190 CheckAndMutate checkAndMutate1 = 1191 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1192 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); 1193 1194 CheckAndMutate checkAndMutate2 = 1195 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) 1196 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1197 1198 List<CheckAndMutateResult> results = 1199 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1200 1201 assertTrue(results.get(0).isSuccess()); 1202 assertNull(results.get(0).getResult()); 1203 assertFalse(results.get(1).isSuccess()); 1204 assertNull(results.get(1).getResult()); 1205 1206 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1207 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1208 1209 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1210 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1211 1212 // Test for Delete 1213 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1214 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")).build(new Delete(row)); 1215 1216 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1217 .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")).build(new Delete(row2)); 1218 1219 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1220 1221 assertTrue(results.get(0).isSuccess()); 1222 assertNull(results.get(0).getResult()); 1223 assertFalse(results.get(1).isSuccess()); 1224 assertNull(results.get(1).getResult()); 1225 1226 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 1227 1228 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1229 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1230 1231 // Test for RowMutations 1232 checkAndMutate1 = 1233 CheckAndMutate.newBuilder(row3).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) 1234 .build(new RowMutations(row3) 1235 .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 1236 .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))); 1237 1238 checkAndMutate2 = 1239 CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")) 1240 .build(new RowMutations(row4) 1241 .add((Mutation) new Put(row4).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 1242 .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D")))); 1243 1244 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1245 1246 assertTrue(results.get(0).isSuccess()); 1247 assertNull(results.get(0).getResult()); 1248 assertFalse(results.get(1).isSuccess()); 1249 assertNull(results.get(1).getResult()); 1250 1251 result = table.get(new Get(row3)).get(); 1252 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1253 assertNull(result.getValue(FAMILY, Bytes.toBytes("D"))); 1254 1255 result = table.get(new Get(row4)).get(); 1256 assertNull(result.getValue(FAMILY, Bytes.toBytes("F"))); 1257 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1258 } 1259 1260 @Test 1261 public void testCheckAndMutateBatch2() throws Throwable { 1262 AsyncTable<?> table = getTable.get(); 1263 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1264 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 1265 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 1266 1267 table 1268 .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1269 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1270 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")), 1271 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))) 1272 .get(); 1273 1274 // Test for ifNotExists() 1275 CheckAndMutate checkAndMutate1 = 1276 CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, Bytes.toBytes("B")) 1277 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); 1278 1279 CheckAndMutate checkAndMutate2 = 1280 CheckAndMutate.newBuilder(row2).ifNotExists(FAMILY, Bytes.toBytes("B")) 1281 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1282 1283 List<CheckAndMutateResult> results = 1284 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1285 1286 assertTrue(results.get(0).isSuccess()); 1287 assertNull(results.get(0).getResult()); 1288 assertFalse(results.get(1).isSuccess()); 1289 assertNull(results.get(1).getResult()); 1290 1291 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1292 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1293 1294 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1295 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1296 1297 // Test for ifMatches() 1298 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1299 .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a")) 1300 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); 1301 1302 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1303 .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b")) 1304 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1305 1306 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1307 1308 assertTrue(results.get(0).isSuccess()); 1309 assertNull(results.get(0).getResult()); 1310 assertFalse(results.get(1).isSuccess()); 1311 assertNull(results.get(1).getResult()); 1312 1313 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1314 assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1315 1316 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1317 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1318 1319 // Test for timeRange() 1320 checkAndMutate1 = CheckAndMutate.newBuilder(row3) 1321 .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).timeRange(TimeRange.between(0, 101)) 1322 .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e"))); 1323 1324 checkAndMutate2 = CheckAndMutate.newBuilder(row4) 1325 .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")).timeRange(TimeRange.between(0, 100)) 1326 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))); 1327 1328 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1329 1330 assertTrue(results.get(0).isSuccess()); 1331 assertNull(results.get(0).getResult()); 1332 assertFalse(results.get(1).isSuccess()); 1333 assertNull(results.get(1).getResult()); 1334 1335 result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1336 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1337 1338 result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1339 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1340 } 1341 1342 @Test 1343 public void testCheckAndMutateBatchWithFilter() throws Throwable { 1344 AsyncTable<?> table = getTable.get(); 1345 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1346 1347 table.putAll(Arrays.asList( 1348 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1349 .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1350 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1351 new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) 1352 .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")) 1353 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))) 1354 .get(); 1355 1356 // Test for Put 1357 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1358 .ifMatches(new FilterList( 1359 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1360 Bytes.toBytes("a")), 1361 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1362 Bytes.toBytes("b")))) 1363 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); 1364 1365 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1366 .ifMatches(new FilterList( 1367 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1368 Bytes.toBytes("a")), 1369 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1370 Bytes.toBytes("b")))) 1371 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); 1372 1373 List<CheckAndMutateResult> results = 1374 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1375 1376 assertTrue(results.get(0).isSuccess()); 1377 assertNull(results.get(0).getResult()); 1378 assertFalse(results.get(1).isSuccess()); 1379 assertNull(results.get(1).getResult()); 1380 1381 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1382 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1383 1384 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1385 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1386 1387 // Test for Delete 1388 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1389 .ifMatches(new FilterList( 1390 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1391 Bytes.toBytes("a")), 1392 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1393 Bytes.toBytes("b")))) 1394 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C"))); 1395 1396 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1397 .ifMatches(new FilterList( 1398 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1399 Bytes.toBytes("a")), 1400 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1401 Bytes.toBytes("b")))) 1402 .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F"))); 1403 1404 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1405 1406 assertTrue(results.get(0).isSuccess()); 1407 assertNull(results.get(0).getResult()); 1408 assertFalse(results.get(1).isSuccess()); 1409 assertNull(results.get(1).getResult()); 1410 1411 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 1412 1413 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1414 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1415 1416 // Test for RowMutations 1417 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1418 .ifMatches(new FilterList( 1419 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1420 Bytes.toBytes("a")), 1421 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1422 Bytes.toBytes("b")))) 1423 .build(new RowMutations(row) 1424 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) 1425 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); 1426 1427 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1428 .ifMatches(new FilterList( 1429 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1430 Bytes.toBytes("a")), 1431 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1432 Bytes.toBytes("b")))) 1433 .build(new RowMutations(row2) 1434 .add((Mutation) new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g"))) 1435 .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D")))); 1436 1437 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1438 1439 assertTrue(results.get(0).isSuccess()); 1440 assertNull(results.get(0).getResult()); 1441 assertFalse(results.get(1).isSuccess()); 1442 assertNull(results.get(1).getResult()); 1443 1444 result = table.get(new Get(row)).get(); 1445 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 1446 assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1447 1448 result = table.get(new Get(row2)).get(); 1449 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1450 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1451 } 1452 1453 @Test 1454 public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable { 1455 AsyncTable<?> table = getTable.get(); 1456 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1457 1458 table.putAll(Arrays.asList( 1459 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")) 1460 .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b")) 1461 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1462 new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")) 1463 .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e")) 1464 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))) 1465 .get(); 1466 1467 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1468 .ifMatches(new FilterList( 1469 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1470 Bytes.toBytes("a")), 1471 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1472 Bytes.toBytes("b")))) 1473 .timeRange(TimeRange.between(0, 101)) 1474 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); 1475 1476 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1477 .ifMatches(new FilterList( 1478 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1479 Bytes.toBytes("d")), 1480 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1481 Bytes.toBytes("e")))) 1482 .timeRange(TimeRange.between(0, 100)) 1483 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); 1484 1485 List<CheckAndMutateResult> results = 1486 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1487 1488 assertTrue(results.get(0).isSuccess()); 1489 assertNull(results.get(0).getResult()); 1490 assertFalse(results.get(1).isSuccess()); 1491 assertNull(results.get(1).getResult()); 1492 1493 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1494 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1495 1496 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1497 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1498 } 1499 1500 @Test 1501 public void testCheckAndIncrementBatch() throws Throwable { 1502 AsyncTable<?> table = getTable.get(); 1503 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1504 1505 table.putAll(Arrays.asList( 1506 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY, 1507 Bytes.toBytes("B"), Bytes.toBytes(0L)), 1508 new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY, 1509 Bytes.toBytes("D"), Bytes.toBytes(0L)))) 1510 .get(); 1511 1512 // CheckAndIncrement with correct value 1513 CheckAndMutate checkAndMutate1 = 1514 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1515 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)); 1516 1517 // CheckAndIncrement with wrong value 1518 CheckAndMutate checkAndMutate2 = 1519 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d")) 1520 .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1)); 1521 1522 List<CheckAndMutateResult> results = 1523 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1524 1525 assertTrue(results.get(0).isSuccess()); 1526 assertEquals(1, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1527 assertFalse(results.get(1).isSuccess()); 1528 assertNull(results.get(1).getResult()); 1529 1530 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1531 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1532 1533 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1534 assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D")))); 1535 } 1536 1537 @Test 1538 public void testCheckAndAppendBatch() throws Throwable { 1539 AsyncTable<?> table = getTable.get(); 1540 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1541 1542 table.putAll(Arrays.asList( 1543 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY, 1544 Bytes.toBytes("B"), Bytes.toBytes("b")), 1545 new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY, 1546 Bytes.toBytes("D"), Bytes.toBytes("d")))) 1547 .get(); 1548 1549 // CheckAndAppend with correct value 1550 CheckAndMutate checkAndMutate1 = 1551 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1552 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 1553 1554 // CheckAndAppend with wrong value 1555 CheckAndMutate checkAndMutate2 = 1556 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d")) 1557 .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 1558 1559 List<CheckAndMutateResult> results = 1560 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1561 1562 assertTrue(results.get(0).isSuccess()); 1563 assertEquals("bb", 1564 Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1565 assertFalse(results.get(1).isSuccess()); 1566 assertNull(results.get(1).getResult()); 1567 1568 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1569 assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1570 1571 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1572 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1573 } 1574 1575 @Test 1576 public void testCheckAndRowMutationsBatch() throws Throwable { 1577 AsyncTable<?> table = getTable.get(); 1578 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1579 1580 table.putAll(Arrays.asList( 1581 new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1582 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L)) 1583 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 1584 new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")) 1585 .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L)) 1586 .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))) 1587 .get(); 1588 1589 // CheckAndIncrement with correct value 1590 CheckAndMutate checkAndMutate1 = 1591 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1592 .build(new RowMutations(row) 1593 .add(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1594 new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")), 1595 new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L), 1596 new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))))); 1597 1598 // CheckAndIncrement with wrong value 1599 CheckAndMutate checkAndMutate2 = 1600 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a")) 1601 .build(new RowMutations(row2).add( 1602 Arrays.asList(new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 1603 new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")), 1604 new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L), 1605 new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))))); 1606 1607 List<CheckAndMutateResult> results = 1608 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1609 1610 assertTrue(results.get(0).isSuccess()); 1611 assertEquals(2, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("C")))); 1612 assertEquals("dd", 1613 Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("D")))); 1614 1615 assertFalse(results.get(1).isSuccess()); 1616 assertNull(results.get(1).getResult()); 1617 1618 Result result = table.get(new Get(row)).get(); 1619 assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1620 assertNull(result.getValue(FAMILY, Bytes.toBytes("B"))); 1621 assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 1622 assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1623 1624 result = table.get(new Get(row2)).get(); 1625 assertNull(result.getValue(FAMILY, Bytes.toBytes("E"))); 1626 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1627 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G")))); 1628 assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H")))); 1629 } 1630 1631 @Test 1632 public void testDisabled() throws InterruptedException, ExecutionException { 1633 ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); 1634 try { 1635 getTable.get().get(new Get(row)).get(); 1636 fail("Should fail since table has been disabled"); 1637 } catch (ExecutionException e) { 1638 Throwable cause = e.getCause(); 1639 assertThat(cause, instanceOf(TableNotEnabledException.class)); 1640 assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); 1641 } 1642 } 1643 1644 @Test 1645 public void testInvalidMutation() throws Exception { 1646 Consumer<Mutation> executeMutation = mutation -> { 1647 if (mutation instanceof Put) { 1648 getTable.get().put((Put) mutation); 1649 } else if (mutation instanceof Increment) { 1650 getTable.get().increment((Increment) mutation); 1651 } else if (mutation instanceof Append) { 1652 getTable.get().append((Append) mutation); 1653 } 1654 }; 1655 1656 Mutation[] emptyMutations = 1657 { new Put(Bytes.toBytes(0)), new Increment(Bytes.toBytes(0)), new Append(Bytes.toBytes(0)) }; 1658 1659 String[] emptyMessages = 1660 { "No columns to put", "No columns to increment", "No columns to append" }; 1661 1662 Mutation[] oversizedMutations = 1663 { new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]), 1664 new Increment(Bytes.toBytes(0)).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 1665 new Append(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) }; 1666 1667 for (int i = 0; i < emptyMutations.length; i++) { 1668 // Test empty mutation 1669 try { 1670 executeMutation.accept(emptyMutations[i]); 1671 fail("Should fail since the mutation does not contain any cells"); 1672 } catch (IllegalArgumentException e) { 1673 assertThat(e.getMessage(), containsString(emptyMessages[i])); 1674 } 1675 1676 // Test oversized mutation 1677 try { 1678 executeMutation.accept(oversizedMutations[i]); 1679 fail("Should fail since the mutation exceeds the max key value size"); 1680 } catch (IllegalArgumentException e) { 1681 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1682 } 1683 } 1684 } 1685 1686 @Test 1687 public void testInvalidMutationInRowMutations() throws IOException { 1688 final byte[] row = Bytes.toBytes(0); 1689 1690 Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) }; 1691 1692 String[] emptyMessages = 1693 { "No columns to put", "No columns to increment", "No columns to append" }; 1694 1695 Mutation[] oversizedMutations = 1696 { new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]), 1697 new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 1698 new Append(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) }; 1699 1700 for (int i = 0; i < emptyMutations.length; i++) { 1701 // Test empty mutation 1702 try { 1703 getTable.get().mutateRow(new RowMutations(row).add(emptyMutations[i])); 1704 fail("Should fail since the mutation does not contain any cells"); 1705 } catch (IllegalArgumentException e) { 1706 assertThat(e.getMessage(), containsString(emptyMessages[i])); 1707 } 1708 1709 // Test oversized mutation 1710 try { 1711 getTable.get().mutateRow(new RowMutations(row).add(oversizedMutations[i])); 1712 fail("Should fail since the mutation exceeds the max key value size"); 1713 } catch (IllegalArgumentException e) { 1714 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1715 } 1716 } 1717 } 1718 1719 @Test 1720 public void testInvalidMutationInRowMutationsInCheckAndMutate() throws IOException { 1721 final byte[] row = Bytes.toBytes(0); 1722 1723 Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) }; 1724 1725 String[] emptyMessages = 1726 { "No columns to put", "No columns to increment", "No columns to append" }; 1727 1728 Mutation[] oversizedMutations = 1729 { new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]), 1730 new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 1731 new Append(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) }; 1732 1733 for (int i = 0; i < emptyMutations.length; i++) { 1734 // Test empty mutation 1735 try { 1736 getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER) 1737 .build(new RowMutations(row).add(emptyMutations[i]))); 1738 fail("Should fail since the mutation does not contain any cells"); 1739 } catch (IllegalArgumentException e) { 1740 assertThat(e.getMessage(), containsString(emptyMessages[i])); 1741 } 1742 1743 // Test oversized mutation 1744 try { 1745 getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER) 1746 .build(new RowMutations(row).add(oversizedMutations[i]))); 1747 fail("Should fail since the mutation exceeds the max key value size"); 1748 } catch (IllegalArgumentException e) { 1749 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1750 } 1751 } 1752 } 1753}