001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertThat; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.Arrays; 033import java.util.List; 034import java.util.concurrent.ArrayBlockingQueue; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.CountDownLatch; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ForkJoinPool; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicLong; 041import java.util.function.Supplier; 042import java.util.stream.IntStream; 043import org.apache.commons.io.IOUtils; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.TableNotEnabledException; 048import org.apache.hadoop.hbase.io.TimeRange; 049import org.apache.hadoop.hbase.testclassification.ClientTests; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.Pair; 053import org.junit.AfterClass; 054import org.junit.Before; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061import org.junit.runner.RunWith; 062import org.junit.runners.Parameterized; 063import org.junit.runners.Parameterized.Parameter; 064import org.junit.runners.Parameterized.Parameters; 065 066@RunWith(Parameterized.class) 067@Category({ MediumTests.class, ClientTests.class }) 068public class TestAsyncTable { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestAsyncTable.class); 073 074 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 075 076 private static TableName TABLE_NAME = TableName.valueOf("async"); 077 078 private static byte[] FAMILY = Bytes.toBytes("cf"); 079 080 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 081 082 private static byte[] VALUE = Bytes.toBytes("value"); 083 084 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 085 086 private static AsyncConnection ASYNC_CONN; 087 088 @Rule 089 public TestName testName = new TestName(); 090 091 private byte[] row; 092 093 @Parameter 094 public Supplier<AsyncTable<?>> getTable; 095 096 private static AsyncTable<?> getRawTable() { 097 return ASYNC_CONN.getTable(TABLE_NAME); 098 } 099 100 private static AsyncTable<?> getTable() { 101 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 102 } 103 104 @Parameters 105 public static List<Object[]> params() { 106 return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable }, 107 new Supplier<?>[] { TestAsyncTable::getTable }); 108 } 109 110 @BeforeClass 111 public static void setUpBeforeClass() throws Exception { 112 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 113 MAX_KEY_VALUE_SIZE); 114 TEST_UTIL.startMiniCluster(1); 115 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 116 TEST_UTIL.waitTableAvailable(TABLE_NAME); 117 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 118 assertFalse(ASYNC_CONN.isClosed()); 119 } 120 121 @AfterClass 122 public static void tearDownAfterClass() throws Exception { 123 IOUtils.closeQuietly(ASYNC_CONN); 124 assertTrue(ASYNC_CONN.isClosed()); 125 TEST_UTIL.shutdownMiniCluster(); 126 } 127 128 @Before 129 public void setUp() throws IOException, InterruptedException, ExecutionException { 130 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); 131 if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) { 132 ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get(); 133 } 134 } 135 136 @Test 137 public void testSimple() throws Exception { 138 AsyncTable<?> table = getTable.get(); 139 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 140 assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 141 Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 142 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 143 table.delete(new Delete(row)).get(); 144 result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 145 assertTrue(result.isEmpty()); 146 assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 147 } 148 149 private byte[] concat(byte[] base, int index) { 150 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 151 } 152 153 @SuppressWarnings("FutureReturnValueIgnored") 154 @Test 155 public void testSimpleMultiple() throws Exception { 156 AsyncTable<?> table = getTable.get(); 157 int count = 100; 158 CountDownLatch putLatch = new CountDownLatch(count); 159 IntStream.range(0, count).forEach( 160 i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) 161 .thenAccept(x -> putLatch.countDown())); 162 putLatch.await(); 163 BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count); 164 IntStream.range(0, count) 165 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 166 .thenAccept(x -> existsResp.add(x))); 167 for (int i = 0; i < count; i++) { 168 assertTrue(existsResp.take()); 169 } 170 BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count); 171 IntStream.range(0, count) 172 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 173 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 174 for (int i = 0; i < count; i++) { 175 Pair<Integer, Result> pair = getResp.take(); 176 assertArrayEquals(concat(VALUE, pair.getFirst()), 177 pair.getSecond().getValue(FAMILY, QUALIFIER)); 178 } 179 CountDownLatch deleteLatch = new CountDownLatch(count); 180 IntStream.range(0, count).forEach( 181 i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); 182 deleteLatch.await(); 183 IntStream.range(0, count) 184 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 185 .thenAccept(x -> existsResp.add(x))); 186 for (int i = 0; i < count; i++) { 187 assertFalse(existsResp.take()); 188 } 189 IntStream.range(0, count) 190 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 191 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 192 for (int i = 0; i < count; i++) { 193 Pair<Integer, Result> pair = getResp.take(); 194 assertTrue(pair.getSecond().isEmpty()); 195 } 196 } 197 198 @SuppressWarnings("FutureReturnValueIgnored") 199 @Test 200 public void testIncrement() throws InterruptedException, ExecutionException { 201 AsyncTable<?> table = getTable.get(); 202 int count = 100; 203 CountDownLatch latch = new CountDownLatch(count); 204 AtomicLong sum = new AtomicLong(0L); 205 IntStream.range(0, count) 206 .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { 207 sum.addAndGet(x); 208 latch.countDown(); 209 })); 210 latch.await(); 211 assertEquals(count, Bytes.toLong( 212 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); 213 assertEquals((1 + count) * count / 2, sum.get()); 214 } 215 216 @SuppressWarnings("FutureReturnValueIgnored") 217 @Test 218 public void testAppend() throws InterruptedException, ExecutionException { 219 AsyncTable<?> table = getTable.get(); 220 int count = 10; 221 CountDownLatch latch = new CountDownLatch(count); 222 char suffix = ':'; 223 AtomicLong suffixCount = new AtomicLong(0L); 224 IntStream.range(0, count) 225 .forEachOrdered(i -> table 226 .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) 227 .thenAccept(r -> { 228 suffixCount.addAndGet( 229 Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count()); 230 latch.countDown(); 231 })); 232 latch.await(); 233 assertEquals((1 + count) * count / 2, suffixCount.get()); 234 String value = Bytes.toString( 235 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); 236 int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) 237 .sorted().toArray(); 238 assertArrayEquals(IntStream.range(0, count).toArray(), actual); 239 } 240 241 @SuppressWarnings("FutureReturnValueIgnored") 242 @Test 243 public void testCheckAndPut() throws InterruptedException, ExecutionException { 244 AsyncTable<?> table = getTable.get(); 245 AtomicInteger successCount = new AtomicInteger(0); 246 AtomicInteger successIndex = new AtomicInteger(-1); 247 int count = 10; 248 CountDownLatch latch = new CountDownLatch(count); 249 IntStream.range(0, count) 250 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() 251 .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { 252 if (x) { 253 successCount.incrementAndGet(); 254 successIndex.set(i); 255 } 256 latch.countDown(); 257 })); 258 latch.await(); 259 assertEquals(1, successCount.get()); 260 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 261 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 262 } 263 264 @SuppressWarnings("FutureReturnValueIgnored") 265 @Test 266 public void testCheckAndDelete() throws InterruptedException, ExecutionException { 267 AsyncTable<?> table = getTable.get(); 268 int count = 10; 269 CountDownLatch putLatch = new CountDownLatch(count + 1); 270 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 271 IntStream.range(0, count) 272 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 273 .thenRun(() -> putLatch.countDown())); 274 putLatch.await(); 275 276 AtomicInteger successCount = new AtomicInteger(0); 277 AtomicInteger successIndex = new AtomicInteger(-1); 278 CountDownLatch deleteLatch = new CountDownLatch(count); 279 IntStream.range(0, count) 280 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) 281 .thenDelete( 282 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) 283 .thenAccept(x -> { 284 if (x) { 285 successCount.incrementAndGet(); 286 successIndex.set(i); 287 } 288 deleteLatch.countDown(); 289 })); 290 deleteLatch.await(); 291 assertEquals(1, successCount.get()); 292 Result result = table.get(new Get(row)).get(); 293 IntStream.range(0, count).forEach(i -> { 294 if (i == successIndex.get()) { 295 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 296 } else { 297 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 298 } 299 }); 300 } 301 302 @Test 303 public void testMutateRow() throws InterruptedException, ExecutionException, IOException { 304 AsyncTable<?> table = getTable.get(); 305 RowMutations mutation = new RowMutations(row); 306 mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); 307 table.mutateRow(mutation).get(); 308 Result result = table.get(new Get(row)).get(); 309 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); 310 311 mutation = new RowMutations(row); 312 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); 313 mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); 314 table.mutateRow(mutation).get(); 315 result = table.get(new Get(row)).get(); 316 assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); 317 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); 318 } 319 320 @SuppressWarnings("FutureReturnValueIgnored") 321 @Test 322 public void testCheckAndMutate() throws InterruptedException, ExecutionException { 323 AsyncTable<?> table = getTable.get(); 324 int count = 10; 325 CountDownLatch putLatch = new CountDownLatch(count + 1); 326 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 327 IntStream.range(0, count) 328 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 329 .thenRun(() -> putLatch.countDown())); 330 putLatch.await(); 331 332 AtomicInteger successCount = new AtomicInteger(0); 333 AtomicInteger successIndex = new AtomicInteger(-1); 334 CountDownLatch mutateLatch = new CountDownLatch(count); 335 IntStream.range(0, count).forEach(i -> { 336 RowMutations mutation = new RowMutations(row); 337 try { 338 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 339 mutation 340 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 341 } catch (IOException e) { 342 throw new UncheckedIOException(e); 343 } 344 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) 345 .thenAccept(x -> { 346 if (x) { 347 successCount.incrementAndGet(); 348 successIndex.set(i); 349 } 350 mutateLatch.countDown(); 351 }); 352 }); 353 mutateLatch.await(); 354 assertEquals(1, successCount.get()); 355 Result result = table.get(new Get(row)).get(); 356 IntStream.range(0, count).forEach(i -> { 357 if (i == successIndex.get()) { 358 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 359 } else { 360 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 361 } 362 }); 363 } 364 365 @Test 366 public void testCheckAndMutateWithTimeRange() throws Exception { 367 AsyncTable<?> table = getTable.get(); 368 final long ts = System.currentTimeMillis() / 2; 369 Put put = new Put(row); 370 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 371 372 boolean ok = 373 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get(); 374 assertTrue(ok); 375 376 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 377 .ifEquals(VALUE).thenPut(put).get(); 378 assertFalse(ok); 379 380 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 381 .ifEquals(VALUE).thenPut(put).get(); 382 assertTrue(ok); 383 384 RowMutations rm = new RowMutations(row).add((Mutation) put); 385 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 386 .ifEquals(VALUE).thenMutate(rm).get(); 387 assertFalse(ok); 388 389 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 390 .ifEquals(VALUE).thenMutate(rm).get(); 391 assertTrue(ok); 392 393 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 394 395 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 396 .ifEquals(VALUE).thenDelete(delete).get(); 397 assertFalse(ok); 398 399 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 400 .ifEquals(VALUE).thenDelete(delete).get(); 401 assertTrue(ok); 402 } 403 404 @Test 405 public void testDisabled() throws InterruptedException, ExecutionException { 406 ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); 407 try { 408 getTable.get().get(new Get(row)).get(); 409 fail("Should fail since table has been disabled"); 410 } catch (ExecutionException e) { 411 Throwable cause = e.getCause(); 412 assertThat(cause, instanceOf(TableNotEnabledException.class)); 413 assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); 414 } 415 } 416 417 @Test 418 public void testInvalidPut() { 419 try { 420 getTable.get().put(new Put(Bytes.toBytes(0))); 421 fail("Should fail since the put does not contain any cells"); 422 } catch (IllegalArgumentException e) { 423 assertThat(e.getMessage(), containsString("No columns to insert")); 424 } 425 426 try { 427 getTable.get() 428 .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])); 429 fail("Should fail since the put exceeds the max key value size"); 430 } catch (IllegalArgumentException e) { 431 assertThat(e.getMessage(), containsString("KeyValue size too large")); 432 } 433 } 434}