001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import java.util.stream.Stream; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 032import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.util.AbstractHBaseTool; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.Threads; 048import org.apache.hadoop.util.StringUtils; 049import org.apache.hadoop.util.ToolRunner; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 057 058/** 059 * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying 060 * that reads never see partially-complete writes 061 */ 062@InterfaceAudience.Private 063public class AcidGuaranteesTestTool extends AbstractHBaseTool { 064 065 private static final Logger LOG = LoggerFactory.getLogger(AcidGuaranteesTestTool.class); 066 067 public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees"); 068 public static final byte[] FAMILY_A = Bytes.toBytes("A"); 069 public static final byte[] FAMILY_B = Bytes.toBytes("B"); 070 public static final byte[] FAMILY_C = Bytes.toBytes("C"); 071 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); 072 073 public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C }; 074 075 public static int NUM_COLS_TO_CHECK = 50; 076 077 private ExecutorService sharedPool; 078 079 private long millisToRun; 080 private int numWriters; 081 private int numGetters; 082 private int numScanners; 083 private int numUniqueRows; 084 private boolean crazyFlush; 085 private boolean useMob; 086 087 private ExecutorService createThreadPool() { 088 int maxThreads = 256; 089 int coreThreads = 128; 090 091 long keepAliveTime = 60; 092 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>( 093 maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); 094 095 ThreadPoolExecutor tpe = 096 new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, 097 new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").setDaemon(true) 098 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 099 tpe.allowCoreThreadTimeOut(true); 100 return tpe; 101 } 102 103 @Override 104 protected void addOptions() { 105 addOptWithArg("millis", "time limit in milliseconds"); 106 addOptWithArg("numWriters", "number of write threads"); 107 addOptWithArg("numGetters", "number of get threads"); 108 addOptWithArg("numScanners", "number of scan threads"); 109 addOptWithArg("numUniqueRows", "number of unique rows to test"); 110 addOptNoArg("crazyFlush", 111 "if specified we will flush continuously otherwise will flush every minute"); 112 addOptNoArg("useMob", "if specified we will enable mob on the first column family"); 113 } 114 115 @Override 116 protected void processOptions(CommandLine cmd) { 117 millisToRun = getOptionAsLong(cmd, "millis", 5000); 118 numWriters = getOptionAsInt(cmd, "numWriters", 50); 119 numGetters = getOptionAsInt(cmd, "numGetters", 2); 120 numScanners = getOptionAsInt(cmd, "numScanners", 2); 121 numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3); 122 crazyFlush = cmd.hasOption("crazyFlush"); 123 useMob = cmd.hasOption("useMob"); 124 } 125 126 @Override 127 protected int doWork() throws Exception { 128 sharedPool = createThreadPool(); 129 try (Connection conn = ConnectionFactory.createConnection(getConf())) { 130 runTestAtomicity(conn.getAdmin()); 131 } finally { 132 sharedPool.shutdown(); 133 } 134 return 0; 135 } 136 137 /** 138 * Thread that does random full-row writes into a table. 139 */ 140 public static class AtomicityWriter extends RepeatingTestThread { 141 byte data[] = new byte[10]; 142 byte[][] targetRows; 143 byte[][] targetFamilies; 144 Connection connection; 145 Table table; 146 AtomicLong numWritten = new AtomicLong(); 147 148 public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies, 149 ExecutorService pool) throws IOException { 150 super(ctx); 151 this.targetRows = targetRows; 152 this.targetFamilies = targetFamilies; 153 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 154 table = connection.getTable(TABLE_NAME); 155 } 156 157 @Override 158 public void doAnAction() throws Exception { 159 // Pick a random row to write into 160 byte[] targetRow = targetRows[ThreadLocalRandom.current().nextInt(targetRows.length)]; 161 Put p = new Put(targetRow); 162 Bytes.random(data); 163 for (byte[] family : targetFamilies) { 164 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 165 byte qualifier[] = Bytes.toBytes("col" + i); 166 p.addColumn(family, qualifier, data); 167 } 168 } 169 table.put(p); 170 numWritten.getAndIncrement(); 171 } 172 173 @Override 174 public void workDone() throws IOException { 175 try { 176 table.close(); 177 } finally { 178 connection.close(); 179 } 180 } 181 } 182 183 /** 184 * Thread that does single-row reads in a table, looking for partially completed rows. 185 */ 186 public static class AtomicGetReader extends RepeatingTestThread { 187 byte[] targetRow; 188 byte[][] targetFamilies; 189 Connection connection; 190 Table table; 191 int numVerified = 0; 192 AtomicLong numRead = new AtomicLong(); 193 194 public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies, 195 ExecutorService pool) throws IOException { 196 super(ctx); 197 this.targetRow = targetRow; 198 this.targetFamilies = targetFamilies; 199 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 200 table = connection.getTable(TABLE_NAME); 201 } 202 203 @Override 204 public void doAnAction() throws Exception { 205 Get g = new Get(targetRow); 206 Result res = table.get(g); 207 byte[] gotValue = null; 208 if (res.getRow() == null) { 209 // Trying to verify but we didn't find the row - the writing 210 // thread probably just hasn't started writing yet, so we can 211 // ignore this action 212 return; 213 } 214 215 for (byte[] family : targetFamilies) { 216 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 217 byte qualifier[] = Bytes.toBytes("col" + i); 218 byte thisValue[] = res.getValue(family, qualifier); 219 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { 220 gotFailure(gotValue, res); 221 } 222 numVerified++; 223 gotValue = thisValue; 224 } 225 } 226 numRead.getAndIncrement(); 227 } 228 229 @Override 230 public void workDone() throws IOException { 231 try { 232 table.close(); 233 } finally { 234 connection.close(); 235 } 236 } 237 238 private void gotFailure(byte[] expected, Result res) { 239 StringBuilder msg = new StringBuilder(); 240 msg.append("Failed after ").append(numVerified).append("!"); 241 msg.append("Expected=").append(Bytes.toStringBinary(expected)); 242 msg.append("Got:\n"); 243 for (Cell kv : res.listCells()) { 244 msg.append(kv.toString()); 245 msg.append(" val= "); 246 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); 247 msg.append("\n"); 248 } 249 throw new RuntimeException(msg.toString()); 250 } 251 } 252 253 /** 254 * Thread that does full scans of the table looking for any partially completed rows. 255 */ 256 public static class AtomicScanReader extends RepeatingTestThread { 257 byte[][] targetFamilies; 258 Table table; 259 Connection connection; 260 AtomicLong numScans = new AtomicLong(); 261 AtomicLong numRowsScanned = new AtomicLong(); 262 263 public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool) 264 throws IOException { 265 super(ctx); 266 this.targetFamilies = targetFamilies; 267 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 268 table = connection.getTable(TABLE_NAME); 269 } 270 271 @Override 272 public void doAnAction() throws Exception { 273 Scan s = new Scan(); 274 for (byte[] family : targetFamilies) { 275 s.addFamily(family); 276 } 277 ResultScanner scanner = table.getScanner(s); 278 279 for (Result res : scanner) { 280 byte[] gotValue = null; 281 282 for (byte[] family : targetFamilies) { 283 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 284 byte qualifier[] = Bytes.toBytes("col" + i); 285 byte thisValue[] = res.getValue(family, qualifier); 286 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { 287 gotFailure(gotValue, res); 288 } 289 gotValue = thisValue; 290 } 291 } 292 numRowsScanned.getAndIncrement(); 293 } 294 numScans.getAndIncrement(); 295 } 296 297 @Override 298 public void workDone() throws IOException { 299 try { 300 table.close(); 301 } finally { 302 connection.close(); 303 } 304 } 305 306 private void gotFailure(byte[] expected, Result res) { 307 StringBuilder msg = new StringBuilder(); 308 msg.append("Failed after ").append(numRowsScanned).append("!"); 309 msg.append("Expected=").append(Bytes.toStringBinary(expected)); 310 msg.append("Got:\n"); 311 for (Cell kv : res.listCells()) { 312 msg.append(kv.toString()); 313 msg.append(" val= "); 314 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); 315 msg.append("\n"); 316 } 317 throw new RuntimeException(msg.toString()); 318 } 319 } 320 321 private void createTableIfMissing(Admin admin, boolean useMob) throws IOException { 322 if (!admin.tableExists(TABLE_NAME)) { 323 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 324 Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of) 325 .forEachOrdered(builder::setColumnFamily); 326 admin.createTable(builder.build()); 327 } 328 ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0]; 329 if (cfd.isMobEnabled() != useMob) { 330 admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd) 331 .setMobEnabled(useMob).setMobThreshold(4).build()); 332 } 333 } 334 335 private void runTestAtomicity(Admin admin) throws Exception { 336 createTableIfMissing(admin, useMob); 337 TestContext ctx = new TestContext(conf); 338 339 byte rows[][] = new byte[numUniqueRows][]; 340 for (int i = 0; i < numUniqueRows; i++) { 341 rows[i] = Bytes.toBytes("test_row_" + i); 342 } 343 344 List<AtomicityWriter> writers = Lists.newArrayList(); 345 for (int i = 0; i < numWriters; i++) { 346 AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool); 347 writers.add(writer); 348 ctx.addThread(writer); 349 } 350 // Add a flusher 351 ctx.addThread(new RepeatingTestThread(ctx) { 352 @Override 353 public void doAnAction() throws Exception { 354 try { 355 admin.flush(TABLE_NAME); 356 } catch (IOException ioe) { 357 LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe)); 358 } 359 // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally, 360 // we would flush as often as possible. On a running cluster, this isn't practical: 361 // (1) we will cause a lot of load due to all the flushing and compacting 362 // (2) we cannot change the flushing/compacting related Configuration options to try to 363 // alleviate this 364 // (3) it is an unrealistic workload, since no one would actually flush that often. 365 // Therefore, let's flush every minute to have more flushes than usual, but not overload 366 // the running cluster. 367 if (!crazyFlush) { 368 Thread.sleep(60000); 369 } 370 } 371 }); 372 373 List<AtomicGetReader> getters = Lists.newArrayList(); 374 for (int i = 0; i < numGetters; i++) { 375 AtomicGetReader getter = 376 new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool); 377 getters.add(getter); 378 ctx.addThread(getter); 379 } 380 381 List<AtomicScanReader> scanners = Lists.newArrayList(); 382 for (int i = 0; i < numScanners; i++) { 383 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool); 384 scanners.add(scanner); 385 ctx.addThread(scanner); 386 } 387 388 ctx.startThreads(); 389 ctx.waitFor(millisToRun); 390 ctx.stop(); 391 392 LOG.info("Finished test. Writers:"); 393 for (AtomicityWriter writer : writers) { 394 LOG.info(" wrote " + writer.numWritten.get()); 395 } 396 LOG.info("Readers:"); 397 for (AtomicGetReader reader : getters) { 398 LOG.info(" read " + reader.numRead.get()); 399 } 400 LOG.info("Scanners:"); 401 for (AtomicScanReader scanner : scanners) { 402 LOG.info(" scanned " + scanner.numScans.get()); 403 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 404 } 405 } 406 407 public static void main(String[] args) { 408 Configuration c = HBaseConfiguration.create(); 409 int status; 410 try { 411 AcidGuaranteesTestTool test = new AcidGuaranteesTestTool(); 412 status = ToolRunner.run(c, test, args); 413 } catch (Exception e) { 414 LOG.error("Exiting due to error", e); 415 status = -1; 416 } 417 System.exit(status); 418 } 419}