001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertThat; 025import static org.junit.Assert.assertTrue; 026 027import java.io.IOException; 028import java.io.UncheckedIOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.Optional; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.ForkJoinPool; 036import java.util.concurrent.Future; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.TimeoutException; 039import java.util.function.Function; 040import java.util.stream.Collectors; 041import java.util.stream.IntStream; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.RegionObserver; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.After; 054import org.junit.AfterClass; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.runner.RunWith; 061import org.junit.runners.Parameterized; 062import org.junit.runners.Parameterized.Parameter; 063import org.junit.runners.Parameterized.Parameters; 064 065@RunWith(Parameterized.class) 066@Category({ LargeTests.class, ClientTests.class }) 067public class TestAsyncTableBatch { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestAsyncTableBatch.class); 072 073 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 074 075 private static TableName TABLE_NAME = TableName.valueOf("async"); 076 077 private static byte[] FAMILY = Bytes.toBytes("cf"); 078 079 private static byte[] CQ = Bytes.toBytes("cq"); 080 private static byte[] CQ1 = Bytes.toBytes("cq1"); 081 082 private static int COUNT = 1000; 083 084 private static AsyncConnection CONN; 085 086 private static byte[][] SPLIT_KEYS; 087 088 @Parameter(0) 089 public String tableType; 090 091 @Parameter(1) 092 public Function<TableName, AsyncTable<?>> tableGetter; 093 094 private static AsyncTable<?> getRawTable(TableName tableName) { 095 return CONN.getTable(tableName); 096 } 097 098 private static AsyncTable<?> getTable(TableName tableName) { 099 return CONN.getTable(tableName, ForkJoinPool.commonPool()); 100 } 101 102 @Parameters(name = "{index}: type={0}") 103 public static List<Object[]> params() { 104 Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable; 105 Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable; 106 return Arrays.asList(new Object[] { "raw", rawTableGetter }, 107 new Object[] { "normal", tableGetter }); 108 } 109 110 @BeforeClass 111 public static void setUp() throws Exception { 112 TEST_UTIL.startMiniCluster(3); 113 SPLIT_KEYS = new byte[8][]; 114 for (int i = 111; i < 999; i += 111) { 115 SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 116 } 117 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 118 } 119 120 @AfterClass 121 public static void tearDown() throws Exception { 122 CONN.close(); 123 TEST_UTIL.shutdownMiniCluster(); 124 } 125 126 @Before 127 public void setUpBeforeTest() throws IOException, InterruptedException { 128 TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); 129 TEST_UTIL.waitTableAvailable(TABLE_NAME); 130 } 131 132 @After 133 public void tearDownAfterTest() throws IOException { 134 Admin admin = TEST_UTIL.getAdmin(); 135 if (admin.isTableEnabled(TABLE_NAME)) { 136 admin.disableTable(TABLE_NAME); 137 } 138 admin.deleteTable(TABLE_NAME); 139 } 140 141 private byte[] getRow(int i) { 142 return Bytes.toBytes(String.format("%03d", i)); 143 } 144 145 @Test 146 public void test() 147 throws InterruptedException, ExecutionException, IOException, TimeoutException { 148 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 149 table.putAll(IntStream.range(0, COUNT) 150 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 151 .collect(Collectors.toList())).get(); 152 List<Result> results = table.getAll(IntStream.range(0, COUNT) 153 .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4)))) 154 .flatMap(l -> l.stream()).collect(Collectors.toList())).get(); 155 assertEquals(2 * COUNT, results.size()); 156 for (int i = 0; i < COUNT; i++) { 157 assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ))); 158 assertTrue(results.get(2 * i + 1).isEmpty()); 159 } 160 Admin admin = TEST_UTIL.getAdmin(); 161 admin.flush(TABLE_NAME); 162 List<Future<?>> splitFutures = 163 TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> { 164 byte[] startKey = r.getRegionInfo().getStartKey(); 165 int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey)); 166 byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55)); 167 try { 168 return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint); 169 } catch (IOException e) { 170 throw new UncheckedIOException(e); 171 } 172 }).collect(Collectors.toList()); 173 for (Future<?> future : splitFutures) { 174 future.get(30, TimeUnit.SECONDS); 175 } 176 table.deleteAll( 177 IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList())) 178 .get(); 179 results = table 180 .getAll( 181 IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 182 .get(); 183 assertEquals(COUNT, results.size()); 184 results.forEach(r -> assertTrue(r.isEmpty())); 185 } 186 187 @Test 188 public void testWithRegionServerFailover() throws Exception { 189 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 190 table.putAll(IntStream.range(0, COUNT) 191 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 192 .collect(Collectors.toList())).get(); 193 TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests"); 194 Thread.sleep(100); 195 table.putAll(IntStream.range(COUNT, 2 * COUNT) 196 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 197 .collect(Collectors.toList())).get(); 198 List<Result> results = table.getAll( 199 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 200 .get(); 201 assertEquals(2 * COUNT, results.size()); 202 results.forEach(r -> assertFalse(r.isEmpty())); 203 table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i))) 204 .collect(Collectors.toList())).get(); 205 results = table.getAll( 206 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 207 .get(); 208 assertEquals(2 * COUNT, results.size()); 209 results.forEach(r -> assertTrue(r.isEmpty())); 210 } 211 212 @Test 213 public void testMixed() throws InterruptedException, ExecutionException, IOException { 214 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 215 table.putAll(IntStream.range(0, 7) 216 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) 217 .collect(Collectors.toList())).get(); 218 List<Row> actions = new ArrayList<>(); 219 actions.add(new Get(Bytes.toBytes(0))); 220 actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L))); 221 actions.add(new Delete(Bytes.toBytes(2))); 222 actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); 223 actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4))); 224 RowMutations rm = new RowMutations(Bytes.toBytes(5)); 225 rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L))); 226 rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L))); 227 actions.add(rm); 228 actions.add(new Get(Bytes.toBytes(6))); 229 230 List<Object> results = table.batchAll(actions).get(); 231 assertEquals(7, results.size()); 232 Result getResult = (Result) results.get(0); 233 assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 234 assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ))); 235 assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty()); 236 Result incrementResult = (Result) results.get(3); 237 assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ))); 238 Result appendResult = (Result) results.get(4); 239 byte[] appendValue = appendResult.getValue(FAMILY, CQ); 240 assertEquals(12, appendValue.length); 241 assertEquals(4, Bytes.toLong(appendValue)); 242 assertEquals(4, Bytes.toInt(appendValue, 8)); 243 assertEquals(100, 244 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ))); 245 assertEquals(200, 246 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1))); 247 getResult = (Result) results.get(6); 248 assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 249 } 250 251 public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver { 252 253 @Override 254 public Optional<RegionObserver> getRegionObserver() { 255 return Optional.of(this); 256 } 257 258 @Override 259 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 260 List<Cell> results) throws IOException { 261 if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) { 262 throw new DoNotRetryRegionException("Inject Error"); 263 } 264 } 265 } 266 267 @Test 268 public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException { 269 Admin admin = TEST_UTIL.getAdmin(); 270 TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME)) 271 .setCoprocessor(ErrorInjectObserver.class.getName()).build(); 272 admin.modifyTable(htd); 273 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 274 table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k)) 275 .collect(Collectors.toList())).get(); 276 List<CompletableFuture<Result>> futures = table 277 .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList())); 278 for (int i = 0; i < SPLIT_KEYS.length - 1; i++) { 279 assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ)); 280 } 281 try { 282 futures.get(SPLIT_KEYS.length - 1).get(); 283 } catch (ExecutionException e) { 284 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 285 } 286 } 287}