001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import java.util.stream.Stream;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
032import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.util.AbstractHBaseTool;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.util.StringUtils;
049import org.apache.hadoop.util.ToolRunner;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
057
058/**
059 * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
060 * that reads never see partially-complete writes
061 */
062@InterfaceAudience.Private
063public class AcidGuaranteesTestTool extends AbstractHBaseTool {
064
065  private static final Logger LOG = LoggerFactory.getLogger(AcidGuaranteesTestTool.class);
066
067  public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
068  public static final byte[] FAMILY_A = Bytes.toBytes("A");
069  public static final byte[] FAMILY_B = Bytes.toBytes("B");
070  public static final byte[] FAMILY_C = Bytes.toBytes("C");
071  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
072
073  public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C };
074
075  public static int NUM_COLS_TO_CHECK = 50;
076
077  private ExecutorService sharedPool;
078
079  private long millisToRun;
080  private int numWriters;
081  private int numGetters;
082  private int numScanners;
083  private int numUniqueRows;
084  private boolean crazyFlush;
085  private boolean useMob;
086
087  private ExecutorService createThreadPool() {
088    int maxThreads = 256;
089    int coreThreads = 128;
090
091    long keepAliveTime = 60;
092    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(
093      maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
094
095    ThreadPoolExecutor tpe =
096      new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
097        new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").setDaemon(true)
098          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
099    tpe.allowCoreThreadTimeOut(true);
100    return tpe;
101  }
102
103  @Override
104  protected void addOptions() {
105    addOptWithArg("millis", "time limit in milliseconds");
106    addOptWithArg("numWriters", "number of write threads");
107    addOptWithArg("numGetters", "number of get threads");
108    addOptWithArg("numScanners", "number of scan threads");
109    addOptWithArg("numUniqueRows", "number of unique rows to test");
110    addOptNoArg("crazyFlush",
111      "if specified we will flush continuously otherwise will flush every minute");
112    addOptNoArg("useMob", "if specified we will enable mob on the first column family");
113  }
114
115  @Override
116  protected void processOptions(CommandLine cmd) {
117    millisToRun = getOptionAsLong(cmd, "millis", 5000);
118    numWriters = getOptionAsInt(cmd, "numWriters", 50);
119    numGetters = getOptionAsInt(cmd, "numGetters", 2);
120    numScanners = getOptionAsInt(cmd, "numScanners", 2);
121    numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3);
122    crazyFlush = cmd.hasOption("crazyFlush");
123    useMob = cmd.hasOption("useMob");
124  }
125
126  @Override
127  protected int doWork() throws Exception {
128    sharedPool = createThreadPool();
129    try (Connection conn = ConnectionFactory.createConnection(getConf())) {
130      runTestAtomicity(conn.getAdmin());
131    } finally {
132      sharedPool.shutdown();
133    }
134    return 0;
135  }
136
137  /**
138   * Thread that does random full-row writes into a table.
139   */
140  public static class AtomicityWriter extends RepeatingTestThread {
141    byte data[] = new byte[10];
142    byte[][] targetRows;
143    byte[][] targetFamilies;
144    Connection connection;
145    Table table;
146    AtomicLong numWritten = new AtomicLong();
147
148    public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies,
149      ExecutorService pool) throws IOException {
150      super(ctx);
151      this.targetRows = targetRows;
152      this.targetFamilies = targetFamilies;
153      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
154      table = connection.getTable(TABLE_NAME);
155    }
156
157    @Override
158    public void doAnAction() throws Exception {
159      // Pick a random row to write into
160      byte[] targetRow = targetRows[ThreadLocalRandom.current().nextInt(targetRows.length)];
161      Put p = new Put(targetRow);
162      Bytes.random(data);
163      for (byte[] family : targetFamilies) {
164        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
165          byte qualifier[] = Bytes.toBytes("col" + i);
166          p.addColumn(family, qualifier, data);
167        }
168      }
169      table.put(p);
170      numWritten.getAndIncrement();
171    }
172
173    @Override
174    public void workDone() throws IOException {
175      try {
176        table.close();
177      } finally {
178        connection.close();
179      }
180    }
181  }
182
183  /**
184   * Thread that does single-row reads in a table, looking for partially completed rows.
185   */
186  public static class AtomicGetReader extends RepeatingTestThread {
187    byte[] targetRow;
188    byte[][] targetFamilies;
189    Connection connection;
190    Table table;
191    int numVerified = 0;
192    AtomicLong numRead = new AtomicLong();
193
194    public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies,
195      ExecutorService pool) throws IOException {
196      super(ctx);
197      this.targetRow = targetRow;
198      this.targetFamilies = targetFamilies;
199      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
200      table = connection.getTable(TABLE_NAME);
201    }
202
203    @Override
204    public void doAnAction() throws Exception {
205      Get g = new Get(targetRow);
206      Result res = table.get(g);
207      byte[] gotValue = null;
208      if (res.getRow() == null) {
209        // Trying to verify but we didn't find the row - the writing
210        // thread probably just hasn't started writing yet, so we can
211        // ignore this action
212        return;
213      }
214
215      for (byte[] family : targetFamilies) {
216        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
217          byte qualifier[] = Bytes.toBytes("col" + i);
218          byte thisValue[] = res.getValue(family, qualifier);
219          if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
220            gotFailure(gotValue, res);
221          }
222          numVerified++;
223          gotValue = thisValue;
224        }
225      }
226      numRead.getAndIncrement();
227    }
228
229    @Override
230    public void workDone() throws IOException {
231      try {
232        table.close();
233      } finally {
234        connection.close();
235      }
236    }
237
238    private void gotFailure(byte[] expected, Result res) {
239      StringBuilder msg = new StringBuilder();
240      msg.append("Failed after ").append(numVerified).append("!");
241      msg.append("Expected=").append(Bytes.toStringBinary(expected));
242      msg.append("Got:\n");
243      for (Cell kv : res.listCells()) {
244        msg.append(kv.toString());
245        msg.append(" val= ");
246        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
247        msg.append("\n");
248      }
249      throw new RuntimeException(msg.toString());
250    }
251  }
252
253  /**
254   * Thread that does full scans of the table looking for any partially completed rows.
255   */
256  public static class AtomicScanReader extends RepeatingTestThread {
257    byte[][] targetFamilies;
258    Table table;
259    Connection connection;
260    AtomicLong numScans = new AtomicLong();
261    AtomicLong numRowsScanned = new AtomicLong();
262
263    public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool)
264      throws IOException {
265      super(ctx);
266      this.targetFamilies = targetFamilies;
267      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
268      table = connection.getTable(TABLE_NAME);
269    }
270
271    @Override
272    public void doAnAction() throws Exception {
273      Scan s = new Scan();
274      for (byte[] family : targetFamilies) {
275        s.addFamily(family);
276      }
277      ResultScanner scanner = table.getScanner(s);
278
279      for (Result res : scanner) {
280        byte[] gotValue = null;
281
282        for (byte[] family : targetFamilies) {
283          for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
284            byte qualifier[] = Bytes.toBytes("col" + i);
285            byte thisValue[] = res.getValue(family, qualifier);
286            if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
287              gotFailure(gotValue, res);
288            }
289            gotValue = thisValue;
290          }
291        }
292        numRowsScanned.getAndIncrement();
293      }
294      numScans.getAndIncrement();
295    }
296
297    @Override
298    public void workDone() throws IOException {
299      try {
300        table.close();
301      } finally {
302        connection.close();
303      }
304    }
305
306    private void gotFailure(byte[] expected, Result res) {
307      StringBuilder msg = new StringBuilder();
308      msg.append("Failed after ").append(numRowsScanned).append("!");
309      msg.append("Expected=").append(Bytes.toStringBinary(expected));
310      msg.append("Got:\n");
311      for (Cell kv : res.listCells()) {
312        msg.append(kv.toString());
313        msg.append(" val= ");
314        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
315        msg.append("\n");
316      }
317      throw new RuntimeException(msg.toString());
318    }
319  }
320
321  private void createTableIfMissing(Admin admin, boolean useMob) throws IOException {
322    if (!admin.tableExists(TABLE_NAME)) {
323      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
324      Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
325        .forEachOrdered(builder::setColumnFamily);
326      admin.createTable(builder.build());
327    }
328    ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0];
329    if (cfd.isMobEnabled() != useMob) {
330      admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd)
331        .setMobEnabled(useMob).setMobThreshold(4).build());
332    }
333  }
334
335  private void runTestAtomicity(Admin admin) throws Exception {
336    createTableIfMissing(admin, useMob);
337    TestContext ctx = new TestContext(conf);
338
339    byte rows[][] = new byte[numUniqueRows][];
340    for (int i = 0; i < numUniqueRows; i++) {
341      rows[i] = Bytes.toBytes("test_row_" + i);
342    }
343
344    List<AtomicityWriter> writers = Lists.newArrayList();
345    for (int i = 0; i < numWriters; i++) {
346      AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool);
347      writers.add(writer);
348      ctx.addThread(writer);
349    }
350    // Add a flusher
351    ctx.addThread(new RepeatingTestThread(ctx) {
352      @Override
353      public void doAnAction() throws Exception {
354        try {
355          admin.flush(TABLE_NAME);
356        } catch (IOException ioe) {
357          LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
358        }
359        // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
360        // we would flush as often as possible. On a running cluster, this isn't practical:
361        // (1) we will cause a lot of load due to all the flushing and compacting
362        // (2) we cannot change the flushing/compacting related Configuration options to try to
363        // alleviate this
364        // (3) it is an unrealistic workload, since no one would actually flush that often.
365        // Therefore, let's flush every minute to have more flushes than usual, but not overload
366        // the running cluster.
367        if (!crazyFlush) {
368          Thread.sleep(60000);
369        }
370      }
371    });
372
373    List<AtomicGetReader> getters = Lists.newArrayList();
374    for (int i = 0; i < numGetters; i++) {
375      AtomicGetReader getter =
376        new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool);
377      getters.add(getter);
378      ctx.addThread(getter);
379    }
380
381    List<AtomicScanReader> scanners = Lists.newArrayList();
382    for (int i = 0; i < numScanners; i++) {
383      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool);
384      scanners.add(scanner);
385      ctx.addThread(scanner);
386    }
387
388    ctx.startThreads();
389    ctx.waitFor(millisToRun);
390    ctx.stop();
391
392    LOG.info("Finished test. Writers:");
393    for (AtomicityWriter writer : writers) {
394      LOG.info("  wrote " + writer.numWritten.get());
395    }
396    LOG.info("Readers:");
397    for (AtomicGetReader reader : getters) {
398      LOG.info("  read " + reader.numRead.get());
399    }
400    LOG.info("Scanners:");
401    for (AtomicScanReader scanner : scanners) {
402      LOG.info("  scanned " + scanner.numScans.get());
403      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
404    }
405  }
406
407  public static void main(String[] args) {
408    Configuration c = HBaseConfiguration.create();
409    int status;
410    try {
411      AcidGuaranteesTestTool test = new AcidGuaranteesTestTool();
412      status = ToolRunner.run(c, test, args);
413    } catch (Exception e) {
414      LOG.error("Exiting due to error", e);
415      status = -1;
416    }
417    System.exit(status);
418  }
419}