001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.stream.Collectors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.CompactionState; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.regionserver.HStore; 051import org.apache.hadoop.hbase.regionserver.HStoreFile; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.junit.jupiter.api.AfterEach; 055import org.junit.jupiter.api.BeforeEach; 056import org.junit.jupiter.api.Tag; 057import org.junit.jupiter.api.Test; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and flushes it N times 3. Runs 063 * major MOB compaction 4. Verifies that number of MOB files in a mob directory is N+1 5. Waits for 064 * a period of time larger than minimum age to archive 6. Runs Mob cleaner chore 7 Verifies that 065 * every old MOB file referenced from current RS was archived 066 */ 067@Tag(MediumTests.TAG) 068public class TestRSMobFileCleanerChore { 069 private static final Logger LOG = LoggerFactory.getLogger(TestRSMobFileCleanerChore.class); 070 071 private HBaseTestingUtil HTU; 072 073 private final static String famStr = "f1"; 074 private final static byte[] fam = Bytes.toBytes(famStr); 075 private final static byte[] qualifier = Bytes.toBytes("q1"); 076 private final static long mobLen = 10; 077 private final static byte[] mobVal = Bytes 078 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 079 080 private Configuration conf; 081 private TableDescriptor tableDescriptor; 082 private ColumnFamilyDescriptor familyDescriptor; 083 private Admin admin; 084 private Table table = null; 085 private RSMobFileCleanerChore chore; 086 private long minAgeToArchive = 10000; 087 088 public TestRSMobFileCleanerChore() { 089 } 090 091 @BeforeEach 092 public void setUp() throws Exception { 093 HTU = new HBaseTestingUtil(); 094 conf = HTU.getConfiguration(); 095 096 initConf(); 097 098 HTU.startMiniCluster(); 099 admin = HTU.getAdmin(); 100 familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) 101 .setMobThreshold(mobLen).setMaxVersions(1).build(); 102 tableDescriptor = HTU.createModifyableTableDescriptor("testMobCompactTable") 103 .setColumnFamily(familyDescriptor).build(); 104 table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1")); 105 } 106 107 private void initConf() { 108 109 conf.setInt("hfile.format.version", 3); 110 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 111 conf.setInt("hbase.client.retries.number", 100); 112 conf.setInt("hbase.hregion.max.filesize", 200000000); 113 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 114 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 115 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 116 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 117 // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 118 // FaultyMobStoreCompactor.class.getName()); 119 // Disable automatic MOB compaction 120 conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); 121 // Disable automatic MOB file cleaner chore 122 conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); 123 // Set minimum age to archive to 10 sec 124 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive); 125 // Set compacted file discharger interval to a half minAgeToArchive 126 conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); 127 } 128 129 private void loadData(Table t, int start, int num) { 130 try { 131 132 for (int i = 0; i < num; i++) { 133 Put p = new Put(Bytes.toBytes(start + i)); 134 p.addColumn(fam, qualifier, mobVal); 135 t.put(p); 136 } 137 admin.flush(t.getName()); 138 } catch (Exception e) { 139 LOG.error("MOB file cleaner chore test FAILED", e); 140 assertTrue(false); 141 } 142 } 143 144 @AfterEach 145 public void tearDown() throws Exception { 146 admin.disableTable(tableDescriptor.getTableName()); 147 admin.deleteTable(tableDescriptor.getTableName()); 148 HTU.shutdownMiniCluster(); 149 } 150 151 @Test 152 public void testMobFileCleanerChore() throws InterruptedException, IOException { 153 loadData(table, 0, 10); 154 loadData(table, 10, 10); 155 // loadData(20, 10); 156 long num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 157 assertEquals(2, num); 158 // Major compact 159 admin.majorCompact(tableDescriptor.getTableName(), fam); 160 // wait until compaction is complete 161 while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) { 162 Thread.sleep(100); 163 } 164 165 num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 166 assertEquals(3, num); 167 // We have guarantee, that compcated file discharger will run during this pause 168 // because it has interval less than this wait time 169 LOG.info("Waiting for {}ms", minAgeToArchive + 1000); 170 171 Thread.sleep(minAgeToArchive + 1000); 172 LOG.info("Cleaning up MOB files"); 173 174 ServerName serverUsed = null; 175 List<RegionInfo> serverRegions = null; 176 for (ServerName sn : admin.getRegionServers()) { 177 serverRegions = admin.getRegions(sn); 178 if (serverRegions != null && serverRegions.size() > 0) { 179 // filtering out non test table regions 180 serverRegions = serverRegions.stream().filter(r -> r.getTable() == table.getName()) 181 .collect(Collectors.toList()); 182 // if such one is found use this rs 183 if (serverRegions.size() > 0) { 184 serverUsed = sn; 185 } 186 break; 187 } 188 } 189 190 chore = HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore(); 191 192 chore.chore(); 193 194 num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 195 assertEquals(3 - serverRegions.size(), num); 196 197 long scanned = scanTable(); 198 assertEquals(20, scanned); 199 200 // creating a MOB file not referenced from the current RS 201 Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(), 202 familyDescriptor, "nonExistentRegion"); 203 204 // verifying the new MOBfile is added 205 num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 206 assertEquals(4 - serverRegions.size(), num); 207 208 FileSystem fs = FileSystem.get(conf); 209 assertTrue(fs.exists(extraMOBFile)); 210 211 LOG.info("Waiting for {}ms", minAgeToArchive + 1000); 212 213 Thread.sleep(minAgeToArchive + 1000); 214 LOG.info("Cleaning up MOB files"); 215 216 // running chore again 217 chore.chore(); 218 219 // the chore should only archive old MOB files that were referenced from the current RS 220 // the unrelated MOB file is still there 221 num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 222 assertEquals(4 - serverRegions.size(), num); 223 224 assertTrue(fs.exists(extraMOBFile)); 225 226 scanned = scanTable(); 227 assertEquals(20, scanned); 228 } 229 230 @Test 231 public void testCleaningAndStoreFileReaderCreatedByOtherThreads() 232 throws IOException, InterruptedException { 233 TableName testTable = TableName.valueOf("testCleaningAndStoreFileReaderCreatedByOtherThreads"); 234 ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder.newBuilder(fam) 235 .setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build(); 236 TableDescriptor tDesc = 237 TableDescriptorBuilder.newBuilder(testTable).setColumnFamily(cfDesc).build(); 238 admin.createTable(tDesc); 239 assertTrue(admin.tableExists(testTable)); 240 241 // put some data 242 loadData(admin.getConnection().getTable(testTable), 0, 10); 243 244 HRegion region = HTU.getHBaseCluster().getRegions(testTable).get(0); 245 HStore store = region.getStore(fam); 246 Collection<HStoreFile> storeFiles = store.getStorefiles(); 247 assertEquals(1, store.getStorefiles().size()); 248 final HStoreFile sf = storeFiles.iterator().next(); 249 assertNotNull(sf); 250 long mobFileNum = getNumberOfMobFiles(conf, testTable, new String(fam)); 251 assertEquals(1, mobFileNum); 252 253 ServerName serverName = null; 254 for (ServerName sn : admin.getRegionServers()) { 255 boolean flag = admin.getRegions(sn).stream().anyMatch( 256 r -> r.getRegionNameAsString().equals(region.getRegionInfo().getRegionNameAsString())); 257 if (flag) { 258 serverName = sn; 259 break; 260 } 261 } 262 assertNotNull(serverName); 263 RSMobFileCleanerChore cleanerChore = 264 HTU.getHBaseCluster().getRegionServer(serverName).getRSMobFileCleanerChore(); 265 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { 266 boolean readerIsNotNull = false; 267 try { 268 sf.initReader(); 269 Thread.sleep(1000 * 10); 270 readerIsNotNull = sf.getReader() != null; 271 sf.closeStoreFile(true); 272 } catch (Exception e) { 273 LOG.error("We occur an exception", e); 274 } 275 return readerIsNotNull; 276 }); 277 Thread.sleep(100); 278 // The StoreFileReader object was created by another thread 279 cleanerChore.chore(); 280 Boolean readerIsNotNull = future.join(); 281 assertTrue(readerIsNotNull); 282 admin.disableTable(testTable); 283 admin.deleteTable(testTable); 284 } 285 286 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 287 throws IOException { 288 FileSystem fs = FileSystem.get(conf); 289 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 290 FileStatus[] stat = fs.listStatus(dir); 291 for (FileStatus st : stat) { 292 LOG.debug("DDDD MOB Directory content: {} size={}", st.getPath(), st.getLen()); 293 } 294 LOG.debug("MOB Directory content total files: {}", stat.length); 295 296 return stat.length; 297 } 298 299 private long scanTable() { 300 try { 301 302 Result result; 303 ResultScanner scanner = table.getScanner(fam); 304 long counter = 0; 305 while ((result = scanner.next()) != null) { 306 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 307 counter++; 308 } 309 return counter; 310 } catch (Exception e) { 311 e.printStackTrace(); 312 LOG.error("MOB file cleaner chore test FAILED"); 313 if (HTU != null) { 314 assertTrue(false); 315 } else { 316 System.exit(-1); 317 } 318 } 319 return 0; 320 } 321}