001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Arrays; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HColumnDescriptor; 031import org.apache.hadoop.hbase.HTableDescriptor; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Reproduction for MOB data loss 1. Settings: Region Size 200 MB, Flush threshold 800 KB. 2. Insert 047 * 10 Million records 3. MOB Compaction and Archiver a) Trigger MOB Compaction (every 2 minutes) b) 048 * Trigger major compaction (every 2 minutes) c) Trigger archive cleaner (every 3 minutes) 4. 049 * Validate MOB data after complete data load. This class is used by MobStressTool only. This is not 050 * a unit test 051 */ 052@SuppressWarnings("deprecation") 053public class MobStressToolRunner { 054 private static final Logger LOG = LoggerFactory.getLogger(MobStressToolRunner.class); 055 056 private HBaseTestingUtility HTU; 057 058 private final static String famStr = "f1"; 059 private final static byte[] fam = Bytes.toBytes(famStr); 060 private final static byte[] qualifier = Bytes.toBytes("q1"); 061 private final static long mobLen = 10; 062 private final static byte[] mobVal = Bytes 063 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 064 065 private Configuration conf; 066 private HTableDescriptor hdt; 067 private HColumnDescriptor hcd; 068 private Admin admin; 069 private long count = 500000; 070 private double failureProb = 0.1; 071 private Table table = null; 072 private MobFileCleanerChore chore = new MobFileCleanerChore(); 073 074 private static volatile boolean run = true; 075 076 public MobStressToolRunner() { 077 078 } 079 080 public void init(Configuration conf, long numRows) throws IOException { 081 this.conf = conf; 082 this.count = numRows; 083 initConf(); 084 printConf(); 085 hdt = new HTableDescriptor(TableName.valueOf("testMobCompactTable")); 086 Connection conn = ConnectionFactory.createConnection(this.conf); 087 this.admin = conn.getAdmin(); 088 this.hcd = new HColumnDescriptor(fam); 089 this.hcd.setMobEnabled(true); 090 this.hcd.setMobThreshold(mobLen); 091 this.hcd.setMaxVersions(1); 092 this.hdt.addFamily(hcd); 093 if (admin.tableExists(hdt.getTableName())) { 094 admin.disableTable(hdt.getTableName()); 095 admin.deleteTable(hdt.getTableName()); 096 } 097 admin.createTable(hdt); 098 table = conn.getTable(hdt.getTableName()); 099 } 100 101 private void printConf() { 102 LOG.info("Please ensure the following HBase configuration is set:"); 103 LOG.info("hfile.format.version=3"); 104 LOG.info("hbase.master.hfilecleaner.ttl=0"); 105 LOG.info("hbase.hregion.max.filesize=200000000"); 106 LOG.info("hbase.client.retries.number=100"); 107 LOG.info("hbase.hregion.memstore.flush.size=800000"); 108 LOG.info("hbase.hstore.blockingStoreFiles=150"); 109 LOG.info("hbase.hstore.compaction.throughput.lower.bound=50000000"); 110 LOG.info("hbase.hstore.compaction.throughput.higher.bound=100000000"); 111 LOG.info("hbase.master.mob.cleaner.period=0"); 112 LOG.info("hbase.mob.default.compactor=org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor"); 113 LOG.warn("hbase.mob.compaction.fault.probability=x, where x is between 0. and 1."); 114 115 } 116 117 private void initConf() { 118 119 conf.setInt("hfile.format.version", 3); 120 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 121 conf.setInt("hbase.client.retries.number", 100); 122 conf.setInt("hbase.hregion.max.filesize", 200000000); 123 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 124 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 125 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 126 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 127 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 128 // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 129 // FaultyMobStoreCompactor.class.getName()); 130 conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); 131 conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); 132 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 120000); 133 conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); 134 conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000); 135 136 } 137 138 class MajorCompaction implements Runnable { 139 140 @Override 141 public void run() { 142 while (run) { 143 try { 144 admin.majorCompact(hdt.getTableName(), fam); 145 Thread.sleep(120000); 146 } catch (Exception e) { 147 LOG.error("MOB Stress Test FAILED", e); 148 System.exit(-1); 149 } 150 } 151 } 152 } 153 154 class CleanMobAndArchive implements Runnable { 155 156 @Override 157 public void run() { 158 while (run) { 159 try { 160 LOG.info("MOB cleanup chore started ..."); 161 chore.cleanupObsoleteMobFiles(conf, table.getName()); 162 LOG.info("MOB cleanup chore finished"); 163 164 Thread.sleep(130000); 165 } catch (Exception e) { 166 LOG.error("CleanMobAndArchive", e); 167 } 168 } 169 } 170 } 171 172 class WriteData implements Runnable { 173 174 private long rows = -1; 175 176 public WriteData(long rows) { 177 this.rows = rows; 178 } 179 180 @Override 181 public void run() { 182 try { 183 184 // Put Operation 185 for (int i = 0; i < rows; i++) { 186 byte[] key = Bytes.toBytes(i); 187 Put p = new Put(key); 188 p.addColumn(fam, qualifier, Bytes.add(key, mobVal)); 189 table.put(p); 190 if (i % 10000 == 0) { 191 LOG.info("LOADED=" + i); 192 try { 193 Thread.sleep(500); 194 } catch (InterruptedException ee) { 195 } 196 } 197 if (i % 100000 == 0) { 198 printStats(i); 199 } 200 } 201 admin.flush(table.getName()); 202 run = false; 203 } catch (Exception e) { 204 LOG.error("MOB Stress Test FAILED", e); 205 System.exit(-1); 206 } 207 } 208 } 209 210 public void runStressTest() throws InterruptedException, IOException { 211 212 try { 213 214 Thread writeData = new Thread(new WriteData(count)); 215 writeData.start(); 216 217 Thread majorcompact = new Thread(new MajorCompaction()); 218 majorcompact.start(); 219 220 Thread cleaner = new Thread(new CleanMobAndArchive()); 221 cleaner.start(); 222 223 while (run) { 224 Thread.sleep(1000); 225 } 226 227 getNumberOfMobFiles(conf, table.getName(), new String(fam)); 228 LOG.info("Waiting for write thread to finish ..."); 229 writeData.join(); 230 // Cleanup again 231 chore.cleanupObsoleteMobFiles(conf, table.getName()); 232 getNumberOfMobFiles(conf, table.getName(), new String(fam)); 233 234 if (HTU != null) { 235 LOG.info("Archive cleaner started ..."); 236 // Call archive cleaner again 237 HTU.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 238 LOG.info("Archive cleaner finished"); 239 } 240 241 scanTable(); 242 243 } finally { 244 245 admin.disableTable(hdt.getTableName()); 246 admin.deleteTable(hdt.getTableName()); 247 } 248 LOG.info("MOB Stress Test finished OK"); 249 printStats(count); 250 251 } 252 253 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 254 throws IOException { 255 FileSystem fs = FileSystem.get(conf); 256 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 257 FileStatus[] stat = fs.listStatus(dir); 258 long size = 0; 259 for (FileStatus st : stat) { 260 LOG.debug("MOB Directory content: {} len={}", st.getPath(), st.getLen()); 261 size += st.getLen(); 262 } 263 LOG.debug("MOB Directory content total files: {}, total size={}", stat.length, size); 264 265 return stat.length; 266 } 267 268 public void printStats(long loaded) { 269 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 270 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 271 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 272 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 273 + FaultyMobStoreCompactor.totalFailures.get()); 274 } 275 276 private void scanTable() { 277 try { 278 279 Result result; 280 ResultScanner scanner = table.getScanner(fam); 281 int counter = 0; 282 while ((result = scanner.next()) != null) { 283 byte[] key = result.getRow(); 284 assertTrue(Arrays.equals(result.getValue(fam, qualifier), Bytes.add(key, mobVal))); 285 if (counter % 10000 == 0) { 286 LOG.info("GET=" + counter + " key=" + Bytes.toInt(key)); 287 } 288 counter++; 289 } 290 291 assertEquals(count, counter); 292 } catch (Exception e) { 293 e.printStackTrace(); 294 LOG.error("MOB Stress Test FAILED"); 295 if (HTU != null) { 296 assertTrue(false); 297 } else { 298 System.exit(-1); 299 } 300 } 301 } 302}