001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
021import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
022import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
023import static org.hamcrest.CoreMatchers.instanceOf;
024import static org.hamcrest.MatcherAssert.assertThat;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027import static org.junit.jupiter.api.Assertions.fail;
028
029import java.io.IOException;
030import java.util.Optional;
031import java.util.concurrent.CompletionException;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicReference;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotFoundException;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
043import org.apache.hadoop.hbase.coprocessor.RegionObserver;
044import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
045import org.apache.hadoop.hbase.security.User;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.Threads;
050import org.junit.jupiter.api.AfterAll;
051import org.junit.jupiter.api.AfterEach;
052import org.junit.jupiter.api.BeforeAll;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055
056import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
057
058@Tag(MediumTests.TAG)
059@Tag(ClientTests.TAG)
060public class TestAsyncRegionLocator {
061
062  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
063
064  private static TableName TABLE_NAME = TableName.valueOf("async");
065
066  private static byte[] FAMILY = Bytes.toBytes("cf");
067
068  private static AsyncConnectionImpl CONN;
069
070  private static AsyncRegionLocator LOCATOR;
071
072  private static volatile long SLEEP_MS = 0L;
073
074  public static class SleepRegionObserver implements RegionCoprocessor, RegionObserver {
075    @Override
076    public Optional<RegionObserver> getRegionObserver() {
077      return Optional.of(this);
078    }
079
080    @Override
081    public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e, Scan scan)
082      throws IOException {
083      if (SLEEP_MS > 0) {
084        Threads.sleepWithoutInterrupt(SLEEP_MS);
085      }
086    }
087  }
088
089  @BeforeAll
090  public static void setUp() throws Exception {
091    Configuration conf = TEST_UTIL.getConfiguration();
092    conf.set(REGION_COPROCESSOR_CONF_KEY, SleepRegionObserver.class.getName());
093    conf.setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
094    TEST_UTIL.startMiniCluster(1);
095    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
096    TEST_UTIL.waitTableAvailable(TABLE_NAME);
097    ConnectionRegistry registry =
098      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
099    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
100      registry.getClusterId().get(), null, User.getCurrent());
101    LOCATOR = CONN.getLocator();
102  }
103
104  @AfterAll
105  public static void tearDown() throws Exception {
106    Closeables.close(CONN, true);
107    TEST_UTIL.shutdownMiniCluster();
108  }
109
110  @AfterEach
111  public void tearDownAfterTest() {
112    LOCATOR.clearCache();
113  }
114
115  @Test
116  public void testTimeout() throws InterruptedException, ExecutionException {
117    SLEEP_MS = 1000;
118    long startNs = System.nanoTime();
119    try {
120      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT,
121        TimeUnit.MILLISECONDS.toNanos(500)).get();
122      fail();
123    } catch (ExecutionException e) {
124      assertThat(e.getCause(), instanceOf(TimeoutIOException.class));
125    }
126    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
127    assertTrue(costMs >= 500);
128    assertTrue(costMs < 1000);
129    // wait for the background task finish
130    Thread.sleep(2000);
131    // Now the location should be in cache, so we will not visit meta again.
132    HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW,
133      RegionLocateType.CURRENT, TimeUnit.MILLISECONDS.toNanos(500)).get();
134    assertEquals(loc.getServerName(),
135      TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName());
136  }
137
138  @Test
139  public void testNoCompletionException() {
140    // make sure that we do not get CompletionException
141    SLEEP_MS = 0;
142    AtomicReference<Throwable> errorHolder = new AtomicReference<>();
143    try {
144      LOCATOR.getRegionLocation(TableName.valueOf("NotExist"), EMPTY_START_ROW,
145        RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1))
146        .whenComplete((r, e) -> errorHolder.set(e)).join();
147      fail();
148    } catch (CompletionException e) {
149      // join will return a CompletionException, which is OK
150      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
151    }
152    // but we need to make sure that we do not get a CompletionException in the callback
153    assertThat(errorHolder.get(), instanceOf(TableNotFoundException.class));
154  }
155}