001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.nio.charset.StandardCharsets; 025import java.util.Arrays; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 037import org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor; 038import org.apache.hadoop.hbase.mob.MobConstants; 039import org.apache.hadoop.hbase.mob.MobFileCleanerChore; 040import org.apache.hadoop.hbase.mob.MobStoreEngine; 041import org.apache.hadoop.hbase.mob.MobUtils; 042import org.apache.hadoop.hbase.testclassification.IntegrationTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.util.ToolRunner; 045import org.junit.After; 046import org.junit.Before; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 053import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 054 055/** 056 * An integration test to detect regressions in HBASE-22749. Test creates MOB-enabled table, and 057 * runs in parallel, the following tasks: loads data, runs MOB compactions, runs MOB cleaning chore. 058 * The failure injections into MOB compaction cycle is implemented via specific sub-class of 059 * DefaultMobStoreCompactor - FaultyMobStoreCompactor. The probability of failure is controlled by 060 * command-line argument 'failprob'. 061 * @see <a href="https://issues.apache.org/jira/browse/HBASE-22749">HBASE-22749</a> 062 * <p> 063 * Sample usage: 064 * 065 * <pre> 066 * hbase org.apache.hadoop.hbase.IntegrationTestMobCompaction -Dservers=10 -Drows=1000000 067 * -Dfailprob=0.2 068 * </pre> 069 */ 070@SuppressWarnings("deprecation") 071 072@Category(IntegrationTests.class) 073public class IntegrationTestMobCompaction extends IntegrationTestBase { 074 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestMobCompaction.class); 075 076 protected static final String REGIONSERVER_COUNT_KEY = "servers"; 077 protected static final String ROWS_COUNT_KEY = "rows"; 078 protected static final String FAILURE_PROB_KEY = "failprob"; 079 080 protected static final int DEFAULT_REGIONSERVER_COUNT = 3; 081 protected static final int DEFAULT_ROWS_COUNT = 5000000; 082 protected static final double DEFAULT_FAILURE_PROB = 0.1; 083 084 protected static int regionServerCount = DEFAULT_REGIONSERVER_COUNT; 085 protected static long rowsToLoad = DEFAULT_ROWS_COUNT; 086 protected static double failureProb = DEFAULT_FAILURE_PROB; 087 088 protected static String famStr = "f1"; 089 protected static byte[] fam = Bytes.toBytes(famStr); 090 protected static byte[] qualifier = Bytes.toBytes("q1"); 091 protected static long mobLen = 10; 092 protected static byte[] mobVal = Bytes 093 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 094 095 private static Configuration conf; 096 private static HTableDescriptor hdt; 097 private static HColumnDescriptor hcd; 098 private static Admin admin; 099 private static Table table = null; 100 private static MobFileCleanerChore chore; 101 102 private static volatile boolean run = true; 103 104 @Override 105 @Before 106 public void setUp() throws Exception { 107 util = getTestingUtil(getConf()); 108 conf = util.getConfiguration(); 109 // Initialize with test-specific configuration values 110 initConf(conf); 111 regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); 112 LOG.info("Initializing cluster with {} region servers.", regionServerCount); 113 util.initializeCluster(regionServerCount); 114 admin = util.getAdmin(); 115 116 createTestTable(); 117 118 LOG.info("Cluster initialized and ready"); 119 } 120 121 private void createTestTable() throws IOException { 122 // Create test table 123 hdt = util.createTableDescriptor("testMobCompactTable"); 124 hcd = new HColumnDescriptor(fam); 125 hcd.setMobEnabled(true); 126 hcd.setMobThreshold(mobLen); 127 hcd.setMaxVersions(1); 128 hdt.addFamily(hcd); 129 table = util.createTable(hdt, null); 130 } 131 132 @After 133 public void tearDown() throws IOException { 134 LOG.info("Cleaning up after test."); 135 if (util.isDistributedCluster()) { 136 deleteTablesIfAny(); 137 // TODO 138 } 139 LOG.info("Restoring cluster."); 140 util.restoreCluster(); 141 LOG.info("Cluster restored."); 142 } 143 144 @Override 145 public void setUpMonkey() throws Exception { 146 // Sorry, no Monkey 147 String msg = "Chaos monkey is not supported"; 148 LOG.warn(msg); 149 throw new IOException(msg); 150 } 151 152 private void deleteTablesIfAny() throws IOException { 153 if (table != null) { 154 util.deleteTableIfAny(table.getName()); 155 } 156 } 157 158 @Override 159 public void setUpCluster() throws Exception { 160 util = getTestingUtil(getConf()); 161 LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); 162 util.initializeCluster(regionServerCount); 163 LOG.debug("Done initializing/checking cluster"); 164 } 165 166 /** Returns status of CLI execution */ 167 @Override 168 public int runTestFromCommandLine() throws Exception { 169 testMobCompaction(); 170 return 0; 171 } 172 173 @Override 174 public TableName getTablename() { 175 // That is only valid when Monkey is CALM (no monkey) 176 return null; 177 } 178 179 @Override 180 protected Set<String> getColumnFamilies() { 181 // That is only valid when Monkey is CALM (no monkey) 182 return null; 183 } 184 185 @Override 186 protected void addOptions() { 187 addOptWithArg(REGIONSERVER_COUNT_KEY, 188 "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); 189 addOptWithArg(ROWS_COUNT_KEY, 190 "Total number of data rows to load. Default: '" + DEFAULT_ROWS_COUNT + "'"); 191 addOptWithArg(FAILURE_PROB_KEY, 192 "Probability of a failure of a region MOB compaction request. Default: '" 193 + DEFAULT_FAILURE_PROB + "'"); 194 } 195 196 @Override 197 protected void processOptions(CommandLine cmd) { 198 super.processOptions(cmd); 199 200 regionServerCount = Integer.parseInt( 201 cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); 202 rowsToLoad = 203 Long.parseLong(cmd.getOptionValue(ROWS_COUNT_KEY, Long.toString(DEFAULT_ROWS_COUNT))); 204 failureProb = Double 205 .parseDouble(cmd.getOptionValue(FAILURE_PROB_KEY, Double.toString(DEFAULT_FAILURE_PROB))); 206 207 LOG.info( 208 MoreObjects.toStringHelper("Parsed Options").add(REGIONSERVER_COUNT_KEY, regionServerCount) 209 .add(ROWS_COUNT_KEY, rowsToLoad).add(FAILURE_PROB_KEY, failureProb).toString()); 210 } 211 212 private static void initConf(Configuration conf) { 213 214 conf.setInt("hfile.format.version", 3); 215 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 216 conf.setInt("hbase.client.retries.number", 100); 217 conf.setInt("hbase.hregion.max.filesize", 200000000); 218 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 219 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 220 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 221 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 222 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 223 conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, FaultyMobStoreCompactor.class.getName()); 224 conf.setBoolean("hbase.table.sanity.checks", false); 225 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 20000); 226 227 } 228 229 static class MajorCompaction implements Runnable { 230 231 @Override 232 public void run() { 233 while (run) { 234 try { 235 admin.majorCompact(hdt.getTableName(), fam); 236 Thread.sleep(120000); 237 } catch (Exception e) { 238 LOG.error("MOB Stress Test FAILED", e); 239 System.exit(-1); 240 } 241 } 242 } 243 } 244 245 static class CleanMobAndArchive implements Runnable { 246 247 @Override 248 public void run() { 249 while (run) { 250 try { 251 LOG.info("MOB cleanup chore started ..."); 252 if (chore == null) { 253 chore = new MobFileCleanerChore(); 254 } 255 chore.cleanupObsoleteMobFiles(conf, table.getName()); 256 LOG.info("MOB cleanup chore finished"); 257 258 Thread.sleep(130000); 259 } catch (Exception e) { 260 LOG.warn("Exception in CleanMobAndArchive", e); 261 } 262 } 263 } 264 } 265 266 class WriteData implements Runnable { 267 268 private long rows = -1; 269 270 public WriteData(long rows) { 271 this.rows = rows; 272 } 273 274 @Override 275 public void run() { 276 try { 277 278 // BufferedMutator bm = admin.getConnection().getBufferedMutator(table.getName()); 279 // Put Operation 280 for (int i = 0; i < rows; i++) { 281 Put p = new Put(Bytes.toBytes(i)); 282 p.addColumn(fam, qualifier, mobVal); 283 table.put(p); 284 285 // bm.mutate(p); 286 if (i % 10000 == 0) { 287 LOG.info("LOADED=" + i); 288 try { 289 Thread.sleep(500); 290 } catch (InterruptedException ee) { 291 // Restore interrupt status 292 Thread.currentThread().interrupt(); 293 } 294 } 295 if (i % 100000 == 0) { 296 printStats(i); 297 } 298 } 299 // bm.flush(); 300 admin.flush(table.getName()); 301 run = false; 302 } catch (Exception e) { 303 LOG.error("MOB Stress Test FAILED", e); 304 System.exit(-1); 305 } 306 } 307 } 308 309 @Test 310 public void testMobCompaction() throws InterruptedException, IOException { 311 312 try { 313 314 Thread writeData = new Thread(new WriteData(rowsToLoad)); 315 writeData.start(); 316 317 Thread majorcompact = new Thread(new MajorCompaction()); 318 majorcompact.start(); 319 320 Thread cleaner = new Thread(new CleanMobAndArchive()); 321 cleaner.start(); 322 323 while (run) { 324 Thread.sleep(1000); 325 } 326 327 getNumberOfMobFiles(conf, table.getName(), new String(fam, StandardCharsets.UTF_8)); 328 LOG.info("Waiting for write thread to finish ..."); 329 writeData.join(); 330 // Cleanup again 331 chore.cleanupObsoleteMobFiles(conf, table.getName()); 332 333 if (util != null) { 334 LOG.info("Archive cleaner started ..."); 335 // Call archive cleaner again 336 util.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 337 LOG.info("Archive cleaner finished"); 338 } 339 340 scanTable(); 341 342 } finally { 343 344 admin.disableTable(hdt.getTableName()); 345 admin.deleteTable(hdt.getTableName()); 346 } 347 LOG.info("MOB Stress Test finished OK"); 348 printStats(rowsToLoad); 349 350 } 351 352 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 353 throws IOException { 354 FileSystem fs = FileSystem.get(conf); 355 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 356 FileStatus[] stat = fs.listStatus(dir); 357 for (FileStatus st : stat) { 358 LOG.debug("MOB Directory content: {}", st.getPath()); 359 } 360 LOG.debug("MOB Directory content total files: {}", stat.length); 361 362 return stat.length; 363 } 364 365 public void printStats(long loaded) { 366 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 367 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 368 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 369 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 370 + FaultyMobStoreCompactor.totalFailures.get()); 371 } 372 373 private void scanTable() { 374 try { 375 376 Result result; 377 ResultScanner scanner = table.getScanner(fam); 378 int counter = 0; 379 while ((result = scanner.next()) != null) { 380 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 381 if (counter % 10000 == 0) { 382 LOG.info("GET=" + counter); 383 } 384 counter++; 385 } 386 assertEquals(rowsToLoad, counter); 387 } catch (Exception e) { 388 e.printStackTrace(); 389 LOG.error("MOB Stress Test FAILED"); 390 if (util != null) { 391 assertTrue(false); 392 } else { 393 System.exit(-1); 394 } 395 } 396 } 397 398 public static void main(String[] args) throws Exception { 399 Configuration conf = HBaseConfiguration.create(); 400 initConf(conf); 401 IntegrationTestingUtility.setUseDistributedCluster(conf); 402 int status = ToolRunner.run(conf, new IntegrationTestMobCompaction(), args); 403 System.exit(status); 404 } 405}