001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CallQueueTooBigException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.MultiActionResultTooLarge;
035import org.apache.hadoop.hbase.NotServingRegionException;
036import org.apache.hadoop.hbase.RegionTooBusyException;
037import org.apache.hadoop.hbase.RetryImmediatelyException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
040import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
041import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.regionserver.RSRpcServices;
044import org.apache.hadoop.hbase.testclassification.ClientTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053
054import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
061
062@Category({ MediumTests.class, ClientTests.class })
063public class TestMetaCache {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestMetaCache.class);
068
069  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
070  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
071  private static final byte[] FAMILY = Bytes.toBytes("fam1");
072  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
073
074  private static HRegionServer badRS;
075
076  private Connection conn;
077  private MetricsConnection metrics;
078  private AsyncRegionLocator locator;
079
080  @BeforeClass
081  public static void setUpBeforeClass() throws Exception {
082    Configuration conf = TEST_UTIL.getConfiguration();
083    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName());
084    TEST_UTIL.startMiniCluster(1);
085    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
086    TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
087    badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
088    assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
089    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
090      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(2).build())
091      .build();
092    TEST_UTIL.createTable(desc, null);
093  }
094
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  @After
101  public void tearDown() throws IOException {
102    Closeables.close(conn, true);
103  }
104
105  private void setupConnection(int retry) throws IOException {
106    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
107    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retry);
108    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
109    conn = ConnectionFactory.createConnection(conf);
110    AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) conn.toAsyncConnection();
111    locator = asyncConn.getLocator();
112    metrics = asyncConn.getConnectionMetrics().get();
113  }
114
115  @Test
116  public void testPreserveMetaCacheOnException() throws Exception {
117    ((FakeRSRpcServices) badRS.getRSRpcServices())
118      .setExceptionInjector(new RoundRobinExceptionInjector());
119    setupConnection(1);
120    try (Table table = conn.getTable(TABLE_NAME)) {
121      byte[] row = Bytes.toBytes("row1");
122
123      Put put = new Put(row);
124      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
125      Get get = new Get(row);
126      Append append = new Append(row);
127      append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
128      Increment increment = new Increment(row);
129      increment.addColumn(FAMILY, QUALIFIER, 10);
130      Delete delete = new Delete(row);
131      delete.addColumn(FAMILY, QUALIFIER);
132      RowMutations mutations = new RowMutations(row);
133      mutations.add(put);
134      mutations.add(delete);
135
136      Exception exp;
137      boolean success;
138      for (int i = 0; i < 50; i++) {
139        exp = null;
140        success = false;
141        try {
142          table.put(put);
143          // If at least one operation succeeded, we should have cached the region location.
144          success = true;
145          table.get(get);
146          table.append(append);
147          table.increment(increment);
148          table.delete(delete);
149          table.mutateRow(mutations);
150        } catch (IOException ex) {
151          // Only keep track of the last exception that updated the meta cache
152          if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
153            exp = ex;
154          }
155        }
156        // Do not test if we did not touch the meta cache in this iteration.
157        if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
158          assertNull(locator.getRegionLocationInCache(TABLE_NAME, row));
159        } else if (success) {
160          assertNotNull(locator.getRegionLocationInCache(TABLE_NAME, row));
161        }
162      }
163    }
164  }
165
166  @Test
167  public void testCacheClearingOnCallQueueTooBig() throws Exception {
168    ((FakeRSRpcServices) badRS.getRSRpcServices())
169      .setExceptionInjector(new CallQueueTooBigExceptionInjector());
170    setupConnection(2);
171    Table table = conn.getTable(TABLE_NAME);
172    byte[] row = Bytes.toBytes("row1");
173
174    Put put = new Put(row);
175    put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
176    table.put(put);
177
178    // obtain the client metrics
179    long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
180    long preGetServerClears = metrics.metaCacheNumClearServer.getCount();
181
182    // attempt a get on the test table
183    Get get = new Get(row);
184    try {
185      table.get(get);
186      fail("Expected CallQueueTooBigException");
187    } catch (RetriesExhaustedException ree) {
188      // expected
189    }
190
191    // verify that no cache clearing took place
192    long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
193    long postGetServerClears = metrics.metaCacheNumClearServer.getCount();
194    assertEquals(preGetRegionClears, postGetRegionClears);
195    assertEquals(preGetServerClears, postGetServerClears);
196  }
197
198  public static List<Throwable> metaCachePreservingExceptions() {
199    return Arrays.asList(new RegionOpeningException(" "),
200      new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "),
201      new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "),
202      new CallQueueTooBigException());
203  }
204
205  public static class RegionServerWithFakeRpcServices extends HRegionServer {
206    private FakeRSRpcServices rsRpcServices;
207
208    public RegionServerWithFakeRpcServices(Configuration conf)
209      throws IOException, InterruptedException {
210      super(conf);
211    }
212
213    @Override
214    protected RSRpcServices createRpcServices() throws IOException {
215      this.rsRpcServices = new FakeRSRpcServices(this);
216      return rsRpcServices;
217    }
218
219    public void setExceptionInjector(ExceptionInjector injector) {
220      rsRpcServices.setExceptionInjector(injector);
221    }
222  }
223
224  public static class FakeRSRpcServices extends RSRpcServices {
225
226    private ExceptionInjector exceptions;
227
228    public FakeRSRpcServices(HRegionServer rs) throws IOException {
229      super(rs);
230      exceptions = new RoundRobinExceptionInjector();
231    }
232
233    public void setExceptionInjector(ExceptionInjector injector) {
234      this.exceptions = injector;
235    }
236
237    @Override
238    public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request)
239      throws ServiceException {
240      exceptions.throwOnGet(this, request);
241      return super.get(controller, request);
242    }
243
244    @Override
245    public ClientProtos.MutateResponse mutate(final RpcController controller,
246      final ClientProtos.MutateRequest request) throws ServiceException {
247      exceptions.throwOnMutate(this, request);
248      return super.mutate(controller, request);
249    }
250
251    @Override
252    public ClientProtos.ScanResponse scan(final RpcController controller,
253      final ClientProtos.ScanRequest request) throws ServiceException {
254      exceptions.throwOnScan(this, request);
255      return super.scan(controller, request);
256    }
257  }
258
259  public static abstract class ExceptionInjector {
260    protected boolean isTestTable(FakeRSRpcServices rpcServices,
261      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
262      try {
263        return TABLE_NAME
264          .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
265      } catch (IOException ioe) {
266        throw new ServiceException(ioe);
267      }
268    }
269
270    public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
271      throws ServiceException;
272
273    public abstract void throwOnMutate(FakeRSRpcServices rpcServices,
274      ClientProtos.MutateRequest request) throws ServiceException;
275
276    public abstract void throwOnScan(FakeRSRpcServices rpcServices,
277      ClientProtos.ScanRequest request) throws ServiceException;
278  }
279
280  /**
281   * Rotates through the possible cache clearing and non-cache clearing exceptions for requests.
282   */
283  public static class RoundRobinExceptionInjector extends ExceptionInjector {
284    private int numReqs = -1;
285    private int expCount = -1;
286    private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
287
288    @Override
289    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
290      throws ServiceException {
291      throwSomeExceptions(rpcServices, request.getRegion());
292    }
293
294    @Override
295    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
296      throws ServiceException {
297      throwSomeExceptions(rpcServices, request.getRegion());
298    }
299
300    @Override
301    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
302      throws ServiceException {
303      if (!request.hasScannerId()) {
304        // only handle initial scan requests
305        throwSomeExceptions(rpcServices, request.getRegion());
306      }
307    }
308
309    /**
310     * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically
311     * throw NotSevingRegionException which clears the meta cache. n
312     */
313    private void throwSomeExceptions(FakeRSRpcServices rpcServices,
314      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
315      if (!isTestTable(rpcServices, regionSpec)) {
316        return;
317      }
318
319      numReqs++;
320      // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
321      // meta cache preserving exceptions otherwise.
322      if (numReqs % 5 == 0) {
323        return;
324      } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
325        throw new ServiceException(new NotServingRegionException());
326      }
327      // Round robin between different special exceptions.
328      // This is not ideal since exception types are not tied to the operation performed here,
329      // But, we don't really care here if we throw MultiActionTooLargeException while doing
330      // single Gets.
331      expCount++;
332      Throwable t =
333        metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size());
334      throw new ServiceException(t);
335    }
336  }
337
338  /**
339   * Throws CallQueueTooBigException for all gets.
340   */
341  public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
342    @Override
343    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
344      throws ServiceException {
345      if (isTestTable(rpcServices, request.getRegion())) {
346        throw new ServiceException(new CallQueueTooBigException());
347      }
348    }
349
350    @Override
351    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
352      throws ServiceException {
353    }
354
355    @Override
356    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
357      throws ServiceException {
358    }
359  }
360}