001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static junit.framework.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CallQueueTooBigException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HColumnDescriptor;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HTableDescriptor;
036import org.apache.hadoop.hbase.MultiActionResultTooLarge;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.RegionTooBusyException;
039import org.apache.hadoop.hbase.RetryImmediatelyException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
042import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
043import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.RSRpcServices;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.AfterClass;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
058import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
063
064@Category({ MediumTests.class, ClientTests.class })
065public class TestMetaCache {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestMetaCache.class);
070
071  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
072  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
073  private static final byte[] FAMILY = Bytes.toBytes("fam1");
074  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
075  private static HRegionServer badRS;
076  private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class);
077
078  /**
079   * @throws java.lang.Exception
080   */
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    Configuration conf = TEST_UTIL.getConfiguration();
084    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName());
085    TEST_UTIL.startMiniCluster(1);
086    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
087    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME);
088    badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
089    assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
090    HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
091    HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
092    fam.setMaxVersions(2);
093    table.addFamily(fam);
094    TEST_UTIL.createTable(table, null);
095  }
096
097  /**
098   * @throws java.lang.Exception
099   */
100  @AfterClass
101  public static void tearDownAfterClass() throws Exception {
102    TEST_UTIL.shutdownMiniCluster();
103  }
104
105  @Test
106  public void testPreserveMetaCacheOnException() throws Exception {
107    ((FakeRSRpcServices) badRS.getRSRpcServices())
108      .setExceptionInjector(new RoundRobinExceptionInjector());
109    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
110    conf.set("hbase.client.retries.number", "1");
111    ConnectionImplementation conn =
112      (ConnectionImplementation) ConnectionFactory.createConnection(conf);
113    try {
114      Table table = conn.getTable(TABLE_NAME);
115      byte[] row = Bytes.toBytes("row1");
116
117      Put put = new Put(row);
118      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
119      Get get = new Get(row);
120      Append append = new Append(row);
121      append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
122      Increment increment = new Increment(row);
123      increment.addColumn(FAMILY, QUALIFIER, 10);
124      Delete delete = new Delete(row);
125      delete.addColumn(FAMILY, QUALIFIER);
126      RowMutations mutations = new RowMutations(row);
127      mutations.add(put);
128      mutations.add(delete);
129
130      Exception exp;
131      boolean success;
132      for (int i = 0; i < 50; i++) {
133        exp = null;
134        success = false;
135        try {
136          table.put(put);
137          // If at least one operation succeeded, we should have cached the region location.
138          success = true;
139          table.get(get);
140          table.append(append);
141          table.increment(increment);
142          table.delete(delete);
143          table.mutateRow(mutations);
144        } catch (IOException ex) {
145          // Only keep track of the last exception that updated the meta cache
146          if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
147            exp = ex;
148          }
149        }
150        // Do not test if we did not touch the meta cache in this iteration.
151        if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
152          assertNull(conn.getCachedLocation(TABLE_NAME, row));
153        } else if (success) {
154          assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
155        }
156      }
157    } finally {
158      conn.close();
159    }
160  }
161
162  @Test
163  public void testClearsCacheOnScanException() throws Exception {
164    ((FakeRSRpcServices) badRS.getRSRpcServices())
165      .setExceptionInjector(new RoundRobinExceptionInjector());
166    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
167    conf.set("hbase.client.retries.number", "1");
168
169    try (
170      ConnectionImplementation conn =
171        (ConnectionImplementation) ConnectionFactory.createConnection(conf);
172      Table table = conn.getTable(TABLE_NAME)) {
173
174      byte[] row = Bytes.toBytes("row2");
175
176      Put put = new Put(row);
177      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
178
179      Scan scan = new Scan();
180      scan.withStartRow(row);
181      scan.setLimit(1);
182      scan.setCaching(1);
183
184      populateCache(table, row);
185      assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
186      assertTrue(executeUntilCacheClearingException(table, scan));
187      assertNull(conn.getCachedLocation(TABLE_NAME, row));
188
189      // repopulate cache so we can test with reverse scan too
190      populateCache(table, row);
191      assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
192
193      // run with reverse scan
194      scan.setReversed(true);
195      assertTrue(executeUntilCacheClearingException(table, scan));
196      assertNull(conn.getCachedLocation(TABLE_NAME, row));
197    }
198  }
199
200  private void populateCache(Table table, byte[] row) {
201    for (int i = 0; i < 50; i++) {
202      try {
203        table.get(new Get(row));
204        return;
205      } catch (Exception e) {
206        // pass, we just want this to succeed so that region location will be cached
207      }
208    }
209  }
210
211  private boolean executeUntilCacheClearingException(Table table, Scan scan) {
212    for (int i = 0; i < 50; i++) {
213      try {
214        try (ResultScanner scanner = table.getScanner(scan)) {
215          scanner.next();
216        }
217      } catch (Exception ex) {
218        // Only keep track of the last exception that updated the meta cache
219        if (ClientExceptionsUtil.isMetaClearingException(ex)) {
220          return true;
221        }
222      }
223    }
224    return false;
225  }
226
227  @Test
228  public void testCacheClearingOnCallQueueTooBig() throws Exception {
229    ((FakeRSRpcServices) badRS.getRSRpcServices())
230      .setExceptionInjector(new CallQueueTooBigExceptionInjector());
231    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
232    conf.set("hbase.client.retries.number", "2");
233    conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true");
234    ConnectionImplementation conn =
235      (ConnectionImplementation) ConnectionFactory.createConnection(conf);
236    try {
237      Table table = conn.getTable(TABLE_NAME);
238      byte[] row = Bytes.toBytes("row1");
239
240      Put put = new Put(row);
241      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
242      table.put(put);
243
244      // obtain the client metrics
245      MetricsConnection metrics = conn.getConnectionMetrics();
246      long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
247      long preGetServerClears = metrics.metaCacheNumClearServer.getCount();
248
249      // attempt a get on the test table
250      Get get = new Get(row);
251      try {
252        table.get(get);
253        fail("Expected CallQueueTooBigException");
254      } catch (RetriesExhaustedException ree) {
255        // expected
256      }
257
258      // verify that no cache clearing took place
259      long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
260      long postGetServerClears = metrics.metaCacheNumClearServer.getCount();
261      assertEquals(preGetRegionClears, postGetRegionClears);
262      assertEquals(preGetServerClears, postGetServerClears);
263    } finally {
264      conn.close();
265    }
266  }
267
268  public static List<Throwable> metaCachePreservingExceptions() {
269    return new ArrayList<Throwable>() {
270      {
271        add(new RegionOpeningException(" "));
272        add(new RegionTooBusyException("Some old message"));
273        add(new RpcThrottlingException(" "));
274        add(new MultiActionResultTooLarge(" "));
275        add(new RetryImmediatelyException(" "));
276        add(new CallQueueTooBigException());
277      }
278    };
279  }
280
281  public static class RegionServerWithFakeRpcServices extends HRegionServer {
282    private FakeRSRpcServices rsRpcServices;
283
284    public RegionServerWithFakeRpcServices(Configuration conf)
285      throws IOException, InterruptedException {
286      super(conf);
287    }
288
289    @Override
290    protected RSRpcServices createRpcServices() throws IOException {
291      this.rsRpcServices = new FakeRSRpcServices(this);
292      return rsRpcServices;
293    }
294
295    public void setExceptionInjector(ExceptionInjector injector) {
296      rsRpcServices.setExceptionInjector(injector);
297    }
298  }
299
300  public static class FakeRSRpcServices extends RSRpcServices {
301
302    private ExceptionInjector exceptions;
303
304    public FakeRSRpcServices(HRegionServer rs) throws IOException {
305      super(rs);
306      exceptions = new RoundRobinExceptionInjector();
307    }
308
309    public void setExceptionInjector(ExceptionInjector injector) {
310      this.exceptions = injector;
311    }
312
313    @Override
314    public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request)
315      throws ServiceException {
316      exceptions.throwOnGet(this, request);
317      return super.get(controller, request);
318    }
319
320    @Override
321    public ClientProtos.MutateResponse mutate(final RpcController controller,
322      final ClientProtos.MutateRequest request) throws ServiceException {
323      exceptions.throwOnMutate(this, request);
324      return super.mutate(controller, request);
325    }
326
327    @Override
328    public ClientProtos.ScanResponse scan(final RpcController controller,
329      final ClientProtos.ScanRequest request) throws ServiceException {
330      exceptions.throwOnScan(this, request);
331      return super.scan(controller, request);
332    }
333  }
334
335  public static abstract class ExceptionInjector {
336    protected boolean isTestTable(FakeRSRpcServices rpcServices,
337      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
338      try {
339        return TABLE_NAME
340          .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
341      } catch (IOException ioe) {
342        throw new ServiceException(ioe);
343      }
344    }
345
346    public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
347      throws ServiceException;
348
349    public abstract void throwOnMutate(FakeRSRpcServices rpcServices,
350      ClientProtos.MutateRequest request) throws ServiceException;
351
352    public abstract void throwOnScan(FakeRSRpcServices rpcServices,
353      ClientProtos.ScanRequest request) throws ServiceException;
354  }
355
356  /**
357   * Rotates through the possible cache clearing and non-cache clearing exceptions for requests.
358   */
359  public static class RoundRobinExceptionInjector extends ExceptionInjector {
360    private int numReqs = -1;
361    private int expCount = -1;
362    private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
363
364    @Override
365    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
366      throws ServiceException {
367      throwSomeExceptions(rpcServices, request.getRegion());
368    }
369
370    @Override
371    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
372      throws ServiceException {
373      throwSomeExceptions(rpcServices, request.getRegion());
374    }
375
376    @Override
377    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
378      throws ServiceException {
379      if (!request.hasScannerId()) {
380        // only handle initial scan requests
381        throwSomeExceptions(rpcServices, request.getRegion());
382      }
383    }
384
385    /**
386     * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically
387     * throw NotSevingRegionException which clears the meta cache. n
388     */
389    private void throwSomeExceptions(FakeRSRpcServices rpcServices,
390      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
391      if (!isTestTable(rpcServices, regionSpec)) {
392        return;
393      }
394
395      numReqs++;
396      // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
397      // meta cache preserving exceptions otherwise.
398      if (numReqs % 5 == 0) {
399        return;
400      } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
401        throw new ServiceException(new NotServingRegionException());
402      }
403      // Round robin between different special exceptions.
404      // This is not ideal since exception types are not tied to the operation performed here,
405      // But, we don't really care here if we throw MultiActionTooLargeException while doing
406      // single Gets.
407      expCount++;
408      Throwable t =
409        metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size());
410      throw new ServiceException(t);
411    }
412  }
413
414  /**
415   * Throws CallQueueTooBigException for all gets.
416   */
417  public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
418    @Override
419    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
420      throws ServiceException {
421      if (isTestTable(rpcServices, request.getRegion())) {
422        throw new ServiceException(new CallQueueTooBigException());
423      }
424    }
425
426    @Override
427    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
428      throws ServiceException {
429    }
430
431    @Override
432    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
433      throws ServiceException {
434    }
435  }
436
437  @Test
438  public void testUserRegionLockThrowsException() throws IOException, InterruptedException {
439    ((FakeRSRpcServices) badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector());
440    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
441    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
442    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
443    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000);
444
445    try (ConnectionImplementation conn =
446      (ConnectionImplementation) ConnectionFactory.createConnection(conf)) {
447      ClientThread client1 = new ClientThread(conn);
448      ClientThread client2 = new ClientThread(conn);
449      client1.start();
450      client2.start();
451      client1.join();
452      client2.join();
453      // One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and
454      // eventually fail since the sleep time is more than hbase client scanner timeout period.
455      // Other thread will wait to acquire userRegionLock.
456      // Have no idea which thread will be scheduled first. So need to check both threads.
457
458      // Both the threads will throw exception. One thread will throw exception since after
459      // acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period
460      // is 2 seconds.
461      // Other thread will throw exception since it was not able to get hold of user region lock
462      // within meta operation timeout period.
463      assertNotNull(client1.getException());
464      assertNotNull(client2.getException());
465
466      assertTrue(client1.getException() instanceof LockTimeoutException
467        ^ client2.getException() instanceof LockTimeoutException);
468    }
469  }
470
471  private final class ClientThread extends Thread {
472    private Exception exception;
473    private ConnectionImplementation connection;
474
475    private ClientThread(ConnectionImplementation connection) {
476      this.connection = connection;
477    }
478
479    @Override
480    public void run() {
481      byte[] currentKey = HConstants.EMPTY_START_ROW;
482      try {
483        connection.getRegionLocation(TABLE_NAME, currentKey, true);
484      } catch (IOException e) {
485        LOG.error("Thread id: " + this.getId() + "  exception: ", e);
486        this.exception = e;
487      }
488    }
489
490    public Exception getException() {
491      return exception;
492    }
493  }
494
495  public static class LockSleepInjector extends ExceptionInjector {
496    @Override
497    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) {
498      try {
499        Thread.sleep(5000);
500      } catch (InterruptedException e) {
501        LOG.info("Interrupted exception", e);
502      }
503    }
504
505    @Override
506    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) {
507    }
508
509    @Override
510    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) {
511    }
512  }
513}