001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.mockito.Mockito.mock; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicReference; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.Stoppable; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 043import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.wal.WALFactory; 049import org.junit.jupiter.api.AfterEach; 050import org.junit.jupiter.api.BeforeEach; 051import org.junit.jupiter.api.Tag; 052import org.junit.jupiter.api.Test; 053import org.junit.jupiter.api.TestInfo; 054import org.mockito.Mockito; 055 056/** 057 * Tests a race condition between archiving of compacted files in CompactedHFilesDischarger chore 058 * and HRegion.close(); 059 */ 060@Tag(RegionServerTests.TAG) 061@Tag(SmallTests.TAG) 062public class TestCompactionArchiveConcurrentClose { 063 064 private HBaseTestingUtil testUtil; 065 066 private Path testDir; 067 private AtomicBoolean archived = new AtomicBoolean(); 068 069 // Static field to track archived state for the static inner class 070 private static final AtomicBoolean STATIC_ARCHIVED = new AtomicBoolean(); 071 private String name; 072 073 @BeforeEach 074 public void setup(TestInfo testInfo) throws Exception { 075 this.name = testInfo.getTestMethod().get().getName(); 076 testUtil = new HBaseTestingUtil(); 077 testDir = testUtil.getDataTestDir("TestStoreFileRefresherChore"); 078 CommonFSUtils.setRootDir(testUtil.getConfiguration(), testDir); 079 // Configure the test to use our custom WaitingStoreFileTracker 080 testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, 081 WaitingStoreFileTracker.class.getName()); 082 // Reset the static archived flag 083 STATIC_ARCHIVED.set(false); 084 } 085 086 @AfterEach 087 public void tearDown() throws Exception { 088 testUtil.cleanupTestDir(); 089 } 090 091 @Test 092 public void testStoreCloseAndDischargeRunningInParallel() throws Exception { 093 byte[] fam = Bytes.toBytes("f"); 094 byte[] col = Bytes.toBytes("c"); 095 byte[] val = Bytes.toBytes("val"); 096 097 TableName tableName = TableName.valueOf(name); 098 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 099 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build(); 100 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 101 HRegion region = initHRegion(htd, info); 102 RegionServerServices rss = mock(RegionServerServices.class); 103 List<HRegion> regions = new ArrayList<>(); 104 regions.add(region); 105 Mockito.doReturn(regions).when(rss).getRegions(); 106 107 // Create the cleaner object 108 CompactedHFilesDischarger cleaner = 109 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 110 // Add some data to the region and do some flushes 111 int batchSize = 10; 112 int fileCount = 10; 113 for (int f = 0; f < fileCount; f++) { 114 int start = f * batchSize; 115 for (int i = start; i < start + batchSize; i++) { 116 Put p = new Put(Bytes.toBytes("row" + i)); 117 p.addColumn(fam, col, val); 118 region.put(p); 119 } 120 // flush them 121 region.flush(true); 122 } 123 124 HStore store = region.getStore(fam); 125 assertEquals(fileCount, store.getStorefilesCount()); 126 127 Collection<HStoreFile> storefiles = store.getStorefiles(); 128 // None of the files should be in compacted state. 129 for (HStoreFile file : storefiles) { 130 assertFalse(file.isCompactedAway()); 131 } 132 System.out.println("Finished compaction "); 133 // Do compaction 134 region.compact(true); 135 136 // now run the cleaner with a concurrent close 137 Thread cleanerThread = new Thread() { 138 @Override 139 public void run() { 140 cleaner.chore(); 141 } 142 }; 143 cleanerThread.start(); 144 // wait for cleaner to pause 145 synchronized (STATIC_ARCHIVED) { 146 if (!STATIC_ARCHIVED.get()) { 147 STATIC_ARCHIVED.wait(); 148 } 149 } 150 final AtomicReference<Exception> closeException = new AtomicReference<>(); 151 Thread closeThread = new Thread() { 152 @Override 153 public void run() { 154 // wait for the chore to complete and call close 155 try { 156 ((HRegion) region).close(); 157 } catch (IOException e) { 158 closeException.set(e); 159 } 160 } 161 }; 162 closeThread.start(); 163 // no error should occur after the execution of the test 164 closeThread.join(); 165 cleanerThread.join(); 166 167 if (closeException.get() != null) { 168 throw closeException.get(); 169 } 170 } 171 172 private HRegion initHRegion(TableDescriptor htd, RegionInfo info) throws IOException { 173 Configuration conf = testUtil.getConfiguration(); 174 Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName()); 175 176 HRegionFileSystem fs = 177 new HRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info); 178 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 179 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 180 final Configuration walConf = new Configuration(conf); 181 CommonFSUtils.setRootDir(walConf, tableDir); 182 final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName()); 183 HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null); 184 Path regionDir = new Path(tableDir, info.getEncodedName()); 185 if (!fs.getFileSystem().exists(regionDir)) { 186 fs.getFileSystem().mkdirs(regionDir); 187 } 188 region.initialize(); 189 return region; 190 } 191 192 public static class WaitingStoreFileTracker extends StoreFileTrackerForTest { 193 194 public WaitingStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 195 super(conf, isPrimaryReplica, ctx); 196 } 197 198 @Override 199 public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException { 200 super.removeStoreFiles(storeFiles); 201 STATIC_ARCHIVED.set(true); 202 synchronized (STATIC_ARCHIVED) { 203 STATIC_ARCHIVED.notifyAll(); 204 } 205 try { 206 // unfortunately we can't use a stronger barrier here as the fix synchronizing 207 // the race condition will then block 208 Thread.sleep(100); 209 } catch (InterruptedException ie) { 210 throw new InterruptedIOException("Interrupted waiting for latch"); 211 } 212 } 213 } 214}