001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertThat; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.io.UncheckedIOException; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.Optional; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.ExecutionException; 037import java.util.concurrent.ForkJoinPool; 038import java.util.concurrent.Future; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.TimeoutException; 041import java.util.function.Function; 042import java.util.stream.Collectors; 043import java.util.stream.IntStream; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtility; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.coprocessor.ObserverContext; 049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 051import org.apache.hadoop.hbase.coprocessor.RegionObserver; 052import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 053import org.apache.hadoop.hbase.testclassification.ClientTests; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.junit.After; 057import org.junit.AfterClass; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.runner.RunWith; 064import org.junit.runners.Parameterized; 065import org.junit.runners.Parameterized.Parameter; 066import org.junit.runners.Parameterized.Parameters; 067 068@RunWith(Parameterized.class) 069@Category({ LargeTests.class, ClientTests.class }) 070public class TestAsyncTableBatch { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestAsyncTableBatch.class); 075 076 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 077 078 private static TableName TABLE_NAME = TableName.valueOf("async"); 079 080 private static byte[] FAMILY = Bytes.toBytes("cf"); 081 082 private static byte[] CQ = Bytes.toBytes("cq"); 083 private static byte[] CQ1 = Bytes.toBytes("cq1"); 084 085 private static int COUNT = 1000; 086 087 private static AsyncConnection CONN; 088 089 private static byte[][] SPLIT_KEYS; 090 091 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 092 093 @Parameter(0) 094 public String tableType; 095 096 @Parameter(1) 097 public Function<TableName, AsyncTable<?>> tableGetter; 098 099 private static AsyncTable<?> getRawTable(TableName tableName) { 100 return CONN.getTable(tableName); 101 } 102 103 private static AsyncTable<?> getTable(TableName tableName) { 104 return CONN.getTable(tableName, ForkJoinPool.commonPool()); 105 } 106 107 @Parameters(name = "{index}: type={0}") 108 public static List<Object[]> params() { 109 Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable; 110 Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable; 111 return Arrays.asList(new Object[] { "raw", rawTableGetter }, 112 new Object[] { "normal", tableGetter }); 113 } 114 115 @BeforeClass 116 public static void setUp() throws Exception { 117 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 118 MAX_KEY_VALUE_SIZE); 119 TEST_UTIL.startMiniCluster(3); 120 SPLIT_KEYS = new byte[8][]; 121 for (int i = 111; i < 999; i += 111) { 122 SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 123 } 124 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 125 } 126 127 @AfterClass 128 public static void tearDown() throws Exception { 129 CONN.close(); 130 TEST_UTIL.shutdownMiniCluster(); 131 } 132 133 @Before 134 public void setUpBeforeTest() throws IOException, InterruptedException { 135 TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); 136 TEST_UTIL.waitTableAvailable(TABLE_NAME); 137 } 138 139 @After 140 public void tearDownAfterTest() throws IOException { 141 Admin admin = TEST_UTIL.getAdmin(); 142 if (admin.isTableEnabled(TABLE_NAME)) { 143 admin.disableTable(TABLE_NAME); 144 } 145 admin.deleteTable(TABLE_NAME); 146 } 147 148 private byte[] getRow(int i) { 149 return Bytes.toBytes(String.format("%03d", i)); 150 } 151 152 @Test 153 public void test() 154 throws InterruptedException, ExecutionException, IOException, TimeoutException { 155 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 156 table.putAll(IntStream.range(0, COUNT) 157 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 158 .collect(Collectors.toList())).get(); 159 List<Result> results = table.getAll(IntStream.range(0, COUNT) 160 .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4)))) 161 .flatMap(l -> l.stream()).collect(Collectors.toList())).get(); 162 assertEquals(2 * COUNT, results.size()); 163 for (int i = 0; i < COUNT; i++) { 164 assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ))); 165 assertTrue(results.get(2 * i + 1).isEmpty()); 166 } 167 Admin admin = TEST_UTIL.getAdmin(); 168 admin.flush(TABLE_NAME); 169 List<Future<?>> splitFutures = 170 TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> { 171 byte[] startKey = r.getRegionInfo().getStartKey(); 172 int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey)); 173 byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55)); 174 try { 175 return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint); 176 } catch (IOException e) { 177 throw new UncheckedIOException(e); 178 } 179 }).collect(Collectors.toList()); 180 for (Future<?> future : splitFutures) { 181 future.get(30, TimeUnit.SECONDS); 182 } 183 table.deleteAll( 184 IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList())) 185 .get(); 186 results = table 187 .getAll( 188 IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 189 .get(); 190 assertEquals(COUNT, results.size()); 191 results.forEach(r -> assertTrue(r.isEmpty())); 192 } 193 194 @Test 195 public void testWithRegionServerFailover() throws Exception { 196 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 197 table.putAll(IntStream.range(0, COUNT) 198 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 199 .collect(Collectors.toList())).get(); 200 TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests"); 201 Thread.sleep(100); 202 table.putAll(IntStream.range(COUNT, 2 * COUNT) 203 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 204 .collect(Collectors.toList())).get(); 205 List<Result> results = table.getAll( 206 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 207 .get(); 208 assertEquals(2 * COUNT, results.size()); 209 results.forEach(r -> assertFalse(r.isEmpty())); 210 table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i))) 211 .collect(Collectors.toList())).get(); 212 results = table.getAll( 213 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 214 .get(); 215 assertEquals(2 * COUNT, results.size()); 216 results.forEach(r -> assertTrue(r.isEmpty())); 217 } 218 219 @Test 220 public void testMixed() throws InterruptedException, ExecutionException, IOException { 221 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 222 table.putAll(IntStream.range(0, 7) 223 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) 224 .collect(Collectors.toList())).get(); 225 List<Row> actions = new ArrayList<>(); 226 actions.add(new Get(Bytes.toBytes(0))); 227 actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L))); 228 actions.add(new Delete(Bytes.toBytes(2))); 229 actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); 230 actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4))); 231 RowMutations rm = new RowMutations(Bytes.toBytes(5)); 232 rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L))); 233 rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L))); 234 actions.add(rm); 235 actions.add(new Get(Bytes.toBytes(6))); 236 237 List<Object> results = table.batchAll(actions).get(); 238 assertEquals(7, results.size()); 239 Result getResult = (Result) results.get(0); 240 assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 241 assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ))); 242 assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty()); 243 Result incrementResult = (Result) results.get(3); 244 assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ))); 245 Result appendResult = (Result) results.get(4); 246 byte[] appendValue = appendResult.getValue(FAMILY, CQ); 247 assertEquals(12, appendValue.length); 248 assertEquals(4, Bytes.toLong(appendValue)); 249 assertEquals(4, Bytes.toInt(appendValue, 8)); 250 assertEquals(100, 251 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ))); 252 assertEquals(200, 253 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1))); 254 getResult = (Result) results.get(6); 255 assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 256 } 257 258 public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver { 259 260 @Override 261 public Optional<RegionObserver> getRegionObserver() { 262 return Optional.of(this); 263 } 264 265 @Override 266 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 267 List<Cell> results) throws IOException { 268 if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) { 269 throw new DoNotRetryRegionException("Inject Error"); 270 } 271 } 272 } 273 274 @Test 275 public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException { 276 Admin admin = TEST_UTIL.getAdmin(); 277 TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME)) 278 .setCoprocessor(ErrorInjectObserver.class.getName()).build(); 279 admin.modifyTable(htd); 280 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 281 table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k)) 282 .collect(Collectors.toList())).get(); 283 List<CompletableFuture<Result>> futures = table 284 .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList())); 285 for (int i = 0; i < SPLIT_KEYS.length - 1; i++) { 286 assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ)); 287 } 288 try { 289 futures.get(SPLIT_KEYS.length - 1).get(); 290 fail(); 291 } catch (ExecutionException e) { 292 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 293 } 294 } 295 296 @Test 297 public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException { 298 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 299 List<CompletableFuture<Object>> futures = table.batch(Arrays.asList( 300 new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ, 301 Bytes.toBytes("bad")), 302 new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1), 303 new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good")))); 304 try { 305 futures.get(0).get(); 306 fail(); 307 } catch (ExecutionException e) { 308 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 309 assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class)); 310 } 311 assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ))); 312 assertTrue(((Result) futures.get(2).get()).isEmpty()); 313 assertEquals("good", 314 Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ))); 315 } 316 317 @Test 318 public void testInvalidPut() { 319 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 320 try { 321 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), new Put(Bytes.toBytes(0)))); 322 fail("Should fail since the put does not contain any cells"); 323 } catch (IllegalArgumentException e) { 324 assertThat(e.getMessage(), containsString("No columns to insert")); 325 } 326 327 try { 328 table.batch( 329 Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), 330 new Delete(Bytes.toBytes(0)))); 331 fail("Should fail since the put exceeds the max key value size"); 332 } catch (IllegalArgumentException e) { 333 assertThat(e.getMessage(), containsString("KeyValue size too large")); 334 } 335 } 336}