001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.RegionInfoBuilder; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 037import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.regionserver.HStore; 041import org.apache.hadoop.hbase.regionserver.Region; 042import org.apache.hadoop.hbase.regionserver.RegionServerServices; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 045import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.MiscTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 052import org.apache.hadoop.hbase.wal.WAL; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 062 063/** 064 * Test for the case where a regionserver going down has enough cycles to do damage to regions that 065 * have actually been assigned elsehwere. 066 * <p> 067 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on 068 * two servers at the same time -- all can work fine until the case where the region on the dying 069 * server decides to compact or otherwise change the region file set. The region in its new location 070 * will then get a surprise when it tries to do something w/ a file removed by the region in its old 071 * location on dying server. 072 * <p> 073 * Making a test for this case is a little tough in that even if a file is deleted up on the 074 * namenode, if the file was opened before the delete, it will continue to let reads happen until 075 * something changes the state of cached blocks in the dfsclient that was already open (a block from 076 * the deleted file is cleaned from the datanode by NN). 077 * <p> 078 * What we will do below is do an explicit check for existence on the files listed in the region 079 * that has had some files removed because of a compaction. This sort of hurry's along and makes 080 * certain what is a chance occurance. 081 */ 082@Tag(MiscTests.TAG) 083@Tag(LargeTests.TAG) 084public class TestIOFencing { 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class); 087 static { 088 // Uncomment the following lines if more verbosity is needed for 089 // debugging (see HBASE-12285 for details). 090 // ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 091 // ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 092 // ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 093 // ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")) 094 // .getLogger().setLevel(Level.ALL); 095 // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); 096 } 097 098 public abstract static class CompactionBlockerRegion extends HRegion { 099 AtomicInteger compactCount = new AtomicInteger(); 100 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0); 101 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0); 102 103 @SuppressWarnings("deprecation") 104 public CompactionBlockerRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam, 105 RegionInfo info, TableDescriptor htd, RegionServerServices rsServices) { 106 super(tableDir, log, fs, confParam, info, htd, rsServices); 107 } 108 109 public void stopCompactions() { 110 compactionsBlocked = new CountDownLatch(1); 111 compactionsWaiting = new CountDownLatch(1); 112 } 113 114 public void allowCompactions() { 115 LOG.debug("allowing compactions"); 116 compactionsBlocked.countDown(); 117 } 118 119 public void waitForCompactionToBlock() throws IOException { 120 try { 121 LOG.debug("waiting for compaction to block"); 122 compactionsWaiting.await(); 123 LOG.debug("compaction block reached"); 124 } catch (InterruptedException ex) { 125 throw new IOException(ex); 126 } 127 } 128 129 @Override 130 public boolean compact(CompactionContext compaction, HStore store, 131 ThroughputController throughputController) throws IOException { 132 try { 133 return super.compact(compaction, store, throughputController); 134 } finally { 135 compactCount.getAndIncrement(); 136 } 137 } 138 139 @Override 140 public boolean compact(CompactionContext compaction, HStore store, 141 ThroughputController throughputController, User user) throws IOException { 142 try { 143 return super.compact(compaction, store, throughputController, user); 144 } finally { 145 compactCount.getAndIncrement(); 146 } 147 } 148 149 public int countStoreFiles() { 150 int count = 0; 151 for (HStore store : stores.values()) { 152 count += store.getStorefilesCount(); 153 } 154 return count; 155 } 156 } 157 158 /** 159 * An override of HRegion that allows us park compactions in a holding pattern and then when 160 * appropriate for the test, allow them proceed again. 161 */ 162 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion { 163 164 public BlockCompactionsInPrepRegion(Path tableDir, WAL log, FileSystem fs, 165 Configuration confParam, RegionInfo info, TableDescriptor htd, 166 RegionServerServices rsServices) { 167 super(tableDir, log, fs, confParam, info, htd, rsServices); 168 } 169 170 @Override 171 protected void doRegionCompactionPrep() throws IOException { 172 compactionsWaiting.countDown(); 173 try { 174 compactionsBlocked.await(); 175 } catch (InterruptedException ex) { 176 throw new IOException(); 177 } 178 super.doRegionCompactionPrep(); 179 } 180 } 181 182 /** 183 * An override of HRegion that allows us park compactions in a holding pattern and then when 184 * appropriate for the test, allow them proceed again. This allows the compaction entry to go the 185 * WAL before blocking, but blocks afterwards 186 */ 187 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion { 188 public BlockCompactionsInCompletionRegion(Path tableDir, WAL log, FileSystem fs, 189 Configuration confParam, RegionInfo info, TableDescriptor htd, 190 RegionServerServices rsServices) { 191 super(tableDir, log, fs, confParam, info, htd, rsServices); 192 } 193 194 @Override 195 protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup) 196 throws IOException { 197 return new BlockCompactionsInCompletionHStore(this, family, this.conf, warmup); 198 } 199 } 200 201 public static class BlockCompactionsInCompletionHStore extends HStore { 202 CompactionBlockerRegion r; 203 204 protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family, 205 Configuration confParam, boolean warmup) throws IOException { 206 super(region, family, confParam, warmup); 207 r = (CompactionBlockerRegion) region; 208 } 209 210 @Override 211 protected void refreshStoreSizeAndTotalBytes() throws IOException { 212 if (r != null) { 213 try { 214 r.compactionsWaiting.countDown(); 215 r.compactionsBlocked.await(); 216 } catch (InterruptedException ex) { 217 throw new IOException(ex); 218 } 219 } 220 super.refreshStoreSizeAndTotalBytes(); 221 } 222 } 223 224 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 225 private final static TableName TABLE_NAME = TableName.valueOf("tabletest"); 226 private final static byte[] FAMILY = Bytes.toBytes("family"); 227 private static final int FIRST_BATCH_COUNT = 4000; 228 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT; 229 230 /** 231 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 232 * compaction until after we have killed the server and the region has come up on a new 233 * regionserver altogether. This fakes the double assignment case where region in one location 234 * changes the files out from underneath a region being served elsewhere. 235 */ 236 @Test 237 public void testFencingAroundCompaction() throws Exception { 238 for (MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 239 doTest(BlockCompactionsInPrepRegion.class, policy); 240 } 241 } 242 243 /** 244 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 245 * compaction completion until after we have killed the server and the region has come up on a new 246 * regionserver altogether. This fakes the double assignment case where region in one location 247 * changes the files out from underneath a region being served elsewhere. 248 */ 249 @Test 250 public void testFencingAroundCompactionAfterWALSync() throws Exception { 251 for (MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 252 doTest(BlockCompactionsInCompletionRegion.class, policy); 253 } 254 } 255 256 public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception { 257 Configuration c = TEST_UTIL.getConfiguration(); 258 // Insert our custom region 259 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); 260 // Encourage plenty of flushes 261 c.setLong("hbase.hregion.memstore.flush.size", 25000); 262 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); 263 // Only run compaction when we tell it to 264 c.setInt("hbase.hstore.compaction.min", 1); 265 c.setInt("hbase.hstore.compactionThreshold", 1000); 266 c.setLong("hbase.hstore.blockingStoreFiles", 1000); 267 // Compact quickly after we tell it to! 268 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); 269 c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy)); 270 LOG.info("Starting mini cluster"); 271 TEST_UTIL.startMiniCluster(1); 272 CompactionBlockerRegion compactingRegion = null; 273 Admin admin = null; 274 try { 275 LOG.info("Creating admin"); 276 admin = TEST_UTIL.getConnection().getAdmin(); 277 LOG.info("Creating table"); 278 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 279 Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME); 280 LOG.info("Loading test table"); 281 // Find the region 282 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); 283 assertEquals(1, testRegions.size()); 284 compactingRegion = (CompactionBlockerRegion) testRegions.get(0); 285 LOG.info("Blocking compactions"); 286 compactingRegion.stopCompactions(); 287 long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores(); 288 // Load some rows 289 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); 290 291 // add a compaction from an older (non-existing) region to see whether we successfully skip 292 // those entries 293 RegionInfo oldHri = RegionInfoBuilder.newBuilder(table.getName()).build(); 294 CompactionDescriptor compactionDescriptor = 295 ProtobufUtil.toCompactionDescriptor(oldHri, FAMILY, Lists.newArrayList(new Path("/a")), 296 Lists.newArrayList(new Path("/b")), new Path("store_dir")); 297 WALUtil.writeCompactionMarker(compactingRegion.getWAL(), 298 ((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor, 299 compactingRegion.getMVCC(), null); 300 301 // Wait till flush has happened, otherwise there won't be multiple store files 302 long startWaitTime = EnvironmentEdgeManager.currentTime(); 303 while ( 304 compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime 305 || compactingRegion.countStoreFiles() <= 1 306 ) { 307 LOG.info("Waiting for the region to flush " 308 + compactingRegion.getRegionInfo().getRegionNameAsString()); 309 Thread.sleep(1000); 310 admin.flush(table.getName()); 311 assertTrue(EnvironmentEdgeManager.currentTime() - startWaitTime < 30000, 312 "Timed out waiting for the region to flush"); 313 } 314 assertTrue(compactingRegion.countStoreFiles() > 1); 315 final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName(); 316 LOG.info("Asking for compaction"); 317 admin.majorCompact(TABLE_NAME); 318 LOG.info("Waiting for compaction to be about to start"); 319 compactingRegion.waitForCompactionToBlock(); 320 LOG.info("Starting a new server"); 321 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); 322 final HRegionServer newServer = newServerThread.getRegionServer(); 323 LOG.info("Killing region server ZK lease"); 324 TEST_UTIL.expireRegionServerSession(0); 325 CompactionBlockerRegion newRegion = null; 326 startWaitTime = EnvironmentEdgeManager.currentTime(); 327 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); 328 329 // wait for region to be assigned and to go out of log replay if applicable 330 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() { 331 @Override 332 public boolean evaluate() throws Exception { 333 Region newRegion = newServer.getOnlineRegion(REGION_NAME); 334 return newRegion != null; 335 } 336 }); 337 338 newRegion = (CompactionBlockerRegion) newServer.getOnlineRegion(REGION_NAME); 339 340 // After compaction of old region finishes on the server that was going down, make sure that 341 // all the files we expect are still working when region is up in new location. 342 FileSystem fs = newRegion.getFilesystem(); 343 for (String f : newRegion.getStoreFileList(new byte[][] { FAMILY })) { 344 assertTrue(fs.exists(new Path(f)), "After compaction, does not exist: " + f); 345 } 346 LOG.info("Allowing compaction to proceed"); 347 compactingRegion.allowCompactions(); 348 while (compactingRegion.compactCount.get() == 0) { 349 Thread.sleep(1000); 350 } 351 // The server we killed stays up until the compaction that was started before it was killed 352 // completes. In logs you should see the old regionserver now going down. 353 LOG.info("Compaction finished"); 354 355 // If we survive the split keep going... 356 // Now we make sure that the region isn't totally confused. Load up more rows. 357 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, 358 FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); 359 admin.majorCompact(TABLE_NAME); 360 startWaitTime = EnvironmentEdgeManager.currentTime(); 361 while (newRegion.compactCount.get() == 0) { 362 Thread.sleep(1000); 363 assertTrue(EnvironmentEdgeManager.currentTime() - startWaitTime < 180000, 364 "New region never compacted"); 365 } 366 int count; 367 for (int i = 0;; i++) { 368 try { 369 count = HBaseTestingUtil.countRows(table); 370 break; 371 } catch (DoNotRetryIOException e) { 372 // wait up to 30s 373 if (i >= 30 || !e.getMessage().contains("File does not exist")) { 374 throw e; 375 } 376 Thread.sleep(1000); 377 } 378 } 379 if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) { 380 assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count); 381 } else { 382 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count); 383 } 384 } finally { 385 if (compactingRegion != null) { 386 compactingRegion.allowCompactions(); 387 } 388 admin.close(); 389 TEST_UTIL.shutdownMiniCluster(); 390 } 391 } 392}