001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.List;
029import java.util.concurrent.TimeUnit;
030import java.util.stream.Collectors;
031import java.util.stream.IntStream;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CallQueueTooBigException;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.MultiActionResultTooLarge;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.RegionTooBusyException;
039import org.apache.hadoop.hbase.RetryImmediatelyException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
042import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
043import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.RSRpcServices;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.jupiter.api.AfterAll;
050import org.junit.jupiter.api.AfterEach;
051import org.junit.jupiter.api.BeforeAll;
052import org.junit.jupiter.api.Tag;
053import org.junit.jupiter.api.Test;
054import org.junit.jupiter.api.function.Executable;
055
056import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
057import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
058import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
063
064@Tag(MediumTests.TAG)
065@Tag(ClientTests.TAG)
066public class TestMetaCache {
067
068  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
069  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
070  private static final byte[] FAMILY = Bytes.toBytes("fam1");
071  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
072
073  private static HRegionServer badRS;
074
075  private Connection conn;
076  private MetricsConnection metrics;
077  private AsyncRegionLocator locator;
078
079  @BeforeAll
080  public static void setUpBeforeClass() throws Exception {
081    Configuration conf = TEST_UTIL.getConfiguration();
082    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName());
083    TEST_UTIL.startMiniCluster(1);
084    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
085    TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
086    badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
087    assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
088    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
089      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(2).build())
090      .build();
091    TEST_UTIL.createTable(desc, null);
092  }
093
094  @AfterAll
095  public static void tearDownAfterClass() throws Exception {
096    TEST_UTIL.shutdownMiniCluster();
097  }
098
099  @AfterEach
100  public void tearDown() throws IOException {
101    Closeables.close(conn, true);
102  }
103
104  private void setupConnection(int retry) throws IOException {
105    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
106    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retry);
107    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
108    conn = ConnectionFactory.createConnection(conf);
109    AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) conn.toAsyncConnection();
110    locator = asyncConn.getLocator();
111    metrics = asyncConn.getConnectionMetrics().get();
112  }
113
114  /**
115   * Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally
116   * encountered when using floorEntry rather than lowerEntry.
117   */
118  @Test
119  public void testAddToCacheReverse() throws IOException, InterruptedException {
120    setupConnection(1);
121    TableName tableName = TableName.valueOf("testAddToCache");
122    byte[] family = Bytes.toBytes("CF");
123    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
124      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
125    int maxSplits = 10;
126    List<byte[]> splits =
127      IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList());
128
129    TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][]));
130    TEST_UTIL.waitTableAvailable(tableName);
131    TEST_UTIL.waitUntilNoRegionsInTransition();
132
133    assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size());
134
135    RegionLocator locatorForTable = conn.getRegionLocator(tableName);
136    for (int i = maxSplits; i >= 0; i--) {
137      locatorForTable.getRegionLocation(Bytes.toBytes(i));
138    }
139
140    for (int i = 0; i < maxSplits; i++) {
141      assertNotNull(locator.getRegionLocationInCache(tableName, Bytes.toBytes(i)));
142    }
143  }
144
145  @Test
146  public void testMergeEmptyWithMetaCache() throws Throwable {
147    TableName tableName = TableName.valueOf("testMergeEmptyWithMetaCache");
148    byte[] family = Bytes.toBytes("CF");
149    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
150      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
151    TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) });
152    TEST_UTIL.waitTableAvailable(tableName);
153    TEST_UTIL.waitUntilNoRegionsInTransition();
154    RegionInfo regionA = null;
155    RegionInfo regionB = null;
156    RegionInfo regionC = null;
157    for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) {
158      if (region.getStartKey().length == 0) {
159        regionA = region;
160      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) {
161        regionB = region;
162      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) {
163        regionC = region;
164      }
165    }
166
167    assertNotNull(regionA);
168    assertNotNull(regionB);
169    assertNotNull(regionC);
170
171    TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY,
172      true);
173    try (AsyncConnection asyncConn =
174      ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
175      AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn;
176
177      MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get();
178
179      // warm meta cache
180      asyncConn.getRegionLocator(tableName).getAllRegionLocations().get();
181
182      assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size());
183
184      // Merge the 3 regions into one
185      TEST_UTIL.getAdmin().mergeRegionsAsync(
186        new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() },
187        false).get(30, TimeUnit.SECONDS);
188
189      assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size());
190
191      AsyncTable<?> asyncTable = asyncConn.getTable(tableName);
192
193      // This request should cause us to cache the newly merged region.
194      // As part of caching that region, it should clear out any cached merge parent regions which
195      // are overlapped by the new region. That way, subsequent calls below won't fall into the
196      // bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached
197      // regionB and we'd continue to see cache misses below.
198      assertTrue(
199        executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics)
200            > 0);
201
202      // We verify no new cache misses here due to above, which proves we've fixed up the cache
203      assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(),
204        asyncMetrics));
205    }
206  }
207
208  private long executeAndGetNewMisses(Executable runnable, MetricsConnection metrics)
209    throws Throwable {
210    long lastVal = metrics.getMetaCacheMisses();
211    runnable.execute();
212    long curVal = metrics.getMetaCacheMisses();
213    return curVal - lastVal;
214  }
215
216  @Test
217  public void testPreserveMetaCacheOnException() throws Exception {
218    ((FakeRSRpcServices) badRS.getRSRpcServices())
219      .setExceptionInjector(new RoundRobinExceptionInjector());
220    setupConnection(1);
221    try (Table table = conn.getTable(TABLE_NAME)) {
222      byte[] row = Bytes.toBytes("row1");
223
224      Put put = new Put(row);
225      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
226      Get get = new Get(row);
227      Append append = new Append(row);
228      append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
229      Increment increment = new Increment(row);
230      increment.addColumn(FAMILY, QUALIFIER, 10);
231      Delete delete = new Delete(row);
232      delete.addColumn(FAMILY, QUALIFIER);
233      RowMutations mutations = new RowMutations(row);
234      mutations.add(put);
235      mutations.add(delete);
236
237      Exception exp;
238      boolean success;
239      for (int i = 0; i < 50; i++) {
240        exp = null;
241        success = false;
242        try {
243          table.put(put);
244          // If at least one operation succeeded, we should have cached the region location.
245          success = true;
246          table.get(get);
247          table.append(append);
248          table.increment(increment);
249          table.delete(delete);
250          table.mutateRow(mutations);
251        } catch (IOException ex) {
252          // Only keep track of the last exception that updated the meta cache
253          if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
254            exp = ex;
255          }
256        }
257        // Do not test if we did not touch the meta cache in this iteration.
258        if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
259          assertNull(locator.getRegionLocationInCache(TABLE_NAME, row));
260        } else if (success) {
261          assertNotNull(locator.getRegionLocationInCache(TABLE_NAME, row));
262        }
263      }
264    }
265  }
266
267  @Test
268  public void testCacheClearingOnCallQueueTooBig() throws Exception {
269    ((FakeRSRpcServices) badRS.getRSRpcServices())
270      .setExceptionInjector(new CallQueueTooBigExceptionInjector());
271    setupConnection(2);
272    Table table = conn.getTable(TABLE_NAME);
273    byte[] row = Bytes.toBytes("row1");
274
275    Put put = new Put(row);
276    put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
277    table.put(put);
278
279    // obtain the client metrics
280    long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
281    long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
282
283    // attempt a get on the test table
284    Get get = new Get(row);
285    try {
286      table.get(get);
287      fail("Expected CallQueueTooBigException");
288    } catch (RetriesExhaustedException ree) {
289      // expected
290    }
291
292    // verify that no cache clearing took place
293    long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
294    long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
295    assertEquals(preGetRegionClears, postGetRegionClears);
296    assertEquals(preGetServerClears, postGetServerClears);
297  }
298
299  public static List<Throwable> metaCachePreservingExceptions() {
300    return Arrays.asList(new RegionOpeningException(" "),
301      new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "),
302      new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "),
303      new CallQueueTooBigException());
304  }
305
306  public static class RegionServerWithFakeRpcServices extends HRegionServer {
307    private FakeRSRpcServices rsRpcServices;
308
309    public RegionServerWithFakeRpcServices(Configuration conf)
310      throws IOException, InterruptedException {
311      super(conf);
312    }
313
314    @Override
315    protected RSRpcServices createRpcServices() throws IOException {
316      this.rsRpcServices = new FakeRSRpcServices(this);
317      return rsRpcServices;
318    }
319
320    public void setExceptionInjector(ExceptionInjector injector) {
321      rsRpcServices.setExceptionInjector(injector);
322    }
323  }
324
325  public static class FakeRSRpcServices extends RSRpcServices {
326
327    private ExceptionInjector exceptions;
328
329    public FakeRSRpcServices(HRegionServer rs) throws IOException {
330      super(rs);
331      exceptions = new RoundRobinExceptionInjector();
332    }
333
334    public void setExceptionInjector(ExceptionInjector injector) {
335      this.exceptions = injector;
336    }
337
338    @Override
339    public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request)
340      throws ServiceException {
341      exceptions.throwOnGet(this, request);
342      return super.get(controller, request);
343    }
344
345    @Override
346    public ClientProtos.MutateResponse mutate(final RpcController controller,
347      final ClientProtos.MutateRequest request) throws ServiceException {
348      exceptions.throwOnMutate(this, request);
349      return super.mutate(controller, request);
350    }
351
352    @Override
353    public ClientProtos.ScanResponse scan(final RpcController controller,
354      final ClientProtos.ScanRequest request) throws ServiceException {
355      exceptions.throwOnScan(this, request);
356      return super.scan(controller, request);
357    }
358  }
359
360  public static abstract class ExceptionInjector {
361    protected boolean isTestTable(FakeRSRpcServices rpcServices,
362      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
363      try {
364        return TABLE_NAME
365          .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
366      } catch (IOException ioe) {
367        throw new ServiceException(ioe);
368      }
369    }
370
371    public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
372      throws ServiceException;
373
374    public abstract void throwOnMutate(FakeRSRpcServices rpcServices,
375      ClientProtos.MutateRequest request) throws ServiceException;
376
377    public abstract void throwOnScan(FakeRSRpcServices rpcServices,
378      ClientProtos.ScanRequest request) throws ServiceException;
379  }
380
381  /**
382   * Rotates through the possible cache clearing and non-cache clearing exceptions for requests.
383   */
384  public static class RoundRobinExceptionInjector extends ExceptionInjector {
385    private int numReqs = -1;
386    private int expCount = -1;
387    private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
388
389    @Override
390    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
391      throws ServiceException {
392      throwSomeExceptions(rpcServices, request.getRegion());
393    }
394
395    @Override
396    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
397      throws ServiceException {
398      throwSomeExceptions(rpcServices, request.getRegion());
399    }
400
401    @Override
402    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
403      throws ServiceException {
404      if (!request.hasScannerId()) {
405        // only handle initial scan requests
406        throwSomeExceptions(rpcServices, request.getRegion());
407      }
408    }
409
410    /**
411     * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically
412     * throw NotSevingRegionException which clears the meta cache.
413     */
414    private void throwSomeExceptions(FakeRSRpcServices rpcServices,
415      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
416      if (!isTestTable(rpcServices, regionSpec)) {
417        return;
418      }
419
420      numReqs++;
421      // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
422      // meta cache preserving exceptions otherwise.
423      if (numReqs % 5 == 0) {
424        return;
425      } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
426        throw new ServiceException(new NotServingRegionException());
427      }
428      // Round robin between different special exceptions.
429      // This is not ideal since exception types are not tied to the operation performed here,
430      // But, we don't really care here if we throw MultiActionTooLargeException while doing
431      // single Gets.
432      expCount++;
433      Throwable t =
434        metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size());
435      throw new ServiceException(t);
436    }
437  }
438
439  /**
440   * Throws CallQueueTooBigException for all gets.
441   */
442  public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
443    @Override
444    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
445      throws ServiceException {
446      if (isTestTable(rpcServices, request.getRegion())) {
447        throw new ServiceException(new CallQueueTooBigException());
448      }
449    }
450
451    @Override
452    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
453      throws ServiceException {
454    }
455
456    @Override
457    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
458      throws ServiceException {
459    }
460  }
461}