001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Arrays;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HColumnDescriptor;
031import org.apache.hadoop.hbase.HTableDescriptor;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Reproduction for MOB data loss 1. Settings: Region Size 200 MB, Flush threshold 800 KB. 2. Insert
047 * 10 Million records 3. MOB Compaction and Archiver a) Trigger MOB Compaction (every 2 minutes) b)
048 * Trigger major compaction (every 2 minutes) c) Trigger archive cleaner (every 3 minutes) 4.
049 * Validate MOB data after complete data load. This class is used by MobStressTool only. This is not
050 * a unit test
051 */
052@SuppressWarnings("deprecation")
053public class MobStressToolRunner {
054  private static final Logger LOG = LoggerFactory.getLogger(MobStressToolRunner.class);
055
056  private HBaseTestingUtility HTU;
057
058  private final static String famStr = "f1";
059  private final static byte[] fam = Bytes.toBytes(famStr);
060  private final static byte[] qualifier = Bytes.toBytes("q1");
061  private final static long mobLen = 10;
062  private final static byte[] mobVal = Bytes
063    .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
064
065  private Configuration conf;
066  private HTableDescriptor hdt;
067  private HColumnDescriptor hcd;
068  private Admin admin;
069  private long count = 500000;
070  private double failureProb = 0.1;
071  private Table table = null;
072  private MobFileCleanerChore chore = new MobFileCleanerChore();
073
074  private static volatile boolean run = true;
075
076  public MobStressToolRunner() {
077
078  }
079
080  public void init(Configuration conf, long numRows) throws IOException {
081    this.conf = conf;
082    this.count = numRows;
083    initConf();
084    printConf();
085    hdt = new HTableDescriptor(TableName.valueOf("testMobCompactTable"));
086    Connection conn = ConnectionFactory.createConnection(this.conf);
087    this.admin = conn.getAdmin();
088    this.hcd = new HColumnDescriptor(fam);
089    this.hcd.setMobEnabled(true);
090    this.hcd.setMobThreshold(mobLen);
091    this.hcd.setMaxVersions(1);
092    this.hdt.addFamily(hcd);
093    if (admin.tableExists(hdt.getTableName())) {
094      admin.disableTable(hdt.getTableName());
095      admin.deleteTable(hdt.getTableName());
096    }
097    admin.createTable(hdt);
098    table = conn.getTable(hdt.getTableName());
099  }
100
101  private void printConf() {
102    LOG.info("Please ensure the following HBase configuration is set:");
103    LOG.info("hfile.format.version=3");
104    LOG.info("hbase.master.hfilecleaner.ttl=0");
105    LOG.info("hbase.hregion.max.filesize=200000000");
106    LOG.info("hbase.client.retries.number=100");
107    LOG.info("hbase.hregion.memstore.flush.size=800000");
108    LOG.info("hbase.hstore.blockingStoreFiles=150");
109    LOG.info("hbase.hstore.compaction.throughput.lower.bound=50000000");
110    LOG.info("hbase.hstore.compaction.throughput.higher.bound=100000000");
111    LOG.info("hbase.master.mob.cleaner.period=0");
112    LOG.info("hbase.mob.default.compactor=org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor");
113    LOG.warn("hbase.mob.compaction.fault.probability=x, where x is between 0. and 1.");
114
115  }
116
117  private void initConf() {
118
119    conf.setInt("hfile.format.version", 3);
120    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
121    conf.setInt("hbase.client.retries.number", 100);
122    conf.setInt("hbase.hregion.max.filesize", 200000000);
123    conf.setInt("hbase.hregion.memstore.flush.size", 800000);
124    conf.setInt("hbase.hstore.blockingStoreFiles", 150);
125    conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800);
126    conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800);
127    conf.setDouble("hbase.mob.compaction.fault.probability", failureProb);
128    // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
129    // FaultyMobStoreCompactor.class.getName());
130    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
131    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
132    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 120000);
133    conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
134    conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
135
136  }
137
138  class MajorCompaction implements Runnable {
139
140    @Override
141    public void run() {
142      while (run) {
143        try {
144          admin.majorCompact(hdt.getTableName(), fam);
145          Thread.sleep(120000);
146        } catch (Exception e) {
147          LOG.error("MOB Stress Test FAILED", e);
148          System.exit(-1);
149        }
150      }
151    }
152  }
153
154  class CleanMobAndArchive implements Runnable {
155
156    @Override
157    public void run() {
158      while (run) {
159        try {
160          LOG.info("MOB cleanup chore started ...");
161          chore.cleanupObsoleteMobFiles(conf, table.getName());
162          LOG.info("MOB cleanup chore finished");
163
164          Thread.sleep(130000);
165        } catch (Exception e) {
166          LOG.error("CleanMobAndArchive", e);
167        }
168      }
169    }
170  }
171
172  class WriteData implements Runnable {
173
174    private long rows = -1;
175
176    public WriteData(long rows) {
177      this.rows = rows;
178    }
179
180    @Override
181    public void run() {
182      try {
183
184        // Put Operation
185        for (int i = 0; i < rows; i++) {
186          byte[] key = Bytes.toBytes(i);
187          Put p = new Put(key);
188          p.addColumn(fam, qualifier, Bytes.add(key, mobVal));
189          table.put(p);
190          if (i % 10000 == 0) {
191            LOG.info("LOADED=" + i);
192            try {
193              Thread.sleep(500);
194            } catch (InterruptedException ee) {
195            }
196          }
197          if (i % 100000 == 0) {
198            printStats(i);
199          }
200        }
201        admin.flush(table.getName());
202        run = false;
203      } catch (Exception e) {
204        LOG.error("MOB Stress Test FAILED", e);
205        System.exit(-1);
206      }
207    }
208  }
209
210  public void runStressTest() throws InterruptedException, IOException {
211
212    try {
213
214      Thread writeData = new Thread(new WriteData(count));
215      writeData.start();
216
217      Thread majorcompact = new Thread(new MajorCompaction());
218      majorcompact.start();
219
220      Thread cleaner = new Thread(new CleanMobAndArchive());
221      cleaner.start();
222
223      while (run) {
224        Thread.sleep(1000);
225      }
226
227      getNumberOfMobFiles(conf, table.getName(), new String(fam));
228      LOG.info("Waiting for write thread to finish ...");
229      writeData.join();
230      // Cleanup again
231      chore.cleanupObsoleteMobFiles(conf, table.getName());
232      getNumberOfMobFiles(conf, table.getName(), new String(fam));
233
234      if (HTU != null) {
235        LOG.info("Archive cleaner started ...");
236        // Call archive cleaner again
237        HTU.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
238        LOG.info("Archive cleaner finished");
239      }
240
241      scanTable();
242
243    } finally {
244
245      admin.disableTable(hdt.getTableName());
246      admin.deleteTable(hdt.getTableName());
247    }
248    LOG.info("MOB Stress Test finished OK");
249    printStats(count);
250
251  }
252
253  private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
254    throws IOException {
255    FileSystem fs = FileSystem.get(conf);
256    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
257    FileStatus[] stat = fs.listStatus(dir);
258    long size = 0;
259    for (FileStatus st : stat) {
260      LOG.debug("MOB Directory content: {} len={}", st.getPath(), st.getLen());
261      size += st.getLen();
262    }
263    LOG.debug("MOB Directory content total files: {}, total size={}", stat.length, size);
264
265    return stat.length;
266  }
267
268  public void printStats(long loaded) {
269    LOG.info("MOB Stress Test: loaded=" + loaded + " compactions="
270      + FaultyMobStoreCompactor.totalCompactions.get() + " major="
271      + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob="
272      + FaultyMobStoreCompactor.mobCounter.get() + " injected failures="
273      + FaultyMobStoreCompactor.totalFailures.get());
274  }
275
276  private void scanTable() {
277    try {
278
279      Result result;
280      ResultScanner scanner = table.getScanner(fam);
281      int counter = 0;
282      while ((result = scanner.next()) != null) {
283        byte[] key = result.getRow();
284        assertTrue(Arrays.equals(result.getValue(fam, qualifier), Bytes.add(key, mobVal)));
285        if (counter % 10000 == 0) {
286          LOG.info("GET=" + counter + " key=" + Bytes.toInt(key));
287        }
288        counter++;
289      }
290
291      assertEquals(count, counter);
292    } catch (Exception e) {
293      e.printStackTrace();
294      LOG.error("MOB Stress Test FAILED");
295      if (HTU != null) {
296        assertTrue(false);
297      } else {
298        System.exit(-1);
299      }
300    }
301  }
302}