001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashSet; 023import java.util.Random; 024import java.util.Set; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HRegionLocation; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Consistency; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** Creates multiple threads that read and verify previously written data */ 039public class MultiThreadedReader extends MultiThreadedAction { 040 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class); 041 042 protected Set<HBaseReaderThread> readers = new HashSet<>(); 043 private final double verifyPercent; 044 protected volatile boolean aborted; 045 046 protected MultiThreadedWriterBase writer = null; 047 048 /** 049 * The number of keys verified in a sequence. This will never be larger than the total number of 050 * keys in the range. The reader might also verify random keys when it catches up with the writer. 051 */ 052 private final AtomicLong numUniqueKeysVerified = new AtomicLong(); 053 054 /** 055 * Default maximum number of read errors to tolerate before shutting down all readers. 056 */ 057 public static final int DEFAULT_MAX_ERRORS = 10; 058 059 /** 060 * Default "window" size between the last key written by the writer and the key that we attempt to 061 * read. The lower this number, the stricter our testing is. If this is zero, we always attempt to 062 * read the highest key in the contiguous sequence of keys written by the writers. 063 */ 064 public static final int DEFAULT_KEY_WINDOW = 0; 065 066 /** 067 * Default batch size for multigets 068 */ 069 public static final int DEFAULT_BATCH_SIZE = 1; // translates to simple GET (no multi GET) 070 071 protected AtomicLong numKeysVerified = new AtomicLong(0); 072 protected AtomicLong numReadErrors = new AtomicLong(0); 073 protected AtomicLong numReadFailures = new AtomicLong(0); 074 protected AtomicLong nullResult = new AtomicLong(0); 075 private int maxErrors = DEFAULT_MAX_ERRORS; 076 private int keyWindow = DEFAULT_KEY_WINDOW; 077 private int batchSize = DEFAULT_BATCH_SIZE; 078 private int regionReplicaId = -1; // particular region replica id to do reads against if set 079 080 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, 081 double verifyPercent) throws IOException { 082 super(dataGen, conf, tableName, "R"); 083 this.verifyPercent = verifyPercent; 084 } 085 086 public void linkToWriter(MultiThreadedWriterBase writer) { 087 this.writer = writer; 088 writer.setTrackWroteKeys(true); 089 } 090 091 public void setMaxErrors(int maxErrors) { 092 this.maxErrors = maxErrors; 093 } 094 095 public void setKeyWindow(int keyWindow) { 096 this.keyWindow = keyWindow; 097 } 098 099 public void setMultiGetBatchSize(int batchSize) { 100 this.batchSize = batchSize; 101 } 102 103 public void setRegionReplicaId(int regionReplicaId) { 104 this.regionReplicaId = regionReplicaId; 105 } 106 107 @Override 108 public void start(long startKey, long endKey, int numThreads) throws IOException { 109 super.start(startKey, endKey, numThreads); 110 if (verbose) { 111 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); 112 } 113 114 addReaderThreads(numThreads); 115 startThreads(readers); 116 } 117 118 protected void addReaderThreads(int numThreads) throws IOException { 119 for (int i = 0; i < numThreads; ++i) { 120 HBaseReaderThread reader = createReaderThread(i); 121 readers.add(reader); 122 } 123 } 124 125 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 126 HBaseReaderThread reader = new HBaseReaderThread(readerId); 127 Threads.setLoggingUncaughtExceptionHandler(reader); 128 return reader; 129 } 130 131 public class HBaseReaderThread extends Thread { 132 protected final int readerId; 133 protected final Table table; 134 135 /** The "current" key being read. Increases from startKey to endKey. */ 136 private long curKey; 137 138 /** Time when the thread started */ 139 protected long startTimeMs; 140 141 /** If we are ahead of the writer and reading a random key. */ 142 private boolean readingRandomKey; 143 144 private boolean printExceptionTrace = true; 145 146 /** 147 * @param readerId only the keys with this remainder from division by {@link #numThreads} will 148 * be read by this thread 149 */ 150 public HBaseReaderThread(int readerId) throws IOException { 151 this.readerId = readerId; 152 table = createTable(); 153 setName(getClass().getSimpleName() + "_" + readerId); 154 } 155 156 protected Table createTable() throws IOException { 157 return connection.getTable(tableName); 158 } 159 160 @Override 161 public void run() { 162 try { 163 runReader(); 164 } finally { 165 closeTable(); 166 numThreadsWorking.decrementAndGet(); 167 } 168 } 169 170 protected void closeTable() { 171 try { 172 if (table != null) { 173 table.close(); 174 } 175 } catch (IOException e) { 176 LOG.error("Error closing table", e); 177 } 178 } 179 180 private void runReader() { 181 if (verbose) { 182 LOG.info("Started thread #" + readerId + " for reads..."); 183 } 184 185 startTimeMs = EnvironmentEdgeManager.currentTime(); 186 curKey = startKey; 187 long[] keysForThisReader = new long[batchSize]; 188 while (curKey < endKey && !aborted) { 189 int readingRandomKeyStartIndex = -1; 190 int numKeys = 0; 191 // if multiGet, loop until we have the number of keys equal to the batch size 192 do { 193 long k = getNextKeyToRead(); 194 if (k < startKey || k >= endKey) { 195 numReadErrors.incrementAndGet(); 196 throw new AssertionError("Load tester logic error: proposed key " + "to read " + k 197 + " is out of range (startKey=" + startKey + ", endKey=" + endKey + ")"); 198 } 199 if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) { 200 // Skip keys that this thread should not read, as well as the keys 201 // that we know the writer failed to write. 202 continue; 203 } 204 keysForThisReader[numKeys] = k; 205 if (readingRandomKey && readingRandomKeyStartIndex == -1) { 206 // store the first index of a random read 207 readingRandomKeyStartIndex = numKeys; 208 } 209 numKeys++; 210 } while (numKeys < batchSize && curKey < endKey && !aborted); 211 212 if (numKeys > 0) { // meaning there is some key to read 213 readKey(keysForThisReader); 214 // We have verified some unique key(s). 215 numUniqueKeysVerified 216 .getAndAdd(readingRandomKeyStartIndex == -1 ? numKeys : readingRandomKeyStartIndex); 217 } 218 } 219 } 220 221 /** 222 * Should only be used for the concurrent writer/reader workload. The maximum key we are allowed 223 * to read, subject to the "key window" constraint. 224 */ 225 private long maxKeyWeCanRead() { 226 long insertedUpToKey = writer.wroteUpToKey(); 227 if (insertedUpToKey >= endKey - 1) { 228 // The writer has finished writing our range, so we can read any 229 // key in the range. 230 return endKey - 1; 231 } 232 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow); 233 } 234 235 protected long getNextKeyToRead() { 236 readingRandomKey = false; 237 if (writer == null || curKey <= maxKeyWeCanRead()) { 238 return curKey++; 239 } 240 241 // We caught up with the writer. See if we can read any keys at all. 242 long maxKeyToRead; 243 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) { 244 // The writer has not written sufficient keys for us to be able to read 245 // anything at all. Sleep a bit. This should only happen in the 246 // beginning of a load test run. 247 Threads.sleepWithoutInterrupt(50); 248 } 249 250 if (curKey <= maxKeyToRead) { 251 // The writer wrote some keys, and we are now allowed to read our 252 // current key. 253 return curKey++; 254 } 255 256 // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys. 257 // Don't increment the current key -- we still have to try reading it 258 // later. Set a flag to make sure that we don't count this key towards 259 // the set of unique keys we have verified. 260 readingRandomKey = true; 261 return startKey 262 + Math.abs(ThreadLocalRandom.current().nextLong()) % (maxKeyToRead - startKey + 1); 263 } 264 265 private Get[] readKey(long[] keysToRead) { 266 Random rand = ThreadLocalRandom.current(); 267 Get[] gets = new Get[keysToRead.length]; 268 int i = 0; 269 for (long keyToRead : keysToRead) { 270 try { 271 gets[i] = createGet(keyToRead); 272 if (keysToRead.length == 1) { 273 queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead); 274 } 275 i++; 276 } catch (IOException e) { 277 numReadFailures.addAndGet(1); 278 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") 279 + ", time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 280 if (printExceptionTrace) { 281 LOG.warn(e.toString(), e); 282 printExceptionTrace = false; 283 } 284 } 285 } 286 if (keysToRead.length > 1) { 287 try { 288 queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead); 289 } catch (IOException e) { 290 numReadFailures.addAndGet(gets.length); 291 for (long keyToRead : keysToRead) { 292 LOG.debug( 293 "[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: " 294 + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 295 } 296 if (printExceptionTrace) { 297 LOG.warn(e.toString(), e); 298 printExceptionTrace = false; 299 } 300 } 301 } 302 return gets; 303 } 304 305 protected Get createGet(long keyToRead) throws IOException { 306 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead)); 307 String cfsString = ""; 308 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 309 for (byte[] cf : columnFamilies) { 310 get.addFamily(cf); 311 if (verbose) { 312 if (cfsString.length() > 0) { 313 cfsString += ", "; 314 } 315 cfsString += "[" + Bytes.toStringBinary(cf) + "]"; 316 } 317 } 318 get = dataGenerator.beforeGet(keyToRead, get); 319 if (regionReplicaId > 0) { 320 get.setReplicaId(regionReplicaId); 321 get.setConsistency(Consistency.TIMELINE); 322 } 323 if (verbose) { 324 LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); 325 } 326 return get; 327 } 328 329 public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException { 330 // read the data 331 long start = System.nanoTime(); 332 // Uses multi/batch gets 333 Result[] results = table.get(Arrays.asList(gets)); 334 long end = System.nanoTime(); 335 verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false); 336 } 337 338 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { 339 // read the data 340 341 long start = System.nanoTime(); 342 // Uses simple get 343 Result result = table.get(get); 344 long end = System.nanoTime(); 345 verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false); 346 } 347 348 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 349 Result[] results, Table table, boolean isNullExpected) throws IOException { 350 totalOpTimeMs.addAndGet(elapsedNano / 1000000); 351 numKeys.addAndGet(gets.length); 352 int i = 0; 353 for (Result result : results) { 354 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table, 355 isNullExpected); 356 } 357 } 358 359 protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, 360 Result result, Table table, boolean isNullExpected) throws IOException { 361 verifyResultsAndUpdateMetrics(verify, new Get[] { get }, elapsedNano, new Result[] { result }, 362 table, isNullExpected); 363 } 364 365 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result, 366 Table table, boolean isNullExpected) throws IOException { 367 if (!result.isEmpty()) { 368 if (verify) { 369 numKeysVerified.incrementAndGet(); 370 } 371 } else { 372 HRegionLocation hloc = connection.getRegionLocation(tableName, get.getRow(), false); 373 String rowKey = Bytes.toString(get.getRow()); 374 LOG.info("Key = " + rowKey + ", Region location: " + hloc); 375 if (isNullExpected) { 376 nullResult.incrementAndGet(); 377 LOG.debug("Null result obtained for the key =" + rowKey); 378 return; 379 } 380 } 381 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); 382 long numErrorsAfterThis = 0; 383 if (isOk) { 384 long cols = 0; 385 // Count the columns for reporting purposes. 386 for (byte[] cf : result.getMap().keySet()) { 387 cols += result.getFamilyMap(cf).size(); 388 } 389 numCols.addAndGet(cols); 390 } else { 391 if (writer != null) { 392 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys"); 393 } 394 numErrorsAfterThis = numReadErrors.incrementAndGet(); 395 } 396 397 if (numErrorsAfterThis > maxErrors) { 398 LOG.error("Aborting readers -- found more than " + maxErrors + " errors"); 399 aborted = true; 400 } 401 } 402 } 403 404 public long getNumReadFailures() { 405 return numReadFailures.get(); 406 } 407 408 public long getNumReadErrors() { 409 return numReadErrors.get(); 410 } 411 412 public long getNumKeysVerified() { 413 return numKeysVerified.get(); 414 } 415 416 public long getNumUniqueKeysVerified() { 417 return numUniqueKeysVerified.get(); 418 } 419 420 public long getNullResultsCount() { 421 return nullResult.get(); 422 } 423 424 @Override 425 protected String progressInfo() { 426 StringBuilder sb = new StringBuilder(); 427 appendToStatus(sb, "verified", numKeysVerified.get()); 428 appendToStatus(sb, "READ FAILURES", numReadFailures.get()); 429 appendToStatus(sb, "READ ERRORS", numReadErrors.get()); 430 appendToStatus(sb, "NULL RESULT", nullResult.get()); 431 return sb.toString(); 432 } 433}