001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
022import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
023import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
024import static org.hamcrest.CoreMatchers.instanceOf;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.junit.Assert.assertArrayEquals;
027import static org.junit.Assert.assertEquals;
028import static org.junit.Assert.assertSame;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.Collection;
033import java.util.List;
034import java.util.concurrent.CompletableFuture;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.stream.IntStream;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.NotServingRegionException;
043import org.apache.hadoop.hbase.RegionLocations;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.TableNotFoundException;
047import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
048import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
054import org.junit.After;
055import org.junit.AfterClass;
056import org.junit.Before;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063import org.junit.runners.Parameterized.Parameter;
064
065import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
066
067@Category({ MediumTests.class, ClientTests.class })
068@RunWith(Parameterized.class)
069public class TestAsyncNonMetaRegionLocator {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
074
075  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
076
077  private static final TableName TABLE_NAME = TableName.valueOf("async");
078
079  private static byte[] FAMILY = Bytes.toBytes("cf");
080  private static final int NB_SERVERS = 4;
081  private static final int NUM_OF_META_REPLICA = NB_SERVERS - 1;
082
083  private static byte[][] SPLIT_KEYS;
084
085  private AsyncConnectionImpl conn;
086  private AsyncNonMetaRegionLocator locator;
087
088  @Parameter
089  public CatalogReplicaMode metaReplicaMode;
090
091  @BeforeClass
092  public static void setUp() throws Exception {
093    Configuration conf = TEST_UTIL.getConfiguration();
094    // Enable hbase:meta replication.
095    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
096    conf.setLong("replication.source.sleepforretries", 10); // 10 ms
097
098    TEST_UTIL.startMiniCluster(NB_SERVERS);
099    Admin admin = TEST_UTIL.getAdmin();
100    admin.balancerSwitch(false, true);
101
102    // Enable hbase:meta replication.
103    HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, NUM_OF_META_REPLICA);
104    TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster()
105      .getRegions(TableName.META_TABLE_NAME).size() >= NUM_OF_META_REPLICA);
106
107    SPLIT_KEYS = new byte[8][];
108    for (int i = 111; i < 999; i += 111) {
109      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
110    }
111  }
112
113  @AfterClass
114  public static void tearDown() throws Exception {
115    TEST_UTIL.shutdownMiniCluster();
116  }
117
118  @Before
119  public void setUpBeforeTest() throws InterruptedException, ExecutionException, IOException {
120    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
121    // Enable meta replica LoadBalance mode for this connection.
122    c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, metaReplicaMode.toString());
123    ConnectionRegistry registry =
124      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
125    conn =
126      new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), null, User.getCurrent());
127    locator = new AsyncNonMetaRegionLocator(conn);
128  }
129
130  @After
131  public void tearDownAfterTest() throws IOException {
132    Admin admin = TEST_UTIL.getAdmin();
133    if (admin.tableExists(TABLE_NAME)) {
134      if (admin.isTableEnabled(TABLE_NAME)) {
135        admin.disableTable(TABLE_NAME);
136      }
137      admin.deleteTable(TABLE_NAME);
138    }
139    Closeables.close(conn, true);
140  }
141
142  @Parameterized.Parameters
143  public static Collection<Object[]> parameters() {
144    return Arrays
145      .asList(new Object[][] { { CatalogReplicaMode.NONE }, { CatalogReplicaMode.LOAD_BALANCE } });
146  }
147
148  private void createSingleRegionTable() throws IOException, InterruptedException {
149    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
150    TEST_UTIL.waitTableAvailable(TABLE_NAME);
151  }
152
153  private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
154    byte[] row, RegionLocateType locateType, boolean reload) {
155    return locator
156      .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
157      .thenApply(RegionLocations::getDefaultRegionLocation);
158  }
159
160  @Test
161  public void testNoTable() throws InterruptedException {
162    for (RegionLocateType locateType : RegionLocateType.values()) {
163      try {
164        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
165      } catch (ExecutionException e) {
166        assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
167      }
168    }
169  }
170
171  @Test
172  public void testDisableTable() throws IOException, InterruptedException {
173    createSingleRegionTable();
174    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
175    for (RegionLocateType locateType : RegionLocateType.values()) {
176      try {
177        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
178      } catch (ExecutionException e) {
179        assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
180      }
181    }
182  }
183
184  private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
185    HRegionLocation loc) {
186    RegionInfo info = loc.getRegion();
187    assertEquals(TABLE_NAME, info.getTable());
188    assertArrayEquals(startKey, info.getStartKey());
189    assertArrayEquals(endKey, info.getEndKey());
190    assertEquals(serverName, loc.getServerName());
191  }
192
193  @Test
194  public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
195    createSingleRegionTable();
196    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
197    for (RegionLocateType locateType : RegionLocateType.values()) {
198      assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
199        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
200    }
201    byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
202    ThreadLocalRandom.current().nextBytes(randKey);
203    for (RegionLocateType locateType : RegionLocateType.values()) {
204      assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
205        getDefaultRegionLocation(TABLE_NAME, randKey, locateType, false).get());
206    }
207  }
208
209  private void createMultiRegionTable() throws IOException, InterruptedException {
210    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
211    TEST_UTIL.waitTableAvailable(TABLE_NAME);
212  }
213
214  private static byte[][] getStartKeys() {
215    byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][];
216    startKeys[0] = EMPTY_START_ROW;
217    System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length);
218    return startKeys;
219  }
220
221  private static byte[][] getEndKeys() {
222    byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1);
223    endKeys[endKeys.length - 1] = EMPTY_START_ROW;
224    return endKeys;
225  }
226
227  private ServerName[] getLocations(byte[][] startKeys) {
228    ServerName[] serverNames = new ServerName[startKeys.length];
229    TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
230      .forEach(rs -> {
231        rs.getRegions(TABLE_NAME).forEach(r -> {
232          serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
233            Bytes::compareTo)] = rs.getServerName();
234        });
235      });
236    return serverNames;
237  }
238
239  @Test
240  public void testMultiRegionTable() throws IOException, InterruptedException {
241    createMultiRegionTable();
242    byte[][] startKeys = getStartKeys();
243    ServerName[] serverNames = getLocations(startKeys);
244    IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
245      try {
246        assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
247          serverNames[i],
248          getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false)
249            .get());
250      } catch (InterruptedException | ExecutionException e) {
251        throw new RuntimeException(e);
252      }
253    }));
254
255    locator.clearCache(TABLE_NAME);
256    IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
257      try {
258        assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
259          serverNames[i],
260          getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
261      } catch (InterruptedException | ExecutionException e) {
262        throw new RuntimeException(e);
263      }
264    }));
265
266    locator.clearCache(TABLE_NAME);
267    byte[][] endKeys = getEndKeys();
268    IntStream.range(0, 2).forEach(
269      n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
270        try {
271          assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
272            getDefaultRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false).get());
273        } catch (InterruptedException | ExecutionException e) {
274          throw new RuntimeException(e);
275        }
276      }));
277  }
278
279  @Test
280  public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
281    createSingleRegionTable();
282    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
283    HRegionLocation loc =
284      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
285    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
286    ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
287      .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
288      .get();
289
290    TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegion().getEncodedName()), newServerName);
291    while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
292      .equals(newServerName)) {
293      Thread.sleep(100);
294    }
295    // Should be same as it is in cache
296    assertSame(loc,
297      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
298    locator.updateCachedLocationOnError(loc, null);
299    // null error will not trigger a cache cleanup
300    assertSame(loc,
301      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
302    locator.updateCachedLocationOnError(loc, new NotServingRegionException());
303    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
304      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
305  }
306
307  // usually locate after will return the same result, so we add a test to make it return different
308  // result.
309  @Test
310  public void testLocateAfter() throws IOException, InterruptedException, ExecutionException {
311    byte[] row = Bytes.toBytes("1");
312    byte[] splitKey = Arrays.copyOf(row, 2);
313    TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey });
314    TEST_UTIL.waitTableAvailable(TABLE_NAME);
315    HRegionLocation currentLoc =
316      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
317    ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
318    assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc);
319
320    HRegionLocation afterLoc =
321      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
322    ServerName afterServerName =
323      TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
324        .filter(rs -> rs.getRegions(TABLE_NAME).stream()
325          .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
326        .findAny().get().getServerName();
327    assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc);
328
329    assertSame(afterLoc,
330      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
331  }
332
333  // For HBASE-17402
334  @Test
335  public void testConcurrentLocate() throws IOException, InterruptedException, ExecutionException {
336    createMultiRegionTable();
337    byte[][] startKeys = getStartKeys();
338    byte[][] endKeys = getEndKeys();
339    ServerName[] serverNames = getLocations(startKeys);
340    for (int i = 0; i < 100; i++) {
341      locator.clearCache(TABLE_NAME);
342      List<CompletableFuture<HRegionLocation>> futures =
343        IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
344          .map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
345          .collect(toList());
346      for (int j = 0; j < 1000; j++) {
347        int index = Math.min(8, j / 111);
348        assertLocEquals(startKeys[index], endKeys[index], serverNames[index], futures.get(j).get());
349      }
350    }
351  }
352
353  @Test
354  public void testReload() throws Exception {
355    createSingleRegionTable();
356    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
357    for (RegionLocateType locateType : RegionLocateType.values()) {
358      assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
359        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
360    }
361    ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
362      .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
363      .get();
364    Admin admin = TEST_UTIL.getAdmin();
365    RegionInfo region = admin.getRegions(TABLE_NAME).stream().findAny().get();
366    admin.move(region.getEncodedNameAsBytes(), newServerName);
367    TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
368
369      @Override
370      public boolean evaluate() throws Exception {
371        ServerName newServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
372        return newServerName != null && !newServerName.equals(serverName);
373      }
374
375      @Override
376      public String explainFailure() throws Exception {
377        return region.getRegionNameAsString() + " is still on " + serverName;
378      }
379
380    });
381    // The cached location will not change
382    for (RegionLocateType locateType : RegionLocateType.values()) {
383      assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
384        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
385    }
386    // should get the new location when reload = true
387    // when meta replica LoadBalance mode is enabled, it may delay a bit.
388    TEST_UTIL.waitFor(3000, new ExplainingPredicate<Exception>() {
389      @Override
390      public boolean evaluate() throws Exception {
391        HRegionLocation loc =
392          getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true)
393            .get();
394        return newServerName.equals(loc.getServerName());
395      }
396
397      @Override
398      public String explainFailure() throws Exception {
399        return "New location does not show up in meta (replica) region";
400      }
401    });
402
403    // the cached location should be replaced
404    for (RegionLocateType locateType : RegionLocateType.values()) {
405      assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
406        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
407    }
408  }
409
410  // Testcase for HBASE-20822
411  @Test
412  public void testLocateBeforeLastRegion()
413    throws IOException, InterruptedException, ExecutionException {
414    createMultiRegionTable();
415    getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
416    HRegionLocation loc =
417      getDefaultRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
418    // should locate to the last region
419    assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW);
420  }
421
422  @Test
423  public void testRegionReplicas() throws Exception {
424    TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
425      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3).build());
426    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
427    testLocator(TEST_UTIL, TABLE_NAME, new Locator() {
428
429      @Override
430      public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
431        throws Exception {
432        locator.updateCachedLocationOnError(loc, error);
433      }
434
435      @Override
436      public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
437        throws Exception {
438        return locator.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
439          RegionLocateType.CURRENT, reload).get();
440      }
441    });
442  }
443
444  // Testcase for HBASE-21961
445  @Test
446  public void testLocateBeforeInOnlyRegion() throws IOException, InterruptedException {
447    createSingleRegionTable();
448    HRegionLocation loc =
449      getDefaultRegionLocation(TABLE_NAME, Bytes.toBytes(1), RegionLocateType.BEFORE, false).join();
450    // should locate to the only region
451    assertArrayEquals(loc.getRegion().getStartKey(), EMPTY_START_ROW);
452    assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW);
453  }
454
455  @Test
456  public void testConcurrentUpdateCachedLocationOnError() throws Exception {
457    createSingleRegionTable();
458    HRegionLocation loc =
459      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
460    IntStream.range(0, 100).parallel()
461      .forEach(i -> locator.updateCachedLocationOnError(loc, new NotServingRegionException()));
462  }
463}