001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.util; 018 019import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 021 022import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.Random; 030import java.util.Set; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicLong; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.ClusterConnection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 044import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 045import org.apache.hadoop.util.StringUtils; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Common base class for reader and writer parts of multi-thread HBase load 051 * test (See LoadTestTool). 052 */ 053public abstract class MultiThreadedAction { 054 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class); 055 056 protected final TableName tableName; 057 protected final Configuration conf; 058 protected final ClusterConnection connection; // all reader / writer threads will share this connection 059 060 protected int numThreads = 1; 061 062 /** The start key of the key range, inclusive */ 063 protected long startKey = 0; 064 065 /** The end key of the key range, exclusive */ 066 protected long endKey = 1; 067 068 protected AtomicInteger numThreadsWorking = new AtomicInteger(); 069 protected AtomicLong numKeys = new AtomicLong(); 070 protected AtomicLong numCols = new AtomicLong(); 071 protected AtomicLong totalOpTimeMs = new AtomicLong(); 072 protected boolean verbose = false; 073 074 protected LoadTestDataGenerator dataGenerator = null; 075 076 /** 077 * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed 078 * set of column families, and random number of columns in range. The table for it can 079 * be created manually or, for example, via 080 * {@link org.apache.hadoop.hbase.HBaseTestingUtility#createPreSplitLoadTestTable(Configuration, TableName, byte[], 081 * org.apache.hadoop.hbase.io.compress.Compression.Algorithm, org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)} 082 */ 083 public static class DefaultDataGenerator extends LoadTestDataGenerator { 084 private byte[][] columnFamilies = null; 085 private int minColumnsPerKey; 086 private int maxColumnsPerKey; 087 private final Random random = new Random(); 088 089 public DefaultDataGenerator(int minValueSize, int maxValueSize, 090 int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) { 091 super(minValueSize, maxValueSize); 092 this.columnFamilies = columnFamilies; 093 this.minColumnsPerKey = minColumnsPerKey; 094 this.maxColumnsPerKey = maxColumnsPerKey; 095 } 096 097 public DefaultDataGenerator(byte[]... columnFamilies) { 098 // Default values for tests that didn't care to provide theirs. 099 this(256, 1024, 1, 10, columnFamilies); 100 } 101 102 @Override 103 public byte[] getDeterministicUniqueKey(long keyBase) { 104 return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase)); 105 } 106 107 @Override 108 public byte[][] getColumnFamilies() { 109 return columnFamilies; 110 } 111 112 @Override 113 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 114 int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1); 115 byte[][] columns = new byte[numColumns][]; 116 for (int i = 0; i < numColumns; ++i) { 117 columns[i] = Bytes.toBytes(Integer.toString(i)); 118 } 119 return columns; 120 } 121 122 @Override 123 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 124 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 125 } 126 127 @Override 128 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 129 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 130 } 131 132 @Override 133 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 134 return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey); 135 } 136 } 137 138 /** "R" or "W" */ 139 private String actionLetter; 140 141 /** Whether we need to print out Hadoop Streaming-style counters */ 142 private boolean streamingCounters; 143 144 public static final int REPORTING_INTERVAL_MS = 5000; 145 146 public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, 147 TableName tableName, 148 String actionLetter) throws IOException { 149 this.conf = conf; 150 this.dataGenerator = dataGen; 151 this.tableName = tableName; 152 this.actionLetter = actionLetter; 153 this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf); 154 } 155 156 public void start(long startKey, long endKey, int numThreads) throws IOException { 157 this.startKey = startKey; 158 this.endKey = endKey; 159 this.numThreads = numThreads; 160 (new Thread(new ProgressReporter(actionLetter), 161 "MultiThreadedAction-ProgressReporter-" + System.currentTimeMillis())).start(); 162 } 163 164 private static String formatTime(long elapsedTime) { 165 String format = String.format("%%0%dd", 2); 166 elapsedTime = elapsedTime / 1000; 167 String seconds = String.format(format, elapsedTime % 60); 168 String minutes = String.format(format, (elapsedTime % 3600) / 60); 169 String hours = String.format(format, elapsedTime / 3600); 170 String time = hours + ":" + minutes + ":" + seconds; 171 return time; 172 } 173 174 /** Asynchronously reports progress */ 175 private class ProgressReporter implements Runnable { 176 177 private String reporterId = ""; 178 179 public ProgressReporter(String id) { 180 this.reporterId = id; 181 } 182 183 @Override 184 public void run() { 185 long startTime = System.currentTimeMillis(); 186 long priorNumKeys = 0; 187 long priorCumulativeOpTime = 0; 188 int priorAverageKeysPerSecond = 0; 189 190 // Give other threads time to start. 191 Threads.sleep(REPORTING_INTERVAL_MS); 192 193 while (numThreadsWorking.get() != 0) { 194 String threadsLeft = 195 "[" + reporterId + ":" + numThreadsWorking.get() + "] "; 196 if (numKeys.get() == 0) { 197 LOG.info(threadsLeft + "Number of keys = 0"); 198 } else { 199 long numKeys = MultiThreadedAction.this.numKeys.get(); 200 long time = System.currentTimeMillis() - startTime; 201 long totalOpTime = totalOpTimeMs.get(); 202 203 long numKeysDelta = numKeys - priorNumKeys; 204 long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; 205 206 double averageKeysPerSecond = 207 (time > 0) ? (numKeys * 1000 / time) : 0; 208 209 LOG.info(threadsLeft 210 + "Keys=" 211 + numKeys 212 + ", cols=" 213 + StringUtils.humanReadableInt(numCols.get()) 214 + ", time=" 215 + formatTime(time) 216 + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " 217 + numKeys * 1000 / time + ", latency=" 218 + String.format("%.2f", (double)totalOpTime / (double)numKeys) 219 + " ms]") : "") 220 + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" 221 + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency=" 222 + String.format("%.2f", (double)totalOpTimeDelta / (double)numKeysDelta) 223 + " ms]") : "") 224 + progressInfo()); 225 226 if (streamingCounters) { 227 printStreamingCounters(numKeysDelta, 228 averageKeysPerSecond - priorAverageKeysPerSecond); 229 } 230 231 priorNumKeys = numKeys; 232 priorCumulativeOpTime = totalOpTime; 233 priorAverageKeysPerSecond = (int) averageKeysPerSecond; 234 } 235 236 Threads.sleep(REPORTING_INTERVAL_MS); 237 } 238 } 239 240 private void printStreamingCounters(long numKeysDelta, 241 double avgKeysPerSecondDelta) { 242 // Write stats in a format that can be interpreted as counters by 243 // streaming map-reduce jobs. 244 System.err.println("reporter:counter:numKeys," + reporterId + "," 245 + numKeysDelta); 246 System.err.println("reporter:counter:numCols," + reporterId + "," 247 + numCols.get()); 248 System.err.println("reporter:counter:avgKeysPerSecond," + reporterId 249 + "," + (long) (avgKeysPerSecondDelta)); 250 } 251 } 252 253 public void close() { 254 if (connection != null) { 255 try { 256 connection.close(); 257 } catch (Exception ex) { 258 LOG.warn("Could not close the connection: " + ex); 259 } 260 } 261 } 262 263 public void waitForFinish() { 264 while (numThreadsWorking.get() != 0) { 265 Threads.sleepWithoutInterrupt(1000); 266 } 267 close(); 268 } 269 270 public boolean isDone() { 271 return (numThreadsWorking.get() == 0); 272 } 273 274 protected void startThreads(Collection<? extends Thread> threads) { 275 numThreadsWorking.addAndGet(threads.size()); 276 for (Thread thread : threads) { 277 thread.start(); 278 } 279 } 280 281 /** @return the end key of the key range, exclusive */ 282 public long getEndKey() { 283 return endKey; 284 } 285 286 /** Returns a task-specific progress string */ 287 protected abstract String progressInfo(); 288 289 protected static void appendToStatus(StringBuilder sb, String desc, 290 long v) { 291 if (v == 0) { 292 return; 293 } 294 sb.append(", "); 295 sb.append(desc); 296 sb.append("="); 297 sb.append(v); 298 } 299 300 protected static void appendToStatus(StringBuilder sb, String desc, 301 String v) { 302 sb.append(", "); 303 sb.append(desc); 304 sb.append("="); 305 sb.append(v); 306 } 307 308 /** 309 * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. 310 * Does not verify cf/column integrity. 311 */ 312 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) { 313 return verifyResultAgainstDataGenerator(result, verifyValues, false); 314 } 315 316 /** 317 * Verifies the result from get or scan using the dataGenerator (that was presumably 318 * also used to generate said result). 319 * @param verifyValues verify that values in the result make sense for row/cf/column combination 320 * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note 321 * that to use this multiPut should be used, or verification 322 * has to happen after writes, otherwise there can be races. 323 * @return true if the values of row result makes sense for row/cf/column combination and true if 324 * the cf/column set in the result is complete, false otherwise. 325 */ 326 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues, 327 boolean verifyCfAndColumnIntegrity) { 328 String rowKeyStr = Bytes.toString(result.getRow()); 329 // See if we have any data at all. 330 if (result.isEmpty()) { 331 LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned"); 332 printLocations(result); 333 return false; 334 } 335 336 if (!verifyValues && !verifyCfAndColumnIntegrity) { 337 return true; // as long as we have something, we are good. 338 } 339 340 // See if we have all the CFs. 341 byte[][] expectedCfs = dataGenerator.getColumnFamilies(); 342 if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) { 343 LOG.error("Error checking data for key [" + rowKeyStr 344 + "], bad family count: " + result.getMap().size()); 345 printLocations(result); 346 return false; 347 } 348 349 // Verify each column family from get in the result. 350 for (byte[] cf : result.getMap().keySet()) { 351 String cfStr = Bytes.toString(cf); 352 Map<byte[], byte[]> columnValues = result.getFamilyMap(cf); 353 if (columnValues == null) { 354 LOG.error("Error checking data for key [" + rowKeyStr 355 + "], no data for family [" + cfStr + "]]"); 356 printLocations(result); 357 return false; 358 } 359 360 Map<String, MutationType> mutateInfo = null; 361 if (verifyCfAndColumnIntegrity || verifyValues) { 362 if (!columnValues.containsKey(MUTATE_INFO)) { 363 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" 364 + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found"); 365 printLocations(result); 366 return false; 367 } 368 369 long cfHash = Arrays.hashCode(cf); 370 // Verify deleted columns, and make up column counts if deleted 371 byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO); 372 mutateInfo = parseMutateInfo(mutateInfoValue); 373 for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) { 374 if (mutate.getValue() == MutationType.DELETE) { 375 byte[] column = Bytes.toBytes(mutate.getKey()); 376 long columnHash = Arrays.hashCode(column); 377 long hashCode = cfHash + columnHash; 378 if (hashCode % 2 == 0) { 379 if (columnValues.containsKey(column)) { 380 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" 381 + cfStr + "], column [" + mutate.getKey() + "]; should be deleted"); 382 printLocations(result); 383 return false; 384 } 385 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 386 columnValues.put(column, hashCodeBytes); 387 } 388 } 389 } 390 391 // Verify increment 392 if (!columnValues.containsKey(INCREMENT)) { 393 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" 394 + cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found"); 395 printLocations(result); 396 return false; 397 } 398 long currentValue = Bytes.toLong(columnValues.remove(INCREMENT)); 399 if (verifyValues) { 400 long amount = mutateInfo.isEmpty() ? 0 : cfHash; 401 long originalValue = Arrays.hashCode(result.getRow()); 402 long extra = currentValue - originalValue; 403 if (extra != 0 && (amount == 0 || extra % amount != 0)) { 404 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" 405 + cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]"); 406 printLocations(result); 407 return false; 408 } 409 if (amount != 0 && extra != amount) { 410 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" 411 + cfStr + "], column [increment], incremented [" + (extra / amount) + "] times"); 412 } 413 } 414 415 // See if we have correct columns. 416 if (verifyCfAndColumnIntegrity 417 && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) { 418 String colsStr = ""; 419 for (byte[] col : columnValues.keySet()) { 420 if (colsStr.length() > 0) { 421 colsStr += ", "; 422 } 423 colsStr += "[" + Bytes.toString(col) + "]"; 424 } 425 LOG.error("Error checking data for key [" + rowKeyStr 426 + "], bad columns for family [" + cfStr + "]: " + colsStr); 427 printLocations(result); 428 return false; 429 } 430 // See if values check out. 431 if (verifyValues) { 432 for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) { 433 String column = Bytes.toString(kv.getKey()); 434 MutationType mutation = mutateInfo.get(column); 435 boolean verificationNeeded = true; 436 byte[] bytes = kv.getValue(); 437 if (mutation != null) { 438 boolean mutationVerified = true; 439 long columnHash = Arrays.hashCode(kv.getKey()); 440 long hashCode = cfHash + columnHash; 441 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 442 if (mutation == MutationType.APPEND) { 443 int offset = bytes.length - hashCodeBytes.length; 444 mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes, 445 0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length); 446 if (mutationVerified) { 447 int n = 1; 448 while (true) { 449 int newOffset = offset - hashCodeBytes.length; 450 if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0, 451 hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) { 452 break; 453 } 454 offset = newOffset; 455 n++; 456 } 457 if (n > 1) { 458 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" 459 + cfStr + "], column [" + column + "], appended [" + n + "] times"); 460 } 461 byte[] dest = new byte[offset]; 462 System.arraycopy(bytes, 0, dest, 0, offset); 463 bytes = dest; 464 } 465 } else if (hashCode % 2 == 0) { // checkAndPut 466 mutationVerified = Bytes.equals(bytes, hashCodeBytes); 467 verificationNeeded = false; 468 } 469 if (!mutationVerified) { 470 LOG.error("Error checking data for key [" + rowKeyStr 471 + "], mutation checking failed for column family [" + cfStr + "], column [" 472 + column + "]; mutation [" + mutation + "], hashCode [" 473 + hashCode + "], verificationNeeded [" 474 + verificationNeeded + "]"); 475 printLocations(result); 476 return false; 477 } 478 } // end of mutation checking 479 if (verificationNeeded && 480 !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) { 481 LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" 482 + cfStr + "], column [" + column + "], mutation [" + mutation 483 + "]; value of length " + bytes.length); 484 printLocations(result); 485 return false; 486 } 487 } 488 } 489 } 490 } 491 return true; 492 } 493 494 private void printLocations(Result r) { 495 RegionLocations rl = null; 496 if (r == null) { 497 LOG.info("FAILED FOR null Result"); 498 return; 499 } 500 LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale()); 501 if (r.getRow() == null) { 502 return; 503 } 504 try { 505 rl = ((ClusterConnection)connection).locateRegion(tableName, r.getRow(), true, true); 506 } catch (IOException e) { 507 LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow())); 508 } 509 HRegionLocation locations[] = rl.getRegionLocations(); 510 for (HRegionLocation h : locations) { 511 LOG.info("LOCATION " + h); 512 } 513 } 514 515 private String resultToString(Result result) { 516 StringBuilder sb = new StringBuilder(); 517 sb.append("cells="); 518 if(result.isEmpty()) { 519 sb.append("NONE"); 520 return sb.toString(); 521 } 522 sb.append("{"); 523 boolean moreThanOne = false; 524 for(Cell cell : result.listCells()) { 525 if(moreThanOne) { 526 sb.append(", "); 527 } else { 528 moreThanOne = true; 529 } 530 sb.append(CellUtil.toString(cell, true)); 531 } 532 sb.append("}"); 533 return sb.toString(); 534 } 535 536 // Parse mutate info into a map of <column name> => <update action> 537 private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) { 538 Map<String, MutationType> mi = new HashMap<>(); 539 if (mutateInfo != null) { 540 String mutateInfoStr = Bytes.toString(mutateInfo); 541 String[] mutations = mutateInfoStr.split("#"); 542 for (String mutation: mutations) { 543 if (mutation.isEmpty()) continue; 544 Preconditions.checkArgument(mutation.contains(":"), 545 "Invalid mutation info " + mutation); 546 int p = mutation.indexOf(":"); 547 String column = mutation.substring(0, p); 548 MutationType type = MutationType.valueOf( 549 Integer.parseInt(mutation.substring(p+1))); 550 mi.put(column, type); 551 } 552 } 553 return mi; 554 } 555}