001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashSet; 023import java.util.Random; 024import java.util.Set; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HRegionLocation; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Consistency; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.RegionLocator; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** Creates multiple threads that read and verify previously written data */ 040public class MultiThreadedReader extends MultiThreadedAction { 041 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class); 042 043 protected Set<HBaseReaderThread> readers = new HashSet<>(); 044 private final double verifyPercent; 045 protected volatile boolean aborted; 046 047 protected MultiThreadedWriterBase writer = null; 048 049 /** 050 * The number of keys verified in a sequence. This will never be larger than the total number of 051 * keys in the range. The reader might also verify random keys when it catches up with the writer. 052 */ 053 private final AtomicLong numUniqueKeysVerified = new AtomicLong(); 054 055 /** 056 * Default maximum number of read errors to tolerate before shutting down all readers. 057 */ 058 public static final int DEFAULT_MAX_ERRORS = 10; 059 060 /** 061 * Default "window" size between the last key written by the writer and the key that we attempt to 062 * read. The lower this number, the stricter our testing is. If this is zero, we always attempt to 063 * read the highest key in the contiguous sequence of keys written by the writers. 064 */ 065 public static final int DEFAULT_KEY_WINDOW = 0; 066 067 /** 068 * Default batch size for multigets 069 */ 070 public static final int DEFAULT_BATCH_SIZE = 1; // translates to simple GET (no multi GET) 071 072 protected AtomicLong numKeysVerified = new AtomicLong(0); 073 protected AtomicLong numReadErrors = new AtomicLong(0); 074 protected AtomicLong numReadFailures = new AtomicLong(0); 075 protected AtomicLong nullResult = new AtomicLong(0); 076 private int maxErrors = DEFAULT_MAX_ERRORS; 077 private int keyWindow = DEFAULT_KEY_WINDOW; 078 private int batchSize = DEFAULT_BATCH_SIZE; 079 private int regionReplicaId = -1; // particular region replica id to do reads against if set 080 081 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, 082 double verifyPercent) throws IOException { 083 super(dataGen, conf, tableName, "R"); 084 this.verifyPercent = verifyPercent; 085 } 086 087 public void linkToWriter(MultiThreadedWriterBase writer) { 088 this.writer = writer; 089 writer.setTrackWroteKeys(true); 090 } 091 092 public void setMaxErrors(int maxErrors) { 093 this.maxErrors = maxErrors; 094 } 095 096 public void setKeyWindow(int keyWindow) { 097 this.keyWindow = keyWindow; 098 } 099 100 public void setMultiGetBatchSize(int batchSize) { 101 this.batchSize = batchSize; 102 } 103 104 public void setRegionReplicaId(int regionReplicaId) { 105 this.regionReplicaId = regionReplicaId; 106 } 107 108 @Override 109 public void start(long startKey, long endKey, int numThreads) throws IOException { 110 super.start(startKey, endKey, numThreads); 111 if (verbose) { 112 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); 113 } 114 115 addReaderThreads(numThreads); 116 startThreads(readers); 117 } 118 119 protected void addReaderThreads(int numThreads) throws IOException { 120 for (int i = 0; i < numThreads; ++i) { 121 HBaseReaderThread reader = createReaderThread(i); 122 readers.add(reader); 123 } 124 } 125 126 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 127 HBaseReaderThread reader = new HBaseReaderThread(readerId); 128 Threads.setLoggingUncaughtExceptionHandler(reader); 129 return reader; 130 } 131 132 public class HBaseReaderThread extends Thread { 133 protected final int readerId; 134 protected final Table table; 135 136 /** The "current" key being read. Increases from startKey to endKey. */ 137 private long curKey; 138 139 /** Time when the thread started */ 140 protected long startTimeMs; 141 142 /** If we are ahead of the writer and reading a random key. */ 143 private boolean readingRandomKey; 144 145 private boolean printExceptionTrace = true; 146 147 /** 148 * @param readerId only the keys with this remainder from division by {@link #numThreads} will 149 * be read by this thread 150 */ 151 public HBaseReaderThread(int readerId) throws IOException { 152 this.readerId = readerId; 153 table = createTable(); 154 setName(getClass().getSimpleName() + "_" + readerId); 155 } 156 157 protected Table createTable() throws IOException { 158 return connection.getTable(tableName); 159 } 160 161 @Override 162 public void run() { 163 try { 164 runReader(); 165 } finally { 166 closeTable(); 167 numThreadsWorking.decrementAndGet(); 168 } 169 } 170 171 protected void closeTable() { 172 try { 173 if (table != null) { 174 table.close(); 175 } 176 } catch (IOException e) { 177 LOG.error("Error closing table", e); 178 } 179 } 180 181 private void runReader() { 182 if (verbose) { 183 LOG.info("Started thread #" + readerId + " for reads..."); 184 } 185 186 startTimeMs = EnvironmentEdgeManager.currentTime(); 187 curKey = startKey; 188 long[] keysForThisReader = new long[batchSize]; 189 while (curKey < endKey && !aborted) { 190 int readingRandomKeyStartIndex = -1; 191 int numKeys = 0; 192 // if multiGet, loop until we have the number of keys equal to the batch size 193 do { 194 long k = getNextKeyToRead(); 195 if (k < startKey || k >= endKey) { 196 numReadErrors.incrementAndGet(); 197 throw new AssertionError("Load tester logic error: proposed key " + "to read " + k 198 + " is out of range (startKey=" + startKey + ", endKey=" + endKey + ")"); 199 } 200 if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) { 201 // Skip keys that this thread should not read, as well as the keys 202 // that we know the writer failed to write. 203 continue; 204 } 205 keysForThisReader[numKeys] = k; 206 if (readingRandomKey && readingRandomKeyStartIndex == -1) { 207 // store the first index of a random read 208 readingRandomKeyStartIndex = numKeys; 209 } 210 numKeys++; 211 } while (numKeys < batchSize && curKey < endKey && !aborted); 212 213 if (numKeys > 0) { // meaning there is some key to read 214 readKey(keysForThisReader); 215 // We have verified some unique key(s). 216 numUniqueKeysVerified 217 .getAndAdd(readingRandomKeyStartIndex == -1 ? numKeys : readingRandomKeyStartIndex); 218 } 219 } 220 } 221 222 /** 223 * Should only be used for the concurrent writer/reader workload. The maximum key we are allowed 224 * to read, subject to the "key window" constraint. 225 */ 226 private long maxKeyWeCanRead() { 227 long insertedUpToKey = writer.wroteUpToKey(); 228 if (insertedUpToKey >= endKey - 1) { 229 // The writer has finished writing our range, so we can read any 230 // key in the range. 231 return endKey - 1; 232 } 233 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow); 234 } 235 236 protected long getNextKeyToRead() { 237 readingRandomKey = false; 238 if (writer == null || curKey <= maxKeyWeCanRead()) { 239 return curKey++; 240 } 241 242 // We caught up with the writer. See if we can read any keys at all. 243 long maxKeyToRead; 244 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) { 245 // The writer has not written sufficient keys for us to be able to read 246 // anything at all. Sleep a bit. This should only happen in the 247 // beginning of a load test run. 248 Threads.sleepWithoutInterrupt(50); 249 } 250 251 if (curKey <= maxKeyToRead) { 252 // The writer wrote some keys, and we are now allowed to read our 253 // current key. 254 return curKey++; 255 } 256 257 // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys. 258 // Don't increment the current key -- we still have to try reading it 259 // later. Set a flag to make sure that we don't count this key towards 260 // the set of unique keys we have verified. 261 readingRandomKey = true; 262 return startKey 263 + Math.abs(ThreadLocalRandom.current().nextLong()) % (maxKeyToRead - startKey + 1); 264 } 265 266 private Get[] readKey(long[] keysToRead) { 267 Random rand = ThreadLocalRandom.current(); 268 Get[] gets = new Get[keysToRead.length]; 269 int i = 0; 270 for (long keyToRead : keysToRead) { 271 try { 272 gets[i] = createGet(keyToRead); 273 if (keysToRead.length == 1) { 274 queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead); 275 } 276 i++; 277 } catch (IOException e) { 278 numReadFailures.addAndGet(1); 279 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") 280 + ", time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 281 if (printExceptionTrace) { 282 LOG.warn(e.toString(), e); 283 printExceptionTrace = false; 284 } 285 } 286 } 287 if (keysToRead.length > 1) { 288 try { 289 queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead); 290 } catch (IOException e) { 291 numReadFailures.addAndGet(gets.length); 292 for (long keyToRead : keysToRead) { 293 LOG.debug( 294 "[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: " 295 + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 296 } 297 if (printExceptionTrace) { 298 LOG.warn(e.toString(), e); 299 printExceptionTrace = false; 300 } 301 } 302 } 303 return gets; 304 } 305 306 protected Get createGet(long keyToRead) throws IOException { 307 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead)); 308 String cfsString = ""; 309 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 310 for (byte[] cf : columnFamilies) { 311 get.addFamily(cf); 312 if (verbose) { 313 if (cfsString.length() > 0) { 314 cfsString += ", "; 315 } 316 cfsString += "[" + Bytes.toStringBinary(cf) + "]"; 317 } 318 } 319 get = dataGenerator.beforeGet(keyToRead, get); 320 if (regionReplicaId > 0) { 321 get.setReplicaId(regionReplicaId); 322 get.setConsistency(Consistency.TIMELINE); 323 } 324 if (verbose) { 325 LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); 326 } 327 return get; 328 } 329 330 public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException { 331 // read the data 332 long start = System.nanoTime(); 333 // Uses multi/batch gets 334 Result[] results = table.get(Arrays.asList(gets)); 335 long end = System.nanoTime(); 336 verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false); 337 } 338 339 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { 340 // read the data 341 342 long start = System.nanoTime(); 343 // Uses simple get 344 Result result = table.get(get); 345 long end = System.nanoTime(); 346 verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false); 347 } 348 349 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 350 Result[] results, Table table, boolean isNullExpected) throws IOException { 351 totalOpTimeMs.addAndGet(elapsedNano / 1000000); 352 numKeys.addAndGet(gets.length); 353 int i = 0; 354 for (Result result : results) { 355 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table, 356 isNullExpected); 357 } 358 } 359 360 protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, 361 Result result, Table table, boolean isNullExpected) throws IOException { 362 verifyResultsAndUpdateMetrics(verify, new Get[] { get }, elapsedNano, new Result[] { result }, 363 table, isNullExpected); 364 } 365 366 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result, 367 Table table, boolean isNullExpected) throws IOException { 368 if (!result.isEmpty()) { 369 if (verify) { 370 numKeysVerified.incrementAndGet(); 371 } 372 } else { 373 HRegionLocation hloc; 374 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 375 hloc = locator.getRegionLocation(get.getRow()); 376 } 377 String rowKey = Bytes.toString(get.getRow()); 378 LOG.info("Key = " + rowKey + ", Region location: " + hloc); 379 if (isNullExpected) { 380 nullResult.incrementAndGet(); 381 LOG.debug("Null result obtained for the key =" + rowKey); 382 return; 383 } 384 } 385 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); 386 long numErrorsAfterThis = 0; 387 if (isOk) { 388 long cols = 0; 389 // Count the columns for reporting purposes. 390 for (byte[] cf : result.getMap().keySet()) { 391 cols += result.getFamilyMap(cf).size(); 392 } 393 numCols.addAndGet(cols); 394 } else { 395 if (writer != null) { 396 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys"); 397 } 398 numErrorsAfterThis = numReadErrors.incrementAndGet(); 399 } 400 401 if (numErrorsAfterThis > maxErrors) { 402 LOG.error("Aborting readers -- found more than " + maxErrors + " errors"); 403 aborted = true; 404 } 405 } 406 } 407 408 public long getNumReadFailures() { 409 return numReadFailures.get(); 410 } 411 412 public long getNumReadErrors() { 413 return numReadErrors.get(); 414 } 415 416 public long getNumKeysVerified() { 417 return numKeysVerified.get(); 418 } 419 420 public long getNumUniqueKeysVerified() { 421 return numUniqueKeysVerified.get(); 422 } 423 424 public long getNullResultsCount() { 425 return nullResult.get(); 426 } 427 428 @Override 429 protected String progressInfo() { 430 StringBuilder sb = new StringBuilder(); 431 appendToStatus(sb, "verified", numKeysVerified.get()); 432 appendToStatus(sb, "READ FAILURES", numReadFailures.get()); 433 appendToStatus(sb, "READ ERRORS", numReadErrors.get()); 434 appendToStatus(sb, "NULL RESULT", nullResult.get()); 435 return sb.toString(); 436 } 437}