001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertThat;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.io.UncheckedIOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.Optional;
033import java.util.concurrent.CompletableFuture;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.ForkJoinPool;
036import java.util.concurrent.Future;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.TimeoutException;
039import java.util.function.Function;
040import java.util.stream.Collectors;
041import java.util.stream.IntStream;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.coprocessor.ObserverContext;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.coprocessor.RegionObserver;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.After;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.runner.RunWith;
061import org.junit.runners.Parameterized;
062import org.junit.runners.Parameterized.Parameter;
063import org.junit.runners.Parameterized.Parameters;
064
065@RunWith(Parameterized.class)
066@Category({ LargeTests.class, ClientTests.class })
067public class TestAsyncTableBatch {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestAsyncTableBatch.class);
072
073  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
074
075  private static TableName TABLE_NAME = TableName.valueOf("async");
076
077  private static byte[] FAMILY = Bytes.toBytes("cf");
078
079  private static byte[] CQ = Bytes.toBytes("cq");
080  private static byte[] CQ1 = Bytes.toBytes("cq1");
081
082  private static int COUNT = 1000;
083
084  private static AsyncConnection CONN;
085
086  private static byte[][] SPLIT_KEYS;
087
088  @Parameter(0)
089  public String tableType;
090
091  @Parameter(1)
092  public Function<TableName, AsyncTable<?>> tableGetter;
093
094  private static AsyncTable<?> getRawTable(TableName tableName) {
095    return CONN.getTable(tableName);
096  }
097
098  private static AsyncTable<?> getTable(TableName tableName) {
099    return CONN.getTable(tableName, ForkJoinPool.commonPool());
100  }
101
102  @Parameters(name = "{index}: type={0}")
103  public static List<Object[]> params() {
104    Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable;
105    Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable;
106    return Arrays.asList(new Object[] { "raw", rawTableGetter },
107      new Object[] { "normal", tableGetter });
108  }
109
110  @BeforeClass
111  public static void setUp() throws Exception {
112    TEST_UTIL.startMiniCluster(3);
113    SPLIT_KEYS = new byte[8][];
114    for (int i = 111; i < 999; i += 111) {
115      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
116    }
117    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
118  }
119
120  @AfterClass
121  public static void tearDown() throws Exception {
122    CONN.close();
123    TEST_UTIL.shutdownMiniCluster();
124  }
125
126  @Before
127  public void setUpBeforeTest() throws IOException, InterruptedException {
128    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
129    TEST_UTIL.waitTableAvailable(TABLE_NAME);
130  }
131
132  @After
133  public void tearDownAfterTest() throws IOException {
134    Admin admin = TEST_UTIL.getAdmin();
135    if (admin.isTableEnabled(TABLE_NAME)) {
136      admin.disableTable(TABLE_NAME);
137    }
138    admin.deleteTable(TABLE_NAME);
139  }
140
141  private byte[] getRow(int i) {
142    return Bytes.toBytes(String.format("%03d", i));
143  }
144
145  @Test
146  public void test()
147      throws InterruptedException, ExecutionException, IOException, TimeoutException {
148    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
149    table.putAll(IntStream.range(0, COUNT)
150        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
151        .collect(Collectors.toList())).get();
152    List<Result> results = table.getAll(IntStream.range(0, COUNT)
153        .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4))))
154        .flatMap(l -> l.stream()).collect(Collectors.toList())).get();
155    assertEquals(2 * COUNT, results.size());
156    for (int i = 0; i < COUNT; i++) {
157      assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ)));
158      assertTrue(results.get(2 * i + 1).isEmpty());
159    }
160    Admin admin = TEST_UTIL.getAdmin();
161    admin.flush(TABLE_NAME);
162    List<Future<?>> splitFutures =
163      TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> {
164        byte[] startKey = r.getRegionInfo().getStartKey();
165        int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey));
166        byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55));
167        try {
168          return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint);
169        } catch (IOException e) {
170          throw new UncheckedIOException(e);
171        }
172      }).collect(Collectors.toList());
173    for (Future<?> future : splitFutures) {
174      future.get(30, TimeUnit.SECONDS);
175    }
176    table.deleteAll(
177      IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList()))
178        .get();
179    results = table
180        .getAll(
181          IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
182        .get();
183    assertEquals(COUNT, results.size());
184    results.forEach(r -> assertTrue(r.isEmpty()));
185  }
186
187  @Test
188  public void testWithRegionServerFailover() throws Exception {
189    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
190    table.putAll(IntStream.range(0, COUNT)
191        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
192        .collect(Collectors.toList())).get();
193    TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests");
194    Thread.sleep(100);
195    table.putAll(IntStream.range(COUNT, 2 * COUNT)
196        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
197        .collect(Collectors.toList())).get();
198    List<Result> results = table.getAll(
199      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
200        .get();
201    assertEquals(2 * COUNT, results.size());
202    results.forEach(r -> assertFalse(r.isEmpty()));
203    table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i)))
204        .collect(Collectors.toList())).get();
205    results = table.getAll(
206      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
207        .get();
208    assertEquals(2 * COUNT, results.size());
209    results.forEach(r -> assertTrue(r.isEmpty()));
210  }
211
212  @Test
213  public void testMixed() throws InterruptedException, ExecutionException, IOException {
214    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
215    table.putAll(IntStream.range(0, 7)
216        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
217        .collect(Collectors.toList())).get();
218    List<Row> actions = new ArrayList<>();
219    actions.add(new Get(Bytes.toBytes(0)));
220    actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L)));
221    actions.add(new Delete(Bytes.toBytes(2)));
222    actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
223    actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
224    RowMutations rm = new RowMutations(Bytes.toBytes(5));
225    rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
226    rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
227    actions.add(rm);
228    actions.add(new Get(Bytes.toBytes(6)));
229
230    List<Object> results = table.batchAll(actions).get();
231    assertEquals(7, results.size());
232    Result getResult = (Result) results.get(0);
233    assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
234    assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
235    assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty());
236    Result incrementResult = (Result) results.get(3);
237    assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ)));
238    Result appendResult = (Result) results.get(4);
239    byte[] appendValue = appendResult.getValue(FAMILY, CQ);
240    assertEquals(12, appendValue.length);
241    assertEquals(4, Bytes.toLong(appendValue));
242    assertEquals(4, Bytes.toInt(appendValue, 8));
243    assertEquals(100,
244      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
245    assertEquals(200,
246      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
247    getResult = (Result) results.get(6);
248    assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
249  }
250
251  public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
252
253    @Override
254    public Optional<RegionObserver> getRegionObserver() {
255      return Optional.of(this);
256    }
257
258    @Override
259    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
260        List<Cell> results) throws IOException {
261      if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) {
262        throw new DoNotRetryRegionException("Inject Error");
263      }
264    }
265  }
266
267  @Test
268  public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException {
269    Admin admin = TEST_UTIL.getAdmin();
270    TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME))
271        .setCoprocessor(ErrorInjectObserver.class.getName()).build();
272    admin.modifyTable(htd);
273    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
274    table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k))
275        .collect(Collectors.toList())).get();
276    List<CompletableFuture<Result>> futures = table
277        .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList()));
278    for (int i = 0; i < SPLIT_KEYS.length - 1; i++) {
279      assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ));
280    }
281    try {
282      futures.get(SPLIT_KEYS.length - 1).get();
283    } catch (ExecutionException e) {
284      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
285    }
286  }
287}