001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Random;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import java.util.stream.Stream;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
032import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.util.AbstractHBaseTool;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.util.StringUtils;
049import org.apache.hadoop.util.ToolRunner;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
055
056/**
057 * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
058 * that reads never see partially-complete writes
059 */
060@InterfaceAudience.Private
061public class AcidGuaranteesTestTool extends AbstractHBaseTool {
062
063  private static final Logger LOG = LoggerFactory.getLogger(AcidGuaranteesTestTool.class);
064
065  public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
066  public static final byte[] FAMILY_A = Bytes.toBytes("A");
067  public static final byte[] FAMILY_B = Bytes.toBytes("B");
068  public static final byte[] FAMILY_C = Bytes.toBytes("C");
069  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
070
071  public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C };
072
073  public static int NUM_COLS_TO_CHECK = 50;
074
075  private ExecutorService sharedPool;
076
077  private long millisToRun;
078  private int numWriters;
079  private int numGetters;
080  private int numScanners;
081  private int numUniqueRows;
082  private boolean crazyFlush;
083  private boolean useMob;
084
085  private ExecutorService createThreadPool() {
086    int maxThreads = 256;
087    int coreThreads = 128;
088
089    long keepAliveTime = 60;
090    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(
091        maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
092
093    ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
094        TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + "-shared"));
095    tpe.allowCoreThreadTimeOut(true);
096    return tpe;
097  }
098
099  @Override
100  protected void addOptions() {
101    addOptWithArg("millis", "time limit in milliseconds");
102    addOptWithArg("numWriters", "number of write threads");
103    addOptWithArg("numGetters", "number of get threads");
104    addOptWithArg("numScanners", "number of scan threads");
105    addOptWithArg("numUniqueRows", "number of unique rows to test");
106    addOptNoArg("crazyFlush",
107      "if specified we will flush continuously otherwise will flush every minute");
108    addOptNoArg("useMob", "if specified we will enable mob on the first column family");
109  }
110
111  @Override
112  protected void processOptions(CommandLine cmd) {
113    millisToRun = getOptionAsLong(cmd, "millis", 5000);
114    numWriters = getOptionAsInt(cmd, "numWriters", 50);
115    numGetters = getOptionAsInt(cmd, "numGetters", 2);
116    numScanners = getOptionAsInt(cmd, "numScanners", 2);
117    numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3);
118    crazyFlush = cmd.hasOption("crazyFlush");
119    useMob = cmd.hasOption("useMob");
120  }
121
122  @Override
123  protected int doWork() throws Exception {
124    sharedPool = createThreadPool();
125    try (Connection conn = ConnectionFactory.createConnection(getConf())) {
126      runTestAtomicity(conn.getAdmin());
127    } finally {
128      sharedPool.shutdown();
129    }
130    return 0;
131  }
132
133  /**
134   * Thread that does random full-row writes into a table.
135   */
136  public static class AtomicityWriter extends RepeatingTestThread {
137    Random rand = new Random();
138    byte data[] = new byte[10];
139    byte[][] targetRows;
140    byte[][] targetFamilies;
141    Connection connection;
142    Table table;
143    AtomicLong numWritten = new AtomicLong();
144
145    public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies,
146        ExecutorService pool) throws IOException {
147      super(ctx);
148      this.targetRows = targetRows;
149      this.targetFamilies = targetFamilies;
150      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
151      table = connection.getTable(TABLE_NAME);
152    }
153
154    @Override
155    public void doAnAction() throws Exception {
156      // Pick a random row to write into
157      byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
158      Put p = new Put(targetRow);
159      rand.nextBytes(data);
160
161      for (byte[] family : targetFamilies) {
162        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
163          byte qualifier[] = Bytes.toBytes("col" + i);
164          p.addColumn(family, qualifier, data);
165        }
166      }
167      table.put(p);
168      numWritten.getAndIncrement();
169    }
170
171    @Override
172    public void workDone() throws IOException {
173      try {
174        table.close();
175      } finally {
176        connection.close();
177      }
178    }
179  }
180
181  /**
182   * Thread that does single-row reads in a table, looking for partially completed rows.
183   */
184  public static class AtomicGetReader extends RepeatingTestThread {
185    byte[] targetRow;
186    byte[][] targetFamilies;
187    Connection connection;
188    Table table;
189    int numVerified = 0;
190    AtomicLong numRead = new AtomicLong();
191
192    public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies,
193        ExecutorService pool) throws IOException {
194      super(ctx);
195      this.targetRow = targetRow;
196      this.targetFamilies = targetFamilies;
197      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
198      table = connection.getTable(TABLE_NAME);
199    }
200
201    @Override
202    public void doAnAction() throws Exception {
203      Get g = new Get(targetRow);
204      Result res = table.get(g);
205      byte[] gotValue = null;
206      if (res.getRow() == null) {
207        // Trying to verify but we didn't find the row - the writing
208        // thread probably just hasn't started writing yet, so we can
209        // ignore this action
210        return;
211      }
212
213      for (byte[] family : targetFamilies) {
214        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
215          byte qualifier[] = Bytes.toBytes("col" + i);
216          byte thisValue[] = res.getValue(family, qualifier);
217          if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
218            gotFailure(gotValue, res);
219          }
220          numVerified++;
221          gotValue = thisValue;
222        }
223      }
224      numRead.getAndIncrement();
225    }
226
227    @Override
228    public void workDone() throws IOException {
229      try {
230        table.close();
231      } finally {
232        connection.close();
233      }
234    }
235
236    private void gotFailure(byte[] expected, Result res) {
237      StringBuilder msg = new StringBuilder();
238      msg.append("Failed after ").append(numVerified).append("!");
239      msg.append("Expected=").append(Bytes.toStringBinary(expected));
240      msg.append("Got:\n");
241      for (Cell kv : res.listCells()) {
242        msg.append(kv.toString());
243        msg.append(" val= ");
244        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
245        msg.append("\n");
246      }
247      throw new RuntimeException(msg.toString());
248    }
249  }
250
251  /**
252   * Thread that does full scans of the table looking for any partially completed rows.
253   */
254  public static class AtomicScanReader extends RepeatingTestThread {
255    byte[][] targetFamilies;
256    Table table;
257    Connection connection;
258    AtomicLong numScans = new AtomicLong();
259    AtomicLong numRowsScanned = new AtomicLong();
260
261    public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool)
262        throws IOException {
263      super(ctx);
264      this.targetFamilies = targetFamilies;
265      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
266      table = connection.getTable(TABLE_NAME);
267    }
268
269    @Override
270    public void doAnAction() throws Exception {
271      Scan s = new Scan();
272      for (byte[] family : targetFamilies) {
273        s.addFamily(family);
274      }
275      ResultScanner scanner = table.getScanner(s);
276
277      for (Result res : scanner) {
278        byte[] gotValue = null;
279
280        for (byte[] family : targetFamilies) {
281          for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
282            byte qualifier[] = Bytes.toBytes("col" + i);
283            byte thisValue[] = res.getValue(family, qualifier);
284            if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
285              gotFailure(gotValue, res);
286            }
287            gotValue = thisValue;
288          }
289        }
290        numRowsScanned.getAndIncrement();
291      }
292      numScans.getAndIncrement();
293    }
294
295    @Override
296    public void workDone() throws IOException {
297      try {
298        table.close();
299      } finally {
300        connection.close();
301      }
302    }
303
304    private void gotFailure(byte[] expected, Result res) {
305      StringBuilder msg = new StringBuilder();
306      msg.append("Failed after ").append(numRowsScanned).append("!");
307      msg.append("Expected=").append(Bytes.toStringBinary(expected));
308      msg.append("Got:\n");
309      for (Cell kv : res.listCells()) {
310        msg.append(kv.toString());
311        msg.append(" val= ");
312        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
313        msg.append("\n");
314      }
315      throw new RuntimeException(msg.toString());
316    }
317  }
318
319  private void createTableIfMissing(Admin admin, boolean useMob) throws IOException {
320    if (!admin.tableExists(TABLE_NAME)) {
321      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
322      Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
323          .forEachOrdered(builder::setColumnFamily);
324      admin.createTable(builder.build());
325    }
326    ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0];
327    if (cfd.isMobEnabled() != useMob) {
328      admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd)
329          .setMobEnabled(useMob).setMobThreshold(4).build());
330    }
331  }
332
333  private void runTestAtomicity(Admin admin) throws Exception {
334    createTableIfMissing(admin, useMob);
335    TestContext ctx = new TestContext(conf);
336
337    byte rows[][] = new byte[numUniqueRows][];
338    for (int i = 0; i < numUniqueRows; i++) {
339      rows[i] = Bytes.toBytes("test_row_" + i);
340    }
341
342    List<AtomicityWriter> writers = Lists.newArrayList();
343    for (int i = 0; i < numWriters; i++) {
344      AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool);
345      writers.add(writer);
346      ctx.addThread(writer);
347    }
348    // Add a flusher
349    ctx.addThread(new RepeatingTestThread(ctx) {
350      @Override
351      public void doAnAction() throws Exception {
352        try {
353          admin.flush(TABLE_NAME);
354        } catch (IOException ioe) {
355          LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
356        }
357        // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
358        // we would flush as often as possible. On a running cluster, this isn't practical:
359        // (1) we will cause a lot of load due to all the flushing and compacting
360        // (2) we cannot change the flushing/compacting related Configuration options to try to
361        // alleviate this
362        // (3) it is an unrealistic workload, since no one would actually flush that often.
363        // Therefore, let's flush every minute to have more flushes than usual, but not overload
364        // the running cluster.
365        if (!crazyFlush) {
366          Thread.sleep(60000);
367        }
368      }
369    });
370
371    List<AtomicGetReader> getters = Lists.newArrayList();
372    for (int i = 0; i < numGetters; i++) {
373      AtomicGetReader getter =
374          new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool);
375      getters.add(getter);
376      ctx.addThread(getter);
377    }
378
379    List<AtomicScanReader> scanners = Lists.newArrayList();
380    for (int i = 0; i < numScanners; i++) {
381      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool);
382      scanners.add(scanner);
383      ctx.addThread(scanner);
384    }
385
386    ctx.startThreads();
387    ctx.waitFor(millisToRun);
388    ctx.stop();
389
390    LOG.info("Finished test. Writers:");
391    for (AtomicityWriter writer : writers) {
392      LOG.info("  wrote " + writer.numWritten.get());
393    }
394    LOG.info("Readers:");
395    for (AtomicGetReader reader : getters) {
396      LOG.info("  read " + reader.numRead.get());
397    }
398    LOG.info("Scanners:");
399    for (AtomicScanReader scanner : scanners) {
400      LOG.info("  scanned " + scanner.numScans.get());
401      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
402    }
403  }
404
405  public static void main(String[] args) {
406    Configuration c = HBaseConfiguration.create();
407    int status;
408    try {
409      AcidGuaranteesTestTool test = new AcidGuaranteesTestTool();
410      status = ToolRunner.run(c, test, args);
411    } catch (Exception e) {
412      LOG.error("Exiting due to error", e);
413      status = -1;
414    }
415    System.exit(status);
416  }
417}