001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Arrays;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Reproduction for MOB data loss 1. Settings: Region Size 200 MB, Flush threshold 800 KB. 2. Insert
049 * 10 Million records 3. MOB Compaction and Archiver a) Trigger MOB Compaction (every 2 minutes) b)
050 * Trigger major compaction (every 2 minutes) c) Trigger archive cleaner (every 3 minutes) 4.
051 * Validate MOB data after complete data load. This class is used by MobStressTool only. This is not
052 * a unit test
053 */
054public class MobStressToolRunner {
055  private static final Logger LOG = LoggerFactory.getLogger(MobStressToolRunner.class);
056
057  private HBaseTestingUtil HTU;
058
059  private final static String famStr = "f1";
060  private final static byte[] fam = Bytes.toBytes(famStr);
061  private final static byte[] qualifier = Bytes.toBytes("q1");
062  private final static long mobLen = 10;
063  private final static byte[] mobVal = Bytes
064    .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
065
066  private Configuration conf;
067  private TableDescriptor tableDescriptor;
068  private ColumnFamilyDescriptor familyDescriptor;
069  private Admin admin;
070  private long count = 500000;
071  private double failureProb = 0.1;
072  private Table table = null;
073
074  private static volatile boolean run = true;
075
076  public MobStressToolRunner() {
077  }
078
079  public void init(Configuration conf, long numRows) throws IOException {
080    this.conf = conf;
081    this.count = numRows;
082    initConf();
083    printConf();
084    Connection conn = ConnectionFactory.createConnection(this.conf);
085    this.admin = conn.getAdmin();
086    this.familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
087      .setMobThreshold(mobLen).setMaxVersions(1).build();
088    this.tableDescriptor =
089      TableDescriptorBuilder.newBuilder(TableName.valueOf("testMobCompactTable"))
090        .setColumnFamily(familyDescriptor).build();
091    if (admin.tableExists(tableDescriptor.getTableName())) {
092      admin.disableTable(tableDescriptor.getTableName());
093      admin.deleteTable(tableDescriptor.getTableName());
094    }
095    admin.createTable(tableDescriptor);
096    table = conn.getTable(tableDescriptor.getTableName());
097  }
098
099  private void printConf() {
100    LOG.info("Please ensure the following HBase configuration is set:");
101    LOG.info("hfile.format.version=3");
102    LOG.info("hbase.master.hfilecleaner.ttl=0");
103    LOG.info("hbase.hregion.max.filesize=200000000");
104    LOG.info("hbase.client.retries.number=100");
105    LOG.info("hbase.hregion.memstore.flush.size=800000");
106    LOG.info("hbase.hstore.blockingStoreFiles=150");
107    LOG.info("hbase.hstore.compaction.throughput.lower.bound=50000000");
108    LOG.info("hbase.hstore.compaction.throughput.higher.bound=100000000");
109    LOG.info("hbase.master.mob.cleaner.period=0");
110    LOG.info("hbase.mob.default.compactor=org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor");
111    LOG.warn("hbase.mob.compaction.fault.probability=x, where x is between 0. and 1.");
112
113  }
114
115  private void initConf() {
116
117    conf.setInt("hfile.format.version", 3);
118    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
119    conf.setInt("hbase.client.retries.number", 100);
120    conf.setInt("hbase.hregion.max.filesize", 200000000);
121    conf.setInt("hbase.hregion.memstore.flush.size", 800000);
122    conf.setInt("hbase.hstore.blockingStoreFiles", 150);
123    conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800);
124    conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800);
125    conf.setDouble("hbase.mob.compaction.fault.probability", failureProb);
126    // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
127    // FaultyMobStoreCompactor.class.getName());
128    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
129    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
130    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 120000);
131    conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
132    conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
133
134  }
135
136  class MajorCompaction implements Runnable {
137
138    @Override
139    public void run() {
140      while (run) {
141        try {
142          admin.majorCompact(tableDescriptor.getTableName(), fam);
143          Thread.sleep(120000);
144        } catch (Exception e) {
145          LOG.error("MOB Stress Test FAILED", e);
146          System.exit(-1);
147        }
148      }
149    }
150  }
151
152  class CleanMobAndArchive implements Runnable {
153
154    @Override
155    public void run() {
156      while (run) {
157        try {
158          LOG.info("MOB cleanup started ...");
159          MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
160          LOG.info("MOB cleanup finished");
161
162          Thread.sleep(130000);
163        } catch (Exception e) {
164          LOG.error("CleanMobAndArchive", e);
165        }
166      }
167    }
168  }
169
170  class WriteData implements Runnable {
171
172    private long rows = -1;
173
174    public WriteData(long rows) {
175      this.rows = rows;
176    }
177
178    @Override
179    public void run() {
180      try {
181
182        // Put Operation
183        for (int i = 0; i < rows; i++) {
184          byte[] key = Bytes.toBytes(i);
185          Put p = new Put(key);
186          p.addColumn(fam, qualifier, Bytes.add(key, mobVal));
187          table.put(p);
188          if (i % 10000 == 0) {
189            LOG.info("LOADED=" + i);
190            try {
191              Thread.sleep(500);
192            } catch (InterruptedException ee) {
193            }
194          }
195          if (i % 100000 == 0) {
196            printStats(i);
197          }
198        }
199        admin.flush(table.getName());
200        run = false;
201      } catch (Exception e) {
202        LOG.error("MOB Stress Test FAILED", e);
203        System.exit(-1);
204      }
205    }
206  }
207
208  public void runStressTest() throws InterruptedException, IOException {
209
210    try {
211
212      Thread writeData = new Thread(new WriteData(count));
213      writeData.start();
214
215      Thread majorcompact = new Thread(new MajorCompaction());
216      majorcompact.start();
217
218      Thread cleaner = new Thread(new CleanMobAndArchive());
219      cleaner.start();
220
221      while (run) {
222        Thread.sleep(1000);
223      }
224
225      getNumberOfMobFiles(conf, table.getName(), new String(fam));
226      LOG.info("Waiting for write thread to finish ...");
227      writeData.join();
228      // Cleanup again
229      MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
230      getNumberOfMobFiles(conf, table.getName(), new String(fam));
231
232      if (HTU != null) {
233        LOG.info("Archive cleaner started ...");
234        // Call archive cleaner again
235        HTU.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
236        LOG.info("Archive cleaner finished");
237      }
238
239      scanTable();
240
241    } finally {
242
243      admin.disableTable(tableDescriptor.getTableName());
244      admin.deleteTable(tableDescriptor.getTableName());
245    }
246    LOG.info("MOB Stress Test finished OK");
247    printStats(count);
248
249  }
250
251  private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
252    throws IOException {
253    FileSystem fs = FileSystem.get(conf);
254    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
255    FileStatus[] stat = fs.listStatus(dir);
256    long size = 0;
257    for (FileStatus st : stat) {
258      LOG.debug("MOB Directory content: {} len={}", st.getPath(), st.getLen());
259      size += st.getLen();
260    }
261    LOG.debug("MOB Directory content total files: {}, total size={}", stat.length, size);
262
263    return stat.length;
264  }
265
266  public void printStats(long loaded) {
267    LOG.info("MOB Stress Test: loaded=" + loaded + " compactions="
268      + FaultyMobStoreCompactor.totalCompactions.get() + " major="
269      + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob="
270      + FaultyMobStoreCompactor.mobCounter.get() + " injected failures="
271      + FaultyMobStoreCompactor.totalFailures.get());
272  }
273
274  private void scanTable() {
275    try {
276
277      Result result;
278      ResultScanner scanner = table.getScanner(fam);
279      int counter = 0;
280      while ((result = scanner.next()) != null) {
281        byte[] key = result.getRow();
282        assertTrue(Arrays.equals(result.getValue(fam, qualifier), Bytes.add(key, mobVal)));
283        if (counter % 10000 == 0) {
284          LOG.info("GET=" + counter + " key=" + Bytes.toInt(key));
285        }
286        counter++;
287      }
288
289      assertEquals(count, counter);
290    } catch (Exception e) {
291      e.printStackTrace();
292      LOG.error("MOB Stress Test FAILED");
293      if (HTU != null) {
294        assertTrue(false);
295      } else {
296        System.exit(-1);
297      }
298    }
299  }
300}