001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.List;
029import java.util.concurrent.TimeUnit;
030import java.util.stream.Collectors;
031import java.util.stream.IntStream;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CallQueueTooBigException;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.MultiActionResultTooLarge;
038import org.apache.hadoop.hbase.NotServingRegionException;
039import org.apache.hadoop.hbase.RegionTooBusyException;
040import org.apache.hadoop.hbase.RetryImmediatelyException;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
043import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
044import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
045import org.apache.hadoop.hbase.regionserver.HRegionServer;
046import org.apache.hadoop.hbase.regionserver.RSRpcServices;
047import org.apache.hadoop.hbase.testclassification.ClientTests;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.function.ThrowingRunnable;
057
058import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
059import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
060import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
065
066@Category({ MediumTests.class, ClientTests.class })
067public class TestMetaCache {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestMetaCache.class);
072
073  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
074  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
075  private static final byte[] FAMILY = Bytes.toBytes("fam1");
076  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
077
078  private static HRegionServer badRS;
079
080  private Connection conn;
081  private MetricsConnection metrics;
082  private AsyncRegionLocator locator;
083
084  @BeforeClass
085  public static void setUpBeforeClass() throws Exception {
086    Configuration conf = TEST_UTIL.getConfiguration();
087    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName());
088    TEST_UTIL.startMiniCluster(1);
089    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
090    TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
091    badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
092    assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
093    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
094      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(2).build())
095      .build();
096    TEST_UTIL.createTable(desc, null);
097  }
098
099  @AfterClass
100  public static void tearDownAfterClass() throws Exception {
101    TEST_UTIL.shutdownMiniCluster();
102  }
103
104  @After
105  public void tearDown() throws IOException {
106    Closeables.close(conn, true);
107  }
108
109  private void setupConnection(int retry) throws IOException {
110    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
111    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retry);
112    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
113    conn = ConnectionFactory.createConnection(conf);
114    AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) conn.toAsyncConnection();
115    locator = asyncConn.getLocator();
116    metrics = asyncConn.getConnectionMetrics().get();
117  }
118
119  /**
120   * Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally
121   * encountered when using floorEntry rather than lowerEntry.
122   */
123  @Test
124  public void testAddToCacheReverse() throws IOException, InterruptedException {
125    setupConnection(1);
126    TableName tableName = TableName.valueOf("testAddToCache");
127    byte[] family = Bytes.toBytes("CF");
128    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
129      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
130    int maxSplits = 10;
131    List<byte[]> splits =
132      IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList());
133
134    TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][]));
135    TEST_UTIL.waitTableAvailable(tableName);
136    TEST_UTIL.waitUntilNoRegionsInTransition();
137
138    assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size());
139
140    RegionLocator locatorForTable = conn.getRegionLocator(tableName);
141    for (int i = maxSplits; i >= 0; i--) {
142      locatorForTable.getRegionLocation(Bytes.toBytes(i));
143    }
144
145    for (int i = 0; i < maxSplits; i++) {
146      assertNotNull(locator.getRegionLocationInCache(tableName, Bytes.toBytes(i)));
147    }
148  }
149
150  @Test
151  public void testMergeEmptyWithMetaCache() throws Throwable {
152    TableName tableName = TableName.valueOf("testMergeEmptyWithMetaCache");
153    byte[] family = Bytes.toBytes("CF");
154    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
155      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
156    TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) });
157    TEST_UTIL.waitTableAvailable(tableName);
158    TEST_UTIL.waitUntilNoRegionsInTransition();
159    RegionInfo regionA = null;
160    RegionInfo regionB = null;
161    RegionInfo regionC = null;
162    for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) {
163      if (region.getStartKey().length == 0) {
164        regionA = region;
165      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) {
166        regionB = region;
167      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) {
168        regionC = region;
169      }
170    }
171
172    assertNotNull(regionA);
173    assertNotNull(regionB);
174    assertNotNull(regionC);
175
176    TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY,
177      true);
178    try (AsyncConnection asyncConn =
179      ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
180      AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn;
181
182      MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get();
183
184      // warm meta cache
185      asyncConn.getRegionLocator(tableName).getAllRegionLocations().get();
186
187      assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size());
188
189      // Merge the 3 regions into one
190      TEST_UTIL.getAdmin().mergeRegionsAsync(
191        new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() },
192        false).get(30, TimeUnit.SECONDS);
193
194      assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size());
195
196      AsyncTable<?> asyncTable = asyncConn.getTable(tableName);
197
198      // This request should cause us to cache the newly merged region.
199      // As part of caching that region, it should clear out any cached merge parent regions which
200      // are overlapped by the new region. That way, subsequent calls below won't fall into the
201      // bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached
202      // regionB and we'd continue to see cache misses below.
203      assertTrue(
204        executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics)
205            > 0);
206
207      // We verify no new cache misses here due to above, which proves we've fixed up the cache
208      assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(),
209        asyncMetrics));
210    }
211  }
212
213  private long executeAndGetNewMisses(ThrowingRunnable runnable, MetricsConnection metrics)
214    throws Throwable {
215    long lastVal = metrics.getMetaCacheMisses();
216    runnable.run();
217    long curVal = metrics.getMetaCacheMisses();
218    return curVal - lastVal;
219  }
220
221  @Test
222  public void testPreserveMetaCacheOnException() throws Exception {
223    ((FakeRSRpcServices) badRS.getRSRpcServices())
224      .setExceptionInjector(new RoundRobinExceptionInjector());
225    setupConnection(1);
226    try (Table table = conn.getTable(TABLE_NAME)) {
227      byte[] row = Bytes.toBytes("row1");
228
229      Put put = new Put(row);
230      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
231      Get get = new Get(row);
232      Append append = new Append(row);
233      append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
234      Increment increment = new Increment(row);
235      increment.addColumn(FAMILY, QUALIFIER, 10);
236      Delete delete = new Delete(row);
237      delete.addColumn(FAMILY, QUALIFIER);
238      RowMutations mutations = new RowMutations(row);
239      mutations.add(put);
240      mutations.add(delete);
241
242      Exception exp;
243      boolean success;
244      for (int i = 0; i < 50; i++) {
245        exp = null;
246        success = false;
247        try {
248          table.put(put);
249          // If at least one operation succeeded, we should have cached the region location.
250          success = true;
251          table.get(get);
252          table.append(append);
253          table.increment(increment);
254          table.delete(delete);
255          table.mutateRow(mutations);
256        } catch (IOException ex) {
257          // Only keep track of the last exception that updated the meta cache
258          if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
259            exp = ex;
260          }
261        }
262        // Do not test if we did not touch the meta cache in this iteration.
263        if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
264          assertNull(locator.getRegionLocationInCache(TABLE_NAME, row));
265        } else if (success) {
266          assertNotNull(locator.getRegionLocationInCache(TABLE_NAME, row));
267        }
268      }
269    }
270  }
271
272  @Test
273  public void testCacheClearingOnCallQueueTooBig() throws Exception {
274    ((FakeRSRpcServices) badRS.getRSRpcServices())
275      .setExceptionInjector(new CallQueueTooBigExceptionInjector());
276    setupConnection(2);
277    Table table = conn.getTable(TABLE_NAME);
278    byte[] row = Bytes.toBytes("row1");
279
280    Put put = new Put(row);
281    put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
282    table.put(put);
283
284    // obtain the client metrics
285    long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
286    long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
287
288    // attempt a get on the test table
289    Get get = new Get(row);
290    try {
291      table.get(get);
292      fail("Expected CallQueueTooBigException");
293    } catch (RetriesExhaustedException ree) {
294      // expected
295    }
296
297    // verify that no cache clearing took place
298    long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
299    long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
300    assertEquals(preGetRegionClears, postGetRegionClears);
301    assertEquals(preGetServerClears, postGetServerClears);
302  }
303
304  public static List<Throwable> metaCachePreservingExceptions() {
305    return Arrays.asList(new RegionOpeningException(" "),
306      new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "),
307      new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "),
308      new CallQueueTooBigException());
309  }
310
311  public static class RegionServerWithFakeRpcServices extends HRegionServer {
312    private FakeRSRpcServices rsRpcServices;
313
314    public RegionServerWithFakeRpcServices(Configuration conf)
315      throws IOException, InterruptedException {
316      super(conf);
317    }
318
319    @Override
320    protected RSRpcServices createRpcServices() throws IOException {
321      this.rsRpcServices = new FakeRSRpcServices(this);
322      return rsRpcServices;
323    }
324
325    public void setExceptionInjector(ExceptionInjector injector) {
326      rsRpcServices.setExceptionInjector(injector);
327    }
328  }
329
330  public static class FakeRSRpcServices extends RSRpcServices {
331
332    private ExceptionInjector exceptions;
333
334    public FakeRSRpcServices(HRegionServer rs) throws IOException {
335      super(rs);
336      exceptions = new RoundRobinExceptionInjector();
337    }
338
339    public void setExceptionInjector(ExceptionInjector injector) {
340      this.exceptions = injector;
341    }
342
343    @Override
344    public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request)
345      throws ServiceException {
346      exceptions.throwOnGet(this, request);
347      return super.get(controller, request);
348    }
349
350    @Override
351    public ClientProtos.MutateResponse mutate(final RpcController controller,
352      final ClientProtos.MutateRequest request) throws ServiceException {
353      exceptions.throwOnMutate(this, request);
354      return super.mutate(controller, request);
355    }
356
357    @Override
358    public ClientProtos.ScanResponse scan(final RpcController controller,
359      final ClientProtos.ScanRequest request) throws ServiceException {
360      exceptions.throwOnScan(this, request);
361      return super.scan(controller, request);
362    }
363  }
364
365  public static abstract class ExceptionInjector {
366    protected boolean isTestTable(FakeRSRpcServices rpcServices,
367      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
368      try {
369        return TABLE_NAME
370          .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
371      } catch (IOException ioe) {
372        throw new ServiceException(ioe);
373      }
374    }
375
376    public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
377      throws ServiceException;
378
379    public abstract void throwOnMutate(FakeRSRpcServices rpcServices,
380      ClientProtos.MutateRequest request) throws ServiceException;
381
382    public abstract void throwOnScan(FakeRSRpcServices rpcServices,
383      ClientProtos.ScanRequest request) throws ServiceException;
384  }
385
386  /**
387   * Rotates through the possible cache clearing and non-cache clearing exceptions for requests.
388   */
389  public static class RoundRobinExceptionInjector extends ExceptionInjector {
390    private int numReqs = -1;
391    private int expCount = -1;
392    private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
393
394    @Override
395    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
396      throws ServiceException {
397      throwSomeExceptions(rpcServices, request.getRegion());
398    }
399
400    @Override
401    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
402      throws ServiceException {
403      throwSomeExceptions(rpcServices, request.getRegion());
404    }
405
406    @Override
407    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
408      throws ServiceException {
409      if (!request.hasScannerId()) {
410        // only handle initial scan requests
411        throwSomeExceptions(rpcServices, request.getRegion());
412      }
413    }
414
415    /**
416     * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically
417     * throw NotSevingRegionException which clears the meta cache.
418     */
419    private void throwSomeExceptions(FakeRSRpcServices rpcServices,
420      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
421      if (!isTestTable(rpcServices, regionSpec)) {
422        return;
423      }
424
425      numReqs++;
426      // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
427      // meta cache preserving exceptions otherwise.
428      if (numReqs % 5 == 0) {
429        return;
430      } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
431        throw new ServiceException(new NotServingRegionException());
432      }
433      // Round robin between different special exceptions.
434      // This is not ideal since exception types are not tied to the operation performed here,
435      // But, we don't really care here if we throw MultiActionTooLargeException while doing
436      // single Gets.
437      expCount++;
438      Throwable t =
439        metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size());
440      throw new ServiceException(t);
441    }
442  }
443
444  /**
445   * Throws CallQueueTooBigException for all gets.
446   */
447  public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
448    @Override
449    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
450      throws ServiceException {
451      if (isTestTable(rpcServices, request.getRegion())) {
452        throw new ServiceException(new CallQueueTooBigException());
453      }
454    }
455
456    @Override
457    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
458      throws ServiceException {
459    }
460
461    @Override
462    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
463      throws ServiceException {
464    }
465  }
466}