001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicLong; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.RegionLocator; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 048 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 050 051/** 052 * Common base class for reader and writer parts of multi-thread HBase load test (See LoadTestTool). 053 */ 054@InterfaceAudience.Private 055public abstract class MultiThreadedAction { 056 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class); 057 058 protected final TableName tableName; 059 protected final Configuration conf; 060 protected final Connection connection; // all reader / writer threads will share this connection 061 062 protected int numThreads = 1; 063 064 /** The start key of the key range, inclusive */ 065 protected long startKey = 0; 066 067 /** The end key of the key range, exclusive */ 068 protected long endKey = 1; 069 070 protected AtomicInteger numThreadsWorking = new AtomicInteger(); 071 protected AtomicLong numKeys = new AtomicLong(); 072 protected AtomicLong numCols = new AtomicLong(); 073 protected AtomicLong totalOpTimeMs = new AtomicLong(); 074 protected boolean verbose = false; 075 076 protected LoadTestDataGenerator dataGenerator = null; 077 078 /** 079 * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed set of 080 * column families, and random number of columns in range. The table for it can be created 081 * manually or, for example, via 082 * {@link org.apache.hadoop.hbase.util.LoadTestUtil#createPreSplitLoadTestTable(Configuration, TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm, org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)} 083 */ 084 public static class DefaultDataGenerator extends LoadTestDataGenerator { 085 private byte[][] columnFamilies = null; 086 private int minColumnsPerKey; 087 private int maxColumnsPerKey; 088 089 public DefaultDataGenerator(int minValueSize, int maxValueSize, int minColumnsPerKey, 090 int maxColumnsPerKey, byte[]... columnFamilies) { 091 super(minValueSize, maxValueSize); 092 this.columnFamilies = columnFamilies; 093 this.minColumnsPerKey = minColumnsPerKey; 094 this.maxColumnsPerKey = maxColumnsPerKey; 095 } 096 097 public DefaultDataGenerator(byte[]... columnFamilies) { 098 // Default values for tests that didn't care to provide theirs. 099 this(256, 1024, 1, 10, columnFamilies); 100 } 101 102 @Override 103 public byte[] getDeterministicUniqueKey(long keyBase) { 104 return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase)); 105 } 106 107 @Override 108 public byte[][] getColumnFamilies() { 109 return columnFamilies; 110 } 111 112 @Override 113 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 114 int numColumns = minColumnsPerKey 115 + ThreadLocalRandom.current().nextInt(maxColumnsPerKey - minColumnsPerKey + 1); 116 byte[][] columns = new byte[numColumns][]; 117 for (int i = 0; i < numColumns; ++i) { 118 columns[i] = Bytes.toBytes(Integer.toString(i)); 119 } 120 return columns; 121 } 122 123 @Override 124 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 125 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 126 } 127 128 @Override 129 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 130 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 131 } 132 133 @Override 134 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 135 return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey); 136 } 137 } 138 139 /** "R" or "W" */ 140 private String actionLetter; 141 142 /** Whether we need to print out Hadoop Streaming-style counters */ 143 private boolean streamingCounters; 144 145 public static final int REPORTING_INTERVAL_MS = 5000; 146 147 public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, 148 String actionLetter) throws IOException { 149 this.conf = conf; 150 this.dataGenerator = dataGen; 151 this.tableName = tableName; 152 this.actionLetter = actionLetter; 153 this.connection = ConnectionFactory.createConnection(conf); 154 } 155 156 public void start(long startKey, long endKey, int numThreads) throws IOException { 157 this.startKey = startKey; 158 this.endKey = endKey; 159 this.numThreads = numThreads; 160 (new Thread(new ProgressReporter(actionLetter), 161 "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start(); 162 } 163 164 private static String formatTime(long elapsedTime) { 165 String format = String.format("%%0%dd", 2); 166 elapsedTime = elapsedTime / 1000; 167 String seconds = String.format(format, elapsedTime % 60); 168 String minutes = String.format(format, (elapsedTime % 3600) / 60); 169 String hours = String.format(format, elapsedTime / 3600); 170 String time = hours + ":" + minutes + ":" + seconds; 171 return time; 172 } 173 174 /** Asynchronously reports progress */ 175 private class ProgressReporter implements Runnable { 176 177 private String reporterId = ""; 178 179 public ProgressReporter(String id) { 180 this.reporterId = id; 181 } 182 183 @Override 184 public void run() { 185 long startTime = EnvironmentEdgeManager.currentTime(); 186 long priorNumKeys = 0; 187 long priorCumulativeOpTime = 0; 188 int priorAverageKeysPerSecond = 0; 189 190 // Give other threads time to start. 191 Threads.sleep(REPORTING_INTERVAL_MS); 192 193 while (numThreadsWorking.get() != 0) { 194 String threadsLeft = "[" + reporterId + ":" + numThreadsWorking.get() + "] "; 195 if (numKeys.get() == 0) { 196 LOG.info(threadsLeft + "Number of keys = 0"); 197 } else { 198 long numKeys = MultiThreadedAction.this.numKeys.get(); 199 long time = EnvironmentEdgeManager.currentTime() - startTime; 200 long totalOpTime = totalOpTimeMs.get(); 201 202 long numKeysDelta = numKeys - priorNumKeys; 203 long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; 204 205 double averageKeysPerSecond = (time > 0) ? (numKeys * 1000.0 / time) : 0; 206 207 LOG.info(threadsLeft + "Keys=" + numKeys + ", cols=" 208 + Strings.humanReadableInt(numCols.get()) + ", time=" + formatTime(time) 209 + ((numKeys > 0 && time > 0) 210 ? (" Overall: [" + "keys/s= " + (numKeys * 1000.0 / time) + ", latency=" 211 + String.format("%.2f", (double) totalOpTime / (double) numKeys) + " ms]") 212 : "") 213 + ((numKeysDelta > 0) 214 ? (" Current: [" + "keys/s=" + (numKeysDelta * 1000.0 / REPORTING_INTERVAL_MS) 215 + ", latency=" 216 + String.format("%.2f", (double) totalOpTimeDelta / (double) numKeysDelta) + " ms]") 217 : "") 218 + progressInfo()); 219 220 if (streamingCounters) { 221 printStreamingCounters(numKeysDelta, averageKeysPerSecond - priorAverageKeysPerSecond); 222 } 223 224 priorNumKeys = numKeys; 225 priorCumulativeOpTime = totalOpTime; 226 priorAverageKeysPerSecond = (int) averageKeysPerSecond; 227 } 228 229 Threads.sleep(REPORTING_INTERVAL_MS); 230 } 231 } 232 233 private void printStreamingCounters(long numKeysDelta, double avgKeysPerSecondDelta) { 234 // Write stats in a format that can be interpreted as counters by 235 // streaming map-reduce jobs. 236 System.err.println("reporter:counter:numKeys," + reporterId + "," + numKeysDelta); 237 System.err.println("reporter:counter:numCols," + reporterId + "," + numCols.get()); 238 System.err.println( 239 "reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta)); 240 } 241 } 242 243 public void close() { 244 if (connection != null) { 245 try { 246 connection.close(); 247 } catch (Exception ex) { 248 LOG.warn("Could not close the connection: " + ex); 249 } 250 } 251 } 252 253 public void waitForFinish() { 254 while (numThreadsWorking.get() != 0) { 255 Threads.sleepWithoutInterrupt(1000); 256 } 257 close(); 258 } 259 260 public boolean isDone() { 261 return (numThreadsWorking.get() == 0); 262 } 263 264 protected void startThreads(Collection<? extends Thread> threads) { 265 numThreadsWorking.addAndGet(threads.size()); 266 for (Thread thread : threads) { 267 thread.start(); 268 } 269 } 270 271 /** Returns the end key of the key range, exclusive */ 272 public long getEndKey() { 273 return endKey; 274 } 275 276 /** Returns a task-specific progress string */ 277 protected abstract String progressInfo(); 278 279 protected static void appendToStatus(StringBuilder sb, String desc, long v) { 280 if (v == 0) { 281 return; 282 } 283 sb.append(", "); 284 sb.append(desc); 285 sb.append("="); 286 sb.append(v); 287 } 288 289 protected static void appendToStatus(StringBuilder sb, String desc, String v) { 290 sb.append(", "); 291 sb.append(desc); 292 sb.append("="); 293 sb.append(v); 294 } 295 296 /** 297 * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. Does not verify 298 * cf/column integrity. 299 */ 300 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) { 301 return verifyResultAgainstDataGenerator(result, verifyValues, false); 302 } 303 304 /** 305 * Verifies the result from get or scan using the dataGenerator (that was presumably also used to 306 * generate said result). 307 * @param verifyValues verify that values in the result make sense for row/cf/column 308 * combination 309 * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note 310 * that to use this multiPut should be used, or verification has 311 * to happen after writes, otherwise there can be races. 312 * @return true if the values of row result makes sense for row/cf/column combination and true if 313 * the cf/column set in the result is complete, false otherwise. 314 */ 315 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues, 316 boolean verifyCfAndColumnIntegrity) { 317 String rowKeyStr = Bytes.toString(result.getRow()); 318 // See if we have any data at all. 319 if (result.isEmpty()) { 320 LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned"); 321 printLocations(result); 322 return false; 323 } 324 325 if (!verifyValues && !verifyCfAndColumnIntegrity) { 326 return true; // as long as we have something, we are good. 327 } 328 329 // See if we have all the CFs. 330 byte[][] expectedCfs = dataGenerator.getColumnFamilies(); 331 if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) { 332 LOG.error("Error checking data for key [" + rowKeyStr + "], bad family count: " 333 + result.getMap().size()); 334 printLocations(result); 335 return false; 336 } 337 338 // Verify each column family from get in the result. 339 for (byte[] cf : result.getMap().keySet()) { 340 String cfStr = Bytes.toString(cf); 341 Map<byte[], byte[]> columnValues = result.getFamilyMap(cf); 342 if (columnValues == null) { 343 LOG.error( 344 "Error checking data for key [" + rowKeyStr + "], no data for family [" + cfStr + "]]"); 345 printLocations(result); 346 return false; 347 } 348 349 Map<String, MutationType> mutateInfo = null; 350 if (verifyCfAndColumnIntegrity || verifyValues) { 351 if (!columnValues.containsKey(MUTATE_INFO)) { 352 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 353 + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found"); 354 printLocations(result); 355 return false; 356 } 357 358 long cfHash = Arrays.hashCode(cf); 359 // Verify deleted columns, and make up column counts if deleted 360 byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO); 361 mutateInfo = parseMutateInfo(mutateInfoValue); 362 for (Map.Entry<String, MutationType> mutate : mutateInfo.entrySet()) { 363 if (mutate.getValue() == MutationType.DELETE) { 364 byte[] column = Bytes.toBytes(mutate.getKey()); 365 long columnHash = Arrays.hashCode(column); 366 long hashCode = cfHash + columnHash; 367 if (hashCode % 2 == 0) { 368 if (columnValues.containsKey(column)) { 369 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 370 + "], column [" + mutate.getKey() + "]; should be deleted"); 371 printLocations(result); 372 return false; 373 } 374 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 375 columnValues.put(column, hashCodeBytes); 376 } 377 } 378 } 379 380 // Verify increment 381 if (!columnValues.containsKey(INCREMENT)) { 382 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 383 + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found"); 384 printLocations(result); 385 return false; 386 } 387 long currentValue = Bytes.toLong(columnValues.remove(INCREMENT)); 388 if (verifyValues) { 389 long amount = mutateInfo.isEmpty() ? 0 : cfHash; 390 long originalValue = Arrays.hashCode(result.getRow()); 391 long extra = currentValue - originalValue; 392 if (extra != 0 && (amount == 0 || extra % amount != 0)) { 393 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 394 + "], column [increment], extra [" + extra + "], amount [" + amount + "]"); 395 printLocations(result); 396 return false; 397 } 398 if (amount != 0 && extra != amount) { 399 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" + cfStr 400 + "], column [increment], incremented [" + (extra / amount) + "] times"); 401 } 402 } 403 404 // See if we have correct columns. 405 if ( 406 verifyCfAndColumnIntegrity 407 && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet()) 408 ) { 409 StringBuilder colsStr = new StringBuilder(); 410 for (byte[] col : columnValues.keySet()) { 411 if (colsStr.length() > 0) { 412 colsStr.append(", "); 413 } 414 colsStr.append("[").append(Bytes.toString(col)).append("]"); 415 } 416 LOG.error("Error checking data for key [{}], bad columns for family [{}]: {}", rowKeyStr, 417 cfStr, colsStr.toString()); 418 printLocations(result); 419 return false; 420 } 421 // See if values check out. 422 if (verifyValues) { 423 for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) { 424 String column = Bytes.toString(kv.getKey()); 425 MutationType mutation = mutateInfo.get(column); 426 boolean verificationNeeded = true; 427 byte[] bytes = kv.getValue(); 428 if (mutation != null) { 429 boolean mutationVerified = true; 430 long columnHash = Arrays.hashCode(kv.getKey()); 431 long hashCode = cfHash + columnHash; 432 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 433 if (mutation == MutationType.APPEND) { 434 int offset = bytes.length - hashCodeBytes.length; 435 mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes, 0, 436 hashCodeBytes.length, bytes, offset, hashCodeBytes.length); 437 if (mutationVerified) { 438 int n = 1; 439 while (true) { 440 int newOffset = offset - hashCodeBytes.length; 441 if ( 442 newOffset < 0 || !Bytes.equals(hashCodeBytes, 0, hashCodeBytes.length, bytes, 443 newOffset, hashCodeBytes.length) 444 ) { 445 break; 446 } 447 offset = newOffset; 448 n++; 449 } 450 if (n > 1) { 451 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" 452 + cfStr + "], column [" + column + "], appended [" + n + "] times"); 453 } 454 byte[] dest = new byte[offset]; 455 System.arraycopy(bytes, 0, dest, 0, offset); 456 bytes = dest; 457 } 458 } else if (hashCode % 2 == 0) { // checkAndPut 459 mutationVerified = Bytes.equals(bytes, hashCodeBytes); 460 verificationNeeded = false; 461 } 462 if (!mutationVerified) { 463 LOG.error("Error checking data for key [" + rowKeyStr 464 + "], mutation checking failed for column family [" + cfStr + "], column [" 465 + column + "]; mutation [" + mutation + "], hashCode [" + hashCode 466 + "], verificationNeeded [" + verificationNeeded + "]"); 467 printLocations(result); 468 return false; 469 } 470 } // end of mutation checking 471 if ( 472 verificationNeeded && !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes) 473 ) { 474 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 475 + "], column [" + column + "], mutation [" + mutation + "]; value of length " 476 + bytes.length); 477 printLocations(result); 478 return false; 479 } 480 } 481 } 482 } 483 } 484 return true; 485 } 486 487 private void printLocations(Result r) { 488 if (r == null) { 489 LOG.info("FAILED FOR null Result"); 490 return; 491 } 492 LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale()); 493 if (r.getRow() == null) { 494 return; 495 } 496 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 497 List<HRegionLocation> locs = locator.getRegionLocations(r.getRow()); 498 for (HRegionLocation h : locs) { 499 LOG.info("LOCATION " + h); 500 } 501 } catch (IOException e) { 502 LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow())); 503 } 504 } 505 506 private String resultToString(Result result) { 507 StringBuilder sb = new StringBuilder(); 508 sb.append("cells="); 509 if (result.isEmpty()) { 510 sb.append("NONE"); 511 return sb.toString(); 512 } 513 sb.append("{"); 514 boolean moreThanOne = false; 515 for (Cell cell : result.listCells()) { 516 if (moreThanOne) { 517 sb.append(", "); 518 } else { 519 moreThanOne = true; 520 } 521 sb.append(CellUtil.toString(cell, true)); 522 } 523 sb.append("}"); 524 return sb.toString(); 525 } 526 527 // Parse mutate info into a map of <column name> => <update action> 528 private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) { 529 Map<String, MutationType> mi = new HashMap<>(); 530 if (mutateInfo != null) { 531 String mutateInfoStr = Bytes.toString(mutateInfo); 532 String[] mutations = mutateInfoStr.split("#"); 533 for (String mutation : mutations) { 534 if (mutation.isEmpty()) continue; 535 Preconditions.checkArgument(mutation.contains(":"), "Invalid mutation info " + mutation); 536 int p = mutation.indexOf(":"); 537 String column = mutation.substring(0, p); 538 MutationType type = MutationType.valueOf(Integer.parseInt(mutation.substring(p + 1))); 539 mi.put(column, type); 540 } 541 } 542 return mi; 543 } 544}