001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashSet;
023import java.util.Random;
024import java.util.Set;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HRegionLocation;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Consistency;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/** Creates multiple threads that read and verify previously written data */
039public class MultiThreadedReader extends MultiThreadedAction {
040  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class);
041
042  protected Set<HBaseReaderThread> readers = new HashSet<>();
043  private final double verifyPercent;
044  protected volatile boolean aborted;
045
046  protected MultiThreadedWriterBase writer = null;
047
048  /**
049   * The number of keys verified in a sequence. This will never be larger than the total number of
050   * keys in the range. The reader might also verify random keys when it catches up with the writer.
051   */
052  private final AtomicLong numUniqueKeysVerified = new AtomicLong();
053
054  /**
055   * Default maximum number of read errors to tolerate before shutting down all readers.
056   */
057  public static final int DEFAULT_MAX_ERRORS = 10;
058
059  /**
060   * Default "window" size between the last key written by the writer and the key that we attempt to
061   * read. The lower this number, the stricter our testing is. If this is zero, we always attempt to
062   * read the highest key in the contiguous sequence of keys written by the writers.
063   */
064  public static final int DEFAULT_KEY_WINDOW = 0;
065
066  /**
067   * Default batch size for multigets
068   */
069  public static final int DEFAULT_BATCH_SIZE = 1; // translates to simple GET (no multi GET)
070
071  protected AtomicLong numKeysVerified = new AtomicLong(0);
072  protected AtomicLong numReadErrors = new AtomicLong(0);
073  protected AtomicLong numReadFailures = new AtomicLong(0);
074  protected AtomicLong nullResult = new AtomicLong(0);
075  private int maxErrors = DEFAULT_MAX_ERRORS;
076  private int keyWindow = DEFAULT_KEY_WINDOW;
077  private int batchSize = DEFAULT_BATCH_SIZE;
078  private int regionReplicaId = -1; // particular region replica id to do reads against if set
079
080  public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
081    double verifyPercent) throws IOException {
082    super(dataGen, conf, tableName, "R");
083    this.verifyPercent = verifyPercent;
084  }
085
086  public void linkToWriter(MultiThreadedWriterBase writer) {
087    this.writer = writer;
088    writer.setTrackWroteKeys(true);
089  }
090
091  public void setMaxErrors(int maxErrors) {
092    this.maxErrors = maxErrors;
093  }
094
095  public void setKeyWindow(int keyWindow) {
096    this.keyWindow = keyWindow;
097  }
098
099  public void setMultiGetBatchSize(int batchSize) {
100    this.batchSize = batchSize;
101  }
102
103  public void setRegionReplicaId(int regionReplicaId) {
104    this.regionReplicaId = regionReplicaId;
105  }
106
107  @Override
108  public void start(long startKey, long endKey, int numThreads) throws IOException {
109    super.start(startKey, endKey, numThreads);
110    if (verbose) {
111      LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
112    }
113
114    addReaderThreads(numThreads);
115    startThreads(readers);
116  }
117
118  protected void addReaderThreads(int numThreads) throws IOException {
119    for (int i = 0; i < numThreads; ++i) {
120      HBaseReaderThread reader = createReaderThread(i);
121      readers.add(reader);
122    }
123  }
124
125  protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
126    HBaseReaderThread reader = new HBaseReaderThread(readerId);
127    Threads.setLoggingUncaughtExceptionHandler(reader);
128    return reader;
129  }
130
131  public class HBaseReaderThread extends Thread {
132    protected final int readerId;
133    protected final Table table;
134
135    /** The "current" key being read. Increases from startKey to endKey. */
136    private long curKey;
137
138    /** Time when the thread started */
139    protected long startTimeMs;
140
141    /** If we are ahead of the writer and reading a random key. */
142    private boolean readingRandomKey;
143
144    private boolean printExceptionTrace = true;
145
146    /**
147     * @param readerId only the keys with this remainder from division by {@link #numThreads} will
148     *                 be read by this thread
149     */
150    public HBaseReaderThread(int readerId) throws IOException {
151      this.readerId = readerId;
152      table = createTable();
153      setName(getClass().getSimpleName() + "_" + readerId);
154    }
155
156    protected Table createTable() throws IOException {
157      return connection.getTable(tableName);
158    }
159
160    @Override
161    public void run() {
162      try {
163        runReader();
164      } finally {
165        closeTable();
166        numThreadsWorking.decrementAndGet();
167      }
168    }
169
170    protected void closeTable() {
171      try {
172        if (table != null) {
173          table.close();
174        }
175      } catch (IOException e) {
176        LOG.error("Error closing table", e);
177      }
178    }
179
180    private void runReader() {
181      if (verbose) {
182        LOG.info("Started thread #" + readerId + " for reads...");
183      }
184
185      startTimeMs = EnvironmentEdgeManager.currentTime();
186      curKey = startKey;
187      long[] keysForThisReader = new long[batchSize];
188      while (curKey < endKey && !aborted) {
189        int readingRandomKeyStartIndex = -1;
190        int numKeys = 0;
191        // if multiGet, loop until we have the number of keys equal to the batch size
192        do {
193          long k = getNextKeyToRead();
194          if (k < startKey || k >= endKey) {
195            numReadErrors.incrementAndGet();
196            throw new AssertionError("Load tester logic error: proposed key " + "to read " + k
197              + " is out of range (startKey=" + startKey + ", endKey=" + endKey + ")");
198          }
199          if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) {
200            // Skip keys that this thread should not read, as well as the keys
201            // that we know the writer failed to write.
202            continue;
203          }
204          keysForThisReader[numKeys] = k;
205          if (readingRandomKey && readingRandomKeyStartIndex == -1) {
206            // store the first index of a random read
207            readingRandomKeyStartIndex = numKeys;
208          }
209          numKeys++;
210        } while (numKeys < batchSize && curKey < endKey && !aborted);
211
212        if (numKeys > 0) { // meaning there is some key to read
213          readKey(keysForThisReader);
214          // We have verified some unique key(s).
215          numUniqueKeysVerified
216            .getAndAdd(readingRandomKeyStartIndex == -1 ? numKeys : readingRandomKeyStartIndex);
217        }
218      }
219    }
220
221    /**
222     * Should only be used for the concurrent writer/reader workload. The maximum key we are allowed
223     * to read, subject to the "key window" constraint.
224     */
225    private long maxKeyWeCanRead() {
226      long insertedUpToKey = writer.wroteUpToKey();
227      if (insertedUpToKey >= endKey - 1) {
228        // The writer has finished writing our range, so we can read any
229        // key in the range.
230        return endKey - 1;
231      }
232      return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
233    }
234
235    protected long getNextKeyToRead() {
236      readingRandomKey = false;
237      if (writer == null || curKey <= maxKeyWeCanRead()) {
238        return curKey++;
239      }
240
241      // We caught up with the writer. See if we can read any keys at all.
242      long maxKeyToRead;
243      while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
244        // The writer has not written sufficient keys for us to be able to read
245        // anything at all. Sleep a bit. This should only happen in the
246        // beginning of a load test run.
247        Threads.sleepWithoutInterrupt(50);
248      }
249
250      if (curKey <= maxKeyToRead) {
251        // The writer wrote some keys, and we are now allowed to read our
252        // current key.
253        return curKey++;
254      }
255
256      // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
257      // Don't increment the current key -- we still have to try reading it
258      // later. Set a flag to make sure that we don't count this key towards
259      // the set of unique keys we have verified.
260      readingRandomKey = true;
261      return startKey
262        + Math.abs(ThreadLocalRandom.current().nextLong()) % (maxKeyToRead - startKey + 1);
263    }
264
265    private Get[] readKey(long[] keysToRead) {
266      Random rand = ThreadLocalRandom.current();
267      Get[] gets = new Get[keysToRead.length];
268      int i = 0;
269      for (long keyToRead : keysToRead) {
270        try {
271          gets[i] = createGet(keyToRead);
272          if (keysToRead.length == 1) {
273            queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead);
274          }
275          i++;
276        } catch (IOException e) {
277          numReadFailures.addAndGet(1);
278          LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
279            + ", time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
280          if (printExceptionTrace) {
281            LOG.warn(e.toString(), e);
282            printExceptionTrace = false;
283          }
284        }
285      }
286      if (keysToRead.length > 1) {
287        try {
288          queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead);
289        } catch (IOException e) {
290          numReadFailures.addAndGet(gets.length);
291          for (long keyToRead : keysToRead) {
292            LOG.debug(
293              "[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: "
294                + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
295          }
296          if (printExceptionTrace) {
297            LOG.warn(e.toString(), e);
298            printExceptionTrace = false;
299          }
300        }
301      }
302      return gets;
303    }
304
305    protected Get createGet(long keyToRead) throws IOException {
306      Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
307      String cfsString = "";
308      byte[][] columnFamilies = dataGenerator.getColumnFamilies();
309      for (byte[] cf : columnFamilies) {
310        get.addFamily(cf);
311        if (verbose) {
312          if (cfsString.length() > 0) {
313            cfsString += ", ";
314          }
315          cfsString += "[" + Bytes.toStringBinary(cf) + "]";
316        }
317      }
318      get = dataGenerator.beforeGet(keyToRead, get);
319      if (regionReplicaId > 0) {
320        get.setReplicaId(regionReplicaId);
321        get.setConsistency(Consistency.TIMELINE);
322      }
323      if (verbose) {
324        LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
325      }
326      return get;
327    }
328
329    public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
330      // read the data
331      long start = System.nanoTime();
332      // Uses multi/batch gets
333      Result[] results = table.get(Arrays.asList(gets));
334      long end = System.nanoTime();
335      verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
336    }
337
338    public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
339      // read the data
340
341      long start = System.nanoTime();
342      // Uses simple get
343      Result result = table.get(get);
344      long end = System.nanoTime();
345      verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
346    }
347
348    protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
349      Result[] results, Table table, boolean isNullExpected) throws IOException {
350      totalOpTimeMs.addAndGet(elapsedNano / 1000000);
351      numKeys.addAndGet(gets.length);
352      int i = 0;
353      for (Result result : results) {
354        verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
355          isNullExpected);
356      }
357    }
358
359    protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
360      Result result, Table table, boolean isNullExpected) throws IOException {
361      verifyResultsAndUpdateMetrics(verify, new Get[] { get }, elapsedNano, new Result[] { result },
362        table, isNullExpected);
363    }
364
365    private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result,
366      Table table, boolean isNullExpected) throws IOException {
367      if (!result.isEmpty()) {
368        if (verify) {
369          numKeysVerified.incrementAndGet();
370        }
371      } else {
372        HRegionLocation hloc = connection.getRegionLocation(tableName, get.getRow(), false);
373        String rowKey = Bytes.toString(get.getRow());
374        LOG.info("Key = " + rowKey + ", Region location: " + hloc);
375        if (isNullExpected) {
376          nullResult.incrementAndGet();
377          LOG.debug("Null result obtained for the key =" + rowKey);
378          return;
379        }
380      }
381      boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
382      long numErrorsAfterThis = 0;
383      if (isOk) {
384        long cols = 0;
385        // Count the columns for reporting purposes.
386        for (byte[] cf : result.getMap().keySet()) {
387          cols += result.getFamilyMap(cf).size();
388        }
389        numCols.addAndGet(cols);
390      } else {
391        if (writer != null) {
392          LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
393        }
394        numErrorsAfterThis = numReadErrors.incrementAndGet();
395      }
396
397      if (numErrorsAfterThis > maxErrors) {
398        LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
399        aborted = true;
400      }
401    }
402  }
403
404  public long getNumReadFailures() {
405    return numReadFailures.get();
406  }
407
408  public long getNumReadErrors() {
409    return numReadErrors.get();
410  }
411
412  public long getNumKeysVerified() {
413    return numKeysVerified.get();
414  }
415
416  public long getNumUniqueKeysVerified() {
417    return numUniqueKeysVerified.get();
418  }
419
420  public long getNullResultsCount() {
421    return nullResult.get();
422  }
423
424  @Override
425  protected String progressInfo() {
426    StringBuilder sb = new StringBuilder();
427    appendToStatus(sb, "verified", numKeysVerified.get());
428    appendToStatus(sb, "READ FAILURES", numReadFailures.get());
429    appendToStatus(sb, "READ ERRORS", numReadErrors.get());
430    appendToStatus(sb, "NULL RESULT", nullResult.get());
431    return sb.toString();
432  }
433}