001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CallQueueTooBigException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.MultiActionResultTooLarge;
035import org.apache.hadoop.hbase.NotServingRegionException;
036import org.apache.hadoop.hbase.RegionTooBusyException;
037import org.apache.hadoop.hbase.RetryImmediatelyException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
040import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
041import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.regionserver.RSRpcServices;
044import org.apache.hadoop.hbase.testclassification.ClientTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053
054import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
061
062@Category({MediumTests.class, ClientTests.class})
063public class TestMetaCache {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestMetaCache.class);
068
069  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
070  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
071  private static final byte[] FAMILY = Bytes.toBytes("fam1");
072  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
073
074  private static HRegionServer badRS;
075
076  private Connection conn;
077  private MetricsConnection metrics;
078  private AsyncRegionLocator locator;
079
080  @BeforeClass
081  public static void setUpBeforeClass() throws Exception {
082    Configuration conf = TEST_UTIL.getConfiguration();
083    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName());
084    TEST_UTIL.startMiniCluster(1);
085    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
086    TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
087    badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
088    assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
089    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
090      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(2).build())
091      .build();
092    TEST_UTIL.createTable(desc, null);
093  }
094
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  @After
101  public void tearDown() throws IOException {
102    Closeables.close(conn, true);
103  }
104
105  private void setupConnection(int retry) throws IOException {
106    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
107    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retry);
108    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
109    conn = ConnectionFactory.createConnection(conf);
110    AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) conn.toAsyncConnection();
111    locator = asyncConn.getLocator();
112    metrics = asyncConn.getConnectionMetrics().get();
113  }
114
115  @Test
116  public void testPreserveMetaCacheOnException() throws Exception {
117    ((FakeRSRpcServices) badRS.getRSRpcServices())
118      .setExceptionInjector(new RoundRobinExceptionInjector());
119    setupConnection(1);
120    try (Table table = conn.getTable(TABLE_NAME)){
121      byte[] row = Bytes.toBytes("row1");
122
123      Put put = new Put(row);
124      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
125      Get get = new Get(row);
126      Append append = new Append(row);
127      append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
128      Increment increment = new Increment(row);
129      increment.addColumn(FAMILY, QUALIFIER, 10);
130      Delete delete = new Delete(row);
131      delete.addColumn(FAMILY, QUALIFIER);
132      RowMutations mutations = new RowMutations(row);
133      mutations.add(put);
134      mutations.add(delete);
135
136      Exception exp;
137      boolean success;
138      for (int i = 0; i < 50; i++) {
139        exp = null;
140        success = false;
141        try {
142          table.put(put);
143          // If at least one operation succeeded, we should have cached the region location.
144          success = true;
145          table.get(get);
146          table.append(append);
147          table.increment(increment);
148          table.delete(delete);
149          table.mutateRow(mutations);
150        } catch (IOException ex) {
151          // Only keep track of the last exception that updated the meta cache
152          if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
153            exp = ex;
154          }
155        }
156        // Do not test if we did not touch the meta cache in this iteration.
157        if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
158          assertNull(locator.getRegionLocationInCache(TABLE_NAME, row));
159        } else if (success) {
160          assertNotNull(locator.getRegionLocationInCache(TABLE_NAME, row));
161        }
162      }
163    }
164  }
165
166  @Test
167  public void testCacheClearingOnCallQueueTooBig() throws Exception {
168    ((FakeRSRpcServices) badRS.getRSRpcServices())
169      .setExceptionInjector(new CallQueueTooBigExceptionInjector());
170    setupConnection(2);
171    Table table = conn.getTable(TABLE_NAME);
172    byte[] row = Bytes.toBytes("row1");
173
174    Put put = new Put(row);
175    put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
176    table.put(put);
177
178    // obtain the client metrics
179    long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
180    long preGetServerClears = metrics.metaCacheNumClearServer.getCount();
181
182    // attempt a get on the test table
183    Get get = new Get(row);
184    try {
185      table.get(get);
186      fail("Expected CallQueueTooBigException");
187    } catch (RetriesExhaustedException ree) {
188      // expected
189    }
190
191    // verify that no cache clearing took place
192    long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
193    long postGetServerClears = metrics.metaCacheNumClearServer.getCount();
194    assertEquals(preGetRegionClears, postGetRegionClears);
195    assertEquals(preGetServerClears, postGetServerClears);
196  }
197
198  public static List<Throwable> metaCachePreservingExceptions() {
199    return Arrays.asList(new RegionOpeningException(" "),
200      new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "),
201      new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "),
202      new CallQueueTooBigException());
203  }
204
205  public static class RegionServerWithFakeRpcServices extends HRegionServer {
206    private FakeRSRpcServices rsRpcServices;
207
208    public RegionServerWithFakeRpcServices(Configuration conf)
209      throws IOException, InterruptedException {
210      super(conf);
211    }
212
213    @Override
214    protected RSRpcServices createRpcServices() throws IOException {
215      this.rsRpcServices = new FakeRSRpcServices(this);
216      return rsRpcServices;
217    }
218
219    public void setExceptionInjector(ExceptionInjector injector) {
220      rsRpcServices.setExceptionInjector(injector);
221    }
222  }
223
224  public static class FakeRSRpcServices extends RSRpcServices {
225
226    private ExceptionInjector exceptions;
227
228    public FakeRSRpcServices(HRegionServer rs) throws IOException {
229      super(rs);
230      exceptions = new RoundRobinExceptionInjector();
231    }
232
233    public void setExceptionInjector(ExceptionInjector injector) {
234      this.exceptions = injector;
235    }
236
237    @Override
238    public GetResponse get(final RpcController controller,
239                           final ClientProtos.GetRequest request) throws ServiceException {
240      exceptions.throwOnGet(this, request);
241      return super.get(controller, request);
242    }
243
244    @Override
245    public ClientProtos.MutateResponse mutate(final RpcController controller,
246        final ClientProtos.MutateRequest request) throws ServiceException {
247      exceptions.throwOnMutate(this, request);
248      return super.mutate(controller, request);
249    }
250
251    @Override
252    public ClientProtos.ScanResponse scan(final RpcController controller,
253        final ClientProtos.ScanRequest request) throws ServiceException {
254      exceptions.throwOnScan(this, request);
255      return super.scan(controller, request);
256    }
257  }
258
259  public static abstract class ExceptionInjector {
260    protected boolean isTestTable(FakeRSRpcServices rpcServices,
261                                  HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
262      try {
263        return TABLE_NAME.equals(
264            rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
265      } catch (IOException ioe) {
266        throw new ServiceException(ioe);
267      }
268    }
269
270    public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
271        throws ServiceException;
272
273    public abstract void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
274        throws ServiceException;
275
276    public abstract void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
277        throws ServiceException;
278  }
279
280  /**
281   * Rotates through the possible cache clearing and non-cache clearing exceptions
282   * for requests.
283   */
284  public static class RoundRobinExceptionInjector extends ExceptionInjector {
285    private int numReqs = -1;
286    private int expCount = -1;
287    private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
288
289    @Override
290    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
291        throws ServiceException {
292      throwSomeExceptions(rpcServices, request.getRegion());
293    }
294
295    @Override
296    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
297        throws ServiceException {
298      throwSomeExceptions(rpcServices, request.getRegion());
299    }
300
301    @Override
302    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
303        throws ServiceException {
304      if (!request.hasScannerId()) {
305        // only handle initial scan requests
306        throwSomeExceptions(rpcServices, request.getRegion());
307      }
308    }
309
310    /**
311     * Throw some exceptions. Mostly throw exceptions which do not clear meta cache.
312     * Periodically throw NotSevingRegionException which clears the meta cache.
313     * @throws ServiceException
314     */
315    private void throwSomeExceptions(FakeRSRpcServices rpcServices,
316                                     HBaseProtos.RegionSpecifier regionSpec)
317        throws ServiceException {
318      if (!isTestTable(rpcServices, regionSpec)) {
319        return;
320      }
321
322      numReqs++;
323      // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
324      // meta cache preserving exceptions otherwise.
325      if (numReqs % 5 ==0) {
326        return;
327      } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
328        throw new ServiceException(new NotServingRegionException());
329      }
330      // Round robin between different special exceptions.
331      // This is not ideal since exception types are not tied to the operation performed here,
332      // But, we don't really care here if we throw MultiActionTooLargeException while doing
333      // single Gets.
334      expCount++;
335      Throwable t = metaCachePreservingExceptions.get(
336          expCount % metaCachePreservingExceptions.size());
337      throw new ServiceException(t);
338    }
339  }
340
341  /**
342   * Throws CallQueueTooBigException for all gets.
343   */
344  public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
345    @Override
346    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
347        throws ServiceException {
348      if (isTestTable(rpcServices, request.getRegion())) {
349        throw new ServiceException(new CallQueueTooBigException());
350      }
351    }
352
353    @Override
354    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
355        throws ServiceException {
356    }
357
358    @Override
359    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
360        throws ServiceException {
361    }
362  }
363}