001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HRegionLocation; 036import org.apache.hadoop.hbase.RegionLocations; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ClusterConnection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 042import org.apache.hadoop.util.StringUtils; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 047 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 049 050/** 051 * Common base class for reader and writer parts of multi-thread HBase load test (See LoadTestTool). 052 */ 053public abstract class MultiThreadedAction { 054 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class); 055 056 protected final TableName tableName; 057 protected final Configuration conf; 058 protected final ClusterConnection connection; // all reader / writer threads will share this 059 // connection 060 061 protected int numThreads = 1; 062 063 /** The start key of the key range, inclusive */ 064 protected long startKey = 0; 065 066 /** The end key of the key range, exclusive */ 067 protected long endKey = 1; 068 069 protected AtomicInteger numThreadsWorking = new AtomicInteger(); 070 protected AtomicLong numKeys = new AtomicLong(); 071 protected AtomicLong numCols = new AtomicLong(); 072 protected AtomicLong totalOpTimeMs = new AtomicLong(); 073 protected boolean verbose = false; 074 075 protected LoadTestDataGenerator dataGenerator = null; 076 077 /** 078 * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed set of 079 * column families, and random number of columns in range. The table for it can be created 080 * manually or, for example, via 081 * {@link org.apache.hadoop.hbase.HBaseTestingUtility#createPreSplitLoadTestTable(Configuration, TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm, org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)} 082 */ 083 public static class DefaultDataGenerator extends LoadTestDataGenerator { 084 private byte[][] columnFamilies = null; 085 private int minColumnsPerKey; 086 private int maxColumnsPerKey; 087 088 public DefaultDataGenerator(int minValueSize, int maxValueSize, int minColumnsPerKey, 089 int maxColumnsPerKey, byte[]... columnFamilies) { 090 super(minValueSize, maxValueSize); 091 this.columnFamilies = columnFamilies; 092 this.minColumnsPerKey = minColumnsPerKey; 093 this.maxColumnsPerKey = maxColumnsPerKey; 094 } 095 096 public DefaultDataGenerator(byte[]... columnFamilies) { 097 // Default values for tests that didn't care to provide theirs. 098 this(256, 1024, 1, 10, columnFamilies); 099 } 100 101 @Override 102 public byte[] getDeterministicUniqueKey(long keyBase) { 103 return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase)); 104 } 105 106 @Override 107 public byte[][] getColumnFamilies() { 108 return columnFamilies; 109 } 110 111 @Override 112 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 113 int numColumns = minColumnsPerKey 114 + ThreadLocalRandom.current().nextInt(maxColumnsPerKey - minColumnsPerKey + 1); 115 byte[][] columns = new byte[numColumns][]; 116 for (int i = 0; i < numColumns; ++i) { 117 columns[i] = Bytes.toBytes(Integer.toString(i)); 118 } 119 return columns; 120 } 121 122 @Override 123 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 124 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 125 } 126 127 @Override 128 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 129 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 130 } 131 132 @Override 133 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 134 return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey); 135 } 136 } 137 138 /** "R" or "W" */ 139 private String actionLetter; 140 141 /** Whether we need to print out Hadoop Streaming-style counters */ 142 private boolean streamingCounters; 143 144 public static final int REPORTING_INTERVAL_MS = 5000; 145 146 public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, 147 String actionLetter) throws IOException { 148 this.conf = conf; 149 this.dataGenerator = dataGen; 150 this.tableName = tableName; 151 this.actionLetter = actionLetter; 152 this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf); 153 } 154 155 public void start(long startKey, long endKey, int numThreads) throws IOException { 156 this.startKey = startKey; 157 this.endKey = endKey; 158 this.numThreads = numThreads; 159 (new Thread(new ProgressReporter(actionLetter), 160 "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start(); 161 } 162 163 private static String formatTime(long elapsedTime) { 164 String format = String.format("%%0%dd", 2); 165 elapsedTime = elapsedTime / 1000; 166 String seconds = String.format(format, elapsedTime % 60); 167 String minutes = String.format(format, (elapsedTime % 3600) / 60); 168 String hours = String.format(format, elapsedTime / 3600); 169 String time = hours + ":" + minutes + ":" + seconds; 170 return time; 171 } 172 173 /** Asynchronously reports progress */ 174 private class ProgressReporter implements Runnable { 175 176 private String reporterId = ""; 177 178 public ProgressReporter(String id) { 179 this.reporterId = id; 180 } 181 182 @Override 183 public void run() { 184 long startTime = EnvironmentEdgeManager.currentTime(); 185 long priorNumKeys = 0; 186 long priorCumulativeOpTime = 0; 187 int priorAverageKeysPerSecond = 0; 188 189 // Give other threads time to start. 190 Threads.sleep(REPORTING_INTERVAL_MS); 191 192 while (numThreadsWorking.get() != 0) { 193 String threadsLeft = "[" + reporterId + ":" + numThreadsWorking.get() + "] "; 194 if (numKeys.get() == 0) { 195 LOG.info(threadsLeft + "Number of keys = 0"); 196 } else { 197 long numKeys = MultiThreadedAction.this.numKeys.get(); 198 long time = EnvironmentEdgeManager.currentTime() - startTime; 199 long totalOpTime = totalOpTimeMs.get(); 200 201 long numKeysDelta = numKeys - priorNumKeys; 202 long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; 203 204 double averageKeysPerSecond = (time > 0) ? (numKeys * 1000 / time) : 0; 205 206 LOG.info(threadsLeft + "Keys=" + numKeys + ", cols=" 207 + StringUtils.humanReadableInt(numCols.get()) + ", time=" + formatTime(time) 208 + ((numKeys > 0 && time > 0) 209 ? (" Overall: [" + "keys/s= " + numKeys * 1000 / time + ", latency=" 210 + String.format("%.2f", (double) totalOpTime / (double) numKeys) + " ms]") 211 : "") 212 + ((numKeysDelta > 0) 213 ? (" Current: [" + "keys/s=" + numKeysDelta * 1000 / REPORTING_INTERVAL_MS 214 + ", latency=" 215 + String.format("%.2f", (double) totalOpTimeDelta / (double) numKeysDelta) + " ms]") 216 : "") 217 + progressInfo()); 218 219 if (streamingCounters) { 220 printStreamingCounters(numKeysDelta, averageKeysPerSecond - priorAverageKeysPerSecond); 221 } 222 223 priorNumKeys = numKeys; 224 priorCumulativeOpTime = totalOpTime; 225 priorAverageKeysPerSecond = (int) averageKeysPerSecond; 226 } 227 228 Threads.sleep(REPORTING_INTERVAL_MS); 229 } 230 } 231 232 private void printStreamingCounters(long numKeysDelta, double avgKeysPerSecondDelta) { 233 // Write stats in a format that can be interpreted as counters by 234 // streaming map-reduce jobs. 235 System.err.println("reporter:counter:numKeys," + reporterId + "," + numKeysDelta); 236 System.err.println("reporter:counter:numCols," + reporterId + "," + numCols.get()); 237 System.err.println( 238 "reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta)); 239 } 240 } 241 242 public void close() { 243 if (connection != null) { 244 try { 245 connection.close(); 246 } catch (Exception ex) { 247 LOG.warn("Could not close the connection: " + ex); 248 } 249 } 250 } 251 252 public void waitForFinish() { 253 while (numThreadsWorking.get() != 0) { 254 Threads.sleepWithoutInterrupt(1000); 255 } 256 close(); 257 } 258 259 public boolean isDone() { 260 return (numThreadsWorking.get() == 0); 261 } 262 263 protected void startThreads(Collection<? extends Thread> threads) { 264 numThreadsWorking.addAndGet(threads.size()); 265 for (Thread thread : threads) { 266 thread.start(); 267 } 268 } 269 270 /** Returns the end key of the key range, exclusive */ 271 public long getEndKey() { 272 return endKey; 273 } 274 275 /** Returns a task-specific progress string */ 276 protected abstract String progressInfo(); 277 278 protected static void appendToStatus(StringBuilder sb, String desc, long v) { 279 if (v == 0) { 280 return; 281 } 282 sb.append(", "); 283 sb.append(desc); 284 sb.append("="); 285 sb.append(v); 286 } 287 288 protected static void appendToStatus(StringBuilder sb, String desc, String v) { 289 sb.append(", "); 290 sb.append(desc); 291 sb.append("="); 292 sb.append(v); 293 } 294 295 /** 296 * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. Does not verify 297 * cf/column integrity. 298 */ 299 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) { 300 return verifyResultAgainstDataGenerator(result, verifyValues, false); 301 } 302 303 /** 304 * Verifies the result from get or scan using the dataGenerator (that was presumably also used to 305 * generate said result). 306 * @param verifyValues verify that values in the result make sense for row/cf/column 307 * combination 308 * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note 309 * that to use this multiPut should be used, or verification has 310 * to happen after writes, otherwise there can be races. 311 * @return true if the values of row result makes sense for row/cf/column combination and true if 312 * the cf/column set in the result is complete, false otherwise. 313 */ 314 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues, 315 boolean verifyCfAndColumnIntegrity) { 316 String rowKeyStr = Bytes.toString(result.getRow()); 317 // See if we have any data at all. 318 if (result.isEmpty()) { 319 LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned"); 320 printLocations(result); 321 return false; 322 } 323 324 if (!verifyValues && !verifyCfAndColumnIntegrity) { 325 return true; // as long as we have something, we are good. 326 } 327 328 // See if we have all the CFs. 329 byte[][] expectedCfs = dataGenerator.getColumnFamilies(); 330 if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) { 331 LOG.error("Error checking data for key [" + rowKeyStr + "], bad family count: " 332 + result.getMap().size()); 333 printLocations(result); 334 return false; 335 } 336 337 // Verify each column family from get in the result. 338 for (byte[] cf : result.getMap().keySet()) { 339 String cfStr = Bytes.toString(cf); 340 Map<byte[], byte[]> columnValues = result.getFamilyMap(cf); 341 if (columnValues == null) { 342 LOG.error( 343 "Error checking data for key [" + rowKeyStr + "], no data for family [" + cfStr + "]]"); 344 printLocations(result); 345 return false; 346 } 347 348 Map<String, MutationType> mutateInfo = null; 349 if (verifyCfAndColumnIntegrity || verifyValues) { 350 if (!columnValues.containsKey(MUTATE_INFO)) { 351 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 352 + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found"); 353 printLocations(result); 354 return false; 355 } 356 357 long cfHash = Arrays.hashCode(cf); 358 // Verify deleted columns, and make up column counts if deleted 359 byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO); 360 mutateInfo = parseMutateInfo(mutateInfoValue); 361 for (Map.Entry<String, MutationType> mutate : mutateInfo.entrySet()) { 362 if (mutate.getValue() == MutationType.DELETE) { 363 byte[] column = Bytes.toBytes(mutate.getKey()); 364 long columnHash = Arrays.hashCode(column); 365 long hashCode = cfHash + columnHash; 366 if (hashCode % 2 == 0) { 367 if (columnValues.containsKey(column)) { 368 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 369 + "], column [" + mutate.getKey() + "]; should be deleted"); 370 printLocations(result); 371 return false; 372 } 373 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 374 columnValues.put(column, hashCodeBytes); 375 } 376 } 377 } 378 379 // Verify increment 380 if (!columnValues.containsKey(INCREMENT)) { 381 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 382 + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found"); 383 printLocations(result); 384 return false; 385 } 386 long currentValue = Bytes.toLong(columnValues.remove(INCREMENT)); 387 if (verifyValues) { 388 long amount = mutateInfo.isEmpty() ? 0 : cfHash; 389 long originalValue = Arrays.hashCode(result.getRow()); 390 long extra = currentValue - originalValue; 391 if (extra != 0 && (amount == 0 || extra % amount != 0)) { 392 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 393 + "], column [increment], extra [" + extra + "], amount [" + amount + "]"); 394 printLocations(result); 395 return false; 396 } 397 if (amount != 0 && extra != amount) { 398 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" + cfStr 399 + "], column [increment], incremented [" + (extra / amount) + "] times"); 400 } 401 } 402 403 // See if we have correct columns. 404 if ( 405 verifyCfAndColumnIntegrity 406 && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet()) 407 ) { 408 String colsStr = ""; 409 for (byte[] col : columnValues.keySet()) { 410 if (colsStr.length() > 0) { 411 colsStr += ", "; 412 } 413 colsStr += "[" + Bytes.toString(col) + "]"; 414 } 415 LOG.error("Error checking data for key [" + rowKeyStr + "], bad columns for family [" 416 + cfStr + "]: " + colsStr); 417 printLocations(result); 418 return false; 419 } 420 // See if values check out. 421 if (verifyValues) { 422 for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) { 423 String column = Bytes.toString(kv.getKey()); 424 MutationType mutation = mutateInfo.get(column); 425 boolean verificationNeeded = true; 426 byte[] bytes = kv.getValue(); 427 if (mutation != null) { 428 boolean mutationVerified = true; 429 long columnHash = Arrays.hashCode(kv.getKey()); 430 long hashCode = cfHash + columnHash; 431 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 432 if (mutation == MutationType.APPEND) { 433 int offset = bytes.length - hashCodeBytes.length; 434 mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes, 0, 435 hashCodeBytes.length, bytes, offset, hashCodeBytes.length); 436 if (mutationVerified) { 437 int n = 1; 438 while (true) { 439 int newOffset = offset - hashCodeBytes.length; 440 if ( 441 newOffset < 0 || !Bytes.equals(hashCodeBytes, 0, hashCodeBytes.length, bytes, 442 newOffset, hashCodeBytes.length) 443 ) { 444 break; 445 } 446 offset = newOffset; 447 n++; 448 } 449 if (n > 1) { 450 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" 451 + cfStr + "], column [" + column + "], appended [" + n + "] times"); 452 } 453 byte[] dest = new byte[offset]; 454 System.arraycopy(bytes, 0, dest, 0, offset); 455 bytes = dest; 456 } 457 } else if (hashCode % 2 == 0) { // checkAndPut 458 mutationVerified = Bytes.equals(bytes, hashCodeBytes); 459 verificationNeeded = false; 460 } 461 if (!mutationVerified) { 462 LOG.error("Error checking data for key [" + rowKeyStr 463 + "], mutation checking failed for column family [" + cfStr + "], column [" 464 + column + "]; mutation [" + mutation + "], hashCode [" + hashCode 465 + "], verificationNeeded [" + verificationNeeded + "]"); 466 printLocations(result); 467 return false; 468 } 469 } // end of mutation checking 470 if ( 471 verificationNeeded && !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes) 472 ) { 473 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 474 + "], column [" + column + "], mutation [" + mutation + "]; value of length " 475 + bytes.length); 476 printLocations(result); 477 return false; 478 } 479 } 480 } 481 } 482 } 483 return true; 484 } 485 486 private void printLocations(Result r) { 487 RegionLocations rl = null; 488 if (r == null) { 489 LOG.info("FAILED FOR null Result"); 490 return; 491 } 492 LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale()); 493 if (r.getRow() == null) { 494 return; 495 } 496 try { 497 rl = ((ClusterConnection) connection).locateRegion(tableName, r.getRow(), true, true); 498 } catch (IOException e) { 499 LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow())); 500 } 501 HRegionLocation locations[] = rl.getRegionLocations(); 502 for (HRegionLocation h : locations) { 503 LOG.info("LOCATION " + h); 504 } 505 } 506 507 private String resultToString(Result result) { 508 StringBuilder sb = new StringBuilder(); 509 sb.append("cells="); 510 if (result.isEmpty()) { 511 sb.append("NONE"); 512 return sb.toString(); 513 } 514 sb.append("{"); 515 boolean moreThanOne = false; 516 for (Cell cell : result.listCells()) { 517 if (moreThanOne) { 518 sb.append(", "); 519 } else { 520 moreThanOne = true; 521 } 522 sb.append(CellUtil.toString(cell, true)); 523 } 524 sb.append("}"); 525 return sb.toString(); 526 } 527 528 // Parse mutate info into a map of <column name> => <update action> 529 private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) { 530 Map<String, MutationType> mi = new HashMap<>(); 531 if (mutateInfo != null) { 532 String mutateInfoStr = Bytes.toString(mutateInfo); 533 String[] mutations = mutateInfoStr.split("#"); 534 for (String mutation : mutations) { 535 if (mutation.isEmpty()) continue; 536 Preconditions.checkArgument(mutation.contains(":"), "Invalid mutation info " + mutation); 537 int p = mutation.indexOf(":"); 538 String column = mutation.substring(0, p); 539 MutationType type = MutationType.valueOf(Integer.parseInt(mutation.substring(p + 1))); 540 mi.put(column, type); 541 } 542 } 543 return mi; 544 } 545}