001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.Mockito.mock;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
045import org.apache.hadoop.hbase.regionserver.HRegion;
046import org.apache.hadoop.hbase.regionserver.HStore;
047import org.apache.hadoop.hbase.regionserver.HStoreFile;
048import org.apache.hadoop.hbase.regionserver.RegionScanner;
049import org.apache.hadoop.hbase.regionserver.RegionServerServices;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.jupiter.api.AfterEach;
054import org.junit.jupiter.api.BeforeEach;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.Test;
057import org.mockito.Mockito;
058
059@Tag(MediumTests.TAG)
060@Tag(RegionServerTests.TAG)
061public class TestCompactedHFilesDischarger {
062
063  private final HBaseTestingUtil testUtil = new HBaseTestingUtil();
064  private HRegion region;
065  private final static byte[] fam = Bytes.toBytes("cf_1");
066  private final static byte[] qual1 = Bytes.toBytes("qf_1");
067  private final static byte[] val = Bytes.toBytes("val");
068  private static CountDownLatch latch = new CountDownLatch(3);
069  private static AtomicInteger counter = new AtomicInteger(0);
070  private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
071  private RegionServerServices rss;
072
073  @BeforeEach
074  public void setUp() throws Exception {
075    TableName tableName = TableName.valueOf(getClass().getSimpleName());
076    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
077      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
078    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
079    Path path = testUtil.getDataTestDir(getClass().getSimpleName());
080    region =
081      HBaseTestingUtil.createRegionAndWAL(info, path, testUtil.getConfiguration(), tableDescriptor);
082    rss = mock(RegionServerServices.class);
083    List<HRegion> regions = new ArrayList<>(1);
084    regions.add(region);
085    Mockito.doReturn(regions).when(rss).getRegions();
086  }
087
088  @AfterEach
089  public void tearDown() throws IOException {
090    counter.set(0);
091    scanCompletedCounter.set(0);
092    latch = new CountDownLatch(3);
093    HBaseTestingUtil.closeRegionAndWAL(region);
094    testUtil.cleanupTestDir();
095  }
096
097  @Test
098  public void testCompactedHFilesCleaner() throws Exception {
099    // Create the cleaner object
100    CompactedHFilesDischarger cleaner =
101      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
102    // Add some data to the region and do some flushes
103    for (int i = 1; i < 10; i++) {
104      Put p = new Put(Bytes.toBytes("row" + i));
105      p.addColumn(fam, qual1, val);
106      region.put(p);
107    }
108    // flush them
109    region.flush(true);
110    for (int i = 11; i < 20; i++) {
111      Put p = new Put(Bytes.toBytes("row" + i));
112      p.addColumn(fam, qual1, val);
113      region.put(p);
114    }
115    // flush them
116    region.flush(true);
117    for (int i = 21; i < 30; i++) {
118      Put p = new Put(Bytes.toBytes("row" + i));
119      p.addColumn(fam, qual1, val);
120      region.put(p);
121    }
122    // flush them
123    region.flush(true);
124
125    HStore store = region.getStore(fam);
126    assertEquals(3, store.getStorefilesCount());
127
128    Collection<HStoreFile> storefiles = store.getStorefiles();
129    Collection<HStoreFile> compactedfiles =
130      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
131    // None of the files should be in compacted state.
132    for (HStoreFile file : storefiles) {
133      assertFalse(file.isCompactedAway());
134    }
135    // Try to run the cleaner without compaction. there should not be any change
136    cleaner.chore();
137    storefiles = store.getStorefiles();
138    // None of the files should be in compacted state.
139    for (HStoreFile file : storefiles) {
140      assertFalse(file.isCompactedAway());
141    }
142    // now do some compaction
143    region.compact(true);
144    // Still the flushed files should be present until the cleaner runs. But the state of it should
145    // be in COMPACTED state
146    assertEquals(1, store.getStorefilesCount());
147    assertEquals(3,
148      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
149
150    // Run the cleaner
151    cleaner.chore();
152    assertEquals(1, store.getStorefilesCount());
153    storefiles = store.getStorefiles();
154    for (HStoreFile file : storefiles) {
155      // Should not be in compacted state
156      assertFalse(file.isCompactedAway());
157    }
158    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
159    assertTrue(compactedfiles.isEmpty());
160
161  }
162
163  @Test
164  public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
165    // Create the cleaner object
166    CompactedHFilesDischarger cleaner =
167      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
168    // Add some data to the region and do some flushes
169    for (int i = 1; i < 10; i++) {
170      Put p = new Put(Bytes.toBytes("row" + i));
171      p.addColumn(fam, qual1, val);
172      region.put(p);
173    }
174    // flush them
175    region.flush(true);
176    for (int i = 11; i < 20; i++) {
177      Put p = new Put(Bytes.toBytes("row" + i));
178      p.addColumn(fam, qual1, val);
179      region.put(p);
180    }
181    // flush them
182    region.flush(true);
183    for (int i = 21; i < 30; i++) {
184      Put p = new Put(Bytes.toBytes("row" + i));
185      p.addColumn(fam, qual1, val);
186      region.put(p);
187    }
188    // flush them
189    region.flush(true);
190
191    HStore store = region.getStore(fam);
192    assertEquals(3, store.getStorefilesCount());
193
194    Collection<HStoreFile> storefiles = store.getStorefiles();
195    Collection<HStoreFile> compactedfiles =
196      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
197    // None of the files should be in compacted state.
198    for (HStoreFile file : storefiles) {
199      assertFalse(file.isCompactedAway());
200    }
201    // Do compaction
202    region.compact(true);
203    startScannerThreads();
204
205    storefiles = store.getStorefiles();
206    int usedReaderCount = 0;
207    int unusedReaderCount = 0;
208    for (HStoreFile file : storefiles) {
209      if (((HStoreFile) file).getRefCount() == 3) {
210        usedReaderCount++;
211      }
212    }
213    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
214    for (HStoreFile file : compactedfiles) {
215      assertEquals(0, ((HStoreFile) file).getRefCount(), "Refcount should be 3");
216      unusedReaderCount++;
217    }
218    // Though there are files we are not using them for reads
219    assertEquals(3, unusedReaderCount, "unused reader count should be 3");
220    assertEquals(1, usedReaderCount, "used reader count should be 1");
221    // now run the cleaner
222    cleaner.chore();
223    countDown();
224    assertEquals(1, store.getStorefilesCount());
225    storefiles = store.getStorefiles();
226    for (HStoreFile file : storefiles) {
227      // Should not be in compacted state
228      assertFalse(file.isCompactedAway());
229    }
230    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
231    assertTrue(compactedfiles.isEmpty());
232  }
233
234  @Test
235  public void testCleanerWithParallelScanners() throws Exception {
236    // Create the cleaner object
237    CompactedHFilesDischarger cleaner =
238      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
239    // Add some data to the region and do some flushes
240    for (int i = 1; i < 10; i++) {
241      Put p = new Put(Bytes.toBytes("row" + i));
242      p.addColumn(fam, qual1, val);
243      region.put(p);
244    }
245    // flush them
246    region.flush(true);
247    for (int i = 11; i < 20; i++) {
248      Put p = new Put(Bytes.toBytes("row" + i));
249      p.addColumn(fam, qual1, val);
250      region.put(p);
251    }
252    // flush them
253    region.flush(true);
254    for (int i = 21; i < 30; i++) {
255      Put p = new Put(Bytes.toBytes("row" + i));
256      p.addColumn(fam, qual1, val);
257      region.put(p);
258    }
259    // flush them
260    region.flush(true);
261
262    HStore store = region.getStore(fam);
263    assertEquals(3, store.getStorefilesCount());
264
265    Collection<HStoreFile> storefiles = store.getStorefiles();
266    Collection<HStoreFile> compactedfiles =
267      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
268    // None of the files should be in compacted state.
269    for (HStoreFile file : storefiles) {
270      assertFalse(file.isCompactedAway());
271    }
272    startScannerThreads();
273    // Do compaction
274    region.compact(true);
275
276    storefiles = store.getStorefiles();
277    int usedReaderCount = 0;
278    int unusedReaderCount = 0;
279    for (HStoreFile file : storefiles) {
280      if (file.getRefCount() == 0) {
281        unusedReaderCount++;
282      }
283    }
284    compactedfiles = store.getStoreEngine().getStoreFileManager().getCompactedfiles();
285    for (HStoreFile file : compactedfiles) {
286      assertEquals(3, ((HStoreFile) file).getRefCount(), "Refcount should be 3");
287      usedReaderCount++;
288    }
289    // The newly compacted file will not be used by any scanner
290    assertEquals(1, unusedReaderCount, "unused reader count should be 1");
291    assertEquals(3, usedReaderCount, "used reader count should be 3");
292    // now run the cleaner
293    cleaner.chore();
294    countDown();
295    // No change in the number of store files as none of the compacted files could be cleaned up
296    assertEquals(1, store.getStorefilesCount());
297    assertEquals(3,
298      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
299    while (scanCompletedCounter.get() != 3) {
300      Thread.sleep(100);
301    }
302    // reset
303    latch = new CountDownLatch(3);
304    scanCompletedCounter.set(0);
305    counter.set(0);
306    // Try creating a new scanner and it should use only the new file created after compaction
307    startScannerThreads();
308    storefiles = store.getStorefiles();
309    usedReaderCount = 0;
310    unusedReaderCount = 0;
311    for (HStoreFile file : storefiles) {
312      if (file.getRefCount() == 3) {
313        usedReaderCount++;
314      }
315    }
316    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
317    for (HStoreFile file : compactedfiles) {
318      assertEquals(0, file.getRefCount(), "Refcount should be 0");
319      unusedReaderCount++;
320    }
321    // Though there are files we are not using them for reads
322    assertEquals(3, unusedReaderCount, "unused reader count should be 3");
323    assertEquals(1, usedReaderCount, "used reader count should be 1");
324    countDown();
325    while (scanCompletedCounter.get() != 3) {
326      Thread.sleep(100);
327    }
328    // Run the cleaner again
329    cleaner.chore();
330    // Now the cleaner should be able to clear it up because there are no active readers
331    assertEquals(1, store.getStorefilesCount());
332    storefiles = store.getStorefiles();
333    for (HStoreFile file : storefiles) {
334      // Should not be in compacted state
335      assertFalse(file.isCompactedAway());
336    }
337    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
338    assertTrue(compactedfiles.isEmpty());
339  }
340
341  @Test
342  public void testStoreFileMissing() throws Exception {
343    // Write 3 records and create 3 store files.
344    write("row1");
345    region.flush(true);
346    write("row2");
347    region.flush(true);
348    write("row3");
349    region.flush(true);
350
351    Scan scan = new Scan();
352    scan.setCaching(1);
353    RegionScanner scanner = region.getScanner(scan);
354    List<Cell> res = new ArrayList<Cell>();
355    // Read first item
356    scanner.next(res);
357    assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
358    res.clear();
359    // Create a new file in between scan nexts
360    write("row4");
361    region.flush(true);
362
363    // Compact the table
364    region.compact(true);
365
366    // Create the cleaner object
367    CompactedHFilesDischarger cleaner =
368      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
369    cleaner.chore();
370    // This issues scan next
371    scanner.next(res);
372    assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
373
374    scanner.close();
375  }
376
377  private void write(String row1) throws IOException {
378    byte[] row = Bytes.toBytes(row1);
379    Put put = new Put(row);
380    put.addColumn(fam, qual1, row);
381    region.put(put);
382  }
383
384  protected void countDown() {
385    // count down 3 times
386    latch.countDown();
387    latch.countDown();
388    latch.countDown();
389  }
390
391  protected void startScannerThreads() throws InterruptedException {
392    // Start parallel scan threads
393    ScanThread[] scanThreads = new ScanThread[3];
394    for (int i = 0; i < 3; i++) {
395      scanThreads[i] = new ScanThread((HRegion) region);
396    }
397    for (ScanThread thread : scanThreads) {
398      thread.start();
399    }
400    while (counter.get() != 3) {
401      Thread.sleep(100);
402    }
403  }
404
405  private static class ScanThread extends Thread {
406    private final HRegion region;
407
408    public ScanThread(HRegion region) {
409      this.region = region;
410    }
411
412    @Override
413    public void run() {
414      try {
415        initiateScan(region);
416      } catch (IOException e) {
417        e.printStackTrace();
418      }
419    }
420
421    private void initiateScan(HRegion region) throws IOException {
422      Scan scan = new Scan();
423      scan.setCaching(1);
424      RegionScanner resScanner = null;
425      try {
426        resScanner = region.getScanner(scan);
427        List<Cell> results = new ArrayList<>();
428        boolean next = resScanner.next(results);
429        try {
430          counter.incrementAndGet();
431          latch.await();
432        } catch (InterruptedException e) {
433        }
434        while (next) {
435          next = resScanner.next(results);
436        }
437      } finally {
438        scanCompletedCounter.incrementAndGet();
439        resScanner.close();
440      }
441    }
442  }
443}