001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.mockito.Mockito.mock;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.List;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.Stoppable;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
043import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
044import org.apache.hadoop.hbase.testclassification.RegionServerTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.wal.WALFactory;
049import org.junit.jupiter.api.AfterEach;
050import org.junit.jupiter.api.BeforeEach;
051import org.junit.jupiter.api.Tag;
052import org.junit.jupiter.api.Test;
053import org.junit.jupiter.api.TestInfo;
054import org.mockito.Mockito;
055
056/**
057 * Tests a race condition between archiving of compacted files in CompactedHFilesDischarger chore
058 * and HRegion.close();
059 */
060@Tag(RegionServerTests.TAG)
061@Tag(SmallTests.TAG)
062public class TestCompactionArchiveConcurrentClose {
063
064  private HBaseTestingUtil testUtil;
065
066  private Path testDir;
067  private AtomicBoolean archived = new AtomicBoolean();
068
069  // Static field to track archived state for the static inner class
070  private static final AtomicBoolean STATIC_ARCHIVED = new AtomicBoolean();
071  private String name;
072
073  @BeforeEach
074  public void setup(TestInfo testInfo) throws Exception {
075    this.name = testInfo.getTestMethod().get().getName();
076    testUtil = new HBaseTestingUtil();
077    testDir = testUtil.getDataTestDir("TestStoreFileRefresherChore");
078    CommonFSUtils.setRootDir(testUtil.getConfiguration(), testDir);
079    // Configure the test to use our custom WaitingStoreFileTracker
080    testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
081      WaitingStoreFileTracker.class.getName());
082    // Reset the static archived flag
083    STATIC_ARCHIVED.set(false);
084  }
085
086  @AfterEach
087  public void tearDown() throws Exception {
088    testUtil.cleanupTestDir();
089  }
090
091  @Test
092  public void testStoreCloseAndDischargeRunningInParallel() throws Exception {
093    byte[] fam = Bytes.toBytes("f");
094    byte[] col = Bytes.toBytes("c");
095    byte[] val = Bytes.toBytes("val");
096
097    TableName tableName = TableName.valueOf(name);
098    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
099      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
100    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
101    HRegion region = initHRegion(htd, info);
102    RegionServerServices rss = mock(RegionServerServices.class);
103    List<HRegion> regions = new ArrayList<>();
104    regions.add(region);
105    Mockito.doReturn(regions).when(rss).getRegions();
106
107    // Create the cleaner object
108    CompactedHFilesDischarger cleaner =
109      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
110    // Add some data to the region and do some flushes
111    int batchSize = 10;
112    int fileCount = 10;
113    for (int f = 0; f < fileCount; f++) {
114      int start = f * batchSize;
115      for (int i = start; i < start + batchSize; i++) {
116        Put p = new Put(Bytes.toBytes("row" + i));
117        p.addColumn(fam, col, val);
118        region.put(p);
119      }
120      // flush them
121      region.flush(true);
122    }
123
124    HStore store = region.getStore(fam);
125    assertEquals(fileCount, store.getStorefilesCount());
126
127    Collection<HStoreFile> storefiles = store.getStorefiles();
128    // None of the files should be in compacted state.
129    for (HStoreFile file : storefiles) {
130      assertFalse(file.isCompactedAway());
131    }
132    System.out.println("Finished compaction ");
133    // Do compaction
134    region.compact(true);
135
136    // now run the cleaner with a concurrent close
137    Thread cleanerThread = new Thread() {
138      @Override
139      public void run() {
140        cleaner.chore();
141      }
142    };
143    cleanerThread.start();
144    // wait for cleaner to pause
145    synchronized (STATIC_ARCHIVED) {
146      if (!STATIC_ARCHIVED.get()) {
147        STATIC_ARCHIVED.wait();
148      }
149    }
150    final AtomicReference<Exception> closeException = new AtomicReference<>();
151    Thread closeThread = new Thread() {
152      @Override
153      public void run() {
154        // wait for the chore to complete and call close
155        try {
156          ((HRegion) region).close();
157        } catch (IOException e) {
158          closeException.set(e);
159        }
160      }
161    };
162    closeThread.start();
163    // no error should occur after the execution of the test
164    closeThread.join();
165    cleanerThread.join();
166
167    if (closeException.get() != null) {
168      throw closeException.get();
169    }
170  }
171
172  private HRegion initHRegion(TableDescriptor htd, RegionInfo info) throws IOException {
173    Configuration conf = testUtil.getConfiguration();
174    Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName());
175
176    HRegionFileSystem fs =
177      new HRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info);
178    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
179      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
180    final Configuration walConf = new Configuration(conf);
181    CommonFSUtils.setRootDir(walConf, tableDir);
182    final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName());
183    HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null);
184    Path regionDir = new Path(tableDir, info.getEncodedName());
185    if (!fs.getFileSystem().exists(regionDir)) {
186      fs.getFileSystem().mkdirs(regionDir);
187    }
188    region.initialize();
189    return region;
190  }
191
192  public static class WaitingStoreFileTracker extends StoreFileTrackerForTest {
193
194    public WaitingStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
195      super(conf, isPrimaryReplica, ctx);
196    }
197
198    @Override
199    public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException {
200      super.removeStoreFiles(storeFiles);
201      STATIC_ARCHIVED.set(true);
202      synchronized (STATIC_ARCHIVED) {
203        STATIC_ARCHIVED.notifyAll();
204      }
205      try {
206        // unfortunately we can't use a stronger barrier here as the fix synchronizing
207        // the race condition will then block
208        Thread.sleep(100);
209      } catch (InterruptedException ie) {
210        throw new InterruptedIOException("Interrupted waiting for latch");
211      }
212    }
213  }
214}