001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Random;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import java.util.stream.Stream;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
032import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.util.AbstractHBaseTool;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.util.StringUtils;
049import org.apache.hadoop.util.ToolRunner;
050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
056
057/**
058 * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
059 * that reads never see partially-complete writes
060 */
061@InterfaceAudience.Private
062public class AcidGuaranteesTestTool extends AbstractHBaseTool {
063
064  private static final Logger LOG = LoggerFactory.getLogger(AcidGuaranteesTestTool.class);
065
066  public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
067  public static final byte[] FAMILY_A = Bytes.toBytes("A");
068  public static final byte[] FAMILY_B = Bytes.toBytes("B");
069  public static final byte[] FAMILY_C = Bytes.toBytes("C");
070  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
071
072  public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C };
073
074  public static int NUM_COLS_TO_CHECK = 50;
075
076  private ExecutorService sharedPool;
077
078  private long millisToRun;
079  private int numWriters;
080  private int numGetters;
081  private int numScanners;
082  private int numUniqueRows;
083  private boolean crazyFlush;
084  private boolean useMob;
085
086  private ExecutorService createThreadPool() {
087    int maxThreads = 256;
088    int coreThreads = 128;
089
090    long keepAliveTime = 60;
091    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(
092        maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
093
094    ThreadPoolExecutor tpe =
095      new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
096        new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").setDaemon(true)
097          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
098    tpe.allowCoreThreadTimeOut(true);
099    return tpe;
100  }
101
102  @Override
103  protected void addOptions() {
104    addOptWithArg("millis", "time limit in milliseconds");
105    addOptWithArg("numWriters", "number of write threads");
106    addOptWithArg("numGetters", "number of get threads");
107    addOptWithArg("numScanners", "number of scan threads");
108    addOptWithArg("numUniqueRows", "number of unique rows to test");
109    addOptNoArg("crazyFlush",
110      "if specified we will flush continuously otherwise will flush every minute");
111    addOptNoArg("useMob", "if specified we will enable mob on the first column family");
112  }
113
114  @Override
115  protected void processOptions(CommandLine cmd) {
116    millisToRun = getOptionAsLong(cmd, "millis", 5000);
117    numWriters = getOptionAsInt(cmd, "numWriters", 50);
118    numGetters = getOptionAsInt(cmd, "numGetters", 2);
119    numScanners = getOptionAsInt(cmd, "numScanners", 2);
120    numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3);
121    crazyFlush = cmd.hasOption("crazyFlush");
122    useMob = cmd.hasOption("useMob");
123  }
124
125  @Override
126  protected int doWork() throws Exception {
127    sharedPool = createThreadPool();
128    try (Connection conn = ConnectionFactory.createConnection(getConf())) {
129      runTestAtomicity(conn.getAdmin());
130    } finally {
131      sharedPool.shutdown();
132    }
133    return 0;
134  }
135
136  /**
137   * Thread that does random full-row writes into a table.
138   */
139  public static class AtomicityWriter extends RepeatingTestThread {
140    Random rand = new Random();
141    byte data[] = new byte[10];
142    byte[][] targetRows;
143    byte[][] targetFamilies;
144    Connection connection;
145    Table table;
146    AtomicLong numWritten = new AtomicLong();
147
148    public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies,
149        ExecutorService pool) throws IOException {
150      super(ctx);
151      this.targetRows = targetRows;
152      this.targetFamilies = targetFamilies;
153      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
154      table = connection.getTable(TABLE_NAME);
155    }
156
157    @Override
158    public void doAnAction() throws Exception {
159      // Pick a random row to write into
160      byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
161      Put p = new Put(targetRow);
162      rand.nextBytes(data);
163
164      for (byte[] family : targetFamilies) {
165        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
166          byte qualifier[] = Bytes.toBytes("col" + i);
167          p.addColumn(family, qualifier, data);
168        }
169      }
170      table.put(p);
171      numWritten.getAndIncrement();
172    }
173
174    @Override
175    public void workDone() throws IOException {
176      try {
177        table.close();
178      } finally {
179        connection.close();
180      }
181    }
182  }
183
184  /**
185   * Thread that does single-row reads in a table, looking for partially completed rows.
186   */
187  public static class AtomicGetReader extends RepeatingTestThread {
188    byte[] targetRow;
189    byte[][] targetFamilies;
190    Connection connection;
191    Table table;
192    int numVerified = 0;
193    AtomicLong numRead = new AtomicLong();
194
195    public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies,
196        ExecutorService pool) throws IOException {
197      super(ctx);
198      this.targetRow = targetRow;
199      this.targetFamilies = targetFamilies;
200      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
201      table = connection.getTable(TABLE_NAME);
202    }
203
204    @Override
205    public void doAnAction() throws Exception {
206      Get g = new Get(targetRow);
207      Result res = table.get(g);
208      byte[] gotValue = null;
209      if (res.getRow() == null) {
210        // Trying to verify but we didn't find the row - the writing
211        // thread probably just hasn't started writing yet, so we can
212        // ignore this action
213        return;
214      }
215
216      for (byte[] family : targetFamilies) {
217        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
218          byte qualifier[] = Bytes.toBytes("col" + i);
219          byte thisValue[] = res.getValue(family, qualifier);
220          if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
221            gotFailure(gotValue, res);
222          }
223          numVerified++;
224          gotValue = thisValue;
225        }
226      }
227      numRead.getAndIncrement();
228    }
229
230    @Override
231    public void workDone() throws IOException {
232      try {
233        table.close();
234      } finally {
235        connection.close();
236      }
237    }
238
239    private void gotFailure(byte[] expected, Result res) {
240      StringBuilder msg = new StringBuilder();
241      msg.append("Failed after ").append(numVerified).append("!");
242      msg.append("Expected=").append(Bytes.toStringBinary(expected));
243      msg.append("Got:\n");
244      for (Cell kv : res.listCells()) {
245        msg.append(kv.toString());
246        msg.append(" val= ");
247        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
248        msg.append("\n");
249      }
250      throw new RuntimeException(msg.toString());
251    }
252  }
253
254  /**
255   * Thread that does full scans of the table looking for any partially completed rows.
256   */
257  public static class AtomicScanReader extends RepeatingTestThread {
258    byte[][] targetFamilies;
259    Table table;
260    Connection connection;
261    AtomicLong numScans = new AtomicLong();
262    AtomicLong numRowsScanned = new AtomicLong();
263
264    public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool)
265        throws IOException {
266      super(ctx);
267      this.targetFamilies = targetFamilies;
268      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
269      table = connection.getTable(TABLE_NAME);
270    }
271
272    @Override
273    public void doAnAction() throws Exception {
274      Scan s = new Scan();
275      for (byte[] family : targetFamilies) {
276        s.addFamily(family);
277      }
278      ResultScanner scanner = table.getScanner(s);
279
280      for (Result res : scanner) {
281        byte[] gotValue = null;
282
283        for (byte[] family : targetFamilies) {
284          for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
285            byte qualifier[] = Bytes.toBytes("col" + i);
286            byte thisValue[] = res.getValue(family, qualifier);
287            if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
288              gotFailure(gotValue, res);
289            }
290            gotValue = thisValue;
291          }
292        }
293        numRowsScanned.getAndIncrement();
294      }
295      numScans.getAndIncrement();
296    }
297
298    @Override
299    public void workDone() throws IOException {
300      try {
301        table.close();
302      } finally {
303        connection.close();
304      }
305    }
306
307    private void gotFailure(byte[] expected, Result res) {
308      StringBuilder msg = new StringBuilder();
309      msg.append("Failed after ").append(numRowsScanned).append("!");
310      msg.append("Expected=").append(Bytes.toStringBinary(expected));
311      msg.append("Got:\n");
312      for (Cell kv : res.listCells()) {
313        msg.append(kv.toString());
314        msg.append(" val= ");
315        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
316        msg.append("\n");
317      }
318      throw new RuntimeException(msg.toString());
319    }
320  }
321
322  private void createTableIfMissing(Admin admin, boolean useMob) throws IOException {
323    if (!admin.tableExists(TABLE_NAME)) {
324      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
325      Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
326          .forEachOrdered(builder::setColumnFamily);
327      admin.createTable(builder.build());
328    }
329    ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0];
330    if (cfd.isMobEnabled() != useMob) {
331      admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd)
332          .setMobEnabled(useMob).setMobThreshold(4).build());
333    }
334  }
335
336  private void runTestAtomicity(Admin admin) throws Exception {
337    createTableIfMissing(admin, useMob);
338    TestContext ctx = new TestContext(conf);
339
340    byte rows[][] = new byte[numUniqueRows][];
341    for (int i = 0; i < numUniqueRows; i++) {
342      rows[i] = Bytes.toBytes("test_row_" + i);
343    }
344
345    List<AtomicityWriter> writers = Lists.newArrayList();
346    for (int i = 0; i < numWriters; i++) {
347      AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool);
348      writers.add(writer);
349      ctx.addThread(writer);
350    }
351    // Add a flusher
352    ctx.addThread(new RepeatingTestThread(ctx) {
353      @Override
354      public void doAnAction() throws Exception {
355        try {
356          admin.flush(TABLE_NAME);
357        } catch (IOException ioe) {
358          LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
359        }
360        // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
361        // we would flush as often as possible. On a running cluster, this isn't practical:
362        // (1) we will cause a lot of load due to all the flushing and compacting
363        // (2) we cannot change the flushing/compacting related Configuration options to try to
364        // alleviate this
365        // (3) it is an unrealistic workload, since no one would actually flush that often.
366        // Therefore, let's flush every minute to have more flushes than usual, but not overload
367        // the running cluster.
368        if (!crazyFlush) {
369          Thread.sleep(60000);
370        }
371      }
372    });
373
374    List<AtomicGetReader> getters = Lists.newArrayList();
375    for (int i = 0; i < numGetters; i++) {
376      AtomicGetReader getter =
377          new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool);
378      getters.add(getter);
379      ctx.addThread(getter);
380    }
381
382    List<AtomicScanReader> scanners = Lists.newArrayList();
383    for (int i = 0; i < numScanners; i++) {
384      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool);
385      scanners.add(scanner);
386      ctx.addThread(scanner);
387    }
388
389    ctx.startThreads();
390    ctx.waitFor(millisToRun);
391    ctx.stop();
392
393    LOG.info("Finished test. Writers:");
394    for (AtomicityWriter writer : writers) {
395      LOG.info("  wrote " + writer.numWritten.get());
396    }
397    LOG.info("Readers:");
398    for (AtomicGetReader reader : getters) {
399      LOG.info("  read " + reader.numRead.get());
400    }
401    LOG.info("Scanners:");
402    for (AtomicScanReader scanner : scanners) {
403      LOG.info("  scanned " + scanner.numScans.get());
404      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
405    }
406  }
407
408  public static void main(String[] args) {
409    Configuration c = HBaseConfiguration.create();
410    int status;
411    try {
412      AcidGuaranteesTestTool test = new AcidGuaranteesTestTool();
413      status = ToolRunner.run(c, test, args);
414    } catch (Exception e) {
415      LOG.error("Exiting due to error", e);
416      status = -1;
417    }
418    System.exit(status);
419  }
420}