001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 037import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.regionserver.HStore; 041import org.apache.hadoop.hbase.regionserver.HStoreFile; 042import org.apache.hadoop.hbase.regionserver.Region; 043import org.apache.hadoop.hbase.regionserver.RegionServerServices; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 045import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 046import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 047import org.apache.hadoop.hbase.security.User; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.MiscTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 052import org.apache.hadoop.hbase.wal.WAL; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 063 064/** 065 * Test for the case where a regionserver going down has enough cycles to do damage to regions that 066 * have actually been assigned elsehwere. 067 * <p> 068 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on 069 * two servers at the same time -- all can work fine until the case where the region on the dying 070 * server decides to compact or otherwise change the region file set. The region in its new location 071 * will then get a surprise when it tries to do something w/ a file removed by the region in its old 072 * location on dying server. 073 * <p> 074 * Making a test for this case is a little tough in that even if a file is deleted up on the 075 * namenode, if the file was opened before the delete, it will continue to let reads happen until 076 * something changes the state of cached blocks in the dfsclient that was already open (a block from 077 * the deleted file is cleaned from the datanode by NN). 078 * <p> 079 * What we will do below is do an explicit check for existence on the files listed in the region 080 * that has had some files removed because of a compaction. This sort of hurry's along and makes 081 * certain what is a chance occurance. 082 */ 083@Category({MiscTests.class, LargeTests.class}) 084public class TestIOFencing { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestIOFencing.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class); 091 static { 092 // Uncomment the following lines if more verbosity is needed for 093 // debugging (see HBASE-12285 for details). 094 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 095 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 096 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 097 //((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")) 098 // .getLogger().setLevel(Level.ALL); 099 //((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); 100 } 101 102 public abstract static class CompactionBlockerRegion extends HRegion { 103 AtomicInteger compactCount = new AtomicInteger(); 104 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0); 105 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0); 106 107 @SuppressWarnings("deprecation") 108 public CompactionBlockerRegion(Path tableDir, WAL log, 109 FileSystem fs, Configuration confParam, RegionInfo info, 110 TableDescriptor htd, RegionServerServices rsServices) { 111 super(tableDir, log, fs, confParam, info, htd, rsServices); 112 } 113 114 public void stopCompactions() { 115 compactionsBlocked = new CountDownLatch(1); 116 compactionsWaiting = new CountDownLatch(1); 117 } 118 119 public void allowCompactions() { 120 LOG.debug("allowing compactions"); 121 compactionsBlocked.countDown(); 122 } 123 public void waitForCompactionToBlock() throws IOException { 124 try { 125 LOG.debug("waiting for compaction to block"); 126 compactionsWaiting.await(); 127 LOG.debug("compaction block reached"); 128 } catch (InterruptedException ex) { 129 throw new IOException(ex); 130 } 131 } 132 133 @Override 134 public boolean compact(CompactionContext compaction, HStore store, 135 ThroughputController throughputController) throws IOException { 136 try { 137 return super.compact(compaction, store, throughputController); 138 } finally { 139 compactCount.getAndIncrement(); 140 } 141 } 142 143 @Override 144 public boolean compact(CompactionContext compaction, HStore store, 145 ThroughputController throughputController, User user) throws IOException { 146 try { 147 return super.compact(compaction, store, throughputController, user); 148 } finally { 149 compactCount.getAndIncrement(); 150 } 151 } 152 153 public int countStoreFiles() { 154 int count = 0; 155 for (HStore store : stores.values()) { 156 count += store.getStorefilesCount(); 157 } 158 return count; 159 } 160 } 161 162 /** 163 * An override of HRegion that allows us park compactions in a holding pattern and 164 * then when appropriate for the test, allow them proceed again. 165 */ 166 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion { 167 168 public BlockCompactionsInPrepRegion(Path tableDir, WAL log, 169 FileSystem fs, Configuration confParam, RegionInfo info, 170 TableDescriptor htd, RegionServerServices rsServices) { 171 super(tableDir, log, fs, confParam, info, htd, rsServices); 172 } 173 @Override 174 protected void doRegionCompactionPrep() throws IOException { 175 compactionsWaiting.countDown(); 176 try { 177 compactionsBlocked.await(); 178 } catch (InterruptedException ex) { 179 throw new IOException(); 180 } 181 super.doRegionCompactionPrep(); 182 } 183 } 184 185 /** 186 * An override of HRegion that allows us park compactions in a holding pattern and 187 * then when appropriate for the test, allow them proceed again. This allows the compaction 188 * entry to go the WAL before blocking, but blocks afterwards 189 */ 190 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion { 191 public BlockCompactionsInCompletionRegion(Path tableDir, WAL log, 192 FileSystem fs, Configuration confParam, RegionInfo info, 193 TableDescriptor htd, RegionServerServices rsServices) { 194 super(tableDir, log, fs, confParam, info, htd, rsServices); 195 } 196 197 @Override 198 protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup) 199 throws IOException { 200 return new BlockCompactionsInCompletionHStore(this, family, this.conf, warmup); 201 } 202 } 203 204 public static class BlockCompactionsInCompletionHStore extends HStore { 205 CompactionBlockerRegion r; 206 protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family, 207 Configuration confParam, boolean warmup) throws IOException { 208 super(region, family, confParam, warmup); 209 r = (CompactionBlockerRegion) region; 210 } 211 212 @Override 213 protected void completeCompaction(Collection<HStoreFile> compactedFiles) throws IOException { 214 try { 215 r.compactionsWaiting.countDown(); 216 r.compactionsBlocked.await(); 217 } catch (InterruptedException ex) { 218 throw new IOException(ex); 219 } 220 super.completeCompaction(compactedFiles); 221 } 222 } 223 224 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 225 private final static TableName TABLE_NAME = 226 TableName.valueOf("tabletest"); 227 private final static byte[] FAMILY = Bytes.toBytes("family"); 228 private static final int FIRST_BATCH_COUNT = 4000; 229 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT; 230 231 /** 232 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 233 * compaction until after we have killed the server and the region has come up on 234 * a new regionserver altogether. This fakes the double assignment case where region in one 235 * location changes the files out from underneath a region being served elsewhere. 236 */ 237 @Test 238 public void testFencingAroundCompaction() throws Exception { 239 for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 240 doTest(BlockCompactionsInPrepRegion.class, policy); 241 } 242 } 243 244 /** 245 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 246 * compaction completion until after we have killed the server and the region has come up on 247 * a new regionserver altogether. This fakes the double assignment case where region in one 248 * location changes the files out from underneath a region being served elsewhere. 249 */ 250 @Test 251 public void testFencingAroundCompactionAfterWALSync() throws Exception { 252 for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 253 doTest(BlockCompactionsInCompletionRegion.class, policy); 254 } 255 } 256 257 public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception { 258 Configuration c = TEST_UTIL.getConfiguration(); 259 // Insert our custom region 260 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); 261 // Encourage plenty of flushes 262 c.setLong("hbase.hregion.memstore.flush.size", 25000); 263 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); 264 // Only run compaction when we tell it to 265 c.setInt("hbase.hstore.compaction.min",1); 266 c.setInt("hbase.hstore.compactionThreshold", 1000); 267 c.setLong("hbase.hstore.blockingStoreFiles", 1000); 268 // Compact quickly after we tell it to! 269 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); 270 c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy)); 271 LOG.info("Starting mini cluster"); 272 TEST_UTIL.startMiniCluster(1); 273 CompactionBlockerRegion compactingRegion = null; 274 Admin admin = null; 275 try { 276 LOG.info("Creating admin"); 277 admin = TEST_UTIL.getConnection().getAdmin(); 278 LOG.info("Creating table"); 279 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 280 Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME); 281 LOG.info("Loading test table"); 282 // Find the region 283 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); 284 assertEquals(1, testRegions.size()); 285 compactingRegion = (CompactionBlockerRegion)testRegions.get(0); 286 LOG.info("Blocking compactions"); 287 compactingRegion.stopCompactions(); 288 long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores(); 289 // Load some rows 290 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); 291 292 // add a compaction from an older (non-existing) region to see whether we successfully skip 293 // those entries 294 HRegionInfo oldHri = new HRegionInfo(table.getName(), 295 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); 296 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri, 297 FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")), 298 new Path("store_dir")); 299 WALUtil.writeCompactionMarker(compactingRegion.getWAL(), 300 ((HRegion)compactingRegion).getReplicationScope(), 301 oldHri, compactionDescriptor, compactingRegion.getMVCC()); 302 303 // Wait till flush has happened, otherwise there won't be multiple store files 304 long startWaitTime = System.currentTimeMillis(); 305 while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime || 306 compactingRegion.countStoreFiles() <= 1) { 307 LOG.info("Waiting for the region to flush " + 308 compactingRegion.getRegionInfo().getRegionNameAsString()); 309 Thread.sleep(1000); 310 admin.flush(table.getName()); 311 assertTrue("Timed out waiting for the region to flush", 312 System.currentTimeMillis() - startWaitTime < 30000); 313 } 314 assertTrue(compactingRegion.countStoreFiles() > 1); 315 final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName(); 316 LOG.info("Asking for compaction"); 317 admin.majorCompact(TABLE_NAME); 318 LOG.info("Waiting for compaction to be about to start"); 319 compactingRegion.waitForCompactionToBlock(); 320 LOG.info("Starting a new server"); 321 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); 322 final HRegionServer newServer = newServerThread.getRegionServer(); 323 LOG.info("Killing region server ZK lease"); 324 TEST_UTIL.expireRegionServerSession(0); 325 CompactionBlockerRegion newRegion = null; 326 startWaitTime = System.currentTimeMillis(); 327 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); 328 329 // wait for region to be assigned and to go out of log replay if applicable 330 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() { 331 @Override 332 public boolean evaluate() throws Exception { 333 Region newRegion = newServer.getOnlineRegion(REGION_NAME); 334 return newRegion != null; 335 } 336 }); 337 338 newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); 339 340 // After compaction of old region finishes on the server that was going down, make sure that 341 // all the files we expect are still working when region is up in new location. 342 FileSystem fs = newRegion.getFilesystem(); 343 for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) { 344 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f))); 345 } 346 LOG.info("Allowing compaction to proceed"); 347 compactingRegion.allowCompactions(); 348 while (compactingRegion.compactCount.get() == 0) { 349 Thread.sleep(1000); 350 } 351 // The server we killed stays up until the compaction that was started before it was killed 352 // completes. In logs you should see the old regionserver now going down. 353 LOG.info("Compaction finished"); 354 355 // If we survive the split keep going... 356 // Now we make sure that the region isn't totally confused. Load up more rows. 357 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, 358 FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); 359 admin.majorCompact(TABLE_NAME); 360 startWaitTime = System.currentTimeMillis(); 361 while (newRegion.compactCount.get() == 0) { 362 Thread.sleep(1000); 363 assertTrue("New region never compacted", 364 System.currentTimeMillis() - startWaitTime < 180000); 365 } 366 int count; 367 for (int i = 0;; i++) { 368 try { 369 count = TEST_UTIL.countRows(table); 370 break; 371 } catch (DoNotRetryIOException e) { 372 // wait up to 30s 373 if (i >= 30 || !e.getMessage().contains("File does not exist")) { 374 throw e; 375 } 376 Thread.sleep(1000); 377 } 378 } 379 if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) { 380 assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count); 381 } else { 382 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count); 383 } 384 } finally { 385 if (compactingRegion != null) { 386 compactingRegion.allowCompactions(); 387 } 388 admin.close(); 389 TEST_UTIL.shutdownMiniCluster(); 390 } 391 } 392}