001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.nio.charset.StandardCharsets; 025import java.util.Arrays; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.ResultScanner; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 040import org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor; 041import org.apache.hadoop.hbase.mob.MobConstants; 042import org.apache.hadoop.hbase.mob.MobFileCleanupUtil; 043import org.apache.hadoop.hbase.mob.MobStoreEngine; 044import org.apache.hadoop.hbase.mob.MobUtils; 045import org.apache.hadoop.hbase.testclassification.IntegrationTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.util.ToolRunner; 048import org.junit.jupiter.api.AfterEach; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 057 058/** 059 * An integration test to detect regressions in HBASE-22749. Test creates MOB-enabled table, and 060 * runs in parallel, the following tasks: loads data, runs MOB compactions, runs MOB cleaning chore. 061 * The failure injections into MOB compaction cycle is implemented via specific sub-class of 062 * DefaultMobStoreCompactor - FaultyMobStoreCompactor. The probability of failure is controlled by 063 * command-line argument 'failprob'. 064 * @see <a href="https://issues.apache.org/jira/browse/HBASE-22749">HBASE-22749</a> 065 * <p> 066 * Sample usage: 067 * 068 * <pre> 069 * hbase org.apache.hadoop.hbase.IntegrationTestMobCompaction -Dservers=10 -Drows=1000000 070 * -Dfailprob=0.2 071 * </pre> 072 */ 073@SuppressWarnings("deprecation") 074@Tag(IntegrationTests.TAG) 075public class IntegrationTestMobCompaction extends IntegrationTestBase { 076 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestMobCompaction.class); 077 078 protected static final String REGIONSERVER_COUNT_KEY = "servers"; 079 protected static final String ROWS_COUNT_KEY = "rows"; 080 protected static final String FAILURE_PROB_KEY = "failprob"; 081 082 protected static final int DEFAULT_REGIONSERVER_COUNT = 3; 083 protected static final int DEFAULT_ROWS_COUNT = 5000000; 084 protected static final double DEFAULT_FAILURE_PROB = 0.1; 085 086 protected static int regionServerCount = DEFAULT_REGIONSERVER_COUNT; 087 protected static long rowsToLoad = DEFAULT_ROWS_COUNT; 088 protected static double failureProb = DEFAULT_FAILURE_PROB; 089 090 protected static String famStr = "f1"; 091 protected static byte[] fam = Bytes.toBytes(famStr); 092 protected static byte[] qualifier = Bytes.toBytes("q1"); 093 protected static long mobLen = 10; 094 protected static byte[] mobVal = Bytes 095 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 096 097 private static Configuration conf; 098 private static TableDescriptor tableDescriptor; 099 private static ColumnFamilyDescriptor familyDescriptor; 100 private static Admin admin; 101 private static Table table = null; 102 103 private static volatile boolean run = true; 104 105 @Override 106 @BeforeEach 107 public void setUp() throws Exception { 108 util = getTestingUtil(getConf()); 109 conf = util.getConfiguration(); 110 // Initialize with test-specific configuration values 111 initConf(conf); 112 regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); 113 LOG.info("Initializing cluster with {} region servers.", regionServerCount); 114 util.initializeCluster(regionServerCount); 115 admin = util.getAdmin(); 116 117 createTestTable(); 118 119 LOG.info("Cluster initialized and ready"); 120 } 121 122 private void createTestTable() throws IOException { 123 // Create test table 124 familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) 125 .setMobThreshold(mobLen).setMaxVersions(1).build(); 126 tableDescriptor = util.createModifyableTableDescriptor("testMobCompactTable") 127 .setColumnFamily(familyDescriptor).build(); 128 table = util.createTable(tableDescriptor, null); 129 } 130 131 @AfterEach 132 public void tearDown() throws IOException { 133 LOG.info("Cleaning up after test."); 134 if (util.isDistributedCluster()) { 135 deleteTablesIfAny(); 136 // TODO 137 } 138 LOG.info("Restoring cluster."); 139 util.restoreCluster(); 140 LOG.info("Cluster restored."); 141 } 142 143 @Override 144 public void setUpMonkey() throws Exception { 145 // Sorry, no Monkey 146 String msg = "Chaos monkey is not supported"; 147 LOG.warn(msg); 148 throw new IOException(msg); 149 } 150 151 private void deleteTablesIfAny() throws IOException { 152 if (table != null) { 153 util.deleteTableIfAny(table.getName()); 154 } 155 } 156 157 @Override 158 public void setUpCluster() throws Exception { 159 util = getTestingUtil(getConf()); 160 LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); 161 util.initializeCluster(regionServerCount); 162 LOG.debug("Done initializing/checking cluster"); 163 } 164 165 /** Returns status of CLI execution */ 166 @Override 167 public int runTestFromCommandLine() throws Exception { 168 testMobCompaction(); 169 return 0; 170 } 171 172 @Override 173 public TableName getTablename() { 174 // That is only valid when Monkey is CALM (no monkey) 175 return null; 176 } 177 178 @Override 179 protected Set<String> getColumnFamilies() { 180 // That is only valid when Monkey is CALM (no monkey) 181 return null; 182 } 183 184 @Override 185 protected void addOptions() { 186 addOptWithArg(REGIONSERVER_COUNT_KEY, 187 "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); 188 addOptWithArg(ROWS_COUNT_KEY, 189 "Total number of data rows to load. Default: '" + DEFAULT_ROWS_COUNT + "'"); 190 addOptWithArg(FAILURE_PROB_KEY, 191 "Probability of a failure of a region MOB compaction request. Default: '" 192 + DEFAULT_FAILURE_PROB + "'"); 193 } 194 195 @Override 196 protected void processOptions(CommandLine cmd) { 197 super.processOptions(cmd); 198 199 regionServerCount = Integer.parseInt( 200 cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); 201 rowsToLoad = 202 Long.parseLong(cmd.getOptionValue(ROWS_COUNT_KEY, Long.toString(DEFAULT_ROWS_COUNT))); 203 failureProb = Double 204 .parseDouble(cmd.getOptionValue(FAILURE_PROB_KEY, Double.toString(DEFAULT_FAILURE_PROB))); 205 206 LOG.info( 207 MoreObjects.toStringHelper("Parsed Options").add(REGIONSERVER_COUNT_KEY, regionServerCount) 208 .add(ROWS_COUNT_KEY, rowsToLoad).add(FAILURE_PROB_KEY, failureProb).toString()); 209 } 210 211 private static void initConf(Configuration conf) { 212 213 conf.setInt("hfile.format.version", 3); 214 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 215 conf.setInt("hbase.client.retries.number", 100); 216 conf.setInt("hbase.hregion.max.filesize", 200000000); 217 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 218 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 219 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 220 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 221 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 222 conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, FaultyMobStoreCompactor.class.getName()); 223 conf.setBoolean("hbase.table.sanity.checks", false); 224 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 20000); 225 226 } 227 228 static class MajorCompaction implements Runnable { 229 230 @Override 231 public void run() { 232 while (run) { 233 try { 234 admin.majorCompact(tableDescriptor.getTableName(), fam); 235 Thread.sleep(120000); 236 } catch (Exception e) { 237 LOG.error("MOB Stress Test FAILED", e); 238 System.exit(-1); 239 } 240 } 241 } 242 } 243 244 static class CleanMobAndArchive implements Runnable { 245 246 @Override 247 public void run() { 248 while (run) { 249 try { 250 LOG.info("MOB cleanup started ..."); 251 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 252 LOG.info("MOB cleanup finished"); 253 254 Thread.sleep(130000); 255 } catch (Exception e) { 256 LOG.warn("Exception in CleanMobAndArchive", e); 257 } 258 } 259 } 260 } 261 262 class WriteData implements Runnable { 263 264 private long rows = -1; 265 266 public WriteData(long rows) { 267 this.rows = rows; 268 } 269 270 @Override 271 public void run() { 272 try { 273 274 // BufferedMutator bm = admin.getConnection().getBufferedMutator(table.getName()); 275 // Put Operation 276 for (int i = 0; i < rows; i++) { 277 Put p = new Put(Bytes.toBytes(i)); 278 p.addColumn(fam, qualifier, mobVal); 279 table.put(p); 280 281 // bm.mutate(p); 282 if (i % 10000 == 0) { 283 LOG.info("LOADED=" + i); 284 try { 285 Thread.sleep(500); 286 } catch (InterruptedException ee) { 287 // Restore interrupt status 288 Thread.currentThread().interrupt(); 289 } 290 } 291 if (i % 100000 == 0) { 292 printStats(i); 293 } 294 } 295 // bm.flush(); 296 admin.flush(table.getName()); 297 run = false; 298 } catch (Exception e) { 299 LOG.error("MOB Stress Test FAILED", e); 300 System.exit(-1); 301 } 302 } 303 } 304 305 @Test 306 public void testMobCompaction() throws InterruptedException, IOException { 307 308 try { 309 310 Thread writeData = new Thread(new WriteData(rowsToLoad)); 311 writeData.start(); 312 313 Thread majorcompact = new Thread(new MajorCompaction()); 314 majorcompact.start(); 315 316 Thread cleaner = new Thread(new CleanMobAndArchive()); 317 cleaner.start(); 318 319 while (run) { 320 Thread.sleep(1000); 321 } 322 323 getNumberOfMobFiles(conf, table.getName(), new String(fam, StandardCharsets.UTF_8)); 324 LOG.info("Waiting for write thread to finish ..."); 325 writeData.join(); 326 // Cleanup again 327 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 328 329 if (util != null) { 330 LOG.info("Archive cleaner started ..."); 331 // Call archive cleaner again 332 util.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 333 LOG.info("Archive cleaner finished"); 334 } 335 336 scanTable(); 337 338 } finally { 339 340 admin.disableTable(tableDescriptor.getTableName()); 341 admin.deleteTable(tableDescriptor.getTableName()); 342 } 343 LOG.info("MOB Stress Test finished OK"); 344 printStats(rowsToLoad); 345 346 } 347 348 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 349 throws IOException { 350 FileSystem fs = FileSystem.get(conf); 351 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 352 FileStatus[] stat = fs.listStatus(dir); 353 for (FileStatus st : stat) { 354 LOG.debug("MOB Directory content: {}", st.getPath()); 355 } 356 LOG.debug("MOB Directory content total files: {}", stat.length); 357 358 return stat.length; 359 } 360 361 public void printStats(long loaded) { 362 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 363 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 364 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 365 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 366 + FaultyMobStoreCompactor.totalFailures.get()); 367 } 368 369 private void scanTable() { 370 try { 371 372 Result result; 373 ResultScanner scanner = table.getScanner(fam); 374 int counter = 0; 375 while ((result = scanner.next()) != null) { 376 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 377 if (counter % 10000 == 0) { 378 LOG.info("GET=" + counter); 379 } 380 counter++; 381 } 382 assertEquals(rowsToLoad, counter); 383 } catch (Exception e) { 384 e.printStackTrace(); 385 LOG.error("MOB Stress Test FAILED"); 386 if (util != null) { 387 assertTrue(false); 388 } else { 389 System.exit(-1); 390 } 391 } 392 } 393 394 public static void main(String[] args) throws Exception { 395 Configuration conf = HBaseConfiguration.create(); 396 initConf(conf); 397 IntegrationTestingUtility.setUseDistributedCluster(conf); 398 int status = ToolRunner.run(conf, new IntegrationTestMobCompaction(), args); 399 System.exit(status); 400 } 401}