001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static junit.framework.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CallQueueTooBigException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HColumnDescriptor;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HTableDescriptor;
036import org.apache.hadoop.hbase.MultiActionResultTooLarge;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.RegionTooBusyException;
039import org.apache.hadoop.hbase.RetryImmediatelyException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
042import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
043import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.RSRpcServices;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.AfterClass;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
061
062@Category({MediumTests.class, ClientTests.class})
063public class TestMetaCache {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestMetaCache.class);
068
069  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
070  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
071  private static final byte[] FAMILY = Bytes.toBytes("fam1");
072  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
073
074  private static HRegionServer badRS;
075
076  /**
077   * @throws java.lang.Exception
078   */
079  @BeforeClass
080  public static void setUpBeforeClass() throws Exception {
081    Configuration conf = TEST_UTIL.getConfiguration();
082    conf.setStrings(HConstants.REGION_SERVER_IMPL,
083        RegionServerWithFakeRpcServices.class.getName());
084    TEST_UTIL.startMiniCluster(1);
085    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
086    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME);
087    badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
088    assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
089    HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
090    HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
091    fam.setMaxVersions(2);
092    table.addFamily(fam);
093    TEST_UTIL.createTable(table, null);
094  }
095
096
097  /**
098   * @throws java.lang.Exception
099   */
100  @AfterClass
101  public static void tearDownAfterClass() throws Exception {
102    TEST_UTIL.shutdownMiniCluster();
103  }
104
105  @Test
106  public void testPreserveMetaCacheOnException() throws Exception {
107    ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(
108        new RoundRobinExceptionInjector());
109    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
110    conf.set("hbase.client.retries.number", "1");
111    ConnectionImplementation conn =
112        (ConnectionImplementation) ConnectionFactory.createConnection(conf);
113    try {
114      Table table = conn.getTable(TABLE_NAME);
115      byte[] row = Bytes.toBytes("row1");
116
117      Put put = new Put(row);
118      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
119      Get get = new Get(row);
120      Append append = new Append(row);
121      append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
122      Increment increment = new Increment(row);
123      increment.addColumn(FAMILY, QUALIFIER, 10);
124      Delete delete = new Delete(row);
125      delete.addColumn(FAMILY, QUALIFIER);
126      RowMutations mutations = new RowMutations(row);
127      mutations.add(put);
128      mutations.add(delete);
129
130      Exception exp;
131      boolean success;
132      for (int i = 0; i < 50; i++) {
133        exp = null;
134        success = false;
135        try {
136          table.put(put);
137          // If at least one operation succeeded, we should have cached the region location.
138          success = true;
139          table.get(get);
140          table.append(append);
141          table.increment(increment);
142          table.delete(delete);
143          table.mutateRow(mutations);
144        } catch (IOException ex) {
145          // Only keep track of the last exception that updated the meta cache
146          if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
147            exp = ex;
148          }
149        }
150        // Do not test if we did not touch the meta cache in this iteration.
151        if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
152          assertNull(conn.getCachedLocation(TABLE_NAME, row));
153        } else if (success) {
154          assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
155        }
156      }
157    } finally {
158      conn.close();
159    }
160  }
161
162  @Test
163  public void testCacheClearingOnCallQueueTooBig() throws Exception {
164    ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(
165        new CallQueueTooBigExceptionInjector());
166    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
167    conf.set("hbase.client.retries.number", "2");
168    conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true");
169    ConnectionImplementation conn =
170        (ConnectionImplementation) ConnectionFactory.createConnection(conf);
171    try {
172      Table table = conn.getTable(TABLE_NAME);
173      byte[] row = Bytes.toBytes("row1");
174
175      Put put = new Put(row);
176      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
177      table.put(put);
178
179      // obtain the client metrics
180      MetricsConnection metrics = conn.getConnectionMetrics();
181      long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
182      long preGetServerClears = metrics.metaCacheNumClearServer.getCount();
183
184      // attempt a get on the test table
185      Get get = new Get(row);
186      try {
187        table.get(get);
188        fail("Expected CallQueueTooBigException");
189      } catch (RetriesExhaustedException ree) {
190        // expected
191      }
192
193      // verify that no cache clearing took place
194      long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
195      long postGetServerClears = metrics.metaCacheNumClearServer.getCount();
196      assertEquals(preGetRegionClears, postGetRegionClears);
197      assertEquals(preGetServerClears, postGetServerClears);
198    } finally {
199      conn.close();
200    }
201  }
202
203  public static List<Throwable> metaCachePreservingExceptions() {
204    return new ArrayList<Throwable>() {{
205        add(new RegionOpeningException(" "));
206        add(new RegionTooBusyException("Some old message"));
207        add(new RpcThrottlingException(" "));
208        add(new MultiActionResultTooLarge(" "));
209        add(new RetryImmediatelyException(" "));
210        add(new CallQueueTooBigException());
211    }};
212  }
213
214  public static class RegionServerWithFakeRpcServices extends HRegionServer {
215    private FakeRSRpcServices rsRpcServices;
216
217    public RegionServerWithFakeRpcServices(Configuration conf)
218      throws IOException, InterruptedException {
219      super(conf);
220    }
221
222    @Override
223    protected RSRpcServices createRpcServices() throws IOException {
224      this.rsRpcServices = new FakeRSRpcServices(this);
225      return rsRpcServices;
226    }
227
228    public void setExceptionInjector(ExceptionInjector injector) {
229      rsRpcServices.setExceptionInjector(injector);
230    }
231  }
232
233  public static class FakeRSRpcServices extends RSRpcServices {
234
235    private ExceptionInjector exceptions;
236
237    public FakeRSRpcServices(HRegionServer rs) throws IOException {
238      super(rs);
239      exceptions = new RoundRobinExceptionInjector();
240    }
241
242    public void setExceptionInjector(ExceptionInjector injector) {
243      this.exceptions = injector;
244    }
245
246    @Override
247    public GetResponse get(final RpcController controller,
248                           final ClientProtos.GetRequest request) throws ServiceException {
249      exceptions.throwOnGet(this, request);
250      return super.get(controller, request);
251    }
252
253    @Override
254    public ClientProtos.MutateResponse mutate(final RpcController controller,
255        final ClientProtos.MutateRequest request) throws ServiceException {
256      exceptions.throwOnMutate(this, request);
257      return super.mutate(controller, request);
258    }
259
260    @Override
261    public ClientProtos.ScanResponse scan(final RpcController controller,
262        final ClientProtos.ScanRequest request) throws ServiceException {
263      exceptions.throwOnScan(this, request);
264      return super.scan(controller, request);
265    }
266  }
267
268  public static abstract class ExceptionInjector {
269    protected boolean isTestTable(FakeRSRpcServices rpcServices,
270                                  HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
271      try {
272        return TABLE_NAME.equals(
273            rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
274      } catch (IOException ioe) {
275        throw new ServiceException(ioe);
276      }
277    }
278
279    public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
280        throws ServiceException;
281
282    public abstract void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
283        throws ServiceException;
284
285    public abstract void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
286        throws ServiceException;
287  }
288
289  /**
290   * Rotates through the possible cache clearing and non-cache clearing exceptions
291   * for requests.
292   */
293  public static class RoundRobinExceptionInjector extends ExceptionInjector {
294    private int numReqs = -1;
295    private int expCount = -1;
296    private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
297
298    @Override
299    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
300        throws ServiceException {
301      throwSomeExceptions(rpcServices, request.getRegion());
302    }
303
304    @Override
305    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
306        throws ServiceException {
307      throwSomeExceptions(rpcServices, request.getRegion());
308    }
309
310    @Override
311    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
312        throws ServiceException {
313      if (!request.hasScannerId()) {
314        // only handle initial scan requests
315        throwSomeExceptions(rpcServices, request.getRegion());
316      }
317    }
318
319    /**
320     * Throw some exceptions. Mostly throw exceptions which do not clear meta cache.
321     * Periodically throw NotSevingRegionException which clears the meta cache.
322     * @throws ServiceException
323     */
324    private void throwSomeExceptions(FakeRSRpcServices rpcServices,
325                                     HBaseProtos.RegionSpecifier regionSpec)
326        throws ServiceException {
327      if (!isTestTable(rpcServices, regionSpec)) {
328        return;
329      }
330
331      numReqs++;
332      // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
333      // meta cache preserving exceptions otherwise.
334      if (numReqs % 5 ==0) {
335        return;
336      } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
337        throw new ServiceException(new NotServingRegionException());
338      }
339      // Round robin between different special exceptions.
340      // This is not ideal since exception types are not tied to the operation performed here,
341      // But, we don't really care here if we throw MultiActionTooLargeException while doing
342      // single Gets.
343      expCount++;
344      Throwable t = metaCachePreservingExceptions.get(
345          expCount % metaCachePreservingExceptions.size());
346      throw new ServiceException(t);
347    }
348  }
349
350  /**
351   * Throws CallQueueTooBigException for all gets.
352   */
353  public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
354    @Override
355    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
356        throws ServiceException {
357      if (isTestTable(rpcServices, request.getRegion())) {
358        throw new ServiceException(new CallQueueTooBigException());
359      }
360    }
361
362    @Override
363    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
364        throws ServiceException {
365    }
366
367    @Override
368    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
369        throws ServiceException {
370    }
371  }
372}