001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashSet; 023import java.util.Random; 024import java.util.Set; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HRegionLocation; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Consistency; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.RegionLocator; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** Creates multiple threads that read and verify previously written data */ 041@InterfaceAudience.Private 042public class MultiThreadedReader extends MultiThreadedAction { 043 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class); 044 045 protected Set<HBaseReaderThread> readers = new HashSet<>(); 046 private final double verifyPercent; 047 protected volatile boolean aborted; 048 049 protected MultiThreadedWriterBase writer = null; 050 051 /** 052 * The number of keys verified in a sequence. This will never be larger than the total number of 053 * keys in the range. The reader might also verify random keys when it catches up with the writer. 054 */ 055 private final AtomicLong numUniqueKeysVerified = new AtomicLong(); 056 057 /** 058 * Default maximum number of read errors to tolerate before shutting down all readers. 059 */ 060 public static final int DEFAULT_MAX_ERRORS = 10; 061 062 /** 063 * Default "window" size between the last key written by the writer and the key that we attempt to 064 * read. The lower this number, the stricter our testing is. If this is zero, we always attempt to 065 * read the highest key in the contiguous sequence of keys written by the writers. 066 */ 067 public static final int DEFAULT_KEY_WINDOW = 0; 068 069 /** 070 * Default batch size for multigets 071 */ 072 public static final int DEFAULT_BATCH_SIZE = 1; // translates to simple GET (no multi GET) 073 074 protected AtomicLong numKeysVerified = new AtomicLong(0); 075 protected AtomicLong numReadErrors = new AtomicLong(0); 076 protected AtomicLong numReadFailures = new AtomicLong(0); 077 protected AtomicLong nullResult = new AtomicLong(0); 078 private int maxErrors = DEFAULT_MAX_ERRORS; 079 private int keyWindow = DEFAULT_KEY_WINDOW; 080 private int batchSize = DEFAULT_BATCH_SIZE; 081 private int regionReplicaId = -1; // particular region replica id to do reads against if set 082 private boolean timelineConsistency = false; 083 084 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, 085 double verifyPercent) throws IOException { 086 super(dataGen, conf, tableName, "R"); 087 this.verifyPercent = verifyPercent; 088 } 089 090 public void linkToWriter(MultiThreadedWriterBase writer) { 091 this.writer = writer; 092 writer.setTrackWroteKeys(true); 093 } 094 095 public void setMaxErrors(int maxErrors) { 096 this.maxErrors = maxErrors; 097 } 098 099 public void setKeyWindow(int keyWindow) { 100 this.keyWindow = keyWindow; 101 } 102 103 public void setMultiGetBatchSize(int batchSize) { 104 this.batchSize = batchSize; 105 } 106 107 public void setRegionReplicaId(int regionReplicaId) { 108 this.regionReplicaId = regionReplicaId; 109 } 110 111 public void setTimelineConsistency(boolean timelineConsistency) { 112 this.timelineConsistency = timelineConsistency; 113 } 114 115 @Override 116 public void start(long startKey, long endKey, int numThreads) throws IOException { 117 super.start(startKey, endKey, numThreads); 118 if (verbose) { 119 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); 120 } 121 122 addReaderThreads(numThreads); 123 startThreads(readers); 124 } 125 126 protected void addReaderThreads(int numThreads) throws IOException { 127 for (int i = 0; i < numThreads; ++i) { 128 HBaseReaderThread reader = createReaderThread(i); 129 readers.add(reader); 130 } 131 } 132 133 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 134 HBaseReaderThread reader = new HBaseReaderThread(readerId); 135 Threads.setLoggingUncaughtExceptionHandler(reader); 136 return reader; 137 } 138 139 public class HBaseReaderThread extends Thread { 140 protected final int readerId; 141 protected final Table table; 142 143 /** The "current" key being read. Increases from startKey to endKey. */ 144 private long curKey; 145 146 /** Time when the thread started */ 147 protected long startTimeMs; 148 149 /** If we are ahead of the writer and reading a random key. */ 150 private boolean readingRandomKey; 151 152 private boolean printExceptionTrace = true; 153 154 /** 155 * @param readerId only the keys with this remainder from division by {@link #numThreads} will 156 * be read by this thread 157 */ 158 public HBaseReaderThread(int readerId) throws IOException { 159 this.readerId = readerId; 160 table = createTable(); 161 setName(getClass().getSimpleName() + "_" + readerId); 162 } 163 164 protected Table createTable() throws IOException { 165 return connection.getTable(tableName); 166 } 167 168 @Override 169 public void run() { 170 try { 171 runReader(); 172 } finally { 173 closeTable(); 174 numThreadsWorking.decrementAndGet(); 175 } 176 } 177 178 protected void closeTable() { 179 try { 180 if (table != null) { 181 table.close(); 182 } 183 } catch (IOException e) { 184 LOG.error("Error closing table", e); 185 } 186 } 187 188 private void runReader() { 189 if (verbose) { 190 LOG.info("Started thread #" + readerId + " for reads..."); 191 } 192 193 startTimeMs = EnvironmentEdgeManager.currentTime(); 194 curKey = startKey; 195 long[] keysForThisReader = new long[batchSize]; 196 while (curKey < endKey && !aborted) { 197 int readingRandomKeyStartIndex = -1; 198 int numKeys = 0; 199 // if multiGet, loop until we have the number of keys equal to the batch size 200 do { 201 long k = getNextKeyToRead(); 202 if (k < startKey || k >= endKey) { 203 numReadErrors.incrementAndGet(); 204 throw new AssertionError("Load tester logic error: proposed key " + "to read " + k 205 + " is out of range (startKey=" + startKey + ", endKey=" + endKey + ")"); 206 } 207 if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) { 208 // Skip keys that this thread should not read, as well as the keys 209 // that we know the writer failed to write. 210 continue; 211 } 212 keysForThisReader[numKeys] = k; 213 if (readingRandomKey && readingRandomKeyStartIndex == -1) { 214 // store the first index of a random read 215 readingRandomKeyStartIndex = numKeys; 216 } 217 numKeys++; 218 } while (numKeys < batchSize && curKey < endKey && !aborted); 219 220 if (numKeys > 0) { // meaning there is some key to read 221 readKey(keysForThisReader); 222 // We have verified some unique key(s). 223 numUniqueKeysVerified 224 .getAndAdd(readingRandomKeyStartIndex == -1 ? numKeys : readingRandomKeyStartIndex); 225 } 226 } 227 } 228 229 /** 230 * Should only be used for the concurrent writer/reader workload. The maximum key we are allowed 231 * to read, subject to the "key window" constraint. 232 */ 233 private long maxKeyWeCanRead() { 234 long insertedUpToKey = writer.wroteUpToKey(); 235 if (insertedUpToKey >= endKey - 1) { 236 // The writer has finished writing our range, so we can read any 237 // key in the range. 238 return endKey - 1; 239 } 240 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow); 241 } 242 243 protected long getNextKeyToRead() { 244 readingRandomKey = false; 245 if (writer == null || curKey <= maxKeyWeCanRead()) { 246 return curKey++; 247 } 248 249 // We caught up with the writer. See if we can read any keys at all. 250 long maxKeyToRead; 251 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) { 252 // The writer has not written sufficient keys for us to be able to read 253 // anything at all. Sleep a bit. This should only happen in the 254 // beginning of a load test run. 255 Threads.sleepWithoutInterrupt(50); 256 } 257 258 if (curKey <= maxKeyToRead) { 259 // The writer wrote some keys, and we are now allowed to read our 260 // current key. 261 return curKey++; 262 } 263 264 // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys. 265 // Don't increment the current key -- we still have to try reading it 266 // later. Set a flag to make sure that we don't count this key towards 267 // the set of unique keys we have verified. 268 readingRandomKey = true; 269 return startKey 270 + Math.abs(ThreadLocalRandom.current().nextLong()) % (maxKeyToRead - startKey + 1); 271 } 272 273 private Get[] readKey(long[] keysToRead) { 274 Random rand = ThreadLocalRandom.current(); 275 Get[] gets = new Get[keysToRead.length]; 276 int i = 0; 277 for (long keyToRead : keysToRead) { 278 try { 279 gets[i] = createGet(keyToRead); 280 if (keysToRead.length == 1) { 281 queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead); 282 } 283 i++; 284 } catch (IOException e) { 285 numReadFailures.addAndGet(1); 286 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") 287 + ", time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 288 if (printExceptionTrace) { 289 LOG.warn(e.toString(), e); 290 printExceptionTrace = false; 291 } 292 } 293 } 294 if (keysToRead.length > 1) { 295 try { 296 queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead); 297 } catch (IOException e) { 298 numReadFailures.addAndGet(gets.length); 299 for (long keyToRead : keysToRead) { 300 LOG.debug( 301 "[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: " 302 + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 303 } 304 if (printExceptionTrace) { 305 LOG.warn(e.toString(), e); 306 printExceptionTrace = false; 307 } 308 } 309 } 310 return gets; 311 } 312 313 protected Get createGet(long keyToRead) throws IOException { 314 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead)); 315 StringBuilder cfsString = new StringBuilder(); 316 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 317 for (byte[] cf : columnFamilies) { 318 get.addFamily(cf); 319 if (verbose) { 320 if (cfsString.length() > 0) { 321 cfsString.append(", "); 322 } 323 cfsString.append("[").append(Bytes.toStringBinary(cf)).append("]"); 324 } 325 } 326 get = dataGenerator.beforeGet(keyToRead, get); 327 if (regionReplicaId > 0) { 328 get.setReplicaId(regionReplicaId); 329 } 330 if (timelineConsistency) { 331 get.setConsistency(Consistency.TIMELINE); 332 } 333 if (verbose) { 334 LOG.info("[{}] Querying key {}, cfs {}", readerId, keyToRead, cfsString.toString()); 335 } 336 return get; 337 } 338 339 public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException { 340 // read the data 341 long start = System.nanoTime(); 342 // Uses multi/batch gets 343 Result[] results = table.get(Arrays.asList(gets)); 344 long end = System.nanoTime(); 345 verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false); 346 } 347 348 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { 349 // read the data 350 351 long start = System.nanoTime(); 352 // Uses simple get 353 Result result = table.get(get); 354 long end = System.nanoTime(); 355 verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false); 356 } 357 358 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 359 Result[] results, Table table, boolean isNullExpected) throws IOException { 360 totalOpTimeMs.addAndGet(elapsedNano / 1000000); 361 numKeys.addAndGet(gets.length); 362 int i = 0; 363 for (Result result : results) { 364 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table, 365 isNullExpected); 366 } 367 } 368 369 protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, 370 Result result, Table table, boolean isNullExpected) throws IOException { 371 verifyResultsAndUpdateMetrics(verify, new Get[] { get }, elapsedNano, new Result[] { result }, 372 table, isNullExpected); 373 } 374 375 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result, 376 Table table, boolean isNullExpected) throws IOException { 377 if (!result.isEmpty()) { 378 if (verify) { 379 numKeysVerified.incrementAndGet(); 380 } 381 } else { 382 HRegionLocation hloc; 383 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 384 hloc = locator.getRegionLocation(get.getRow()); 385 } 386 String rowKey = Bytes.toString(get.getRow()); 387 LOG.info("Key = " + rowKey + ", Region location: " + hloc); 388 if (isNullExpected) { 389 nullResult.incrementAndGet(); 390 LOG.debug("Null result obtained for the key =" + rowKey); 391 return; 392 } 393 } 394 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); 395 long numErrorsAfterThis = 0; 396 if (isOk) { 397 long cols = 0; 398 // Count the columns for reporting purposes. 399 for (byte[] cf : result.getMap().keySet()) { 400 cols += result.getFamilyMap(cf).size(); 401 } 402 numCols.addAndGet(cols); 403 } else { 404 if (writer != null) { 405 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys"); 406 } 407 numErrorsAfterThis = numReadErrors.incrementAndGet(); 408 } 409 410 if (numErrorsAfterThis > maxErrors) { 411 LOG.error("Aborting readers -- found more than " + maxErrors + " errors"); 412 aborted = true; 413 } 414 } 415 } 416 417 public long getNumReadFailures() { 418 return numReadFailures.get(); 419 } 420 421 public long getNumReadErrors() { 422 return numReadErrors.get(); 423 } 424 425 public long getNumKeysVerified() { 426 return numKeysVerified.get(); 427 } 428 429 public long getNumUniqueKeysVerified() { 430 return numUniqueKeysVerified.get(); 431 } 432 433 public long getNullResultsCount() { 434 return nullResult.get(); 435 } 436 437 @Override 438 protected String progressInfo() { 439 StringBuilder sb = new StringBuilder(); 440 appendToStatus(sb, "verified", numKeysVerified.get()); 441 appendToStatus(sb, "READ FAILURES", numReadFailures.get()); 442 appendToStatus(sb, "READ ERRORS", numReadErrors.get()); 443 appendToStatus(sb, "NULL RESULT", nullResult.get()); 444 return sb.toString(); 445 } 446}