001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.Mockito.mock; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.Stoppable; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 045import org.apache.hadoop.hbase.regionserver.HRegion; 046import org.apache.hadoop.hbase.regionserver.HStore; 047import org.apache.hadoop.hbase.regionserver.HStoreFile; 048import org.apache.hadoop.hbase.regionserver.RegionScanner; 049import org.apache.hadoop.hbase.regionserver.RegionServerServices; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.jupiter.api.AfterEach; 054import org.junit.jupiter.api.BeforeEach; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.Test; 057import org.mockito.Mockito; 058 059@Tag(MediumTests.TAG) 060@Tag(RegionServerTests.TAG) 061public class TestCompactedHFilesDischarger { 062 063 private final HBaseTestingUtil testUtil = new HBaseTestingUtil(); 064 private HRegion region; 065 private final static byte[] fam = Bytes.toBytes("cf_1"); 066 private final static byte[] qual1 = Bytes.toBytes("qf_1"); 067 private final static byte[] val = Bytes.toBytes("val"); 068 private static CountDownLatch latch = new CountDownLatch(3); 069 private static AtomicInteger counter = new AtomicInteger(0); 070 private static AtomicInteger scanCompletedCounter = new AtomicInteger(0); 071 private RegionServerServices rss; 072 073 @BeforeEach 074 public void setUp() throws Exception { 075 TableName tableName = TableName.valueOf(getClass().getSimpleName()); 076 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 077 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build(); 078 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 079 Path path = testUtil.getDataTestDir(getClass().getSimpleName()); 080 region = 081 HBaseTestingUtil.createRegionAndWAL(info, path, testUtil.getConfiguration(), tableDescriptor); 082 rss = mock(RegionServerServices.class); 083 List<HRegion> regions = new ArrayList<>(1); 084 regions.add(region); 085 Mockito.doReturn(regions).when(rss).getRegions(); 086 } 087 088 @AfterEach 089 public void tearDown() throws IOException { 090 counter.set(0); 091 scanCompletedCounter.set(0); 092 latch = new CountDownLatch(3); 093 HBaseTestingUtil.closeRegionAndWAL(region); 094 testUtil.cleanupTestDir(); 095 } 096 097 @Test 098 public void testCompactedHFilesCleaner() throws Exception { 099 // Create the cleaner object 100 CompactedHFilesDischarger cleaner = 101 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 102 // Add some data to the region and do some flushes 103 for (int i = 1; i < 10; i++) { 104 Put p = new Put(Bytes.toBytes("row" + i)); 105 p.addColumn(fam, qual1, val); 106 region.put(p); 107 } 108 // flush them 109 region.flush(true); 110 for (int i = 11; i < 20; i++) { 111 Put p = new Put(Bytes.toBytes("row" + i)); 112 p.addColumn(fam, qual1, val); 113 region.put(p); 114 } 115 // flush them 116 region.flush(true); 117 for (int i = 21; i < 30; i++) { 118 Put p = new Put(Bytes.toBytes("row" + i)); 119 p.addColumn(fam, qual1, val); 120 region.put(p); 121 } 122 // flush them 123 region.flush(true); 124 125 HStore store = region.getStore(fam); 126 assertEquals(3, store.getStorefilesCount()); 127 128 Collection<HStoreFile> storefiles = store.getStorefiles(); 129 Collection<HStoreFile> compactedfiles = 130 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 131 // None of the files should be in compacted state. 132 for (HStoreFile file : storefiles) { 133 assertFalse(file.isCompactedAway()); 134 } 135 // Try to run the cleaner without compaction. there should not be any change 136 cleaner.chore(); 137 storefiles = store.getStorefiles(); 138 // None of the files should be in compacted state. 139 for (HStoreFile file : storefiles) { 140 assertFalse(file.isCompactedAway()); 141 } 142 // now do some compaction 143 region.compact(true); 144 // Still the flushed files should be present until the cleaner runs. But the state of it should 145 // be in COMPACTED state 146 assertEquals(1, store.getStorefilesCount()); 147 assertEquals(3, 148 ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 149 150 // Run the cleaner 151 cleaner.chore(); 152 assertEquals(1, store.getStorefilesCount()); 153 storefiles = store.getStorefiles(); 154 for (HStoreFile file : storefiles) { 155 // Should not be in compacted state 156 assertFalse(file.isCompactedAway()); 157 } 158 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 159 assertTrue(compactedfiles.isEmpty()); 160 161 } 162 163 @Test 164 public void testCleanerWithParallelScannersAfterCompaction() throws Exception { 165 // Create the cleaner object 166 CompactedHFilesDischarger cleaner = 167 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 168 // Add some data to the region and do some flushes 169 for (int i = 1; i < 10; i++) { 170 Put p = new Put(Bytes.toBytes("row" + i)); 171 p.addColumn(fam, qual1, val); 172 region.put(p); 173 } 174 // flush them 175 region.flush(true); 176 for (int i = 11; i < 20; i++) { 177 Put p = new Put(Bytes.toBytes("row" + i)); 178 p.addColumn(fam, qual1, val); 179 region.put(p); 180 } 181 // flush them 182 region.flush(true); 183 for (int i = 21; i < 30; i++) { 184 Put p = new Put(Bytes.toBytes("row" + i)); 185 p.addColumn(fam, qual1, val); 186 region.put(p); 187 } 188 // flush them 189 region.flush(true); 190 191 HStore store = region.getStore(fam); 192 assertEquals(3, store.getStorefilesCount()); 193 194 Collection<HStoreFile> storefiles = store.getStorefiles(); 195 Collection<HStoreFile> compactedfiles = 196 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 197 // None of the files should be in compacted state. 198 for (HStoreFile file : storefiles) { 199 assertFalse(file.isCompactedAway()); 200 } 201 // Do compaction 202 region.compact(true); 203 startScannerThreads(); 204 205 storefiles = store.getStorefiles(); 206 int usedReaderCount = 0; 207 int unusedReaderCount = 0; 208 for (HStoreFile file : storefiles) { 209 if (((HStoreFile) file).getRefCount() == 3) { 210 usedReaderCount++; 211 } 212 } 213 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 214 for (HStoreFile file : compactedfiles) { 215 assertEquals(0, ((HStoreFile) file).getRefCount(), "Refcount should be 3"); 216 unusedReaderCount++; 217 } 218 // Though there are files we are not using them for reads 219 assertEquals(3, unusedReaderCount, "unused reader count should be 3"); 220 assertEquals(1, usedReaderCount, "used reader count should be 1"); 221 // now run the cleaner 222 cleaner.chore(); 223 countDown(); 224 assertEquals(1, store.getStorefilesCount()); 225 storefiles = store.getStorefiles(); 226 for (HStoreFile file : storefiles) { 227 // Should not be in compacted state 228 assertFalse(file.isCompactedAway()); 229 } 230 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 231 assertTrue(compactedfiles.isEmpty()); 232 } 233 234 @Test 235 public void testCleanerWithParallelScanners() throws Exception { 236 // Create the cleaner object 237 CompactedHFilesDischarger cleaner = 238 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 239 // Add some data to the region and do some flushes 240 for (int i = 1; i < 10; i++) { 241 Put p = new Put(Bytes.toBytes("row" + i)); 242 p.addColumn(fam, qual1, val); 243 region.put(p); 244 } 245 // flush them 246 region.flush(true); 247 for (int i = 11; i < 20; i++) { 248 Put p = new Put(Bytes.toBytes("row" + i)); 249 p.addColumn(fam, qual1, val); 250 region.put(p); 251 } 252 // flush them 253 region.flush(true); 254 for (int i = 21; i < 30; i++) { 255 Put p = new Put(Bytes.toBytes("row" + i)); 256 p.addColumn(fam, qual1, val); 257 region.put(p); 258 } 259 // flush them 260 region.flush(true); 261 262 HStore store = region.getStore(fam); 263 assertEquals(3, store.getStorefilesCount()); 264 265 Collection<HStoreFile> storefiles = store.getStorefiles(); 266 Collection<HStoreFile> compactedfiles = 267 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 268 // None of the files should be in compacted state. 269 for (HStoreFile file : storefiles) { 270 assertFalse(file.isCompactedAway()); 271 } 272 startScannerThreads(); 273 // Do compaction 274 region.compact(true); 275 276 storefiles = store.getStorefiles(); 277 int usedReaderCount = 0; 278 int unusedReaderCount = 0; 279 for (HStoreFile file : storefiles) { 280 if (file.getRefCount() == 0) { 281 unusedReaderCount++; 282 } 283 } 284 compactedfiles = store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 285 for (HStoreFile file : compactedfiles) { 286 assertEquals(3, ((HStoreFile) file).getRefCount(), "Refcount should be 3"); 287 usedReaderCount++; 288 } 289 // The newly compacted file will not be used by any scanner 290 assertEquals(1, unusedReaderCount, "unused reader count should be 1"); 291 assertEquals(3, usedReaderCount, "used reader count should be 3"); 292 // now run the cleaner 293 cleaner.chore(); 294 countDown(); 295 // No change in the number of store files as none of the compacted files could be cleaned up 296 assertEquals(1, store.getStorefilesCount()); 297 assertEquals(3, 298 ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 299 while (scanCompletedCounter.get() != 3) { 300 Thread.sleep(100); 301 } 302 // reset 303 latch = new CountDownLatch(3); 304 scanCompletedCounter.set(0); 305 counter.set(0); 306 // Try creating a new scanner and it should use only the new file created after compaction 307 startScannerThreads(); 308 storefiles = store.getStorefiles(); 309 usedReaderCount = 0; 310 unusedReaderCount = 0; 311 for (HStoreFile file : storefiles) { 312 if (file.getRefCount() == 3) { 313 usedReaderCount++; 314 } 315 } 316 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 317 for (HStoreFile file : compactedfiles) { 318 assertEquals(0, file.getRefCount(), "Refcount should be 0"); 319 unusedReaderCount++; 320 } 321 // Though there are files we are not using them for reads 322 assertEquals(3, unusedReaderCount, "unused reader count should be 3"); 323 assertEquals(1, usedReaderCount, "used reader count should be 1"); 324 countDown(); 325 while (scanCompletedCounter.get() != 3) { 326 Thread.sleep(100); 327 } 328 // Run the cleaner again 329 cleaner.chore(); 330 // Now the cleaner should be able to clear it up because there are no active readers 331 assertEquals(1, store.getStorefilesCount()); 332 storefiles = store.getStorefiles(); 333 for (HStoreFile file : storefiles) { 334 // Should not be in compacted state 335 assertFalse(file.isCompactedAway()); 336 } 337 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 338 assertTrue(compactedfiles.isEmpty()); 339 } 340 341 @Test 342 public void testStoreFileMissing() throws Exception { 343 // Write 3 records and create 3 store files. 344 write("row1"); 345 region.flush(true); 346 write("row2"); 347 region.flush(true); 348 write("row3"); 349 region.flush(true); 350 351 Scan scan = new Scan(); 352 scan.setCaching(1); 353 RegionScanner scanner = region.getScanner(scan); 354 List<Cell> res = new ArrayList<Cell>(); 355 // Read first item 356 scanner.next(res); 357 assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0)))); 358 res.clear(); 359 // Create a new file in between scan nexts 360 write("row4"); 361 region.flush(true); 362 363 // Compact the table 364 region.compact(true); 365 366 // Create the cleaner object 367 CompactedHFilesDischarger cleaner = 368 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 369 cleaner.chore(); 370 // This issues scan next 371 scanner.next(res); 372 assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0)))); 373 374 scanner.close(); 375 } 376 377 private void write(String row1) throws IOException { 378 byte[] row = Bytes.toBytes(row1); 379 Put put = new Put(row); 380 put.addColumn(fam, qual1, row); 381 region.put(put); 382 } 383 384 protected void countDown() { 385 // count down 3 times 386 latch.countDown(); 387 latch.countDown(); 388 latch.countDown(); 389 } 390 391 protected void startScannerThreads() throws InterruptedException { 392 // Start parallel scan threads 393 ScanThread[] scanThreads = new ScanThread[3]; 394 for (int i = 0; i < 3; i++) { 395 scanThreads[i] = new ScanThread((HRegion) region); 396 } 397 for (ScanThread thread : scanThreads) { 398 thread.start(); 399 } 400 while (counter.get() != 3) { 401 Thread.sleep(100); 402 } 403 } 404 405 private static class ScanThread extends Thread { 406 private final HRegion region; 407 408 public ScanThread(HRegion region) { 409 this.region = region; 410 } 411 412 @Override 413 public void run() { 414 try { 415 initiateScan(region); 416 } catch (IOException e) { 417 e.printStackTrace(); 418 } 419 } 420 421 private void initiateScan(HRegion region) throws IOException { 422 Scan scan = new Scan(); 423 scan.setCaching(1); 424 RegionScanner resScanner = null; 425 try { 426 resScanner = region.getScanner(scan); 427 List<Cell> results = new ArrayList<>(); 428 boolean next = resScanner.next(results); 429 try { 430 counter.incrementAndGet(); 431 latch.await(); 432 } catch (InterruptedException e) { 433 } 434 while (next) { 435 next = resScanner.next(results); 436 } 437 } finally { 438 scanCompletedCounter.incrementAndGet(); 439 resScanner.close(); 440 } 441 } 442 } 443}