001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertThat; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.Arrays; 033import java.util.List; 034import java.util.concurrent.ArrayBlockingQueue; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.CountDownLatch; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ForkJoinPool; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicLong; 041import java.util.function.Supplier; 042import java.util.stream.IntStream; 043import org.apache.commons.io.IOUtils; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.TableNotEnabledException; 048import org.apache.hadoop.hbase.io.TimeRange; 049import org.apache.hadoop.hbase.testclassification.ClientTests; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.Pair; 053import org.junit.AfterClass; 054import org.junit.Before; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061import org.junit.runner.RunWith; 062import org.junit.runners.Parameterized; 063import org.junit.runners.Parameterized.Parameter; 064import org.junit.runners.Parameterized.Parameters; 065 066@RunWith(Parameterized.class) 067@Category({ MediumTests.class, ClientTests.class }) 068public class TestAsyncTable { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestAsyncTable.class); 073 074 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 075 076 private static TableName TABLE_NAME = TableName.valueOf("async"); 077 078 private static byte[] FAMILY = Bytes.toBytes("cf"); 079 080 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 081 082 private static byte[] VALUE = Bytes.toBytes("value"); 083 084 private static AsyncConnection ASYNC_CONN; 085 086 @Rule 087 public TestName testName = new TestName(); 088 089 private byte[] row; 090 091 @Parameter 092 public Supplier<AsyncTable<?>> getTable; 093 094 private static AsyncTable<?> getRawTable() { 095 return ASYNC_CONN.getTable(TABLE_NAME); 096 } 097 098 private static AsyncTable<?> getTable() { 099 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 100 } 101 102 @Parameters 103 public static List<Object[]> params() { 104 return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable }, 105 new Supplier<?>[] { TestAsyncTable::getTable }); 106 } 107 108 @BeforeClass 109 public static void setUpBeforeClass() throws Exception { 110 TEST_UTIL.startMiniCluster(1); 111 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 112 TEST_UTIL.waitTableAvailable(TABLE_NAME); 113 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 114 } 115 116 @AfterClass 117 public static void tearDownAfterClass() throws Exception { 118 IOUtils.closeQuietly(ASYNC_CONN); 119 TEST_UTIL.shutdownMiniCluster(); 120 } 121 122 @Before 123 public void setUp() throws IOException, InterruptedException, ExecutionException { 124 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); 125 if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) { 126 ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get(); 127 } 128 } 129 130 @Test 131 public void testSimple() throws Exception { 132 AsyncTable<?> table = getTable.get(); 133 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 134 assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 135 Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 136 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 137 table.delete(new Delete(row)).get(); 138 result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 139 assertTrue(result.isEmpty()); 140 assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 141 } 142 143 private byte[] concat(byte[] base, int index) { 144 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 145 } 146 147 @Test 148 public void testSimpleMultiple() throws Exception { 149 AsyncTable<?> table = getTable.get(); 150 int count = 100; 151 CountDownLatch putLatch = new CountDownLatch(count); 152 IntStream.range(0, count).forEach( 153 i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) 154 .thenAccept(x -> putLatch.countDown())); 155 putLatch.await(); 156 BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count); 157 IntStream.range(0, count) 158 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 159 .thenAccept(x -> existsResp.add(x))); 160 for (int i = 0; i < count; i++) { 161 assertTrue(existsResp.take()); 162 } 163 BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count); 164 IntStream.range(0, count) 165 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 166 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 167 for (int i = 0; i < count; i++) { 168 Pair<Integer, Result> pair = getResp.take(); 169 assertArrayEquals(concat(VALUE, pair.getFirst()), 170 pair.getSecond().getValue(FAMILY, QUALIFIER)); 171 } 172 CountDownLatch deleteLatch = new CountDownLatch(count); 173 IntStream.range(0, count).forEach( 174 i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); 175 deleteLatch.await(); 176 IntStream.range(0, count) 177 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 178 .thenAccept(x -> existsResp.add(x))); 179 for (int i = 0; i < count; i++) { 180 assertFalse(existsResp.take()); 181 } 182 IntStream.range(0, count) 183 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 184 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 185 for (int i = 0; i < count; i++) { 186 Pair<Integer, Result> pair = getResp.take(); 187 assertTrue(pair.getSecond().isEmpty()); 188 } 189 } 190 191 @Test 192 public void testIncrement() throws InterruptedException, ExecutionException { 193 AsyncTable<?> table = getTable.get(); 194 int count = 100; 195 CountDownLatch latch = new CountDownLatch(count); 196 AtomicLong sum = new AtomicLong(0L); 197 IntStream.range(0, count) 198 .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { 199 sum.addAndGet(x); 200 latch.countDown(); 201 })); 202 latch.await(); 203 assertEquals(count, Bytes.toLong( 204 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); 205 assertEquals((1 + count) * count / 2, sum.get()); 206 } 207 208 @Test 209 public void testAppend() throws InterruptedException, ExecutionException { 210 AsyncTable<?> table = getTable.get(); 211 int count = 10; 212 CountDownLatch latch = new CountDownLatch(count); 213 char suffix = ':'; 214 AtomicLong suffixCount = new AtomicLong(0L); 215 IntStream.range(0, count).forEachOrdered( 216 i -> table.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) 217 .thenAccept(r -> { 218 suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars() 219 .filter(x -> x == suffix).count()); 220 latch.countDown(); 221 })); 222 latch.await(); 223 assertEquals((1 + count) * count / 2, suffixCount.get()); 224 String value = Bytes.toString( 225 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); 226 int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) 227 .sorted().toArray(); 228 assertArrayEquals(IntStream.range(0, count).toArray(), actual); 229 } 230 231 @Test 232 public void testCheckAndPut() throws InterruptedException, ExecutionException { 233 AsyncTable<?> table = getTable.get(); 234 AtomicInteger successCount = new AtomicInteger(0); 235 AtomicInteger successIndex = new AtomicInteger(-1); 236 int count = 10; 237 CountDownLatch latch = new CountDownLatch(count); 238 IntStream.range(0, count) 239 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() 240 .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { 241 if (x) { 242 successCount.incrementAndGet(); 243 successIndex.set(i); 244 } 245 latch.countDown(); 246 })); 247 latch.await(); 248 assertEquals(1, successCount.get()); 249 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 250 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 251 } 252 253 @Test 254 public void testCheckAndDelete() throws InterruptedException, ExecutionException { 255 AsyncTable<?> table = getTable.get(); 256 int count = 10; 257 CountDownLatch putLatch = new CountDownLatch(count + 1); 258 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 259 IntStream.range(0, count) 260 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 261 .thenRun(() -> putLatch.countDown())); 262 putLatch.await(); 263 264 AtomicInteger successCount = new AtomicInteger(0); 265 AtomicInteger successIndex = new AtomicInteger(-1); 266 CountDownLatch deleteLatch = new CountDownLatch(count); 267 IntStream.range(0, count) 268 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) 269 .thenDelete( 270 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) 271 .thenAccept(x -> { 272 if (x) { 273 successCount.incrementAndGet(); 274 successIndex.set(i); 275 } 276 deleteLatch.countDown(); 277 })); 278 deleteLatch.await(); 279 assertEquals(1, successCount.get()); 280 Result result = table.get(new Get(row)).get(); 281 IntStream.range(0, count).forEach(i -> { 282 if (i == successIndex.get()) { 283 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 284 } else { 285 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 286 } 287 }); 288 } 289 290 @Test 291 public void testMutateRow() throws InterruptedException, ExecutionException, IOException { 292 AsyncTable<?> table = getTable.get(); 293 RowMutations mutation = new RowMutations(row); 294 mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); 295 table.mutateRow(mutation).get(); 296 Result result = table.get(new Get(row)).get(); 297 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); 298 299 mutation = new RowMutations(row); 300 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); 301 mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); 302 table.mutateRow(mutation).get(); 303 result = table.get(new Get(row)).get(); 304 assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); 305 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); 306 } 307 308 @Test 309 public void testCheckAndMutate() throws InterruptedException, ExecutionException { 310 AsyncTable<?> table = getTable.get(); 311 int count = 10; 312 CountDownLatch putLatch = new CountDownLatch(count + 1); 313 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 314 IntStream.range(0, count) 315 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 316 .thenRun(() -> putLatch.countDown())); 317 putLatch.await(); 318 319 AtomicInteger successCount = new AtomicInteger(0); 320 AtomicInteger successIndex = new AtomicInteger(-1); 321 CountDownLatch mutateLatch = new CountDownLatch(count); 322 IntStream.range(0, count).forEach(i -> { 323 RowMutations mutation = new RowMutations(row); 324 try { 325 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 326 mutation 327 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 328 } catch (IOException e) { 329 throw new UncheckedIOException(e); 330 } 331 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) 332 .thenAccept(x -> { 333 if (x) { 334 successCount.incrementAndGet(); 335 successIndex.set(i); 336 } 337 mutateLatch.countDown(); 338 }); 339 }); 340 mutateLatch.await(); 341 assertEquals(1, successCount.get()); 342 Result result = table.get(new Get(row)).get(); 343 IntStream.range(0, count).forEach(i -> { 344 if (i == successIndex.get()) { 345 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 346 } else { 347 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 348 } 349 }); 350 } 351 352 @Test 353 public void testCheckAndMutateWithTimeRange() throws Exception { 354 AsyncTable<?> table = getTable.get(); 355 final long ts = System.currentTimeMillis() / 2; 356 Put put = new Put(row); 357 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 358 359 boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) 360 .ifNotExists() 361 .thenPut(put) 362 .get(); 363 assertTrue(ok); 364 365 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) 366 .timeRange(TimeRange.at(ts + 10000)) 367 .ifEquals(VALUE) 368 .thenPut(put) 369 .get(); 370 assertFalse(ok); 371 372 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) 373 .timeRange(TimeRange.at(ts)) 374 .ifEquals(VALUE) 375 .thenPut(put) 376 .get(); 377 assertTrue(ok); 378 379 RowMutations rm = new RowMutations(row) 380 .add((Mutation) put); 381 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) 382 .timeRange(TimeRange.at(ts + 10000)) 383 .ifEquals(VALUE) 384 .thenMutate(rm) 385 .get(); 386 assertFalse(ok); 387 388 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) 389 .timeRange(TimeRange.at(ts)) 390 .ifEquals(VALUE) 391 .thenMutate(rm) 392 .get(); 393 assertTrue(ok); 394 395 Delete delete = new Delete(row) 396 .addColumn(FAMILY, QUALIFIER); 397 398 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) 399 .timeRange(TimeRange.at(ts + 10000)) 400 .ifEquals(VALUE) 401 .thenDelete(delete) 402 .get(); 403 assertFalse(ok); 404 405 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) 406 .timeRange(TimeRange.at(ts)) 407 .ifEquals(VALUE) 408 .thenDelete(delete) 409 .get(); 410 assertTrue(ok); 411 } 412 413 @Test 414 public void testDisabled() throws InterruptedException, ExecutionException { 415 ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); 416 try { 417 getTable.get().get(new Get(row)).get(); 418 fail("Should fail since table has been disabled"); 419 } catch (ExecutionException e) { 420 Throwable cause = e.getCause(); 421 assertThat(cause, instanceOf(TableNotEnabledException.class)); 422 assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); 423 } 424 } 425}