001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Arrays; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Reproduction for MOB data loss 1. Settings: Region Size 200 MB, Flush threshold 800 KB. 2. Insert 049 * 10 Million records 3. MOB Compaction and Archiver a) Trigger MOB Compaction (every 2 minutes) b) 050 * Trigger major compaction (every 2 minutes) c) Trigger archive cleaner (every 3 minutes) 4. 051 * Validate MOB data after complete data load. This class is used by MobStressTool only. This is not 052 * a unit test 053 */ 054public class MobStressToolRunner { 055 private static final Logger LOG = LoggerFactory.getLogger(MobStressToolRunner.class); 056 057 private HBaseTestingUtil HTU; 058 059 private final static String famStr = "f1"; 060 private final static byte[] fam = Bytes.toBytes(famStr); 061 private final static byte[] qualifier = Bytes.toBytes("q1"); 062 private final static long mobLen = 10; 063 private final static byte[] mobVal = Bytes 064 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 065 066 private Configuration conf; 067 private TableDescriptor tableDescriptor; 068 private ColumnFamilyDescriptor familyDescriptor; 069 private Admin admin; 070 private long count = 500000; 071 private double failureProb = 0.1; 072 private Table table = null; 073 private MobFileCleanerChore chore = new MobFileCleanerChore(); 074 075 private static volatile boolean run = true; 076 077 public MobStressToolRunner() { 078 } 079 080 public void init(Configuration conf, long numRows) throws IOException { 081 this.conf = conf; 082 this.count = numRows; 083 initConf(); 084 printConf(); 085 Connection conn = ConnectionFactory.createConnection(this.conf); 086 this.admin = conn.getAdmin(); 087 this.familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) 088 .setMobThreshold(mobLen).setMaxVersions(1).build(); 089 this.tableDescriptor = 090 TableDescriptorBuilder.newBuilder(TableName.valueOf("testMobCompactTable")) 091 .setColumnFamily(familyDescriptor).build(); 092 if (admin.tableExists(tableDescriptor.getTableName())) { 093 admin.disableTable(tableDescriptor.getTableName()); 094 admin.deleteTable(tableDescriptor.getTableName()); 095 } 096 admin.createTable(tableDescriptor); 097 table = conn.getTable(tableDescriptor.getTableName()); 098 } 099 100 private void printConf() { 101 LOG.info("Please ensure the following HBase configuration is set:"); 102 LOG.info("hfile.format.version=3"); 103 LOG.info("hbase.master.hfilecleaner.ttl=0"); 104 LOG.info("hbase.hregion.max.filesize=200000000"); 105 LOG.info("hbase.client.retries.number=100"); 106 LOG.info("hbase.hregion.memstore.flush.size=800000"); 107 LOG.info("hbase.hstore.blockingStoreFiles=150"); 108 LOG.info("hbase.hstore.compaction.throughput.lower.bound=50000000"); 109 LOG.info("hbase.hstore.compaction.throughput.higher.bound=100000000"); 110 LOG.info("hbase.master.mob.cleaner.period=0"); 111 LOG.info("hbase.mob.default.compactor=org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor"); 112 LOG.warn("hbase.mob.compaction.fault.probability=x, where x is between 0. and 1."); 113 114 } 115 116 private void initConf() { 117 118 conf.setInt("hfile.format.version", 3); 119 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 120 conf.setInt("hbase.client.retries.number", 100); 121 conf.setInt("hbase.hregion.max.filesize", 200000000); 122 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 123 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 124 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 125 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 126 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 127 // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 128 // FaultyMobStoreCompactor.class.getName()); 129 conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); 130 conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); 131 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 120000); 132 conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); 133 conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000); 134 135 } 136 137 class MajorCompaction implements Runnable { 138 139 @Override 140 public void run() { 141 while (run) { 142 try { 143 admin.majorCompact(tableDescriptor.getTableName(), fam); 144 Thread.sleep(120000); 145 } catch (Exception e) { 146 LOG.error("MOB Stress Test FAILED", e); 147 System.exit(-1); 148 } 149 } 150 } 151 } 152 153 class CleanMobAndArchive implements Runnable { 154 155 @Override 156 public void run() { 157 while (run) { 158 try { 159 LOG.info("MOB cleanup chore started ..."); 160 chore.cleanupObsoleteMobFiles(conf, table.getName()); 161 LOG.info("MOB cleanup chore finished"); 162 163 Thread.sleep(130000); 164 } catch (Exception e) { 165 LOG.error("CleanMobAndArchive", e); 166 } 167 } 168 } 169 } 170 171 class WriteData implements Runnable { 172 173 private long rows = -1; 174 175 public WriteData(long rows) { 176 this.rows = rows; 177 } 178 179 @Override 180 public void run() { 181 try { 182 183 // Put Operation 184 for (int i = 0; i < rows; i++) { 185 byte[] key = Bytes.toBytes(i); 186 Put p = new Put(key); 187 p.addColumn(fam, qualifier, Bytes.add(key, mobVal)); 188 table.put(p); 189 if (i % 10000 == 0) { 190 LOG.info("LOADED=" + i); 191 try { 192 Thread.sleep(500); 193 } catch (InterruptedException ee) { 194 } 195 } 196 if (i % 100000 == 0) { 197 printStats(i); 198 } 199 } 200 admin.flush(table.getName()); 201 run = false; 202 } catch (Exception e) { 203 LOG.error("MOB Stress Test FAILED", e); 204 System.exit(-1); 205 } 206 } 207 } 208 209 public void runStressTest() throws InterruptedException, IOException { 210 211 try { 212 213 Thread writeData = new Thread(new WriteData(count)); 214 writeData.start(); 215 216 Thread majorcompact = new Thread(new MajorCompaction()); 217 majorcompact.start(); 218 219 Thread cleaner = new Thread(new CleanMobAndArchive()); 220 cleaner.start(); 221 222 while (run) { 223 Thread.sleep(1000); 224 } 225 226 getNumberOfMobFiles(conf, table.getName(), new String(fam)); 227 LOG.info("Waiting for write thread to finish ..."); 228 writeData.join(); 229 // Cleanup again 230 chore.cleanupObsoleteMobFiles(conf, table.getName()); 231 getNumberOfMobFiles(conf, table.getName(), new String(fam)); 232 233 if (HTU != null) { 234 LOG.info("Archive cleaner started ..."); 235 // Call archive cleaner again 236 HTU.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 237 LOG.info("Archive cleaner finished"); 238 } 239 240 scanTable(); 241 242 } finally { 243 244 admin.disableTable(tableDescriptor.getTableName()); 245 admin.deleteTable(tableDescriptor.getTableName()); 246 } 247 LOG.info("MOB Stress Test finished OK"); 248 printStats(count); 249 250 } 251 252 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 253 throws IOException { 254 FileSystem fs = FileSystem.get(conf); 255 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 256 FileStatus[] stat = fs.listStatus(dir); 257 long size = 0; 258 for (FileStatus st : stat) { 259 LOG.debug("MOB Directory content: {} len={}", st.getPath(), st.getLen()); 260 size += st.getLen(); 261 } 262 LOG.debug("MOB Directory content total files: {}, total size={}", stat.length, size); 263 264 return stat.length; 265 } 266 267 public void printStats(long loaded) { 268 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 269 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 270 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 271 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 272 + FaultyMobStoreCompactor.totalFailures.get()); 273 } 274 275 private void scanTable() { 276 try { 277 278 Result result; 279 ResultScanner scanner = table.getScanner(fam); 280 int counter = 0; 281 while ((result = scanner.next()) != null) { 282 byte[] key = result.getRow(); 283 assertTrue(Arrays.equals(result.getValue(fam, qualifier), Bytes.add(key, mobVal))); 284 if (counter % 10000 == 0) { 285 LOG.info("GET=" + counter + " key=" + Bytes.toInt(key)); 286 } 287 counter++; 288 } 289 290 assertEquals(count, counter); 291 } catch (Exception e) { 292 e.printStackTrace(); 293 LOG.error("MOB Stress Test FAILED"); 294 if (HTU != null) { 295 assertTrue(false); 296 } else { 297 System.exit(-1); 298 } 299 } 300 } 301}