001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.util; 018 019import java.io.IOException; 020import java.util.Arrays; 021import java.util.HashSet; 022import java.util.Set; 023import java.util.concurrent.atomic.AtomicLong; 024 025import org.apache.commons.lang3.RandomUtils; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HRegionLocation; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Get; 030 031import org.apache.hadoop.hbase.client.Consistency; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** Creates multiple threads that read and verify previously written data */ 039public class MultiThreadedReader extends MultiThreadedAction 040{ 041 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class); 042 043 protected Set<HBaseReaderThread> readers = new HashSet<>(); 044 private final double verifyPercent; 045 protected volatile boolean aborted; 046 047 protected MultiThreadedWriterBase writer = null; 048 049 /** 050 * The number of keys verified in a sequence. This will never be larger than 051 * the total number of keys in the range. The reader might also verify 052 * random keys when it catches up with the writer. 053 */ 054 private final AtomicLong numUniqueKeysVerified = new AtomicLong(); 055 056 /** 057 * Default maximum number of read errors to tolerate before shutting down all 058 * readers. 059 */ 060 public static final int DEFAULT_MAX_ERRORS = 10; 061 062 /** 063 * Default "window" size between the last key written by the writer and the 064 * key that we attempt to read. The lower this number, the stricter our 065 * testing is. If this is zero, we always attempt to read the highest key 066 * in the contiguous sequence of keys written by the writers. 067 */ 068 public static final int DEFAULT_KEY_WINDOW = 0; 069 070 /** 071 * Default batch size for multigets 072 */ 073 public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET) 074 075 protected AtomicLong numKeysVerified = new AtomicLong(0); 076 protected AtomicLong numReadErrors = new AtomicLong(0); 077 protected AtomicLong numReadFailures = new AtomicLong(0); 078 protected AtomicLong nullResult = new AtomicLong(0); 079 private int maxErrors = DEFAULT_MAX_ERRORS; 080 private int keyWindow = DEFAULT_KEY_WINDOW; 081 private int batchSize = DEFAULT_BATCH_SIZE; 082 private int regionReplicaId = -1; // particular region replica id to do reads against if set 083 084 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, 085 TableName tableName, double verifyPercent) throws IOException { 086 super(dataGen, conf, tableName, "R"); 087 this.verifyPercent = verifyPercent; 088 } 089 090 public void linkToWriter(MultiThreadedWriterBase writer) { 091 this.writer = writer; 092 writer.setTrackWroteKeys(true); 093 } 094 095 public void setMaxErrors(int maxErrors) { 096 this.maxErrors = maxErrors; 097 } 098 099 public void setKeyWindow(int keyWindow) { 100 this.keyWindow = keyWindow; 101 } 102 103 public void setMultiGetBatchSize(int batchSize) { 104 this.batchSize = batchSize; 105 } 106 107 public void setRegionReplicaId(int regionReplicaId) { 108 this.regionReplicaId = regionReplicaId; 109 } 110 111 @Override 112 public void start(long startKey, long endKey, int numThreads) throws IOException { 113 super.start(startKey, endKey, numThreads); 114 if (verbose) { 115 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); 116 } 117 118 addReaderThreads(numThreads); 119 startThreads(readers); 120 } 121 122 protected void addReaderThreads(int numThreads) throws IOException { 123 for (int i = 0; i < numThreads; ++i) { 124 HBaseReaderThread reader = createReaderThread(i); 125 readers.add(reader); 126 } 127 } 128 129 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 130 HBaseReaderThread reader = new HBaseReaderThread(readerId); 131 Threads.setLoggingUncaughtExceptionHandler(reader); 132 return reader; 133 } 134 135 public class HBaseReaderThread extends Thread { 136 protected final int readerId; 137 protected final Table table; 138 139 /** The "current" key being read. Increases from startKey to endKey. */ 140 private long curKey; 141 142 /** Time when the thread started */ 143 protected long startTimeMs; 144 145 /** If we are ahead of the writer and reading a random key. */ 146 private boolean readingRandomKey; 147 148 private boolean printExceptionTrace = true; 149 150 /** 151 * @param readerId only the keys with this remainder from division by 152 * {@link #numThreads} will be read by this thread 153 */ 154 public HBaseReaderThread(int readerId) throws IOException { 155 this.readerId = readerId; 156 table = createTable(); 157 setName(getClass().getSimpleName() + "_" + readerId); 158 } 159 160 protected Table createTable() throws IOException { 161 return connection.getTable(tableName); 162 } 163 164 @Override 165 public void run() { 166 try { 167 runReader(); 168 } finally { 169 closeTable(); 170 numThreadsWorking.decrementAndGet(); 171 } 172 } 173 174 protected void closeTable() { 175 try { 176 if (table != null) { 177 table.close(); 178 } 179 } catch (IOException e) { 180 LOG.error("Error closing table", e); 181 } 182 } 183 184 private void runReader() { 185 if (verbose) { 186 LOG.info("Started thread #" + readerId + " for reads..."); 187 } 188 189 startTimeMs = System.currentTimeMillis(); 190 curKey = startKey; 191 long [] keysForThisReader = new long[batchSize]; 192 while (curKey < endKey && !aborted) { 193 int readingRandomKeyStartIndex = -1; 194 int numKeys = 0; 195 // if multiGet, loop until we have the number of keys equal to the batch size 196 do { 197 long k = getNextKeyToRead(); 198 if (k < startKey || k >= endKey) { 199 numReadErrors.incrementAndGet(); 200 throw new AssertionError("Load tester logic error: proposed key " + 201 "to read " + k + " is out of range (startKey=" + startKey + 202 ", endKey=" + endKey + ")"); 203 } 204 if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) { 205 // Skip keys that this thread should not read, as well as the keys 206 // that we know the writer failed to write. 207 continue; 208 } 209 keysForThisReader[numKeys] = k; 210 if (readingRandomKey && readingRandomKeyStartIndex == -1) { 211 //store the first index of a random read 212 readingRandomKeyStartIndex = numKeys; 213 } 214 numKeys++; 215 } while (numKeys < batchSize && curKey < endKey && !aborted); 216 217 if (numKeys > 0) { //meaning there is some key to read 218 readKey(keysForThisReader); 219 // We have verified some unique key(s). 220 numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ? 221 numKeys : readingRandomKeyStartIndex); 222 } 223 } 224 } 225 226 /** 227 * Should only be used for the concurrent writer/reader workload. The 228 * maximum key we are allowed to read, subject to the "key window" 229 * constraint. 230 */ 231 private long maxKeyWeCanRead() { 232 long insertedUpToKey = writer.wroteUpToKey(); 233 if (insertedUpToKey >= endKey - 1) { 234 // The writer has finished writing our range, so we can read any 235 // key in the range. 236 return endKey - 1; 237 } 238 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow); 239 } 240 241 protected long getNextKeyToRead() { 242 readingRandomKey = false; 243 if (writer == null || curKey <= maxKeyWeCanRead()) { 244 return curKey++; 245 } 246 247 // We caught up with the writer. See if we can read any keys at all. 248 long maxKeyToRead; 249 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) { 250 // The writer has not written sufficient keys for us to be able to read 251 // anything at all. Sleep a bit. This should only happen in the 252 // beginning of a load test run. 253 Threads.sleepWithoutInterrupt(50); 254 } 255 256 if (curKey <= maxKeyToRead) { 257 // The writer wrote some keys, and we are now allowed to read our 258 // current key. 259 return curKey++; 260 } 261 262 // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys. 263 // Don't increment the current key -- we still have to try reading it 264 // later. Set a flag to make sure that we don't count this key towards 265 // the set of unique keys we have verified. 266 readingRandomKey = true; 267 return startKey + Math.abs(RandomUtils.nextLong()) 268 % (maxKeyToRead - startKey + 1); 269 } 270 271 private Get[] readKey(long[] keysToRead) { 272 Get [] gets = new Get[keysToRead.length]; 273 int i = 0; 274 for (long keyToRead : keysToRead) { 275 try { 276 gets[i] = createGet(keyToRead); 277 if (keysToRead.length == 1) { 278 queryKey(gets[i], RandomUtils.nextInt(0, 100) < verifyPercent, keyToRead); 279 } 280 i++; 281 } catch (IOException e) { 282 numReadFailures.addAndGet(1); 283 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") 284 + ", time from start: " 285 + (System.currentTimeMillis() - startTimeMs) + " ms"); 286 if (printExceptionTrace) { 287 LOG.warn(e.toString(), e); 288 printExceptionTrace = false; 289 } 290 } 291 } 292 if (keysToRead.length > 1) { 293 try { 294 queryKey(gets, RandomUtils.nextInt(0, 100) < verifyPercent, keysToRead); 295 } catch (IOException e) { 296 numReadFailures.addAndGet(gets.length); 297 for (long keyToRead : keysToRead) { 298 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") 299 + ", time from start: " 300 + (System.currentTimeMillis() - startTimeMs) + " ms"); 301 } 302 if (printExceptionTrace) { 303 LOG.warn(e.toString(), e); 304 printExceptionTrace = false; 305 } 306 } 307 } 308 return gets; 309 } 310 311 protected Get createGet(long keyToRead) throws IOException { 312 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead)); 313 String cfsString = ""; 314 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 315 for (byte[] cf : columnFamilies) { 316 get.addFamily(cf); 317 if (verbose) { 318 if (cfsString.length() > 0) { 319 cfsString += ", "; 320 } 321 cfsString += "[" + Bytes.toStringBinary(cf) + "]"; 322 } 323 } 324 get = dataGenerator.beforeGet(keyToRead, get); 325 if (regionReplicaId > 0) { 326 get.setReplicaId(regionReplicaId); 327 get.setConsistency(Consistency.TIMELINE); 328 } 329 if (verbose) { 330 LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); 331 } 332 return get; 333 } 334 335 public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException { 336 // read the data 337 long start = System.nanoTime(); 338 // Uses multi/batch gets 339 Result[] results = table.get(Arrays.asList(gets)); 340 long end = System.nanoTime(); 341 verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false); 342 } 343 344 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { 345 // read the data 346 347 long start = System.nanoTime(); 348 // Uses simple get 349 Result result = table.get(get); 350 long end = System.nanoTime(); 351 verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false); 352 } 353 354 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 355 Result[] results, Table table, boolean isNullExpected) 356 throws IOException { 357 totalOpTimeMs.addAndGet(elapsedNano / 1000000); 358 numKeys.addAndGet(gets.length); 359 int i = 0; 360 for (Result result : results) { 361 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table, 362 isNullExpected); 363 } 364 } 365 366 protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, 367 Result result, Table table, boolean isNullExpected) 368 throws IOException { 369 verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano, 370 new Result[]{result}, table, isNullExpected); 371 } 372 373 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, 374 Result result, Table table, boolean isNullExpected) throws IOException { 375 if (!result.isEmpty()) { 376 if (verify) { 377 numKeysVerified.incrementAndGet(); 378 } 379 } else { 380 HRegionLocation hloc = connection.getRegionLocation(tableName, 381 get.getRow(), false); 382 String rowKey = Bytes.toString(get.getRow()); 383 LOG.info("Key = " + rowKey + ", Region location: " + hloc); 384 if(isNullExpected) { 385 nullResult.incrementAndGet(); 386 LOG.debug("Null result obtained for the key ="+rowKey); 387 return; 388 } 389 } 390 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); 391 long numErrorsAfterThis = 0; 392 if (isOk) { 393 long cols = 0; 394 // Count the columns for reporting purposes. 395 for (byte[] cf : result.getMap().keySet()) { 396 cols += result.getFamilyMap(cf).size(); 397 } 398 numCols.addAndGet(cols); 399 } else { 400 if (writer != null) { 401 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys"); 402 } 403 numErrorsAfterThis = numReadErrors.incrementAndGet(); 404 } 405 406 if (numErrorsAfterThis > maxErrors) { 407 LOG.error("Aborting readers -- found more than " + maxErrors + " errors"); 408 aborted = true; 409 } 410 } 411 } 412 413 public long getNumReadFailures() { 414 return numReadFailures.get(); 415 } 416 417 public long getNumReadErrors() { 418 return numReadErrors.get(); 419 } 420 421 public long getNumKeysVerified() { 422 return numKeysVerified.get(); 423 } 424 425 public long getNumUniqueKeysVerified() { 426 return numUniqueKeysVerified.get(); 427 } 428 429 public long getNullResultsCount() { 430 return nullResult.get(); 431 } 432 433 @Override 434 protected String progressInfo() { 435 StringBuilder sb = new StringBuilder(); 436 appendToStatus(sb, "verified", numKeysVerified.get()); 437 appendToStatus(sb, "READ FAILURES", numReadFailures.get()); 438 appendToStatus(sb, "READ ERRORS", numReadErrors.get()); 439 appendToStatus(sb, "NULL RESULT", nullResult.get()); 440 return sb.toString(); 441 } 442}