001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.nio.charset.StandardCharsets; 025import java.util.Arrays; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.ResultScanner; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 040import org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor; 041import org.apache.hadoop.hbase.mob.MobConstants; 042import org.apache.hadoop.hbase.mob.MobFileCleanerChore; 043import org.apache.hadoop.hbase.mob.MobStoreEngine; 044import org.apache.hadoop.hbase.mob.MobUtils; 045import org.apache.hadoop.hbase.testclassification.IntegrationTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.util.ToolRunner; 048import org.junit.After; 049import org.junit.Before; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 057 058/** 059 * An integration test to detect regressions in HBASE-22749. Test creates MOB-enabled table, and 060 * runs in parallel, the following tasks: loads data, runs MOB compactions, runs MOB cleaning chore. 061 * The failure injections into MOB compaction cycle is implemented via specific sub-class of 062 * DefaultMobStoreCompactor - FaultyMobStoreCompactor. The probability of failure is controlled by 063 * command-line argument 'failprob'. 064 * @see <a href="https://issues.apache.org/jira/browse/HBASE-22749">HBASE-22749</a> 065 * <p> 066 * Sample usage: 067 * 068 * <pre> 069 * hbase org.apache.hadoop.hbase.IntegrationTestMobCompaction -Dservers=10 -Drows=1000000 070 * -Dfailprob=0.2 071 * </pre> 072 */ 073@SuppressWarnings("deprecation") 074 075@Category(IntegrationTests.class) 076public class IntegrationTestMobCompaction extends IntegrationTestBase { 077 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestMobCompaction.class); 078 079 protected static final String REGIONSERVER_COUNT_KEY = "servers"; 080 protected static final String ROWS_COUNT_KEY = "rows"; 081 protected static final String FAILURE_PROB_KEY = "failprob"; 082 083 protected static final int DEFAULT_REGIONSERVER_COUNT = 3; 084 protected static final int DEFAULT_ROWS_COUNT = 5000000; 085 protected static final double DEFAULT_FAILURE_PROB = 0.1; 086 087 protected static int regionServerCount = DEFAULT_REGIONSERVER_COUNT; 088 protected static long rowsToLoad = DEFAULT_ROWS_COUNT; 089 protected static double failureProb = DEFAULT_FAILURE_PROB; 090 091 protected static String famStr = "f1"; 092 protected static byte[] fam = Bytes.toBytes(famStr); 093 protected static byte[] qualifier = Bytes.toBytes("q1"); 094 protected static long mobLen = 10; 095 protected static byte[] mobVal = Bytes 096 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 097 098 private static Configuration conf; 099 private static TableDescriptor tableDescriptor; 100 private static ColumnFamilyDescriptor familyDescriptor; 101 private static Admin admin; 102 private static Table table = null; 103 private static MobFileCleanerChore chore; 104 105 private static volatile boolean run = true; 106 107 @Override 108 @Before 109 public void setUp() throws Exception { 110 util = getTestingUtil(getConf()); 111 conf = util.getConfiguration(); 112 // Initialize with test-specific configuration values 113 initConf(conf); 114 regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); 115 LOG.info("Initializing cluster with {} region servers.", regionServerCount); 116 util.initializeCluster(regionServerCount); 117 admin = util.getAdmin(); 118 119 createTestTable(); 120 121 LOG.info("Cluster initialized and ready"); 122 } 123 124 private void createTestTable() throws IOException { 125 // Create test table 126 familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) 127 .setMobThreshold(mobLen).setMaxVersions(1).build(); 128 tableDescriptor = util.createModifyableTableDescriptor("testMobCompactTable") 129 .setColumnFamily(familyDescriptor).build(); 130 table = util.createTable(tableDescriptor, null); 131 } 132 133 @After 134 public void tearDown() throws IOException { 135 LOG.info("Cleaning up after test."); 136 if (util.isDistributedCluster()) { 137 deleteTablesIfAny(); 138 // TODO 139 } 140 LOG.info("Restoring cluster."); 141 util.restoreCluster(); 142 LOG.info("Cluster restored."); 143 } 144 145 @Override 146 public void setUpMonkey() throws Exception { 147 // Sorry, no Monkey 148 String msg = "Chaos monkey is not supported"; 149 LOG.warn(msg); 150 throw new IOException(msg); 151 } 152 153 private void deleteTablesIfAny() throws IOException { 154 if (table != null) { 155 util.deleteTableIfAny(table.getName()); 156 } 157 } 158 159 @Override 160 public void setUpCluster() throws Exception { 161 util = getTestingUtil(getConf()); 162 LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); 163 util.initializeCluster(regionServerCount); 164 LOG.debug("Done initializing/checking cluster"); 165 } 166 167 /** Returns status of CLI execution */ 168 @Override 169 public int runTestFromCommandLine() throws Exception { 170 testMobCompaction(); 171 return 0; 172 } 173 174 @Override 175 public TableName getTablename() { 176 // That is only valid when Monkey is CALM (no monkey) 177 return null; 178 } 179 180 @Override 181 protected Set<String> getColumnFamilies() { 182 // That is only valid when Monkey is CALM (no monkey) 183 return null; 184 } 185 186 @Override 187 protected void addOptions() { 188 addOptWithArg(REGIONSERVER_COUNT_KEY, 189 "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); 190 addOptWithArg(ROWS_COUNT_KEY, 191 "Total number of data rows to load. Default: '" + DEFAULT_ROWS_COUNT + "'"); 192 addOptWithArg(FAILURE_PROB_KEY, 193 "Probability of a failure of a region MOB compaction request. Default: '" 194 + DEFAULT_FAILURE_PROB + "'"); 195 } 196 197 @Override 198 protected void processOptions(CommandLine cmd) { 199 super.processOptions(cmd); 200 201 regionServerCount = Integer.parseInt( 202 cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); 203 rowsToLoad = 204 Long.parseLong(cmd.getOptionValue(ROWS_COUNT_KEY, Long.toString(DEFAULT_ROWS_COUNT))); 205 failureProb = Double 206 .parseDouble(cmd.getOptionValue(FAILURE_PROB_KEY, Double.toString(DEFAULT_FAILURE_PROB))); 207 208 LOG.info( 209 MoreObjects.toStringHelper("Parsed Options").add(REGIONSERVER_COUNT_KEY, regionServerCount) 210 .add(ROWS_COUNT_KEY, rowsToLoad).add(FAILURE_PROB_KEY, failureProb).toString()); 211 } 212 213 private static void initConf(Configuration conf) { 214 215 conf.setInt("hfile.format.version", 3); 216 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 217 conf.setInt("hbase.client.retries.number", 100); 218 conf.setInt("hbase.hregion.max.filesize", 200000000); 219 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 220 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 221 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 222 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 223 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 224 conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, FaultyMobStoreCompactor.class.getName()); 225 conf.setBoolean("hbase.table.sanity.checks", false); 226 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 20000); 227 228 } 229 230 static class MajorCompaction implements Runnable { 231 232 @Override 233 public void run() { 234 while (run) { 235 try { 236 admin.majorCompact(tableDescriptor.getTableName(), fam); 237 Thread.sleep(120000); 238 } catch (Exception e) { 239 LOG.error("MOB Stress Test FAILED", e); 240 System.exit(-1); 241 } 242 } 243 } 244 } 245 246 static class CleanMobAndArchive implements Runnable { 247 248 @Override 249 public void run() { 250 while (run) { 251 try { 252 LOG.info("MOB cleanup chore started ..."); 253 if (chore == null) { 254 chore = new MobFileCleanerChore(); 255 } 256 chore.cleanupObsoleteMobFiles(conf, table.getName()); 257 LOG.info("MOB cleanup chore finished"); 258 259 Thread.sleep(130000); 260 } catch (Exception e) { 261 LOG.warn("Exception in CleanMobAndArchive", e); 262 } 263 } 264 } 265 } 266 267 class WriteData implements Runnable { 268 269 private long rows = -1; 270 271 public WriteData(long rows) { 272 this.rows = rows; 273 } 274 275 @Override 276 public void run() { 277 try { 278 279 // BufferedMutator bm = admin.getConnection().getBufferedMutator(table.getName()); 280 // Put Operation 281 for (int i = 0; i < rows; i++) { 282 Put p = new Put(Bytes.toBytes(i)); 283 p.addColumn(fam, qualifier, mobVal); 284 table.put(p); 285 286 // bm.mutate(p); 287 if (i % 10000 == 0) { 288 LOG.info("LOADED=" + i); 289 try { 290 Thread.sleep(500); 291 } catch (InterruptedException ee) { 292 // Restore interrupt status 293 Thread.currentThread().interrupt(); 294 } 295 } 296 if (i % 100000 == 0) { 297 printStats(i); 298 } 299 } 300 // bm.flush(); 301 admin.flush(table.getName()); 302 run = false; 303 } catch (Exception e) { 304 LOG.error("MOB Stress Test FAILED", e); 305 System.exit(-1); 306 } 307 } 308 } 309 310 @Test 311 public void testMobCompaction() throws InterruptedException, IOException { 312 313 try { 314 315 Thread writeData = new Thread(new WriteData(rowsToLoad)); 316 writeData.start(); 317 318 Thread majorcompact = new Thread(new MajorCompaction()); 319 majorcompact.start(); 320 321 Thread cleaner = new Thread(new CleanMobAndArchive()); 322 cleaner.start(); 323 324 while (run) { 325 Thread.sleep(1000); 326 } 327 328 getNumberOfMobFiles(conf, table.getName(), new String(fam, StandardCharsets.UTF_8)); 329 LOG.info("Waiting for write thread to finish ..."); 330 writeData.join(); 331 // Cleanup again 332 chore.cleanupObsoleteMobFiles(conf, table.getName()); 333 334 if (util != null) { 335 LOG.info("Archive cleaner started ..."); 336 // Call archive cleaner again 337 util.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 338 LOG.info("Archive cleaner finished"); 339 } 340 341 scanTable(); 342 343 } finally { 344 345 admin.disableTable(tableDescriptor.getTableName()); 346 admin.deleteTable(tableDescriptor.getTableName()); 347 } 348 LOG.info("MOB Stress Test finished OK"); 349 printStats(rowsToLoad); 350 351 } 352 353 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 354 throws IOException { 355 FileSystem fs = FileSystem.get(conf); 356 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 357 FileStatus[] stat = fs.listStatus(dir); 358 for (FileStatus st : stat) { 359 LOG.debug("MOB Directory content: {}", st.getPath()); 360 } 361 LOG.debug("MOB Directory content total files: {}", stat.length); 362 363 return stat.length; 364 } 365 366 public void printStats(long loaded) { 367 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 368 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 369 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 370 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 371 + FaultyMobStoreCompactor.totalFailures.get()); 372 } 373 374 private void scanTable() { 375 try { 376 377 Result result; 378 ResultScanner scanner = table.getScanner(fam); 379 int counter = 0; 380 while ((result = scanner.next()) != null) { 381 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 382 if (counter % 10000 == 0) { 383 LOG.info("GET=" + counter); 384 } 385 counter++; 386 } 387 assertEquals(rowsToLoad, counter); 388 } catch (Exception e) { 389 e.printStackTrace(); 390 LOG.error("MOB Stress Test FAILED"); 391 if (util != null) { 392 assertTrue(false); 393 } else { 394 System.exit(-1); 395 } 396 } 397 } 398 399 public static void main(String[] args) throws Exception { 400 Configuration conf = HBaseConfiguration.create(); 401 initConf(conf); 402 IntegrationTestingUtility.setUseDistributedCluster(conf); 403 int status = ToolRunner.run(conf, new IntegrationTestMobCompaction(), args); 404 System.exit(status); 405 } 406}