001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.mock; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HColumnDescriptor; 037import org.apache.hadoop.hbase.HRegionInfo; 038import org.apache.hadoop.hbase.HTableDescriptor; 039import org.apache.hadoop.hbase.Stoppable; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.regionserver.HStore; 046import org.apache.hadoop.hbase.regionserver.HStoreFile; 047import org.apache.hadoop.hbase.regionserver.RegionScanner; 048import org.apache.hadoop.hbase.regionserver.RegionServerServices; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.testclassification.RegionServerTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.junit.After; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.mockito.Mockito; 058 059@Category({ MediumTests.class, RegionServerTests.class }) 060public class TestCompactedHFilesDischarger { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestCompactedHFilesDischarger.class); 065 066 private final HBaseTestingUtility testUtil = new HBaseTestingUtility(); 067 private HRegion region; 068 private final static byte[] fam = Bytes.toBytes("cf_1"); 069 private final static byte[] qual1 = Bytes.toBytes("qf_1"); 070 private final static byte[] val = Bytes.toBytes("val"); 071 private static CountDownLatch latch = new CountDownLatch(3); 072 private static AtomicInteger counter = new AtomicInteger(0); 073 private static AtomicInteger scanCompletedCounter = new AtomicInteger(0); 074 private RegionServerServices rss; 075 076 @Before 077 public void setUp() throws Exception { 078 TableName tableName = TableName.valueOf(getClass().getSimpleName()); 079 HTableDescriptor htd = new HTableDescriptor(tableName); 080 htd.addFamily(new HColumnDescriptor(fam)); 081 HRegionInfo info = new HRegionInfo(tableName, null, null, false); 082 Path path = testUtil.getDataTestDir(getClass().getSimpleName()); 083 region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd); 084 rss = mock(RegionServerServices.class); 085 List<HRegion> regions = new ArrayList<>(1); 086 regions.add(region); 087 Mockito.doReturn(regions).when(rss).getRegions(); 088 } 089 090 @After 091 public void tearDown() throws IOException { 092 counter.set(0); 093 scanCompletedCounter.set(0); 094 latch = new CountDownLatch(3); 095 HBaseTestingUtility.closeRegionAndWAL(region); 096 testUtil.cleanupTestDir(); 097 } 098 099 @Test 100 public void testCompactedHFilesCleaner() throws Exception { 101 // Create the cleaner object 102 CompactedHFilesDischarger cleaner = 103 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 104 // Add some data to the region and do some flushes 105 for (int i = 1; i < 10; i++) { 106 Put p = new Put(Bytes.toBytes("row" + i)); 107 p.addColumn(fam, qual1, val); 108 region.put(p); 109 } 110 // flush them 111 region.flush(true); 112 for (int i = 11; i < 20; i++) { 113 Put p = new Put(Bytes.toBytes("row" + i)); 114 p.addColumn(fam, qual1, val); 115 region.put(p); 116 } 117 // flush them 118 region.flush(true); 119 for (int i = 21; i < 30; i++) { 120 Put p = new Put(Bytes.toBytes("row" + i)); 121 p.addColumn(fam, qual1, val); 122 region.put(p); 123 } 124 // flush them 125 region.flush(true); 126 127 HStore store = region.getStore(fam); 128 assertEquals(3, store.getStorefilesCount()); 129 130 Collection<HStoreFile> storefiles = store.getStorefiles(); 131 Collection<HStoreFile> compactedfiles = 132 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 133 // None of the files should be in compacted state. 134 for (HStoreFile file : storefiles) { 135 assertFalse(file.isCompactedAway()); 136 } 137 // Try to run the cleaner without compaction. there should not be any change 138 cleaner.chore(); 139 storefiles = store.getStorefiles(); 140 // None of the files should be in compacted state. 141 for (HStoreFile file : storefiles) { 142 assertFalse(file.isCompactedAway()); 143 } 144 // now do some compaction 145 region.compact(true); 146 // Still the flushed files should be present until the cleaner runs. But the state of it should 147 // be in COMPACTED state 148 assertEquals(1, store.getStorefilesCount()); 149 assertEquals(3, 150 ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 151 152 // Run the cleaner 153 cleaner.chore(); 154 assertEquals(1, store.getStorefilesCount()); 155 storefiles = store.getStorefiles(); 156 for (HStoreFile file : storefiles) { 157 // Should not be in compacted state 158 assertFalse(file.isCompactedAway()); 159 } 160 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 161 assertTrue(compactedfiles.isEmpty()); 162 163 } 164 165 @Test 166 public void testCleanerWithParallelScannersAfterCompaction() throws Exception { 167 // Create the cleaner object 168 CompactedHFilesDischarger cleaner = 169 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 170 // Add some data to the region and do some flushes 171 for (int i = 1; i < 10; i++) { 172 Put p = new Put(Bytes.toBytes("row" + i)); 173 p.addColumn(fam, qual1, val); 174 region.put(p); 175 } 176 // flush them 177 region.flush(true); 178 for (int i = 11; i < 20; i++) { 179 Put p = new Put(Bytes.toBytes("row" + i)); 180 p.addColumn(fam, qual1, val); 181 region.put(p); 182 } 183 // flush them 184 region.flush(true); 185 for (int i = 21; i < 30; i++) { 186 Put p = new Put(Bytes.toBytes("row" + i)); 187 p.addColumn(fam, qual1, val); 188 region.put(p); 189 } 190 // flush them 191 region.flush(true); 192 193 HStore store = region.getStore(fam); 194 assertEquals(3, store.getStorefilesCount()); 195 196 Collection<HStoreFile> storefiles = store.getStorefiles(); 197 Collection<HStoreFile> compactedfiles = 198 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 199 // None of the files should be in compacted state. 200 for (HStoreFile file : storefiles) { 201 assertFalse(file.isCompactedAway()); 202 } 203 // Do compaction 204 region.compact(true); 205 startScannerThreads(); 206 207 storefiles = store.getStorefiles(); 208 int usedReaderCount = 0; 209 int unusedReaderCount = 0; 210 for (HStoreFile file : storefiles) { 211 if (((HStoreFile) file).getRefCount() == 3) { 212 usedReaderCount++; 213 } 214 } 215 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 216 for (HStoreFile file : compactedfiles) { 217 assertEquals("Refcount should be 3", 0, ((HStoreFile) file).getRefCount()); 218 unusedReaderCount++; 219 } 220 // Though there are files we are not using them for reads 221 assertEquals("unused reader count should be 3", 3, unusedReaderCount); 222 assertEquals("used reader count should be 1", 1, usedReaderCount); 223 // now run the cleaner 224 cleaner.chore(); 225 countDown(); 226 assertEquals(1, store.getStorefilesCount()); 227 storefiles = store.getStorefiles(); 228 for (HStoreFile file : storefiles) { 229 // Should not be in compacted state 230 assertFalse(file.isCompactedAway()); 231 } 232 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 233 assertTrue(compactedfiles.isEmpty()); 234 } 235 236 @Test 237 public void testCleanerWithParallelScanners() throws Exception { 238 // Create the cleaner object 239 CompactedHFilesDischarger cleaner = 240 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 241 // Add some data to the region and do some flushes 242 for (int i = 1; i < 10; i++) { 243 Put p = new Put(Bytes.toBytes("row" + i)); 244 p.addColumn(fam, qual1, val); 245 region.put(p); 246 } 247 // flush them 248 region.flush(true); 249 for (int i = 11; i < 20; i++) { 250 Put p = new Put(Bytes.toBytes("row" + i)); 251 p.addColumn(fam, qual1, val); 252 region.put(p); 253 } 254 // flush them 255 region.flush(true); 256 for (int i = 21; i < 30; i++) { 257 Put p = new Put(Bytes.toBytes("row" + i)); 258 p.addColumn(fam, qual1, val); 259 region.put(p); 260 } 261 // flush them 262 region.flush(true); 263 264 HStore store = region.getStore(fam); 265 assertEquals(3, store.getStorefilesCount()); 266 267 Collection<HStoreFile> storefiles = store.getStorefiles(); 268 Collection<HStoreFile> compactedfiles = 269 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 270 // None of the files should be in compacted state. 271 for (HStoreFile file : storefiles) { 272 assertFalse(file.isCompactedAway()); 273 } 274 startScannerThreads(); 275 // Do compaction 276 region.compact(true); 277 278 storefiles = store.getStorefiles(); 279 int usedReaderCount = 0; 280 int unusedReaderCount = 0; 281 for (HStoreFile file : storefiles) { 282 if (file.getRefCount() == 0) { 283 unusedReaderCount++; 284 } 285 } 286 compactedfiles = store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 287 for (HStoreFile file : compactedfiles) { 288 assertEquals("Refcount should be 3", 3, ((HStoreFile) file).getRefCount()); 289 usedReaderCount++; 290 } 291 // The newly compacted file will not be used by any scanner 292 assertEquals("unused reader count should be 1", 1, unusedReaderCount); 293 assertEquals("used reader count should be 3", 3, usedReaderCount); 294 // now run the cleaner 295 cleaner.chore(); 296 countDown(); 297 // No change in the number of store files as none of the compacted files could be cleaned up 298 assertEquals(1, store.getStorefilesCount()); 299 assertEquals(3, 300 ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 301 while (scanCompletedCounter.get() != 3) { 302 Thread.sleep(100); 303 } 304 // reset 305 latch = new CountDownLatch(3); 306 scanCompletedCounter.set(0); 307 counter.set(0); 308 // Try creating a new scanner and it should use only the new file created after compaction 309 startScannerThreads(); 310 storefiles = store.getStorefiles(); 311 usedReaderCount = 0; 312 unusedReaderCount = 0; 313 for (HStoreFile file : storefiles) { 314 if (file.getRefCount() == 3) { 315 usedReaderCount++; 316 } 317 } 318 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 319 for (HStoreFile file : compactedfiles) { 320 assertEquals("Refcount should be 0", 0, file.getRefCount()); 321 unusedReaderCount++; 322 } 323 // Though there are files we are not using them for reads 324 assertEquals("unused reader count should be 3", 3, unusedReaderCount); 325 assertEquals("used reader count should be 1", 1, usedReaderCount); 326 countDown(); 327 while (scanCompletedCounter.get() != 3) { 328 Thread.sleep(100); 329 } 330 // Run the cleaner again 331 cleaner.chore(); 332 // Now the cleaner should be able to clear it up because there are no active readers 333 assertEquals(1, store.getStorefilesCount()); 334 storefiles = store.getStorefiles(); 335 for (HStoreFile file : storefiles) { 336 // Should not be in compacted state 337 assertFalse(file.isCompactedAway()); 338 } 339 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 340 assertTrue(compactedfiles.isEmpty()); 341 } 342 343 @Test 344 public void testStoreFileMissing() throws Exception { 345 // Write 3 records and create 3 store files. 346 write("row1"); 347 region.flush(true); 348 write("row2"); 349 region.flush(true); 350 write("row3"); 351 region.flush(true); 352 353 Scan scan = new Scan(); 354 scan.setCaching(1); 355 RegionScanner scanner = region.getScanner(scan); 356 List<Cell> res = new ArrayList<Cell>(); 357 // Read first item 358 scanner.next(res); 359 assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0)))); 360 res.clear(); 361 // Create a new file in between scan nexts 362 write("row4"); 363 region.flush(true); 364 365 // Compact the table 366 region.compact(true); 367 368 // Create the cleaner object 369 CompactedHFilesDischarger cleaner = 370 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 371 cleaner.chore(); 372 // This issues scan next 373 scanner.next(res); 374 assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0)))); 375 376 scanner.close(); 377 } 378 379 private void write(String row1) throws IOException { 380 byte[] row = Bytes.toBytes(row1); 381 Put put = new Put(row); 382 put.addColumn(fam, qual1, row); 383 region.put(put); 384 } 385 386 protected void countDown() { 387 // count down 3 times 388 latch.countDown(); 389 latch.countDown(); 390 latch.countDown(); 391 } 392 393 protected void startScannerThreads() throws InterruptedException { 394 // Start parallel scan threads 395 ScanThread[] scanThreads = new ScanThread[3]; 396 for (int i = 0; i < 3; i++) { 397 scanThreads[i] = new ScanThread((HRegion) region); 398 } 399 for (ScanThread thread : scanThreads) { 400 thread.start(); 401 } 402 while (counter.get() != 3) { 403 Thread.sleep(100); 404 } 405 } 406 407 private static class ScanThread extends Thread { 408 private final HRegion region; 409 410 public ScanThread(HRegion region) { 411 this.region = region; 412 } 413 414 @Override 415 public void run() { 416 try { 417 initiateScan(region); 418 } catch (IOException e) { 419 e.printStackTrace(); 420 } 421 } 422 423 private void initiateScan(HRegion region) throws IOException { 424 Scan scan = new Scan(); 425 scan.setCaching(1); 426 RegionScanner resScanner = null; 427 try { 428 resScanner = region.getScanner(scan); 429 List<Cell> results = new ArrayList<>(); 430 boolean next = resScanner.next(results); 431 try { 432 counter.incrementAndGet(); 433 latch.await(); 434 } catch (InterruptedException e) { 435 } 436 while (next) { 437 next = resScanner.next(results); 438 } 439 } finally { 440 scanCompletedCounter.incrementAndGet(); 441 resScanner.close(); 442 } 443 } 444 } 445}