001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Random; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import java.util.stream.Stream; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 032import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.util.AbstractHBaseTool; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.Threads; 048import org.apache.hadoop.util.StringUtils; 049import org.apache.hadoop.util.ToolRunner; 050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 056 057/** 058 * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying 059 * that reads never see partially-complete writes 060 */ 061@InterfaceAudience.Private 062public class AcidGuaranteesTestTool extends AbstractHBaseTool { 063 064 private static final Logger LOG = LoggerFactory.getLogger(AcidGuaranteesTestTool.class); 065 066 public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees"); 067 public static final byte[] FAMILY_A = Bytes.toBytes("A"); 068 public static final byte[] FAMILY_B = Bytes.toBytes("B"); 069 public static final byte[] FAMILY_C = Bytes.toBytes("C"); 070 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); 071 072 public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C }; 073 074 public static int NUM_COLS_TO_CHECK = 50; 075 076 private ExecutorService sharedPool; 077 078 private long millisToRun; 079 private int numWriters; 080 private int numGetters; 081 private int numScanners; 082 private int numUniqueRows; 083 private boolean crazyFlush; 084 private boolean useMob; 085 086 private ExecutorService createThreadPool() { 087 int maxThreads = 256; 088 int coreThreads = 128; 089 090 long keepAliveTime = 60; 091 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>( 092 maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); 093 094 ThreadPoolExecutor tpe = 095 new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, 096 new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").setDaemon(true) 097 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 098 tpe.allowCoreThreadTimeOut(true); 099 return tpe; 100 } 101 102 @Override 103 protected void addOptions() { 104 addOptWithArg("millis", "time limit in milliseconds"); 105 addOptWithArg("numWriters", "number of write threads"); 106 addOptWithArg("numGetters", "number of get threads"); 107 addOptWithArg("numScanners", "number of scan threads"); 108 addOptWithArg("numUniqueRows", "number of unique rows to test"); 109 addOptNoArg("crazyFlush", 110 "if specified we will flush continuously otherwise will flush every minute"); 111 addOptNoArg("useMob", "if specified we will enable mob on the first column family"); 112 } 113 114 @Override 115 protected void processOptions(CommandLine cmd) { 116 millisToRun = getOptionAsLong(cmd, "millis", 5000); 117 numWriters = getOptionAsInt(cmd, "numWriters", 50); 118 numGetters = getOptionAsInt(cmd, "numGetters", 2); 119 numScanners = getOptionAsInt(cmd, "numScanners", 2); 120 numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3); 121 crazyFlush = cmd.hasOption("crazyFlush"); 122 useMob = cmd.hasOption("useMob"); 123 } 124 125 @Override 126 protected int doWork() throws Exception { 127 sharedPool = createThreadPool(); 128 try (Connection conn = ConnectionFactory.createConnection(getConf())) { 129 runTestAtomicity(conn.getAdmin()); 130 } finally { 131 sharedPool.shutdown(); 132 } 133 return 0; 134 } 135 136 /** 137 * Thread that does random full-row writes into a table. 138 */ 139 public static class AtomicityWriter extends RepeatingTestThread { 140 Random rand = new Random(); 141 byte data[] = new byte[10]; 142 byte[][] targetRows; 143 byte[][] targetFamilies; 144 Connection connection; 145 Table table; 146 AtomicLong numWritten = new AtomicLong(); 147 148 public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies, 149 ExecutorService pool) throws IOException { 150 super(ctx); 151 this.targetRows = targetRows; 152 this.targetFamilies = targetFamilies; 153 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 154 table = connection.getTable(TABLE_NAME); 155 } 156 157 @Override 158 public void doAnAction() throws Exception { 159 // Pick a random row to write into 160 byte[] targetRow = targetRows[rand.nextInt(targetRows.length)]; 161 Put p = new Put(targetRow); 162 rand.nextBytes(data); 163 164 for (byte[] family : targetFamilies) { 165 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 166 byte qualifier[] = Bytes.toBytes("col" + i); 167 p.addColumn(family, qualifier, data); 168 } 169 } 170 table.put(p); 171 numWritten.getAndIncrement(); 172 } 173 174 @Override 175 public void workDone() throws IOException { 176 try { 177 table.close(); 178 } finally { 179 connection.close(); 180 } 181 } 182 } 183 184 /** 185 * Thread that does single-row reads in a table, looking for partially completed rows. 186 */ 187 public static class AtomicGetReader extends RepeatingTestThread { 188 byte[] targetRow; 189 byte[][] targetFamilies; 190 Connection connection; 191 Table table; 192 int numVerified = 0; 193 AtomicLong numRead = new AtomicLong(); 194 195 public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies, 196 ExecutorService pool) throws IOException { 197 super(ctx); 198 this.targetRow = targetRow; 199 this.targetFamilies = targetFamilies; 200 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 201 table = connection.getTable(TABLE_NAME); 202 } 203 204 @Override 205 public void doAnAction() throws Exception { 206 Get g = new Get(targetRow); 207 Result res = table.get(g); 208 byte[] gotValue = null; 209 if (res.getRow() == null) { 210 // Trying to verify but we didn't find the row - the writing 211 // thread probably just hasn't started writing yet, so we can 212 // ignore this action 213 return; 214 } 215 216 for (byte[] family : targetFamilies) { 217 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 218 byte qualifier[] = Bytes.toBytes("col" + i); 219 byte thisValue[] = res.getValue(family, qualifier); 220 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { 221 gotFailure(gotValue, res); 222 } 223 numVerified++; 224 gotValue = thisValue; 225 } 226 } 227 numRead.getAndIncrement(); 228 } 229 230 @Override 231 public void workDone() throws IOException { 232 try { 233 table.close(); 234 } finally { 235 connection.close(); 236 } 237 } 238 239 private void gotFailure(byte[] expected, Result res) { 240 StringBuilder msg = new StringBuilder(); 241 msg.append("Failed after ").append(numVerified).append("!"); 242 msg.append("Expected=").append(Bytes.toStringBinary(expected)); 243 msg.append("Got:\n"); 244 for (Cell kv : res.listCells()) { 245 msg.append(kv.toString()); 246 msg.append(" val= "); 247 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); 248 msg.append("\n"); 249 } 250 throw new RuntimeException(msg.toString()); 251 } 252 } 253 254 /** 255 * Thread that does full scans of the table looking for any partially completed rows. 256 */ 257 public static class AtomicScanReader extends RepeatingTestThread { 258 byte[][] targetFamilies; 259 Table table; 260 Connection connection; 261 AtomicLong numScans = new AtomicLong(); 262 AtomicLong numRowsScanned = new AtomicLong(); 263 264 public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool) 265 throws IOException { 266 super(ctx); 267 this.targetFamilies = targetFamilies; 268 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 269 table = connection.getTable(TABLE_NAME); 270 } 271 272 @Override 273 public void doAnAction() throws Exception { 274 Scan s = new Scan(); 275 for (byte[] family : targetFamilies) { 276 s.addFamily(family); 277 } 278 ResultScanner scanner = table.getScanner(s); 279 280 for (Result res : scanner) { 281 byte[] gotValue = null; 282 283 for (byte[] family : targetFamilies) { 284 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 285 byte qualifier[] = Bytes.toBytes("col" + i); 286 byte thisValue[] = res.getValue(family, qualifier); 287 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { 288 gotFailure(gotValue, res); 289 } 290 gotValue = thisValue; 291 } 292 } 293 numRowsScanned.getAndIncrement(); 294 } 295 numScans.getAndIncrement(); 296 } 297 298 @Override 299 public void workDone() throws IOException { 300 try { 301 table.close(); 302 } finally { 303 connection.close(); 304 } 305 } 306 307 private void gotFailure(byte[] expected, Result res) { 308 StringBuilder msg = new StringBuilder(); 309 msg.append("Failed after ").append(numRowsScanned).append("!"); 310 msg.append("Expected=").append(Bytes.toStringBinary(expected)); 311 msg.append("Got:\n"); 312 for (Cell kv : res.listCells()) { 313 msg.append(kv.toString()); 314 msg.append(" val= "); 315 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); 316 msg.append("\n"); 317 } 318 throw new RuntimeException(msg.toString()); 319 } 320 } 321 322 private void createTableIfMissing(Admin admin, boolean useMob) throws IOException { 323 if (!admin.tableExists(TABLE_NAME)) { 324 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 325 Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of) 326 .forEachOrdered(builder::setColumnFamily); 327 admin.createTable(builder.build()); 328 } 329 ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0]; 330 if (cfd.isMobEnabled() != useMob) { 331 admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd) 332 .setMobEnabled(useMob).setMobThreshold(4).build()); 333 } 334 } 335 336 private void runTestAtomicity(Admin admin) throws Exception { 337 createTableIfMissing(admin, useMob); 338 TestContext ctx = new TestContext(conf); 339 340 byte rows[][] = new byte[numUniqueRows][]; 341 for (int i = 0; i < numUniqueRows; i++) { 342 rows[i] = Bytes.toBytes("test_row_" + i); 343 } 344 345 List<AtomicityWriter> writers = Lists.newArrayList(); 346 for (int i = 0; i < numWriters; i++) { 347 AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool); 348 writers.add(writer); 349 ctx.addThread(writer); 350 } 351 // Add a flusher 352 ctx.addThread(new RepeatingTestThread(ctx) { 353 @Override 354 public void doAnAction() throws Exception { 355 try { 356 admin.flush(TABLE_NAME); 357 } catch (IOException ioe) { 358 LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe)); 359 } 360 // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally, 361 // we would flush as often as possible. On a running cluster, this isn't practical: 362 // (1) we will cause a lot of load due to all the flushing and compacting 363 // (2) we cannot change the flushing/compacting related Configuration options to try to 364 // alleviate this 365 // (3) it is an unrealistic workload, since no one would actually flush that often. 366 // Therefore, let's flush every minute to have more flushes than usual, but not overload 367 // the running cluster. 368 if (!crazyFlush) { 369 Thread.sleep(60000); 370 } 371 } 372 }); 373 374 List<AtomicGetReader> getters = Lists.newArrayList(); 375 for (int i = 0; i < numGetters; i++) { 376 AtomicGetReader getter = 377 new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool); 378 getters.add(getter); 379 ctx.addThread(getter); 380 } 381 382 List<AtomicScanReader> scanners = Lists.newArrayList(); 383 for (int i = 0; i < numScanners; i++) { 384 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool); 385 scanners.add(scanner); 386 ctx.addThread(scanner); 387 } 388 389 ctx.startThreads(); 390 ctx.waitFor(millisToRun); 391 ctx.stop(); 392 393 LOG.info("Finished test. Writers:"); 394 for (AtomicityWriter writer : writers) { 395 LOG.info(" wrote " + writer.numWritten.get()); 396 } 397 LOG.info("Readers:"); 398 for (AtomicGetReader reader : getters) { 399 LOG.info(" read " + reader.numRead.get()); 400 } 401 LOG.info("Scanners:"); 402 for (AtomicScanReader scanner : scanners) { 403 LOG.info(" scanned " + scanner.numScans.get()); 404 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 405 } 406 } 407 408 public static void main(String[] args) { 409 Configuration c = HBaseConfiguration.create(); 410 int status; 411 try { 412 AcidGuaranteesTestTool test = new AcidGuaranteesTestTool(); 413 status = ToolRunner.run(c, test, args); 414 } catch (Exception e) { 415 LOG.error("Exiting due to error", e); 416 status = -1; 417 } 418 System.exit(status); 419 } 420}