001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.client.TableDescriptor; 035import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 036import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 037import org.apache.hadoop.hbase.regionserver.HRegion; 038import org.apache.hadoop.hbase.regionserver.HRegionServer; 039import org.apache.hadoop.hbase.regionserver.HStore; 040import org.apache.hadoop.hbase.regionserver.Region; 041import org.apache.hadoop.hbase.regionserver.RegionServerServices; 042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 043import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 044import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 045import org.apache.hadoop.hbase.security.User; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.testclassification.MiscTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 062 063/** 064 * Test for the case where a regionserver going down has enough cycles to do damage to regions that 065 * have actually been assigned elsehwere. 066 * <p> 067 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on 068 * two servers at the same time -- all can work fine until the case where the region on the dying 069 * server decides to compact or otherwise change the region file set. The region in its new location 070 * will then get a surprise when it tries to do something w/ a file removed by the region in its old 071 * location on dying server. 072 * <p> 073 * Making a test for this case is a little tough in that even if a file is deleted up on the 074 * namenode, if the file was opened before the delete, it will continue to let reads happen until 075 * something changes the state of cached blocks in the dfsclient that was already open (a block from 076 * the deleted file is cleaned from the datanode by NN). 077 * <p> 078 * What we will do below is do an explicit check for existence on the files listed in the region 079 * that has had some files removed because of a compaction. This sort of hurry's along and makes 080 * certain what is a chance occurance. 081 */ 082@Category({ MiscTests.class, LargeTests.class }) 083public class TestIOFencing { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestIOFencing.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class); 090 static { 091 // Uncomment the following lines if more verbosity is needed for 092 // debugging (see HBASE-12285 for details). 093 // ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 094 // ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 095 // ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 096 // ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")) 097 // .getLogger().setLevel(Level.ALL); 098 // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); 099 } 100 101 public abstract static class CompactionBlockerRegion extends HRegion { 102 AtomicInteger compactCount = new AtomicInteger(); 103 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0); 104 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0); 105 106 @SuppressWarnings("deprecation") 107 public CompactionBlockerRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam, 108 RegionInfo info, TableDescriptor htd, RegionServerServices rsServices) { 109 super(tableDir, log, fs, confParam, info, htd, rsServices); 110 } 111 112 public void stopCompactions() { 113 compactionsBlocked = new CountDownLatch(1); 114 compactionsWaiting = new CountDownLatch(1); 115 } 116 117 public void allowCompactions() { 118 LOG.debug("allowing compactions"); 119 compactionsBlocked.countDown(); 120 } 121 122 public void waitForCompactionToBlock() throws IOException { 123 try { 124 LOG.debug("waiting for compaction to block"); 125 compactionsWaiting.await(); 126 LOG.debug("compaction block reached"); 127 } catch (InterruptedException ex) { 128 throw new IOException(ex); 129 } 130 } 131 132 @Override 133 public boolean compact(CompactionContext compaction, HStore store, 134 ThroughputController throughputController) throws IOException { 135 try { 136 return super.compact(compaction, store, throughputController); 137 } finally { 138 compactCount.getAndIncrement(); 139 } 140 } 141 142 @Override 143 public boolean compact(CompactionContext compaction, HStore store, 144 ThroughputController throughputController, User user) throws IOException { 145 try { 146 return super.compact(compaction, store, throughputController, user); 147 } finally { 148 compactCount.getAndIncrement(); 149 } 150 } 151 152 public int countStoreFiles() { 153 int count = 0; 154 for (HStore store : stores.values()) { 155 count += store.getStorefilesCount(); 156 } 157 return count; 158 } 159 } 160 161 /** 162 * An override of HRegion that allows us park compactions in a holding pattern and then when 163 * appropriate for the test, allow them proceed again. 164 */ 165 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion { 166 167 public BlockCompactionsInPrepRegion(Path tableDir, WAL log, FileSystem fs, 168 Configuration confParam, RegionInfo info, TableDescriptor htd, 169 RegionServerServices rsServices) { 170 super(tableDir, log, fs, confParam, info, htd, rsServices); 171 } 172 173 @Override 174 protected void doRegionCompactionPrep() throws IOException { 175 compactionsWaiting.countDown(); 176 try { 177 compactionsBlocked.await(); 178 } catch (InterruptedException ex) { 179 throw new IOException(); 180 } 181 super.doRegionCompactionPrep(); 182 } 183 } 184 185 /** 186 * An override of HRegion that allows us park compactions in a holding pattern and then when 187 * appropriate for the test, allow them proceed again. This allows the compaction entry to go the 188 * WAL before blocking, but blocks afterwards 189 */ 190 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion { 191 public BlockCompactionsInCompletionRegion(Path tableDir, WAL log, FileSystem fs, 192 Configuration confParam, RegionInfo info, TableDescriptor htd, 193 RegionServerServices rsServices) { 194 super(tableDir, log, fs, confParam, info, htd, rsServices); 195 } 196 197 @Override 198 protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup) 199 throws IOException { 200 return new BlockCompactionsInCompletionHStore(this, family, this.conf, warmup); 201 } 202 } 203 204 public static class BlockCompactionsInCompletionHStore extends HStore { 205 CompactionBlockerRegion r; 206 207 protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family, 208 Configuration confParam, boolean warmup) throws IOException { 209 super(region, family, confParam, warmup); 210 r = (CompactionBlockerRegion) region; 211 } 212 213 @Override 214 protected void refreshStoreSizeAndTotalBytes() throws IOException { 215 if (r != null) { 216 try { 217 r.compactionsWaiting.countDown(); 218 r.compactionsBlocked.await(); 219 } catch (InterruptedException ex) { 220 throw new IOException(ex); 221 } 222 } 223 super.refreshStoreSizeAndTotalBytes(); 224 } 225 } 226 227 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 228 private final static TableName TABLE_NAME = TableName.valueOf("tabletest"); 229 private final static byte[] FAMILY = Bytes.toBytes("family"); 230 private static final int FIRST_BATCH_COUNT = 4000; 231 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT; 232 233 /** 234 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 235 * compaction until after we have killed the server and the region has come up on a new 236 * regionserver altogether. This fakes the double assignment case where region in one location 237 * changes the files out from underneath a region being served elsewhere. 238 */ 239 @Test 240 public void testFencingAroundCompaction() throws Exception { 241 for (MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 242 doTest(BlockCompactionsInPrepRegion.class, policy); 243 } 244 } 245 246 /** 247 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 248 * compaction completion until after we have killed the server and the region has come up on a new 249 * regionserver altogether. This fakes the double assignment case where region in one location 250 * changes the files out from underneath a region being served elsewhere. 251 */ 252 @Test 253 public void testFencingAroundCompactionAfterWALSync() throws Exception { 254 for (MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 255 doTest(BlockCompactionsInCompletionRegion.class, policy); 256 } 257 } 258 259 public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception { 260 Configuration c = TEST_UTIL.getConfiguration(); 261 // Insert our custom region 262 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); 263 // Encourage plenty of flushes 264 c.setLong("hbase.hregion.memstore.flush.size", 25000); 265 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); 266 // Only run compaction when we tell it to 267 c.setInt("hbase.hstore.compaction.min", 1); 268 c.setInt("hbase.hstore.compactionThreshold", 1000); 269 c.setLong("hbase.hstore.blockingStoreFiles", 1000); 270 // Compact quickly after we tell it to! 271 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); 272 c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy)); 273 LOG.info("Starting mini cluster"); 274 TEST_UTIL.startMiniCluster(1); 275 CompactionBlockerRegion compactingRegion = null; 276 Admin admin = null; 277 try { 278 LOG.info("Creating admin"); 279 admin = TEST_UTIL.getConnection().getAdmin(); 280 LOG.info("Creating table"); 281 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 282 Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME); 283 LOG.info("Loading test table"); 284 // Find the region 285 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); 286 assertEquals(1, testRegions.size()); 287 compactingRegion = (CompactionBlockerRegion) testRegions.get(0); 288 LOG.info("Blocking compactions"); 289 compactingRegion.stopCompactions(); 290 long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores(); 291 // Load some rows 292 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); 293 294 // add a compaction from an older (non-existing) region to see whether we successfully skip 295 // those entries 296 HRegionInfo oldHri = 297 new HRegionInfo(table.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); 298 CompactionDescriptor compactionDescriptor = 299 ProtobufUtil.toCompactionDescriptor(oldHri, FAMILY, Lists.newArrayList(new Path("/a")), 300 Lists.newArrayList(new Path("/b")), new Path("store_dir")); 301 WALUtil.writeCompactionMarker(compactingRegion.getWAL(), 302 ((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor, 303 compactingRegion.getMVCC()); 304 305 // Wait till flush has happened, otherwise there won't be multiple store files 306 long startWaitTime = EnvironmentEdgeManager.currentTime(); 307 while ( 308 compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime 309 || compactingRegion.countStoreFiles() <= 1 310 ) { 311 LOG.info("Waiting for the region to flush " 312 + compactingRegion.getRegionInfo().getRegionNameAsString()); 313 Thread.sleep(1000); 314 admin.flush(table.getName()); 315 assertTrue("Timed out waiting for the region to flush", 316 EnvironmentEdgeManager.currentTime() - startWaitTime < 30000); 317 } 318 assertTrue(compactingRegion.countStoreFiles() > 1); 319 final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName(); 320 LOG.info("Asking for compaction"); 321 admin.majorCompact(TABLE_NAME); 322 LOG.info("Waiting for compaction to be about to start"); 323 compactingRegion.waitForCompactionToBlock(); 324 LOG.info("Starting a new server"); 325 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); 326 final HRegionServer newServer = newServerThread.getRegionServer(); 327 LOG.info("Killing region server ZK lease"); 328 TEST_UTIL.expireRegionServerSession(0); 329 CompactionBlockerRegion newRegion = null; 330 startWaitTime = EnvironmentEdgeManager.currentTime(); 331 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); 332 333 // wait for region to be assigned and to go out of log replay if applicable 334 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() { 335 @Override 336 public boolean evaluate() throws Exception { 337 Region newRegion = newServer.getOnlineRegion(REGION_NAME); 338 return newRegion != null; 339 } 340 }); 341 342 newRegion = (CompactionBlockerRegion) newServer.getOnlineRegion(REGION_NAME); 343 344 // After compaction of old region finishes on the server that was going down, make sure that 345 // all the files we expect are still working when region is up in new location. 346 FileSystem fs = newRegion.getFilesystem(); 347 for (String f : newRegion.getStoreFileList(new byte[][] { FAMILY })) { 348 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f))); 349 } 350 LOG.info("Allowing compaction to proceed"); 351 compactingRegion.allowCompactions(); 352 while (compactingRegion.compactCount.get() == 0) { 353 Thread.sleep(1000); 354 } 355 // The server we killed stays up until the compaction that was started before it was killed 356 // completes. In logs you should see the old regionserver now going down. 357 LOG.info("Compaction finished"); 358 359 // If we survive the split keep going... 360 // Now we make sure that the region isn't totally confused. Load up more rows. 361 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, 362 FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); 363 admin.majorCompact(TABLE_NAME); 364 startWaitTime = EnvironmentEdgeManager.currentTime(); 365 while (newRegion.compactCount.get() == 0) { 366 Thread.sleep(1000); 367 assertTrue("New region never compacted", 368 EnvironmentEdgeManager.currentTime() - startWaitTime < 180000); 369 } 370 int count; 371 for (int i = 0;; i++) { 372 try { 373 count = TEST_UTIL.countRows(table); 374 break; 375 } catch (DoNotRetryIOException e) { 376 // wait up to 30s 377 if (i >= 30 || !e.getMessage().contains("File does not exist")) { 378 throw e; 379 } 380 Thread.sleep(1000); 381 } 382 } 383 if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) { 384 assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count); 385 } else { 386 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count); 387 } 388 } finally { 389 if (compactingRegion != null) { 390 compactingRegion.allowCompactions(); 391 } 392 admin.close(); 393 TEST_UTIL.shutdownMiniCluster(); 394 } 395 } 396}