001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicLong; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.RegionLocator; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 043import org.apache.hadoop.util.StringUtils; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 049 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 051 052/** 053 * Common base class for reader and writer parts of multi-thread HBase load test (See LoadTestTool). 054 */ 055@InterfaceAudience.Private 056public abstract class MultiThreadedAction { 057 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class); 058 059 protected final TableName tableName; 060 protected final Configuration conf; 061 protected final Connection connection; // all reader / writer threads will share this connection 062 063 protected int numThreads = 1; 064 065 /** The start key of the key range, inclusive */ 066 protected long startKey = 0; 067 068 /** The end key of the key range, exclusive */ 069 protected long endKey = 1; 070 071 protected AtomicInteger numThreadsWorking = new AtomicInteger(); 072 protected AtomicLong numKeys = new AtomicLong(); 073 protected AtomicLong numCols = new AtomicLong(); 074 protected AtomicLong totalOpTimeMs = new AtomicLong(); 075 protected boolean verbose = false; 076 077 protected LoadTestDataGenerator dataGenerator = null; 078 079 /** 080 * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed set of 081 * column families, and random number of columns in range. The table for it can be created 082 * manually or, for example, via 083 * {@link org.apache.hadoop.hbase.util.LoadTestUtil#createPreSplitLoadTestTable(Configuration, TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm, org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)} 084 */ 085 public static class DefaultDataGenerator extends LoadTestDataGenerator { 086 private byte[][] columnFamilies = null; 087 private int minColumnsPerKey; 088 private int maxColumnsPerKey; 089 090 public DefaultDataGenerator(int minValueSize, int maxValueSize, int minColumnsPerKey, 091 int maxColumnsPerKey, byte[]... columnFamilies) { 092 super(minValueSize, maxValueSize); 093 this.columnFamilies = columnFamilies; 094 this.minColumnsPerKey = minColumnsPerKey; 095 this.maxColumnsPerKey = maxColumnsPerKey; 096 } 097 098 public DefaultDataGenerator(byte[]... columnFamilies) { 099 // Default values for tests that didn't care to provide theirs. 100 this(256, 1024, 1, 10, columnFamilies); 101 } 102 103 @Override 104 public byte[] getDeterministicUniqueKey(long keyBase) { 105 return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase)); 106 } 107 108 @Override 109 public byte[][] getColumnFamilies() { 110 return columnFamilies; 111 } 112 113 @Override 114 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 115 int numColumns = minColumnsPerKey 116 + ThreadLocalRandom.current().nextInt(maxColumnsPerKey - minColumnsPerKey + 1); 117 byte[][] columns = new byte[numColumns][]; 118 for (int i = 0; i < numColumns; ++i) { 119 columns[i] = Bytes.toBytes(Integer.toString(i)); 120 } 121 return columns; 122 } 123 124 @Override 125 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 126 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 127 } 128 129 @Override 130 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 131 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 132 } 133 134 @Override 135 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 136 return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey); 137 } 138 } 139 140 /** "R" or "W" */ 141 private String actionLetter; 142 143 /** Whether we need to print out Hadoop Streaming-style counters */ 144 private boolean streamingCounters; 145 146 public static final int REPORTING_INTERVAL_MS = 5000; 147 148 public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, 149 String actionLetter) throws IOException { 150 this.conf = conf; 151 this.dataGenerator = dataGen; 152 this.tableName = tableName; 153 this.actionLetter = actionLetter; 154 this.connection = ConnectionFactory.createConnection(conf); 155 } 156 157 public void start(long startKey, long endKey, int numThreads) throws IOException { 158 this.startKey = startKey; 159 this.endKey = endKey; 160 this.numThreads = numThreads; 161 (new Thread(new ProgressReporter(actionLetter), 162 "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start(); 163 } 164 165 private static String formatTime(long elapsedTime) { 166 String format = String.format("%%0%dd", 2); 167 elapsedTime = elapsedTime / 1000; 168 String seconds = String.format(format, elapsedTime % 60); 169 String minutes = String.format(format, (elapsedTime % 3600) / 60); 170 String hours = String.format(format, elapsedTime / 3600); 171 String time = hours + ":" + minutes + ":" + seconds; 172 return time; 173 } 174 175 /** Asynchronously reports progress */ 176 private class ProgressReporter implements Runnable { 177 178 private String reporterId = ""; 179 180 public ProgressReporter(String id) { 181 this.reporterId = id; 182 } 183 184 @Override 185 public void run() { 186 long startTime = EnvironmentEdgeManager.currentTime(); 187 long priorNumKeys = 0; 188 long priorCumulativeOpTime = 0; 189 int priorAverageKeysPerSecond = 0; 190 191 // Give other threads time to start. 192 Threads.sleep(REPORTING_INTERVAL_MS); 193 194 while (numThreadsWorking.get() != 0) { 195 String threadsLeft = "[" + reporterId + ":" + numThreadsWorking.get() + "] "; 196 if (numKeys.get() == 0) { 197 LOG.info(threadsLeft + "Number of keys = 0"); 198 } else { 199 long numKeys = MultiThreadedAction.this.numKeys.get(); 200 long time = EnvironmentEdgeManager.currentTime() - startTime; 201 long totalOpTime = totalOpTimeMs.get(); 202 203 long numKeysDelta = numKeys - priorNumKeys; 204 long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; 205 206 double averageKeysPerSecond = (time > 0) ? (numKeys * 1000.0 / time) : 0; 207 208 LOG.info(threadsLeft + "Keys=" + numKeys + ", cols=" 209 + StringUtils.humanReadableInt(numCols.get()) + ", time=" + formatTime(time) 210 + ((numKeys > 0 && time > 0) 211 ? (" Overall: [" + "keys/s= " + (numKeys * 1000.0 / time) + ", latency=" 212 + String.format("%.2f", (double) totalOpTime / (double) numKeys) + " ms]") 213 : "") 214 + ((numKeysDelta > 0) 215 ? (" Current: [" + "keys/s=" + (numKeysDelta * 1000.0 / REPORTING_INTERVAL_MS) 216 + ", latency=" 217 + String.format("%.2f", (double) totalOpTimeDelta / (double) numKeysDelta) + " ms]") 218 : "") 219 + progressInfo()); 220 221 if (streamingCounters) { 222 printStreamingCounters(numKeysDelta, averageKeysPerSecond - priorAverageKeysPerSecond); 223 } 224 225 priorNumKeys = numKeys; 226 priorCumulativeOpTime = totalOpTime; 227 priorAverageKeysPerSecond = (int) averageKeysPerSecond; 228 } 229 230 Threads.sleep(REPORTING_INTERVAL_MS); 231 } 232 } 233 234 private void printStreamingCounters(long numKeysDelta, double avgKeysPerSecondDelta) { 235 // Write stats in a format that can be interpreted as counters by 236 // streaming map-reduce jobs. 237 System.err.println("reporter:counter:numKeys," + reporterId + "," + numKeysDelta); 238 System.err.println("reporter:counter:numCols," + reporterId + "," + numCols.get()); 239 System.err.println( 240 "reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta)); 241 } 242 } 243 244 public void close() { 245 if (connection != null) { 246 try { 247 connection.close(); 248 } catch (Exception ex) { 249 LOG.warn("Could not close the connection: " + ex); 250 } 251 } 252 } 253 254 public void waitForFinish() { 255 while (numThreadsWorking.get() != 0) { 256 Threads.sleepWithoutInterrupt(1000); 257 } 258 close(); 259 } 260 261 public boolean isDone() { 262 return (numThreadsWorking.get() == 0); 263 } 264 265 protected void startThreads(Collection<? extends Thread> threads) { 266 numThreadsWorking.addAndGet(threads.size()); 267 for (Thread thread : threads) { 268 thread.start(); 269 } 270 } 271 272 /** Returns the end key of the key range, exclusive */ 273 public long getEndKey() { 274 return endKey; 275 } 276 277 /** Returns a task-specific progress string */ 278 protected abstract String progressInfo(); 279 280 protected static void appendToStatus(StringBuilder sb, String desc, long v) { 281 if (v == 0) { 282 return; 283 } 284 sb.append(", "); 285 sb.append(desc); 286 sb.append("="); 287 sb.append(v); 288 } 289 290 protected static void appendToStatus(StringBuilder sb, String desc, String v) { 291 sb.append(", "); 292 sb.append(desc); 293 sb.append("="); 294 sb.append(v); 295 } 296 297 /** 298 * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. Does not verify 299 * cf/column integrity. 300 */ 301 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) { 302 return verifyResultAgainstDataGenerator(result, verifyValues, false); 303 } 304 305 /** 306 * Verifies the result from get or scan using the dataGenerator (that was presumably also used to 307 * generate said result). 308 * @param verifyValues verify that values in the result make sense for row/cf/column 309 * combination 310 * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note 311 * that to use this multiPut should be used, or verification has 312 * to happen after writes, otherwise there can be races. 313 * @return true if the values of row result makes sense for row/cf/column combination and true if 314 * the cf/column set in the result is complete, false otherwise. 315 */ 316 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues, 317 boolean verifyCfAndColumnIntegrity) { 318 String rowKeyStr = Bytes.toString(result.getRow()); 319 // See if we have any data at all. 320 if (result.isEmpty()) { 321 LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned"); 322 printLocations(result); 323 return false; 324 } 325 326 if (!verifyValues && !verifyCfAndColumnIntegrity) { 327 return true; // as long as we have something, we are good. 328 } 329 330 // See if we have all the CFs. 331 byte[][] expectedCfs = dataGenerator.getColumnFamilies(); 332 if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) { 333 LOG.error("Error checking data for key [" + rowKeyStr + "], bad family count: " 334 + result.getMap().size()); 335 printLocations(result); 336 return false; 337 } 338 339 // Verify each column family from get in the result. 340 for (byte[] cf : result.getMap().keySet()) { 341 String cfStr = Bytes.toString(cf); 342 Map<byte[], byte[]> columnValues = result.getFamilyMap(cf); 343 if (columnValues == null) { 344 LOG.error( 345 "Error checking data for key [" + rowKeyStr + "], no data for family [" + cfStr + "]]"); 346 printLocations(result); 347 return false; 348 } 349 350 Map<String, MutationType> mutateInfo = null; 351 if (verifyCfAndColumnIntegrity || verifyValues) { 352 if (!columnValues.containsKey(MUTATE_INFO)) { 353 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 354 + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found"); 355 printLocations(result); 356 return false; 357 } 358 359 long cfHash = Arrays.hashCode(cf); 360 // Verify deleted columns, and make up column counts if deleted 361 byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO); 362 mutateInfo = parseMutateInfo(mutateInfoValue); 363 for (Map.Entry<String, MutationType> mutate : mutateInfo.entrySet()) { 364 if (mutate.getValue() == MutationType.DELETE) { 365 byte[] column = Bytes.toBytes(mutate.getKey()); 366 long columnHash = Arrays.hashCode(column); 367 long hashCode = cfHash + columnHash; 368 if (hashCode % 2 == 0) { 369 if (columnValues.containsKey(column)) { 370 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 371 + "], column [" + mutate.getKey() + "]; should be deleted"); 372 printLocations(result); 373 return false; 374 } 375 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 376 columnValues.put(column, hashCodeBytes); 377 } 378 } 379 } 380 381 // Verify increment 382 if (!columnValues.containsKey(INCREMENT)) { 383 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 384 + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found"); 385 printLocations(result); 386 return false; 387 } 388 long currentValue = Bytes.toLong(columnValues.remove(INCREMENT)); 389 if (verifyValues) { 390 long amount = mutateInfo.isEmpty() ? 0 : cfHash; 391 long originalValue = Arrays.hashCode(result.getRow()); 392 long extra = currentValue - originalValue; 393 if (extra != 0 && (amount == 0 || extra % amount != 0)) { 394 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 395 + "], column [increment], extra [" + extra + "], amount [" + amount + "]"); 396 printLocations(result); 397 return false; 398 } 399 if (amount != 0 && extra != amount) { 400 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" + cfStr 401 + "], column [increment], incremented [" + (extra / amount) + "] times"); 402 } 403 } 404 405 // See if we have correct columns. 406 if ( 407 verifyCfAndColumnIntegrity 408 && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet()) 409 ) { 410 StringBuilder colsStr = new StringBuilder(); 411 for (byte[] col : columnValues.keySet()) { 412 if (colsStr.length() > 0) { 413 colsStr.append(", "); 414 } 415 colsStr.append("[").append(Bytes.toString(col)).append("]"); 416 } 417 LOG.error("Error checking data for key [{}], bad columns for family [{}]: {}", rowKeyStr, 418 cfStr, colsStr.toString()); 419 printLocations(result); 420 return false; 421 } 422 // See if values check out. 423 if (verifyValues) { 424 for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) { 425 String column = Bytes.toString(kv.getKey()); 426 MutationType mutation = mutateInfo.get(column); 427 boolean verificationNeeded = true; 428 byte[] bytes = kv.getValue(); 429 if (mutation != null) { 430 boolean mutationVerified = true; 431 long columnHash = Arrays.hashCode(kv.getKey()); 432 long hashCode = cfHash + columnHash; 433 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 434 if (mutation == MutationType.APPEND) { 435 int offset = bytes.length - hashCodeBytes.length; 436 mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes, 0, 437 hashCodeBytes.length, bytes, offset, hashCodeBytes.length); 438 if (mutationVerified) { 439 int n = 1; 440 while (true) { 441 int newOffset = offset - hashCodeBytes.length; 442 if ( 443 newOffset < 0 || !Bytes.equals(hashCodeBytes, 0, hashCodeBytes.length, bytes, 444 newOffset, hashCodeBytes.length) 445 ) { 446 break; 447 } 448 offset = newOffset; 449 n++; 450 } 451 if (n > 1) { 452 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" 453 + cfStr + "], column [" + column + "], appended [" + n + "] times"); 454 } 455 byte[] dest = new byte[offset]; 456 System.arraycopy(bytes, 0, dest, 0, offset); 457 bytes = dest; 458 } 459 } else if (hashCode % 2 == 0) { // checkAndPut 460 mutationVerified = Bytes.equals(bytes, hashCodeBytes); 461 verificationNeeded = false; 462 } 463 if (!mutationVerified) { 464 LOG.error("Error checking data for key [" + rowKeyStr 465 + "], mutation checking failed for column family [" + cfStr + "], column [" 466 + column + "]; mutation [" + mutation + "], hashCode [" + hashCode 467 + "], verificationNeeded [" + verificationNeeded + "]"); 468 printLocations(result); 469 return false; 470 } 471 } // end of mutation checking 472 if ( 473 verificationNeeded && !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes) 474 ) { 475 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr 476 + "], column [" + column + "], mutation [" + mutation + "]; value of length " 477 + bytes.length); 478 printLocations(result); 479 return false; 480 } 481 } 482 } 483 } 484 } 485 return true; 486 } 487 488 private void printLocations(Result r) { 489 if (r == null) { 490 LOG.info("FAILED FOR null Result"); 491 return; 492 } 493 LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale()); 494 if (r.getRow() == null) { 495 return; 496 } 497 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 498 List<HRegionLocation> locs = locator.getRegionLocations(r.getRow()); 499 for (HRegionLocation h : locs) { 500 LOG.info("LOCATION " + h); 501 } 502 } catch (IOException e) { 503 LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow())); 504 } 505 } 506 507 private String resultToString(Result result) { 508 StringBuilder sb = new StringBuilder(); 509 sb.append("cells="); 510 if (result.isEmpty()) { 511 sb.append("NONE"); 512 return sb.toString(); 513 } 514 sb.append("{"); 515 boolean moreThanOne = false; 516 for (Cell cell : result.listCells()) { 517 if (moreThanOne) { 518 sb.append(", "); 519 } else { 520 moreThanOne = true; 521 } 522 sb.append(CellUtil.toString(cell, true)); 523 } 524 sb.append("}"); 525 return sb.toString(); 526 } 527 528 // Parse mutate info into a map of <column name> => <update action> 529 private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) { 530 Map<String, MutationType> mi = new HashMap<>(); 531 if (mutateInfo != null) { 532 String mutateInfoStr = Bytes.toString(mutateInfo); 533 String[] mutations = mutateInfoStr.split("#"); 534 for (String mutation : mutations) { 535 if (mutation.isEmpty()) continue; 536 Preconditions.checkArgument(mutation.contains(":"), "Invalid mutation info " + mutation); 537 int p = mutation.indexOf(":"); 538 String column = mutation.substring(0, p); 539 MutationType type = MutationType.valueOf(Integer.parseInt(mutation.substring(p + 1))); 540 mi.put(column, type); 541 } 542 } 543 return mi; 544 } 545}