001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static junit.framework.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CallQueueTooBigException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HColumnDescriptor;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HTableDescriptor;
036import org.apache.hadoop.hbase.MultiActionResultTooLarge;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.RegionTooBusyException;
039import org.apache.hadoop.hbase.RetryImmediatelyException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
042import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
043import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.RSRpcServices;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.AfterClass;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064@Category({MediumTests.class, ClientTests.class})
065public class TestMetaCache {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069      HBaseClassTestRule.forClass(TestMetaCache.class);
070
071  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
072  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
073  private static final byte[] FAMILY = Bytes.toBytes("fam1");
074  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
075  private static HRegionServer badRS;
076  private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class);
077
078  /**
079   * @throws java.lang.Exception
080   */
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    Configuration conf = TEST_UTIL.getConfiguration();
084    conf.setStrings(HConstants.REGION_SERVER_IMPL,
085        RegionServerWithFakeRpcServices.class.getName());
086    TEST_UTIL.startMiniCluster(1);
087    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
088    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME);
089    badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
090    assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
091    HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
092    HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
093    fam.setMaxVersions(2);
094    table.addFamily(fam);
095    TEST_UTIL.createTable(table, null);
096  }
097
098
099  /**
100   * @throws java.lang.Exception
101   */
102  @AfterClass
103  public static void tearDownAfterClass() throws Exception {
104    TEST_UTIL.shutdownMiniCluster();
105  }
106
107  @Test
108  public void testPreserveMetaCacheOnException() throws Exception {
109    ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(
110        new RoundRobinExceptionInjector());
111    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
112    conf.set("hbase.client.retries.number", "1");
113    ConnectionImplementation conn =
114        (ConnectionImplementation) ConnectionFactory.createConnection(conf);
115    try {
116      Table table = conn.getTable(TABLE_NAME);
117      byte[] row = Bytes.toBytes("row1");
118
119      Put put = new Put(row);
120      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
121      Get get = new Get(row);
122      Append append = new Append(row);
123      append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
124      Increment increment = new Increment(row);
125      increment.addColumn(FAMILY, QUALIFIER, 10);
126      Delete delete = new Delete(row);
127      delete.addColumn(FAMILY, QUALIFIER);
128      RowMutations mutations = new RowMutations(row);
129      mutations.add(put);
130      mutations.add(delete);
131
132      Exception exp;
133      boolean success;
134      for (int i = 0; i < 50; i++) {
135        exp = null;
136        success = false;
137        try {
138          table.put(put);
139          // If at least one operation succeeded, we should have cached the region location.
140          success = true;
141          table.get(get);
142          table.append(append);
143          table.increment(increment);
144          table.delete(delete);
145          table.mutateRow(mutations);
146        } catch (IOException ex) {
147          // Only keep track of the last exception that updated the meta cache
148          if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
149            exp = ex;
150          }
151        }
152        // Do not test if we did not touch the meta cache in this iteration.
153        if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
154          assertNull(conn.getCachedLocation(TABLE_NAME, row));
155        } else if (success) {
156          assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
157        }
158      }
159    } finally {
160      conn.close();
161    }
162  }
163
164  @Test
165  public void testCacheClearingOnCallQueueTooBig() throws Exception {
166    ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(
167        new CallQueueTooBigExceptionInjector());
168    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
169    conf.set("hbase.client.retries.number", "2");
170    conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true");
171    ConnectionImplementation conn =
172        (ConnectionImplementation) ConnectionFactory.createConnection(conf);
173    try {
174      Table table = conn.getTable(TABLE_NAME);
175      byte[] row = Bytes.toBytes("row1");
176
177      Put put = new Put(row);
178      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
179      table.put(put);
180
181      // obtain the client metrics
182      MetricsConnection metrics = conn.getConnectionMetrics();
183      long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
184      long preGetServerClears = metrics.metaCacheNumClearServer.getCount();
185
186      // attempt a get on the test table
187      Get get = new Get(row);
188      try {
189        table.get(get);
190        fail("Expected CallQueueTooBigException");
191      } catch (RetriesExhaustedException ree) {
192        // expected
193      }
194
195      // verify that no cache clearing took place
196      long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
197      long postGetServerClears = metrics.metaCacheNumClearServer.getCount();
198      assertEquals(preGetRegionClears, postGetRegionClears);
199      assertEquals(preGetServerClears, postGetServerClears);
200    } finally {
201      conn.close();
202    }
203  }
204
205  public static List<Throwable> metaCachePreservingExceptions() {
206    return new ArrayList<Throwable>() {{
207        add(new RegionOpeningException(" "));
208        add(new RegionTooBusyException("Some old message"));
209        add(new RpcThrottlingException(" "));
210        add(new MultiActionResultTooLarge(" "));
211        add(new RetryImmediatelyException(" "));
212        add(new CallQueueTooBigException());
213    }};
214  }
215
216  public static class RegionServerWithFakeRpcServices extends HRegionServer {
217    private FakeRSRpcServices rsRpcServices;
218
219    public RegionServerWithFakeRpcServices(Configuration conf)
220      throws IOException, InterruptedException {
221      super(conf);
222    }
223
224    @Override
225    protected RSRpcServices createRpcServices() throws IOException {
226      this.rsRpcServices = new FakeRSRpcServices(this);
227      return rsRpcServices;
228    }
229
230    public void setExceptionInjector(ExceptionInjector injector) {
231      rsRpcServices.setExceptionInjector(injector);
232    }
233  }
234
235  public static class FakeRSRpcServices extends RSRpcServices {
236
237    private ExceptionInjector exceptions;
238
239    public FakeRSRpcServices(HRegionServer rs) throws IOException {
240      super(rs);
241      exceptions = new RoundRobinExceptionInjector();
242    }
243
244    public void setExceptionInjector(ExceptionInjector injector) {
245      this.exceptions = injector;
246    }
247
248    @Override
249    public GetResponse get(final RpcController controller,
250                           final ClientProtos.GetRequest request) throws ServiceException {
251      exceptions.throwOnGet(this, request);
252      return super.get(controller, request);
253    }
254
255    @Override
256    public ClientProtos.MutateResponse mutate(final RpcController controller,
257        final ClientProtos.MutateRequest request) throws ServiceException {
258      exceptions.throwOnMutate(this, request);
259      return super.mutate(controller, request);
260    }
261
262    @Override
263    public ClientProtos.ScanResponse scan(final RpcController controller,
264        final ClientProtos.ScanRequest request) throws ServiceException {
265      exceptions.throwOnScan(this, request);
266      return super.scan(controller, request);
267    }
268  }
269
270  public static abstract class ExceptionInjector {
271    protected boolean isTestTable(FakeRSRpcServices rpcServices,
272                                  HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
273      try {
274        return TABLE_NAME.equals(
275            rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
276      } catch (IOException ioe) {
277        throw new ServiceException(ioe);
278      }
279    }
280
281    public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
282        throws ServiceException;
283
284    public abstract void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
285        throws ServiceException;
286
287    public abstract void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
288        throws ServiceException;
289  }
290
291  /**
292   * Rotates through the possible cache clearing and non-cache clearing exceptions
293   * for requests.
294   */
295  public static class RoundRobinExceptionInjector extends ExceptionInjector {
296    private int numReqs = -1;
297    private int expCount = -1;
298    private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
299
300    @Override
301    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
302        throws ServiceException {
303      throwSomeExceptions(rpcServices, request.getRegion());
304    }
305
306    @Override
307    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
308        throws ServiceException {
309      throwSomeExceptions(rpcServices, request.getRegion());
310    }
311
312    @Override
313    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
314        throws ServiceException {
315      if (!request.hasScannerId()) {
316        // only handle initial scan requests
317        throwSomeExceptions(rpcServices, request.getRegion());
318      }
319    }
320
321    /**
322     * Throw some exceptions. Mostly throw exceptions which do not clear meta cache.
323     * Periodically throw NotSevingRegionException which clears the meta cache.
324     * @throws ServiceException
325     */
326    private void throwSomeExceptions(FakeRSRpcServices rpcServices,
327                                     HBaseProtos.RegionSpecifier regionSpec)
328        throws ServiceException {
329      if (!isTestTable(rpcServices, regionSpec)) {
330        return;
331      }
332
333      numReqs++;
334      // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
335      // meta cache preserving exceptions otherwise.
336      if (numReqs % 5 ==0) {
337        return;
338      } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
339        throw new ServiceException(new NotServingRegionException());
340      }
341      // Round robin between different special exceptions.
342      // This is not ideal since exception types are not tied to the operation performed here,
343      // But, we don't really care here if we throw MultiActionTooLargeException while doing
344      // single Gets.
345      expCount++;
346      Throwable t = metaCachePreservingExceptions.get(
347          expCount % metaCachePreservingExceptions.size());
348      throw new ServiceException(t);
349    }
350  }
351
352  /**
353   * Throws CallQueueTooBigException for all gets.
354   */
355  public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
356    @Override
357    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
358        throws ServiceException {
359      if (isTestTable(rpcServices, request.getRegion())) {
360        throw new ServiceException(new CallQueueTooBigException());
361      }
362    }
363
364    @Override
365    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
366        throws ServiceException {
367    }
368
369    @Override
370    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
371        throws ServiceException {
372    }
373  }
374
375  @Test
376  public void testUserRegionLockThrowsException() throws IOException, InterruptedException {
377    ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector());
378    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
379    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
380    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
381    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000);
382
383    try (ConnectionImplementation conn =
384            (ConnectionImplementation) ConnectionFactory.createConnection(conf)) {
385      ClientThread client1 = new ClientThread(conn);
386      ClientThread client2 = new ClientThread(conn);
387      client1.start();
388      client2.start();
389      client1.join();
390      client2.join();
391      // One thread will get the lock but will sleep in  LockExceptionInjector#throwOnScan and
392      // eventually fail since the sleep time is more than hbase client scanner timeout period.
393      // Other thread will wait to acquire userRegionLock.
394      // Have no idea which thread will be scheduled first. So need to check both threads.
395
396      // Both the threads will throw exception. One thread will throw exception since after
397      // acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period
398      // is 2 seconds.
399      // Other thread will throw exception since it was not able to get hold of user region lock
400      // within meta operation timeout period.
401      assertNotNull(client1.getException());
402      assertNotNull(client2.getException());
403
404      assertTrue(client1.getException() instanceof LockTimeoutException
405          ^ client2.getException() instanceof LockTimeoutException);
406    }
407  }
408
409  private final class ClientThread extends Thread {
410    private Exception exception;
411    private ConnectionImplementation connection;
412
413    private ClientThread(ConnectionImplementation connection) {
414      this.connection = connection;
415    }
416    @Override
417    public void run() {
418      byte[] currentKey = HConstants.EMPTY_START_ROW;
419      try {
420        connection.getRegionLocation(TABLE_NAME, currentKey, true);
421      } catch (IOException e) {
422        LOG.error("Thread id: " + this.getId() + "  exception: ", e);
423        this.exception = e;
424      }
425    }
426    public Exception getException() {
427      return exception;
428    }
429  }
430
431  public static class LockSleepInjector extends ExceptionInjector {
432    @Override
433    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) {
434      try {
435        Thread.sleep(5000);
436      } catch (InterruptedException e) {
437        LOG.info("Interrupted exception", e);
438      }
439    }
440
441    @Override
442    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { }
443
444    @Override
445    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { }
446  }
447}