001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.mock;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HColumnDescriptor;
037import org.apache.hadoop.hbase.HRegionInfo;
038import org.apache.hadoop.hbase.HTableDescriptor;
039import org.apache.hadoop.hbase.Stoppable;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.regionserver.HStore;
046import org.apache.hadoop.hbase.regionserver.HStoreFile;
047import org.apache.hadoop.hbase.regionserver.RegionScanner;
048import org.apache.hadoop.hbase.regionserver.RegionServerServices;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.testclassification.RegionServerTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.junit.After;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.mockito.Mockito;
058
059@Category({ MediumTests.class, RegionServerTests.class })
060public class TestCompactedHFilesDischarger {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestCompactedHFilesDischarger.class);
065
066  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
067  private HRegion region;
068  private final static byte[] fam = Bytes.toBytes("cf_1");
069  private final static byte[] qual1 = Bytes.toBytes("qf_1");
070  private final static byte[] val = Bytes.toBytes("val");
071  private static CountDownLatch latch = new CountDownLatch(3);
072  private static AtomicInteger counter = new AtomicInteger(0);
073  private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
074  private RegionServerServices rss;
075
076  @Before
077  public void setUp() throws Exception {
078    TableName tableName = TableName.valueOf(getClass().getSimpleName());
079    HTableDescriptor htd = new HTableDescriptor(tableName);
080    htd.addFamily(new HColumnDescriptor(fam));
081    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
082    Path path = testUtil.getDataTestDir(getClass().getSimpleName());
083    region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
084    rss = mock(RegionServerServices.class);
085    List<HRegion> regions = new ArrayList<>(1);
086    regions.add(region);
087    Mockito.doReturn(regions).when(rss).getRegions();
088  }
089
090  @After
091  public void tearDown() throws IOException {
092    counter.set(0);
093    scanCompletedCounter.set(0);
094    latch = new CountDownLatch(3);
095    HBaseTestingUtility.closeRegionAndWAL(region);
096    testUtil.cleanupTestDir();
097  }
098
099  @Test
100  public void testCompactedHFilesCleaner() throws Exception {
101    // Create the cleaner object
102    CompactedHFilesDischarger cleaner =
103      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
104    // Add some data to the region and do some flushes
105    for (int i = 1; i < 10; i++) {
106      Put p = new Put(Bytes.toBytes("row" + i));
107      p.addColumn(fam, qual1, val);
108      region.put(p);
109    }
110    // flush them
111    region.flush(true);
112    for (int i = 11; i < 20; i++) {
113      Put p = new Put(Bytes.toBytes("row" + i));
114      p.addColumn(fam, qual1, val);
115      region.put(p);
116    }
117    // flush them
118    region.flush(true);
119    for (int i = 21; i < 30; i++) {
120      Put p = new Put(Bytes.toBytes("row" + i));
121      p.addColumn(fam, qual1, val);
122      region.put(p);
123    }
124    // flush them
125    region.flush(true);
126
127    HStore store = region.getStore(fam);
128    assertEquals(3, store.getStorefilesCount());
129
130    Collection<HStoreFile> storefiles = store.getStorefiles();
131    Collection<HStoreFile> compactedfiles =
132      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
133    // None of the files should be in compacted state.
134    for (HStoreFile file : storefiles) {
135      assertFalse(file.isCompactedAway());
136    }
137    // Try to run the cleaner without compaction. there should not be any change
138    cleaner.chore();
139    storefiles = store.getStorefiles();
140    // None of the files should be in compacted state.
141    for (HStoreFile file : storefiles) {
142      assertFalse(file.isCompactedAway());
143    }
144    // now do some compaction
145    region.compact(true);
146    // Still the flushed files should be present until the cleaner runs. But the state of it should
147    // be in COMPACTED state
148    assertEquals(1, store.getStorefilesCount());
149    assertEquals(3,
150      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
151
152    // Run the cleaner
153    cleaner.chore();
154    assertEquals(1, store.getStorefilesCount());
155    storefiles = store.getStorefiles();
156    for (HStoreFile file : storefiles) {
157      // Should not be in compacted state
158      assertFalse(file.isCompactedAway());
159    }
160    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
161    assertTrue(compactedfiles.isEmpty());
162
163  }
164
165  @Test
166  public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
167    // Create the cleaner object
168    CompactedHFilesDischarger cleaner =
169      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
170    // Add some data to the region and do some flushes
171    for (int i = 1; i < 10; i++) {
172      Put p = new Put(Bytes.toBytes("row" + i));
173      p.addColumn(fam, qual1, val);
174      region.put(p);
175    }
176    // flush them
177    region.flush(true);
178    for (int i = 11; i < 20; i++) {
179      Put p = new Put(Bytes.toBytes("row" + i));
180      p.addColumn(fam, qual1, val);
181      region.put(p);
182    }
183    // flush them
184    region.flush(true);
185    for (int i = 21; i < 30; i++) {
186      Put p = new Put(Bytes.toBytes("row" + i));
187      p.addColumn(fam, qual1, val);
188      region.put(p);
189    }
190    // flush them
191    region.flush(true);
192
193    HStore store = region.getStore(fam);
194    assertEquals(3, store.getStorefilesCount());
195
196    Collection<HStoreFile> storefiles = store.getStorefiles();
197    Collection<HStoreFile> compactedfiles =
198      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
199    // None of the files should be in compacted state.
200    for (HStoreFile file : storefiles) {
201      assertFalse(file.isCompactedAway());
202    }
203    // Do compaction
204    region.compact(true);
205    startScannerThreads();
206
207    storefiles = store.getStorefiles();
208    int usedReaderCount = 0;
209    int unusedReaderCount = 0;
210    for (HStoreFile file : storefiles) {
211      if (((HStoreFile) file).getRefCount() == 3) {
212        usedReaderCount++;
213      }
214    }
215    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
216    for (HStoreFile file : compactedfiles) {
217      assertEquals("Refcount should be 3", 0, ((HStoreFile) file).getRefCount());
218      unusedReaderCount++;
219    }
220    // Though there are files we are not using them for reads
221    assertEquals("unused reader count should be 3", 3, unusedReaderCount);
222    assertEquals("used reader count should be 1", 1, usedReaderCount);
223    // now run the cleaner
224    cleaner.chore();
225    countDown();
226    assertEquals(1, store.getStorefilesCount());
227    storefiles = store.getStorefiles();
228    for (HStoreFile file : storefiles) {
229      // Should not be in compacted state
230      assertFalse(file.isCompactedAway());
231    }
232    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
233    assertTrue(compactedfiles.isEmpty());
234  }
235
236  @Test
237  public void testCleanerWithParallelScanners() throws Exception {
238    // Create the cleaner object
239    CompactedHFilesDischarger cleaner =
240      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
241    // Add some data to the region and do some flushes
242    for (int i = 1; i < 10; i++) {
243      Put p = new Put(Bytes.toBytes("row" + i));
244      p.addColumn(fam, qual1, val);
245      region.put(p);
246    }
247    // flush them
248    region.flush(true);
249    for (int i = 11; i < 20; i++) {
250      Put p = new Put(Bytes.toBytes("row" + i));
251      p.addColumn(fam, qual1, val);
252      region.put(p);
253    }
254    // flush them
255    region.flush(true);
256    for (int i = 21; i < 30; i++) {
257      Put p = new Put(Bytes.toBytes("row" + i));
258      p.addColumn(fam, qual1, val);
259      region.put(p);
260    }
261    // flush them
262    region.flush(true);
263
264    HStore store = region.getStore(fam);
265    assertEquals(3, store.getStorefilesCount());
266
267    Collection<HStoreFile> storefiles = store.getStorefiles();
268    Collection<HStoreFile> compactedfiles =
269      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
270    // None of the files should be in compacted state.
271    for (HStoreFile file : storefiles) {
272      assertFalse(file.isCompactedAway());
273    }
274    startScannerThreads();
275    // Do compaction
276    region.compact(true);
277
278    storefiles = store.getStorefiles();
279    int usedReaderCount = 0;
280    int unusedReaderCount = 0;
281    for (HStoreFile file : storefiles) {
282      if (file.getRefCount() == 0) {
283        unusedReaderCount++;
284      }
285    }
286    compactedfiles = store.getStoreEngine().getStoreFileManager().getCompactedfiles();
287    for (HStoreFile file : compactedfiles) {
288      assertEquals("Refcount should be 3", 3, ((HStoreFile) file).getRefCount());
289      usedReaderCount++;
290    }
291    // The newly compacted file will not be used by any scanner
292    assertEquals("unused reader count should be 1", 1, unusedReaderCount);
293    assertEquals("used reader count should be 3", 3, usedReaderCount);
294    // now run the cleaner
295    cleaner.chore();
296    countDown();
297    // No change in the number of store files as none of the compacted files could be cleaned up
298    assertEquals(1, store.getStorefilesCount());
299    assertEquals(3,
300      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
301    while (scanCompletedCounter.get() != 3) {
302      Thread.sleep(100);
303    }
304    // reset
305    latch = new CountDownLatch(3);
306    scanCompletedCounter.set(0);
307    counter.set(0);
308    // Try creating a new scanner and it should use only the new file created after compaction
309    startScannerThreads();
310    storefiles = store.getStorefiles();
311    usedReaderCount = 0;
312    unusedReaderCount = 0;
313    for (HStoreFile file : storefiles) {
314      if (file.getRefCount() == 3) {
315        usedReaderCount++;
316      }
317    }
318    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
319    for (HStoreFile file : compactedfiles) {
320      assertEquals("Refcount should be 0", 0, file.getRefCount());
321      unusedReaderCount++;
322    }
323    // Though there are files we are not using them for reads
324    assertEquals("unused reader count should be 3", 3, unusedReaderCount);
325    assertEquals("used reader count should be 1", 1, usedReaderCount);
326    countDown();
327    while (scanCompletedCounter.get() != 3) {
328      Thread.sleep(100);
329    }
330    // Run the cleaner again
331    cleaner.chore();
332    // Now the cleaner should be able to clear it up because there are no active readers
333    assertEquals(1, store.getStorefilesCount());
334    storefiles = store.getStorefiles();
335    for (HStoreFile file : storefiles) {
336      // Should not be in compacted state
337      assertFalse(file.isCompactedAway());
338    }
339    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
340    assertTrue(compactedfiles.isEmpty());
341  }
342
343  @Test
344  public void testStoreFileMissing() throws Exception {
345    // Write 3 records and create 3 store files.
346    write("row1");
347    region.flush(true);
348    write("row2");
349    region.flush(true);
350    write("row3");
351    region.flush(true);
352
353    Scan scan = new Scan();
354    scan.setCaching(1);
355    RegionScanner scanner = region.getScanner(scan);
356    List<Cell> res = new ArrayList<Cell>();
357    // Read first item
358    scanner.next(res);
359    assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
360    res.clear();
361    // Create a new file in between scan nexts
362    write("row4");
363    region.flush(true);
364
365    // Compact the table
366    region.compact(true);
367
368    // Create the cleaner object
369    CompactedHFilesDischarger cleaner =
370      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
371    cleaner.chore();
372    // This issues scan next
373    scanner.next(res);
374    assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
375
376    scanner.close();
377  }
378
379  private void write(String row1) throws IOException {
380    byte[] row = Bytes.toBytes(row1);
381    Put put = new Put(row);
382    put.addColumn(fam, qual1, row);
383    region.put(put);
384  }
385
386  protected void countDown() {
387    // count down 3 times
388    latch.countDown();
389    latch.countDown();
390    latch.countDown();
391  }
392
393  protected void startScannerThreads() throws InterruptedException {
394    // Start parallel scan threads
395    ScanThread[] scanThreads = new ScanThread[3];
396    for (int i = 0; i < 3; i++) {
397      scanThreads[i] = new ScanThread((HRegion) region);
398    }
399    for (ScanThread thread : scanThreads) {
400      thread.start();
401    }
402    while (counter.get() != 3) {
403      Thread.sleep(100);
404    }
405  }
406
407  private static class ScanThread extends Thread {
408    private final HRegion region;
409
410    public ScanThread(HRegion region) {
411      this.region = region;
412    }
413
414    @Override
415    public void run() {
416      try {
417        initiateScan(region);
418      } catch (IOException e) {
419        e.printStackTrace();
420      }
421    }
422
423    private void initiateScan(HRegion region) throws IOException {
424      Scan scan = new Scan();
425      scan.setCaching(1);
426      RegionScanner resScanner = null;
427      try {
428        resScanner = region.getScanner(scan);
429        List<Cell> results = new ArrayList<>();
430        boolean next = resScanner.next(results);
431        try {
432          counter.incrementAndGet();
433          latch.await();
434        } catch (InterruptedException e) {
435        }
436        while (next) {
437          next = resScanner.next(results);
438        }
439      } finally {
440        scanCompletedCounter.incrementAndGet();
441        resScanner.close();
442      }
443    }
444  }
445}