001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 037import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.regionserver.HStore; 041import org.apache.hadoop.hbase.regionserver.HStoreFile; 042import org.apache.hadoop.hbase.regionserver.Region; 043import org.apache.hadoop.hbase.regionserver.RegionServerServices; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 045import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 046import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 047import org.apache.hadoop.hbase.security.User; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.MiscTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 052import org.apache.hadoop.hbase.wal.WAL; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 063 064/** 065 * Test for the case where a regionserver going down has enough cycles to do damage to regions that 066 * have actually been assigned elsehwere. 067 * <p> 068 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on 069 * two servers at the same time -- all can work fine until the case where the region on the dying 070 * server decides to compact or otherwise change the region file set. The region in its new location 071 * will then get a surprise when it tries to do something w/ a file removed by the region in its old 072 * location on dying server. 073 * <p> 074 * Making a test for this case is a little tough in that even if a file is deleted up on the 075 * namenode, if the file was opened before the delete, it will continue to let reads happen until 076 * something changes the state of cached blocks in the dfsclient that was already open (a block from 077 * the deleted file is cleaned from the datanode by NN). 078 * <p> 079 * What we will do below is do an explicit check for existence on the files listed in the region 080 * that has had some files removed because of a compaction. This sort of hurry's along and makes 081 * certain what is a chance occurance. 082 */ 083@Category({MiscTests.class, LargeTests.class}) 084public class TestIOFencing { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestIOFencing.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class); 091 static { 092 // Uncomment the following lines if more verbosity is needed for 093 // debugging (see HBASE-12285 for details). 094 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 095 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 096 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 097 //((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")) 098 // .getLogger().setLevel(Level.ALL); 099 //((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); 100 } 101 102 public abstract static class CompactionBlockerRegion extends HRegion { 103 AtomicInteger compactCount = new AtomicInteger(); 104 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0); 105 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0); 106 107 @SuppressWarnings("deprecation") 108 public CompactionBlockerRegion(Path tableDir, WAL log, 109 FileSystem fs, Configuration confParam, RegionInfo info, 110 TableDescriptor htd, RegionServerServices rsServices) { 111 super(tableDir, log, fs, confParam, info, htd, rsServices); 112 } 113 114 public void stopCompactions() { 115 compactionsBlocked = new CountDownLatch(1); 116 compactionsWaiting = new CountDownLatch(1); 117 } 118 119 public void allowCompactions() { 120 LOG.debug("allowing compactions"); 121 compactionsBlocked.countDown(); 122 } 123 public void waitForCompactionToBlock() throws IOException { 124 try { 125 LOG.debug("waiting for compaction to block"); 126 compactionsWaiting.await(); 127 LOG.debug("compaction block reached"); 128 } catch (InterruptedException ex) { 129 throw new IOException(ex); 130 } 131 } 132 133 @Override 134 public boolean compact(CompactionContext compaction, HStore store, 135 ThroughputController throughputController) throws IOException { 136 try { 137 return super.compact(compaction, store, throughputController); 138 } finally { 139 compactCount.getAndIncrement(); 140 } 141 } 142 143 @Override 144 public boolean compact(CompactionContext compaction, HStore store, 145 ThroughputController throughputController, User user) throws IOException { 146 try { 147 return super.compact(compaction, store, throughputController, user); 148 } finally { 149 compactCount.getAndIncrement(); 150 } 151 } 152 153 public int countStoreFiles() { 154 int count = 0; 155 for (HStore store : stores.values()) { 156 count += store.getStorefilesCount(); 157 } 158 return count; 159 } 160 } 161 162 /** 163 * An override of HRegion that allows us park compactions in a holding pattern and 164 * then when appropriate for the test, allow them proceed again. 165 */ 166 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion { 167 168 public BlockCompactionsInPrepRegion(Path tableDir, WAL log, 169 FileSystem fs, Configuration confParam, RegionInfo info, 170 TableDescriptor htd, RegionServerServices rsServices) { 171 super(tableDir, log, fs, confParam, info, htd, rsServices); 172 } 173 @Override 174 protected void doRegionCompactionPrep() throws IOException { 175 compactionsWaiting.countDown(); 176 try { 177 compactionsBlocked.await(); 178 } catch (InterruptedException ex) { 179 throw new IOException(); 180 } 181 super.doRegionCompactionPrep(); 182 } 183 } 184 185 /** 186 * An override of HRegion that allows us park compactions in a holding pattern and 187 * then when appropriate for the test, allow them proceed again. This allows the compaction 188 * entry to go the WAL before blocking, but blocks afterwards 189 */ 190 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion { 191 public BlockCompactionsInCompletionRegion(Path tableDir, WAL log, 192 FileSystem fs, Configuration confParam, RegionInfo info, 193 TableDescriptor htd, RegionServerServices rsServices) { 194 super(tableDir, log, fs, confParam, info, htd, rsServices); 195 } 196 @Override 197 protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException { 198 return new BlockCompactionsInCompletionHStore(this, family, this.conf); 199 } 200 } 201 202 public static class BlockCompactionsInCompletionHStore extends HStore { 203 CompactionBlockerRegion r; 204 protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family, 205 Configuration confParam) throws IOException { 206 super(region, family, confParam); 207 r = (CompactionBlockerRegion) region; 208 } 209 210 @Override 211 protected void completeCompaction(Collection<HStoreFile> compactedFiles) throws IOException { 212 try { 213 r.compactionsWaiting.countDown(); 214 r.compactionsBlocked.await(); 215 } catch (InterruptedException ex) { 216 throw new IOException(ex); 217 } 218 super.completeCompaction(compactedFiles); 219 } 220 } 221 222 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 223 private final static TableName TABLE_NAME = 224 TableName.valueOf("tabletest"); 225 private final static byte[] FAMILY = Bytes.toBytes("family"); 226 private static final int FIRST_BATCH_COUNT = 4000; 227 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT; 228 229 /** 230 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 231 * compaction until after we have killed the server and the region has come up on 232 * a new regionserver altogether. This fakes the double assignment case where region in one 233 * location changes the files out from underneath a region being served elsewhere. 234 */ 235 @Test 236 public void testFencingAroundCompaction() throws Exception { 237 for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 238 doTest(BlockCompactionsInPrepRegion.class, policy); 239 } 240 } 241 242 /** 243 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 244 * compaction completion until after we have killed the server and the region has come up on 245 * a new regionserver altogether. This fakes the double assignment case where region in one 246 * location changes the files out from underneath a region being served elsewhere. 247 */ 248 @Test 249 public void testFencingAroundCompactionAfterWALSync() throws Exception { 250 for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 251 doTest(BlockCompactionsInCompletionRegion.class, policy); 252 } 253 } 254 255 public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception { 256 Configuration c = TEST_UTIL.getConfiguration(); 257 // Insert our custom region 258 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); 259 // Encourage plenty of flushes 260 c.setLong("hbase.hregion.memstore.flush.size", 25000); 261 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); 262 // Only run compaction when we tell it to 263 c.setInt("hbase.hstore.compaction.min",1); 264 c.setInt("hbase.hstore.compactionThreshold", 1000); 265 c.setLong("hbase.hstore.blockingStoreFiles", 1000); 266 // Compact quickly after we tell it to! 267 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); 268 c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy)); 269 LOG.info("Starting mini cluster"); 270 TEST_UTIL.startMiniCluster(1); 271 CompactionBlockerRegion compactingRegion = null; 272 Admin admin = null; 273 try { 274 LOG.info("Creating admin"); 275 admin = TEST_UTIL.getConnection().getAdmin(); 276 LOG.info("Creating table"); 277 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 278 Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME); 279 LOG.info("Loading test table"); 280 // Find the region 281 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); 282 assertEquals(1, testRegions.size()); 283 compactingRegion = (CompactionBlockerRegion)testRegions.get(0); 284 LOG.info("Blocking compactions"); 285 compactingRegion.stopCompactions(); 286 long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores(); 287 // Load some rows 288 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); 289 290 // add a compaction from an older (non-existing) region to see whether we successfully skip 291 // those entries 292 HRegionInfo oldHri = new HRegionInfo(table.getName(), 293 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); 294 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri, 295 FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")), 296 new Path("store_dir")); 297 WALUtil.writeCompactionMarker(compactingRegion.getWAL(), 298 ((HRegion)compactingRegion).getReplicationScope(), 299 oldHri, compactionDescriptor, compactingRegion.getMVCC()); 300 301 // Wait till flush has happened, otherwise there won't be multiple store files 302 long startWaitTime = System.currentTimeMillis(); 303 while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime || 304 compactingRegion.countStoreFiles() <= 1) { 305 LOG.info("Waiting for the region to flush " + 306 compactingRegion.getRegionInfo().getRegionNameAsString()); 307 Thread.sleep(1000); 308 admin.flush(table.getName()); 309 assertTrue("Timed out waiting for the region to flush", 310 System.currentTimeMillis() - startWaitTime < 30000); 311 } 312 assertTrue(compactingRegion.countStoreFiles() > 1); 313 final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName(); 314 LOG.info("Asking for compaction"); 315 admin.majorCompact(TABLE_NAME); 316 LOG.info("Waiting for compaction to be about to start"); 317 compactingRegion.waitForCompactionToBlock(); 318 LOG.info("Starting a new server"); 319 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); 320 final HRegionServer newServer = newServerThread.getRegionServer(); 321 LOG.info("Killing region server ZK lease"); 322 TEST_UTIL.expireRegionServerSession(0); 323 CompactionBlockerRegion newRegion = null; 324 startWaitTime = System.currentTimeMillis(); 325 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); 326 327 // wait for region to be assigned and to go out of log replay if applicable 328 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() { 329 @Override 330 public boolean evaluate() throws Exception { 331 Region newRegion = newServer.getOnlineRegion(REGION_NAME); 332 return newRegion != null; 333 } 334 }); 335 336 newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); 337 338 // After compaction of old region finishes on the server that was going down, make sure that 339 // all the files we expect are still working when region is up in new location. 340 FileSystem fs = newRegion.getFilesystem(); 341 for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) { 342 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f))); 343 } 344 LOG.info("Allowing compaction to proceed"); 345 compactingRegion.allowCompactions(); 346 while (compactingRegion.compactCount.get() == 0) { 347 Thread.sleep(1000); 348 } 349 // The server we killed stays up until the compaction that was started before it was killed 350 // completes. In logs you should see the old regionserver now going down. 351 LOG.info("Compaction finished"); 352 353 // If we survive the split keep going... 354 // Now we make sure that the region isn't totally confused. Load up more rows. 355 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, 356 FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); 357 admin.majorCompact(TABLE_NAME); 358 startWaitTime = System.currentTimeMillis(); 359 while (newRegion.compactCount.get() == 0) { 360 Thread.sleep(1000); 361 assertTrue("New region never compacted", 362 System.currentTimeMillis() - startWaitTime < 180000); 363 } 364 int count; 365 for (int i = 0;; i++) { 366 try { 367 count = TEST_UTIL.countRows(table); 368 break; 369 } catch (DoNotRetryIOException e) { 370 // wait up to 30s 371 if (i >= 30 || !e.getMessage().contains("File does not exist")) { 372 throw e; 373 } 374 Thread.sleep(1000); 375 } 376 } 377 if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) { 378 assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count); 379 } else { 380 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count); 381 } 382 } finally { 383 if (compactingRegion != null) { 384 compactingRegion.allowCompactions(); 385 } 386 admin.close(); 387 TEST_UTIL.shutdownMiniCluster(); 388 } 389 } 390}