001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.mockito.Mockito.mock; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicReference; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.Stoppable; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 044import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest; 045import org.apache.hadoop.hbase.testclassification.RegionServerTests; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CommonFSUtils; 049import org.apache.hadoop.hbase.wal.WALFactory; 050import org.junit.After; 051import org.junit.Before; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057import org.mockito.Mockito; 058 059/** 060 * Tests a race condition between archiving of compacted files in CompactedHFilesDischarger chore 061 * and HRegion.close(); 062 */ 063@Category({ RegionServerTests.class, SmallTests.class }) 064public class TestCompactionArchiveConcurrentClose { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestCompactionArchiveConcurrentClose.class); 069 070 private HBaseTestingUtil testUtil; 071 072 private Path testDir; 073 private AtomicBoolean archived = new AtomicBoolean(); 074 075 // Static field to track archived state for the static inner class 076 private static final AtomicBoolean STATIC_ARCHIVED = new AtomicBoolean(); 077 078 @Rule 079 public TestName name = new TestName(); 080 081 @Before 082 public void setup() throws Exception { 083 testUtil = new HBaseTestingUtil(); 084 testDir = testUtil.getDataTestDir("TestStoreFileRefresherChore"); 085 CommonFSUtils.setRootDir(testUtil.getConfiguration(), testDir); 086 // Configure the test to use our custom WaitingStoreFileTracker 087 testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, 088 WaitingStoreFileTracker.class.getName()); 089 // Reset the static archived flag 090 STATIC_ARCHIVED.set(false); 091 } 092 093 @After 094 public void tearDown() throws Exception { 095 testUtil.cleanupTestDir(); 096 } 097 098 @Test 099 public void testStoreCloseAndDischargeRunningInParallel() throws Exception { 100 byte[] fam = Bytes.toBytes("f"); 101 byte[] col = Bytes.toBytes("c"); 102 byte[] val = Bytes.toBytes("val"); 103 104 TableName tableName = TableName.valueOf(name.getMethodName()); 105 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 106 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build(); 107 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 108 HRegion region = initHRegion(htd, info); 109 RegionServerServices rss = mock(RegionServerServices.class); 110 List<HRegion> regions = new ArrayList<>(); 111 regions.add(region); 112 Mockito.doReturn(regions).when(rss).getRegions(); 113 114 // Create the cleaner object 115 CompactedHFilesDischarger cleaner = 116 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 117 // Add some data to the region and do some flushes 118 int batchSize = 10; 119 int fileCount = 10; 120 for (int f = 0; f < fileCount; f++) { 121 int start = f * batchSize; 122 for (int i = start; i < start + batchSize; i++) { 123 Put p = new Put(Bytes.toBytes("row" + i)); 124 p.addColumn(fam, col, val); 125 region.put(p); 126 } 127 // flush them 128 region.flush(true); 129 } 130 131 HStore store = region.getStore(fam); 132 assertEquals(fileCount, store.getStorefilesCount()); 133 134 Collection<HStoreFile> storefiles = store.getStorefiles(); 135 // None of the files should be in compacted state. 136 for (HStoreFile file : storefiles) { 137 assertFalse(file.isCompactedAway()); 138 } 139 System.out.println("Finished compaction "); 140 // Do compaction 141 region.compact(true); 142 143 // now run the cleaner with a concurrent close 144 Thread cleanerThread = new Thread() { 145 @Override 146 public void run() { 147 cleaner.chore(); 148 } 149 }; 150 cleanerThread.start(); 151 // wait for cleaner to pause 152 synchronized (STATIC_ARCHIVED) { 153 if (!STATIC_ARCHIVED.get()) { 154 STATIC_ARCHIVED.wait(); 155 } 156 } 157 final AtomicReference<Exception> closeException = new AtomicReference<>(); 158 Thread closeThread = new Thread() { 159 @Override 160 public void run() { 161 // wait for the chore to complete and call close 162 try { 163 ((HRegion) region).close(); 164 } catch (IOException e) { 165 closeException.set(e); 166 } 167 } 168 }; 169 closeThread.start(); 170 // no error should occur after the execution of the test 171 closeThread.join(); 172 cleanerThread.join(); 173 174 if (closeException.get() != null) { 175 throw closeException.get(); 176 } 177 } 178 179 private HRegion initHRegion(TableDescriptor htd, RegionInfo info) throws IOException { 180 Configuration conf = testUtil.getConfiguration(); 181 Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName()); 182 183 HRegionFileSystem fs = 184 new HRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info); 185 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 186 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 187 final Configuration walConf = new Configuration(conf); 188 CommonFSUtils.setRootDir(walConf, tableDir); 189 final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName()); 190 HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null); 191 192 region.initialize(); 193 194 return region; 195 } 196 197 public static class WaitingStoreFileTracker extends StoreFileTrackerForTest { 198 199 public WaitingStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 200 super(conf, isPrimaryReplica, ctx); 201 } 202 203 @Override 204 public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException { 205 super.removeStoreFiles(storeFiles); 206 STATIC_ARCHIVED.set(true); 207 synchronized (STATIC_ARCHIVED) { 208 STATIC_ARCHIVED.notifyAll(); 209 } 210 try { 211 // unfortunately we can't use a stronger barrier here as the fix synchronizing 212 // the race condition will then block 213 Thread.sleep(100); 214 } catch (InterruptedException ie) { 215 throw new InterruptedIOException("Interrupted waiting for latch"); 216 } 217 } 218 } 219}