001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Random; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import java.util.stream.Stream; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 032import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.util.AbstractHBaseTool; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.Threads; 048import org.apache.hadoop.util.StringUtils; 049import org.apache.hadoop.util.ToolRunner; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 054import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 055 056/** 057 * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying 058 * that reads never see partially-complete writes 059 */ 060@InterfaceAudience.Private 061public class AcidGuaranteesTestTool extends AbstractHBaseTool { 062 063 private static final Logger LOG = LoggerFactory.getLogger(AcidGuaranteesTestTool.class); 064 065 public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees"); 066 public static final byte[] FAMILY_A = Bytes.toBytes("A"); 067 public static final byte[] FAMILY_B = Bytes.toBytes("B"); 068 public static final byte[] FAMILY_C = Bytes.toBytes("C"); 069 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); 070 071 public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C }; 072 073 public static int NUM_COLS_TO_CHECK = 50; 074 075 private ExecutorService sharedPool; 076 077 private long millisToRun; 078 private int numWriters; 079 private int numGetters; 080 private int numScanners; 081 private int numUniqueRows; 082 private boolean crazyFlush; 083 private boolean useMob; 084 085 private ExecutorService createThreadPool() { 086 int maxThreads = 256; 087 int coreThreads = 128; 088 089 long keepAliveTime = 60; 090 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>( 091 maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); 092 093 ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, 094 TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + "-shared")); 095 tpe.allowCoreThreadTimeOut(true); 096 return tpe; 097 } 098 099 @Override 100 protected void addOptions() { 101 addOptWithArg("millis", "time limit in milliseconds"); 102 addOptWithArg("numWriters", "number of write threads"); 103 addOptWithArg("numGetters", "number of get threads"); 104 addOptWithArg("numScanners", "number of scan threads"); 105 addOptWithArg("numUniqueRows", "number of unique rows to test"); 106 addOptNoArg("crazyFlush", 107 "if specified we will flush continuously otherwise will flush every minute"); 108 addOptNoArg("useMob", "if specified we will enable mob on the first column family"); 109 } 110 111 @Override 112 protected void processOptions(CommandLine cmd) { 113 millisToRun = getOptionAsLong(cmd, "millis", 5000); 114 numWriters = getOptionAsInt(cmd, "numWriters", 50); 115 numGetters = getOptionAsInt(cmd, "numGetters", 2); 116 numScanners = getOptionAsInt(cmd, "numScanners", 2); 117 numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3); 118 crazyFlush = cmd.hasOption("crazyFlush"); 119 useMob = cmd.hasOption("useMob"); 120 } 121 122 @Override 123 protected int doWork() throws Exception { 124 sharedPool = createThreadPool(); 125 try (Connection conn = ConnectionFactory.createConnection(getConf())) { 126 runTestAtomicity(conn.getAdmin()); 127 } finally { 128 sharedPool.shutdown(); 129 } 130 return 0; 131 } 132 133 /** 134 * Thread that does random full-row writes into a table. 135 */ 136 public static class AtomicityWriter extends RepeatingTestThread { 137 Random rand = new Random(); 138 byte data[] = new byte[10]; 139 byte[][] targetRows; 140 byte[][] targetFamilies; 141 Connection connection; 142 Table table; 143 AtomicLong numWritten = new AtomicLong(); 144 145 public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies, 146 ExecutorService pool) throws IOException { 147 super(ctx); 148 this.targetRows = targetRows; 149 this.targetFamilies = targetFamilies; 150 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 151 table = connection.getTable(TABLE_NAME); 152 } 153 154 @Override 155 public void doAnAction() throws Exception { 156 // Pick a random row to write into 157 byte[] targetRow = targetRows[rand.nextInt(targetRows.length)]; 158 Put p = new Put(targetRow); 159 rand.nextBytes(data); 160 161 for (byte[] family : targetFamilies) { 162 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 163 byte qualifier[] = Bytes.toBytes("col" + i); 164 p.addColumn(family, qualifier, data); 165 } 166 } 167 table.put(p); 168 numWritten.getAndIncrement(); 169 } 170 171 @Override 172 public void workDone() throws IOException { 173 try { 174 table.close(); 175 } finally { 176 connection.close(); 177 } 178 } 179 } 180 181 /** 182 * Thread that does single-row reads in a table, looking for partially completed rows. 183 */ 184 public static class AtomicGetReader extends RepeatingTestThread { 185 byte[] targetRow; 186 byte[][] targetFamilies; 187 Connection connection; 188 Table table; 189 int numVerified = 0; 190 AtomicLong numRead = new AtomicLong(); 191 192 public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies, 193 ExecutorService pool) throws IOException { 194 super(ctx); 195 this.targetRow = targetRow; 196 this.targetFamilies = targetFamilies; 197 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 198 table = connection.getTable(TABLE_NAME); 199 } 200 201 @Override 202 public void doAnAction() throws Exception { 203 Get g = new Get(targetRow); 204 Result res = table.get(g); 205 byte[] gotValue = null; 206 if (res.getRow() == null) { 207 // Trying to verify but we didn't find the row - the writing 208 // thread probably just hasn't started writing yet, so we can 209 // ignore this action 210 return; 211 } 212 213 for (byte[] family : targetFamilies) { 214 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 215 byte qualifier[] = Bytes.toBytes("col" + i); 216 byte thisValue[] = res.getValue(family, qualifier); 217 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { 218 gotFailure(gotValue, res); 219 } 220 numVerified++; 221 gotValue = thisValue; 222 } 223 } 224 numRead.getAndIncrement(); 225 } 226 227 @Override 228 public void workDone() throws IOException { 229 try { 230 table.close(); 231 } finally { 232 connection.close(); 233 } 234 } 235 236 private void gotFailure(byte[] expected, Result res) { 237 StringBuilder msg = new StringBuilder(); 238 msg.append("Failed after ").append(numVerified).append("!"); 239 msg.append("Expected=").append(Bytes.toStringBinary(expected)); 240 msg.append("Got:\n"); 241 for (Cell kv : res.listCells()) { 242 msg.append(kv.toString()); 243 msg.append(" val= "); 244 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); 245 msg.append("\n"); 246 } 247 throw new RuntimeException(msg.toString()); 248 } 249 } 250 251 /** 252 * Thread that does full scans of the table looking for any partially completed rows. 253 */ 254 public static class AtomicScanReader extends RepeatingTestThread { 255 byte[][] targetFamilies; 256 Table table; 257 Connection connection; 258 AtomicLong numScans = new AtomicLong(); 259 AtomicLong numRowsScanned = new AtomicLong(); 260 261 public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool) 262 throws IOException { 263 super(ctx); 264 this.targetFamilies = targetFamilies; 265 connection = ConnectionFactory.createConnection(ctx.getConf(), pool); 266 table = connection.getTable(TABLE_NAME); 267 } 268 269 @Override 270 public void doAnAction() throws Exception { 271 Scan s = new Scan(); 272 for (byte[] family : targetFamilies) { 273 s.addFamily(family); 274 } 275 ResultScanner scanner = table.getScanner(s); 276 277 for (Result res : scanner) { 278 byte[] gotValue = null; 279 280 for (byte[] family : targetFamilies) { 281 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { 282 byte qualifier[] = Bytes.toBytes("col" + i); 283 byte thisValue[] = res.getValue(family, qualifier); 284 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { 285 gotFailure(gotValue, res); 286 } 287 gotValue = thisValue; 288 } 289 } 290 numRowsScanned.getAndIncrement(); 291 } 292 numScans.getAndIncrement(); 293 } 294 295 @Override 296 public void workDone() throws IOException { 297 try { 298 table.close(); 299 } finally { 300 connection.close(); 301 } 302 } 303 304 private void gotFailure(byte[] expected, Result res) { 305 StringBuilder msg = new StringBuilder(); 306 msg.append("Failed after ").append(numRowsScanned).append("!"); 307 msg.append("Expected=").append(Bytes.toStringBinary(expected)); 308 msg.append("Got:\n"); 309 for (Cell kv : res.listCells()) { 310 msg.append(kv.toString()); 311 msg.append(" val= "); 312 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); 313 msg.append("\n"); 314 } 315 throw new RuntimeException(msg.toString()); 316 } 317 } 318 319 private void createTableIfMissing(Admin admin, boolean useMob) throws IOException { 320 if (!admin.tableExists(TABLE_NAME)) { 321 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 322 Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of) 323 .forEachOrdered(builder::setColumnFamily); 324 admin.createTable(builder.build()); 325 } 326 ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0]; 327 if (cfd.isMobEnabled() != useMob) { 328 admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd) 329 .setMobEnabled(useMob).setMobThreshold(4).build()); 330 } 331 } 332 333 private void runTestAtomicity(Admin admin) throws Exception { 334 createTableIfMissing(admin, useMob); 335 TestContext ctx = new TestContext(conf); 336 337 byte rows[][] = new byte[numUniqueRows][]; 338 for (int i = 0; i < numUniqueRows; i++) { 339 rows[i] = Bytes.toBytes("test_row_" + i); 340 } 341 342 List<AtomicityWriter> writers = Lists.newArrayList(); 343 for (int i = 0; i < numWriters; i++) { 344 AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool); 345 writers.add(writer); 346 ctx.addThread(writer); 347 } 348 // Add a flusher 349 ctx.addThread(new RepeatingTestThread(ctx) { 350 @Override 351 public void doAnAction() throws Exception { 352 try { 353 admin.flush(TABLE_NAME); 354 } catch (IOException ioe) { 355 LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe)); 356 } 357 // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally, 358 // we would flush as often as possible. On a running cluster, this isn't practical: 359 // (1) we will cause a lot of load due to all the flushing and compacting 360 // (2) we cannot change the flushing/compacting related Configuration options to try to 361 // alleviate this 362 // (3) it is an unrealistic workload, since no one would actually flush that often. 363 // Therefore, let's flush every minute to have more flushes than usual, but not overload 364 // the running cluster. 365 if (!crazyFlush) { 366 Thread.sleep(60000); 367 } 368 } 369 }); 370 371 List<AtomicGetReader> getters = Lists.newArrayList(); 372 for (int i = 0; i < numGetters; i++) { 373 AtomicGetReader getter = 374 new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool); 375 getters.add(getter); 376 ctx.addThread(getter); 377 } 378 379 List<AtomicScanReader> scanners = Lists.newArrayList(); 380 for (int i = 0; i < numScanners; i++) { 381 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool); 382 scanners.add(scanner); 383 ctx.addThread(scanner); 384 } 385 386 ctx.startThreads(); 387 ctx.waitFor(millisToRun); 388 ctx.stop(); 389 390 LOG.info("Finished test. Writers:"); 391 for (AtomicityWriter writer : writers) { 392 LOG.info(" wrote " + writer.numWritten.get()); 393 } 394 LOG.info("Readers:"); 395 for (AtomicGetReader reader : getters) { 396 LOG.info(" read " + reader.numRead.get()); 397 } 398 LOG.info("Scanners:"); 399 for (AtomicScanReader scanner : scanners) { 400 LOG.info(" scanned " + scanner.numScans.get()); 401 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 402 } 403 } 404 405 public static void main(String[] args) { 406 Configuration c = HBaseConfiguration.create(); 407 int status; 408 try { 409 AcidGuaranteesTestTool test = new AcidGuaranteesTestTool(); 410 status = ToolRunner.run(c, test, args); 411 } catch (Exception e) { 412 LOG.error("Exiting due to error", e); 413 status = -1; 414 } 415 System.exit(status); 416 } 417}