001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashSet;
023import java.util.Random;
024import java.util.Set;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HRegionLocation;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Consistency;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/** Creates multiple threads that read and verify previously written data */
041@InterfaceAudience.Private
042public class MultiThreadedReader extends MultiThreadedAction {
043  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class);
044
045  protected Set<HBaseReaderThread> readers = new HashSet<>();
046  private final double verifyPercent;
047  protected volatile boolean aborted;
048
049  protected MultiThreadedWriterBase writer = null;
050
051  /**
052   * The number of keys verified in a sequence. This will never be larger than the total number of
053   * keys in the range. The reader might also verify random keys when it catches up with the writer.
054   */
055  private final AtomicLong numUniqueKeysVerified = new AtomicLong();
056
057  /**
058   * Default maximum number of read errors to tolerate before shutting down all readers.
059   */
060  public static final int DEFAULT_MAX_ERRORS = 10;
061
062  /**
063   * Default "window" size between the last key written by the writer and the key that we attempt to
064   * read. The lower this number, the stricter our testing is. If this is zero, we always attempt to
065   * read the highest key in the contiguous sequence of keys written by the writers.
066   */
067  public static final int DEFAULT_KEY_WINDOW = 0;
068
069  /**
070   * Default batch size for multigets
071   */
072  public static final int DEFAULT_BATCH_SIZE = 1; // translates to simple GET (no multi GET)
073
074  protected AtomicLong numKeysVerified = new AtomicLong(0);
075  protected AtomicLong numReadErrors = new AtomicLong(0);
076  protected AtomicLong numReadFailures = new AtomicLong(0);
077  protected AtomicLong nullResult = new AtomicLong(0);
078  private int maxErrors = DEFAULT_MAX_ERRORS;
079  private int keyWindow = DEFAULT_KEY_WINDOW;
080  private int batchSize = DEFAULT_BATCH_SIZE;
081  private int regionReplicaId = -1; // particular region replica id to do reads against if set
082  private boolean timelineConsistency = false;
083
084  public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
085    double verifyPercent) throws IOException {
086    super(dataGen, conf, tableName, "R");
087    this.verifyPercent = verifyPercent;
088  }
089
090  public void linkToWriter(MultiThreadedWriterBase writer) {
091    this.writer = writer;
092    writer.setTrackWroteKeys(true);
093  }
094
095  public void setMaxErrors(int maxErrors) {
096    this.maxErrors = maxErrors;
097  }
098
099  public void setKeyWindow(int keyWindow) {
100    this.keyWindow = keyWindow;
101  }
102
103  public void setMultiGetBatchSize(int batchSize) {
104    this.batchSize = batchSize;
105  }
106
107  public void setRegionReplicaId(int regionReplicaId) {
108    this.regionReplicaId = regionReplicaId;
109  }
110
111  public void setTimelineConsistency(boolean timelineConsistency) {
112    this.timelineConsistency = timelineConsistency;
113  }
114
115  @Override
116  public void start(long startKey, long endKey, int numThreads) throws IOException {
117    super.start(startKey, endKey, numThreads);
118    if (verbose) {
119      LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
120    }
121
122    addReaderThreads(numThreads);
123    startThreads(readers);
124  }
125
126  protected void addReaderThreads(int numThreads) throws IOException {
127    for (int i = 0; i < numThreads; ++i) {
128      HBaseReaderThread reader = createReaderThread(i);
129      readers.add(reader);
130    }
131  }
132
133  protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
134    HBaseReaderThread reader = new HBaseReaderThread(readerId);
135    Threads.setLoggingUncaughtExceptionHandler(reader);
136    return reader;
137  }
138
139  public class HBaseReaderThread extends Thread {
140    protected final int readerId;
141    protected final Table table;
142
143    /** The "current" key being read. Increases from startKey to endKey. */
144    private long curKey;
145
146    /** Time when the thread started */
147    protected long startTimeMs;
148
149    /** If we are ahead of the writer and reading a random key. */
150    private boolean readingRandomKey;
151
152    private boolean printExceptionTrace = true;
153
154    /**
155     * @param readerId only the keys with this remainder from division by {@link #numThreads} will
156     *                 be read by this thread
157     */
158    public HBaseReaderThread(int readerId) throws IOException {
159      this.readerId = readerId;
160      table = createTable();
161      setName(getClass().getSimpleName() + "_" + readerId);
162    }
163
164    protected Table createTable() throws IOException {
165      return connection.getTable(tableName);
166    }
167
168    @Override
169    public void run() {
170      try {
171        runReader();
172      } finally {
173        closeTable();
174        numThreadsWorking.decrementAndGet();
175      }
176    }
177
178    protected void closeTable() {
179      try {
180        if (table != null) {
181          table.close();
182        }
183      } catch (IOException e) {
184        LOG.error("Error closing table", e);
185      }
186    }
187
188    private void runReader() {
189      if (verbose) {
190        LOG.info("Started thread #" + readerId + " for reads...");
191      }
192
193      startTimeMs = EnvironmentEdgeManager.currentTime();
194      curKey = startKey;
195      long[] keysForThisReader = new long[batchSize];
196      while (curKey < endKey && !aborted) {
197        int readingRandomKeyStartIndex = -1;
198        int numKeys = 0;
199        // if multiGet, loop until we have the number of keys equal to the batch size
200        do {
201          long k = getNextKeyToRead();
202          if (k < startKey || k >= endKey) {
203            numReadErrors.incrementAndGet();
204            throw new AssertionError("Load tester logic error: proposed key " + "to read " + k
205              + " is out of range (startKey=" + startKey + ", endKey=" + endKey + ")");
206          }
207          if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) {
208            // Skip keys that this thread should not read, as well as the keys
209            // that we know the writer failed to write.
210            continue;
211          }
212          keysForThisReader[numKeys] = k;
213          if (readingRandomKey && readingRandomKeyStartIndex == -1) {
214            // store the first index of a random read
215            readingRandomKeyStartIndex = numKeys;
216          }
217          numKeys++;
218        } while (numKeys < batchSize && curKey < endKey && !aborted);
219
220        if (numKeys > 0) { // meaning there is some key to read
221          readKey(keysForThisReader);
222          // We have verified some unique key(s).
223          numUniqueKeysVerified
224            .getAndAdd(readingRandomKeyStartIndex == -1 ? numKeys : readingRandomKeyStartIndex);
225        }
226      }
227    }
228
229    /**
230     * Should only be used for the concurrent writer/reader workload. The maximum key we are allowed
231     * to read, subject to the "key window" constraint.
232     */
233    private long maxKeyWeCanRead() {
234      long insertedUpToKey = writer.wroteUpToKey();
235      if (insertedUpToKey >= endKey - 1) {
236        // The writer has finished writing our range, so we can read any
237        // key in the range.
238        return endKey - 1;
239      }
240      return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
241    }
242
243    protected long getNextKeyToRead() {
244      readingRandomKey = false;
245      if (writer == null || curKey <= maxKeyWeCanRead()) {
246        return curKey++;
247      }
248
249      // We caught up with the writer. See if we can read any keys at all.
250      long maxKeyToRead;
251      while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
252        // The writer has not written sufficient keys for us to be able to read
253        // anything at all. Sleep a bit. This should only happen in the
254        // beginning of a load test run.
255        Threads.sleepWithoutInterrupt(50);
256      }
257
258      if (curKey <= maxKeyToRead) {
259        // The writer wrote some keys, and we are now allowed to read our
260        // current key.
261        return curKey++;
262      }
263
264      // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
265      // Don't increment the current key -- we still have to try reading it
266      // later. Set a flag to make sure that we don't count this key towards
267      // the set of unique keys we have verified.
268      readingRandomKey = true;
269      return startKey
270        + Math.abs(ThreadLocalRandom.current().nextLong()) % (maxKeyToRead - startKey + 1);
271    }
272
273    private Get[] readKey(long[] keysToRead) {
274      Random rand = ThreadLocalRandom.current();
275      Get[] gets = new Get[keysToRead.length];
276      int i = 0;
277      for (long keyToRead : keysToRead) {
278        try {
279          gets[i] = createGet(keyToRead);
280          if (keysToRead.length == 1) {
281            queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead);
282          }
283          i++;
284        } catch (IOException e) {
285          numReadFailures.addAndGet(1);
286          LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
287            + ", time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
288          if (printExceptionTrace) {
289            LOG.warn(e.toString(), e);
290            printExceptionTrace = false;
291          }
292        }
293      }
294      if (keysToRead.length > 1) {
295        try {
296          queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead);
297        } catch (IOException e) {
298          numReadFailures.addAndGet(gets.length);
299          for (long keyToRead : keysToRead) {
300            LOG.debug(
301              "[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: "
302                + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
303          }
304          if (printExceptionTrace) {
305            LOG.warn(e.toString(), e);
306            printExceptionTrace = false;
307          }
308        }
309      }
310      return gets;
311    }
312
313    protected Get createGet(long keyToRead) throws IOException {
314      Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
315      StringBuilder cfsString = new StringBuilder();
316      byte[][] columnFamilies = dataGenerator.getColumnFamilies();
317      for (byte[] cf : columnFamilies) {
318        get.addFamily(cf);
319        if (verbose) {
320          if (cfsString.length() > 0) {
321            cfsString.append(", ");
322          }
323          cfsString.append("[").append(Bytes.toStringBinary(cf)).append("]");
324        }
325      }
326      get = dataGenerator.beforeGet(keyToRead, get);
327      if (regionReplicaId > 0) {
328        get.setReplicaId(regionReplicaId);
329      }
330      if (timelineConsistency) {
331        get.setConsistency(Consistency.TIMELINE);
332      }
333      if (verbose) {
334        LOG.info("[{}] Querying key {}, cfs {}", readerId, keyToRead, cfsString.toString());
335      }
336      return get;
337    }
338
339    public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
340      // read the data
341      long start = System.nanoTime();
342      // Uses multi/batch gets
343      Result[] results = table.get(Arrays.asList(gets));
344      long end = System.nanoTime();
345      verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
346    }
347
348    public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
349      // read the data
350
351      long start = System.nanoTime();
352      // Uses simple get
353      Result result = table.get(get);
354      long end = System.nanoTime();
355      verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
356    }
357
358    protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
359      Result[] results, Table table, boolean isNullExpected) throws IOException {
360      totalOpTimeMs.addAndGet(elapsedNano / 1000000);
361      numKeys.addAndGet(gets.length);
362      int i = 0;
363      for (Result result : results) {
364        verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
365          isNullExpected);
366      }
367    }
368
369    protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
370      Result result, Table table, boolean isNullExpected) throws IOException {
371      verifyResultsAndUpdateMetrics(verify, new Get[] { get }, elapsedNano, new Result[] { result },
372        table, isNullExpected);
373    }
374
375    private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result,
376      Table table, boolean isNullExpected) throws IOException {
377      if (!result.isEmpty()) {
378        if (verify) {
379          numKeysVerified.incrementAndGet();
380        }
381      } else {
382        HRegionLocation hloc;
383        try (RegionLocator locator = connection.getRegionLocator(tableName)) {
384          hloc = locator.getRegionLocation(get.getRow());
385        }
386        String rowKey = Bytes.toString(get.getRow());
387        LOG.info("Key = " + rowKey + ", Region location: " + hloc);
388        if (isNullExpected) {
389          nullResult.incrementAndGet();
390          LOG.debug("Null result obtained for the key =" + rowKey);
391          return;
392        }
393      }
394      boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
395      long numErrorsAfterThis = 0;
396      if (isOk) {
397        long cols = 0;
398        // Count the columns for reporting purposes.
399        for (byte[] cf : result.getMap().keySet()) {
400          cols += result.getFamilyMap(cf).size();
401        }
402        numCols.addAndGet(cols);
403      } else {
404        if (writer != null) {
405          LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
406        }
407        numErrorsAfterThis = numReadErrors.incrementAndGet();
408      }
409
410      if (numErrorsAfterThis > maxErrors) {
411        LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
412        aborted = true;
413      }
414    }
415  }
416
417  public long getNumReadFailures() {
418    return numReadFailures.get();
419  }
420
421  public long getNumReadErrors() {
422    return numReadErrors.get();
423  }
424
425  public long getNumKeysVerified() {
426    return numKeysVerified.get();
427  }
428
429  public long getNumUniqueKeysVerified() {
430    return numUniqueKeysVerified.get();
431  }
432
433  public long getNullResultsCount() {
434    return nullResult.get();
435  }
436
437  @Override
438  protected String progressInfo() {
439    StringBuilder sb = new StringBuilder();
440    appendToStatus(sb, "verified", numKeysVerified.get());
441    appendToStatus(sb, "READ FAILURES", numReadFailures.get());
442    appendToStatus(sb, "READ ERRORS", numReadErrors.get());
443    appendToStatus(sb, "NULL RESULT", nullResult.get());
444    return sb.toString();
445  }
446}