001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CallQueueTooBigException; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.MultiActionResultTooLarge; 035import org.apache.hadoop.hbase.NotServingRegionException; 036import org.apache.hadoop.hbase.RegionTooBusyException; 037import org.apache.hadoop.hbase.RetryImmediatelyException; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 040import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 041import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.regionserver.RSRpcServices; 044import org.apache.hadoop.hbase.testclassification.ClientTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.After; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053 054import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 057 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 061 062@Category({ MediumTests.class, ClientTests.class }) 063public class TestMetaCache { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestMetaCache.class); 068 069 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 070 private static final TableName TABLE_NAME = TableName.valueOf("test_table"); 071 private static final byte[] FAMILY = Bytes.toBytes("fam1"); 072 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 073 074 private static HRegionServer badRS; 075 076 private Connection conn; 077 private MetricsConnection metrics; 078 private AsyncRegionLocator locator; 079 080 @BeforeClass 081 public static void setUpBeforeClass() throws Exception { 082 Configuration conf = TEST_UTIL.getConfiguration(); 083 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName()); 084 TEST_UTIL.startMiniCluster(1); 085 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 086 TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 087 badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0); 088 assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices); 089 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) 090 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(2).build()) 091 .build(); 092 TEST_UTIL.createTable(desc, null); 093 } 094 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 TEST_UTIL.shutdownMiniCluster(); 098 } 099 100 @After 101 public void tearDown() throws IOException { 102 Closeables.close(conn, true); 103 } 104 105 private void setupConnection(int retry) throws IOException { 106 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 107 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retry); 108 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 109 conn = ConnectionFactory.createConnection(conf); 110 AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) conn.toAsyncConnection(); 111 locator = asyncConn.getLocator(); 112 metrics = asyncConn.getConnectionMetrics().get(); 113 } 114 115 @Test 116 public void testPreserveMetaCacheOnException() throws Exception { 117 ((FakeRSRpcServices) badRS.getRSRpcServices()) 118 .setExceptionInjector(new RoundRobinExceptionInjector()); 119 setupConnection(1); 120 try (Table table = conn.getTable(TABLE_NAME)) { 121 byte[] row = Bytes.toBytes("row1"); 122 123 Put put = new Put(row); 124 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 125 Get get = new Get(row); 126 Append append = new Append(row); 127 append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11)); 128 Increment increment = new Increment(row); 129 increment.addColumn(FAMILY, QUALIFIER, 10); 130 Delete delete = new Delete(row); 131 delete.addColumn(FAMILY, QUALIFIER); 132 RowMutations mutations = new RowMutations(row); 133 mutations.add(put); 134 mutations.add(delete); 135 136 Exception exp; 137 boolean success; 138 for (int i = 0; i < 50; i++) { 139 exp = null; 140 success = false; 141 try { 142 table.put(put); 143 // If at least one operation succeeded, we should have cached the region location. 144 success = true; 145 table.get(get); 146 table.append(append); 147 table.increment(increment); 148 table.delete(delete); 149 table.mutateRow(mutations); 150 } catch (IOException ex) { 151 // Only keep track of the last exception that updated the meta cache 152 if (ClientExceptionsUtil.isMetaClearingException(ex) || success) { 153 exp = ex; 154 } 155 } 156 // Do not test if we did not touch the meta cache in this iteration. 157 if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) { 158 assertNull(locator.getRegionLocationInCache(TABLE_NAME, row)); 159 } else if (success) { 160 assertNotNull(locator.getRegionLocationInCache(TABLE_NAME, row)); 161 } 162 } 163 } 164 } 165 166 @Test 167 public void testCacheClearingOnCallQueueTooBig() throws Exception { 168 ((FakeRSRpcServices) badRS.getRSRpcServices()) 169 .setExceptionInjector(new CallQueueTooBigExceptionInjector()); 170 setupConnection(2); 171 Table table = conn.getTable(TABLE_NAME); 172 byte[] row = Bytes.toBytes("row1"); 173 174 Put put = new Put(row); 175 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 176 table.put(put); 177 178 // obtain the client metrics 179 long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount(); 180 long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount(); 181 182 // attempt a get on the test table 183 Get get = new Get(row); 184 try { 185 table.get(get); 186 fail("Expected CallQueueTooBigException"); 187 } catch (RetriesExhaustedException ree) { 188 // expected 189 } 190 191 // verify that no cache clearing took place 192 long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount(); 193 long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount(); 194 assertEquals(preGetRegionClears, postGetRegionClears); 195 assertEquals(preGetServerClears, postGetServerClears); 196 } 197 198 public static List<Throwable> metaCachePreservingExceptions() { 199 return Arrays.asList(new RegionOpeningException(" "), 200 new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "), 201 new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "), 202 new CallQueueTooBigException()); 203 } 204 205 public static class RegionServerWithFakeRpcServices extends HRegionServer { 206 private FakeRSRpcServices rsRpcServices; 207 208 public RegionServerWithFakeRpcServices(Configuration conf) 209 throws IOException, InterruptedException { 210 super(conf); 211 } 212 213 @Override 214 protected RSRpcServices createRpcServices() throws IOException { 215 this.rsRpcServices = new FakeRSRpcServices(this); 216 return rsRpcServices; 217 } 218 219 public void setExceptionInjector(ExceptionInjector injector) { 220 rsRpcServices.setExceptionInjector(injector); 221 } 222 } 223 224 public static class FakeRSRpcServices extends RSRpcServices { 225 226 private ExceptionInjector exceptions; 227 228 public FakeRSRpcServices(HRegionServer rs) throws IOException { 229 super(rs); 230 exceptions = new RoundRobinExceptionInjector(); 231 } 232 233 public void setExceptionInjector(ExceptionInjector injector) { 234 this.exceptions = injector; 235 } 236 237 @Override 238 public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request) 239 throws ServiceException { 240 exceptions.throwOnGet(this, request); 241 return super.get(controller, request); 242 } 243 244 @Override 245 public ClientProtos.MutateResponse mutate(final RpcController controller, 246 final ClientProtos.MutateRequest request) throws ServiceException { 247 exceptions.throwOnMutate(this, request); 248 return super.mutate(controller, request); 249 } 250 251 @Override 252 public ClientProtos.ScanResponse scan(final RpcController controller, 253 final ClientProtos.ScanRequest request) throws ServiceException { 254 exceptions.throwOnScan(this, request); 255 return super.scan(controller, request); 256 } 257 } 258 259 public static abstract class ExceptionInjector { 260 protected boolean isTestTable(FakeRSRpcServices rpcServices, 261 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 262 try { 263 return TABLE_NAME 264 .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName()); 265 } catch (IOException ioe) { 266 throw new ServiceException(ioe); 267 } 268 } 269 270 public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 271 throws ServiceException; 272 273 public abstract void throwOnMutate(FakeRSRpcServices rpcServices, 274 ClientProtos.MutateRequest request) throws ServiceException; 275 276 public abstract void throwOnScan(FakeRSRpcServices rpcServices, 277 ClientProtos.ScanRequest request) throws ServiceException; 278 } 279 280 /** 281 * Rotates through the possible cache clearing and non-cache clearing exceptions for requests. 282 */ 283 public static class RoundRobinExceptionInjector extends ExceptionInjector { 284 private int numReqs = -1; 285 private int expCount = -1; 286 private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions(); 287 288 @Override 289 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 290 throws ServiceException { 291 throwSomeExceptions(rpcServices, request.getRegion()); 292 } 293 294 @Override 295 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 296 throws ServiceException { 297 throwSomeExceptions(rpcServices, request.getRegion()); 298 } 299 300 @Override 301 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 302 throws ServiceException { 303 if (!request.hasScannerId()) { 304 // only handle initial scan requests 305 throwSomeExceptions(rpcServices, request.getRegion()); 306 } 307 } 308 309 /** 310 * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically 311 * throw NotSevingRegionException which clears the meta cache. 312 */ 313 private void throwSomeExceptions(FakeRSRpcServices rpcServices, 314 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 315 if (!isTestTable(rpcServices, regionSpec)) { 316 return; 317 } 318 319 numReqs++; 320 // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw 321 // meta cache preserving exceptions otherwise. 322 if (numReqs % 5 == 0) { 323 return; 324 } else if (numReqs % 5 == 1 || numReqs % 5 == 2) { 325 throw new ServiceException(new NotServingRegionException()); 326 } 327 // Round robin between different special exceptions. 328 // This is not ideal since exception types are not tied to the operation performed here, 329 // But, we don't really care here if we throw MultiActionTooLargeException while doing 330 // single Gets. 331 expCount++; 332 Throwable t = 333 metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size()); 334 throw new ServiceException(t); 335 } 336 } 337 338 /** 339 * Throws CallQueueTooBigException for all gets. 340 */ 341 public static class CallQueueTooBigExceptionInjector extends ExceptionInjector { 342 @Override 343 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 344 throws ServiceException { 345 if (isTestTable(rpcServices, request.getRegion())) { 346 throw new ServiceException(new CallQueueTooBigException()); 347 } 348 } 349 350 @Override 351 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 352 throws ServiceException { 353 } 354 355 @Override 356 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 357 throws ServiceException { 358 } 359 } 360}