001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashSet;
023import java.util.Random;
024import java.util.Set;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HRegionLocation;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Consistency;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/** Creates multiple threads that read and verify previously written data */
040public class MultiThreadedReader extends MultiThreadedAction {
041  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class);
042
043  protected Set<HBaseReaderThread> readers = new HashSet<>();
044  private final double verifyPercent;
045  protected volatile boolean aborted;
046
047  protected MultiThreadedWriterBase writer = null;
048
049  /**
050   * The number of keys verified in a sequence. This will never be larger than the total number of
051   * keys in the range. The reader might also verify random keys when it catches up with the writer.
052   */
053  private final AtomicLong numUniqueKeysVerified = new AtomicLong();
054
055  /**
056   * Default maximum number of read errors to tolerate before shutting down all readers.
057   */
058  public static final int DEFAULT_MAX_ERRORS = 10;
059
060  /**
061   * Default "window" size between the last key written by the writer and the key that we attempt to
062   * read. The lower this number, the stricter our testing is. If this is zero, we always attempt to
063   * read the highest key in the contiguous sequence of keys written by the writers.
064   */
065  public static final int DEFAULT_KEY_WINDOW = 0;
066
067  /**
068   * Default batch size for multigets
069   */
070  public static final int DEFAULT_BATCH_SIZE = 1; // translates to simple GET (no multi GET)
071
072  protected AtomicLong numKeysVerified = new AtomicLong(0);
073  protected AtomicLong numReadErrors = new AtomicLong(0);
074  protected AtomicLong numReadFailures = new AtomicLong(0);
075  protected AtomicLong nullResult = new AtomicLong(0);
076  private int maxErrors = DEFAULT_MAX_ERRORS;
077  private int keyWindow = DEFAULT_KEY_WINDOW;
078  private int batchSize = DEFAULT_BATCH_SIZE;
079  private int regionReplicaId = -1; // particular region replica id to do reads against if set
080
081  public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
082    double verifyPercent) throws IOException {
083    super(dataGen, conf, tableName, "R");
084    this.verifyPercent = verifyPercent;
085  }
086
087  public void linkToWriter(MultiThreadedWriterBase writer) {
088    this.writer = writer;
089    writer.setTrackWroteKeys(true);
090  }
091
092  public void setMaxErrors(int maxErrors) {
093    this.maxErrors = maxErrors;
094  }
095
096  public void setKeyWindow(int keyWindow) {
097    this.keyWindow = keyWindow;
098  }
099
100  public void setMultiGetBatchSize(int batchSize) {
101    this.batchSize = batchSize;
102  }
103
104  public void setRegionReplicaId(int regionReplicaId) {
105    this.regionReplicaId = regionReplicaId;
106  }
107
108  @Override
109  public void start(long startKey, long endKey, int numThreads) throws IOException {
110    super.start(startKey, endKey, numThreads);
111    if (verbose) {
112      LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
113    }
114
115    addReaderThreads(numThreads);
116    startThreads(readers);
117  }
118
119  protected void addReaderThreads(int numThreads) throws IOException {
120    for (int i = 0; i < numThreads; ++i) {
121      HBaseReaderThread reader = createReaderThread(i);
122      readers.add(reader);
123    }
124  }
125
126  protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
127    HBaseReaderThread reader = new HBaseReaderThread(readerId);
128    Threads.setLoggingUncaughtExceptionHandler(reader);
129    return reader;
130  }
131
132  public class HBaseReaderThread extends Thread {
133    protected final int readerId;
134    protected final Table table;
135
136    /** The "current" key being read. Increases from startKey to endKey. */
137    private long curKey;
138
139    /** Time when the thread started */
140    protected long startTimeMs;
141
142    /** If we are ahead of the writer and reading a random key. */
143    private boolean readingRandomKey;
144
145    private boolean printExceptionTrace = true;
146
147    /**
148     * @param readerId only the keys with this remainder from division by {@link #numThreads} will
149     *                 be read by this thread
150     */
151    public HBaseReaderThread(int readerId) throws IOException {
152      this.readerId = readerId;
153      table = createTable();
154      setName(getClass().getSimpleName() + "_" + readerId);
155    }
156
157    protected Table createTable() throws IOException {
158      return connection.getTable(tableName);
159    }
160
161    @Override
162    public void run() {
163      try {
164        runReader();
165      } finally {
166        closeTable();
167        numThreadsWorking.decrementAndGet();
168      }
169    }
170
171    protected void closeTable() {
172      try {
173        if (table != null) {
174          table.close();
175        }
176      } catch (IOException e) {
177        LOG.error("Error closing table", e);
178      }
179    }
180
181    private void runReader() {
182      if (verbose) {
183        LOG.info("Started thread #" + readerId + " for reads...");
184      }
185
186      startTimeMs = EnvironmentEdgeManager.currentTime();
187      curKey = startKey;
188      long[] keysForThisReader = new long[batchSize];
189      while (curKey < endKey && !aborted) {
190        int readingRandomKeyStartIndex = -1;
191        int numKeys = 0;
192        // if multiGet, loop until we have the number of keys equal to the batch size
193        do {
194          long k = getNextKeyToRead();
195          if (k < startKey || k >= endKey) {
196            numReadErrors.incrementAndGet();
197            throw new AssertionError("Load tester logic error: proposed key " + "to read " + k
198              + " is out of range (startKey=" + startKey + ", endKey=" + endKey + ")");
199          }
200          if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) {
201            // Skip keys that this thread should not read, as well as the keys
202            // that we know the writer failed to write.
203            continue;
204          }
205          keysForThisReader[numKeys] = k;
206          if (readingRandomKey && readingRandomKeyStartIndex == -1) {
207            // store the first index of a random read
208            readingRandomKeyStartIndex = numKeys;
209          }
210          numKeys++;
211        } while (numKeys < batchSize && curKey < endKey && !aborted);
212
213        if (numKeys > 0) { // meaning there is some key to read
214          readKey(keysForThisReader);
215          // We have verified some unique key(s).
216          numUniqueKeysVerified
217            .getAndAdd(readingRandomKeyStartIndex == -1 ? numKeys : readingRandomKeyStartIndex);
218        }
219      }
220    }
221
222    /**
223     * Should only be used for the concurrent writer/reader workload. The maximum key we are allowed
224     * to read, subject to the "key window" constraint.
225     */
226    private long maxKeyWeCanRead() {
227      long insertedUpToKey = writer.wroteUpToKey();
228      if (insertedUpToKey >= endKey - 1) {
229        // The writer has finished writing our range, so we can read any
230        // key in the range.
231        return endKey - 1;
232      }
233      return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
234    }
235
236    protected long getNextKeyToRead() {
237      readingRandomKey = false;
238      if (writer == null || curKey <= maxKeyWeCanRead()) {
239        return curKey++;
240      }
241
242      // We caught up with the writer. See if we can read any keys at all.
243      long maxKeyToRead;
244      while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
245        // The writer has not written sufficient keys for us to be able to read
246        // anything at all. Sleep a bit. This should only happen in the
247        // beginning of a load test run.
248        Threads.sleepWithoutInterrupt(50);
249      }
250
251      if (curKey <= maxKeyToRead) {
252        // The writer wrote some keys, and we are now allowed to read our
253        // current key.
254        return curKey++;
255      }
256
257      // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
258      // Don't increment the current key -- we still have to try reading it
259      // later. Set a flag to make sure that we don't count this key towards
260      // the set of unique keys we have verified.
261      readingRandomKey = true;
262      return startKey
263        + Math.abs(ThreadLocalRandom.current().nextLong()) % (maxKeyToRead - startKey + 1);
264    }
265
266    private Get[] readKey(long[] keysToRead) {
267      Random rand = ThreadLocalRandom.current();
268      Get[] gets = new Get[keysToRead.length];
269      int i = 0;
270      for (long keyToRead : keysToRead) {
271        try {
272          gets[i] = createGet(keyToRead);
273          if (keysToRead.length == 1) {
274            queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead);
275          }
276          i++;
277        } catch (IOException e) {
278          numReadFailures.addAndGet(1);
279          LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
280            + ", time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
281          if (printExceptionTrace) {
282            LOG.warn(e.toString(), e);
283            printExceptionTrace = false;
284          }
285        }
286      }
287      if (keysToRead.length > 1) {
288        try {
289          queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead);
290        } catch (IOException e) {
291          numReadFailures.addAndGet(gets.length);
292          for (long keyToRead : keysToRead) {
293            LOG.debug(
294              "[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: "
295                + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
296          }
297          if (printExceptionTrace) {
298            LOG.warn(e.toString(), e);
299            printExceptionTrace = false;
300          }
301        }
302      }
303      return gets;
304    }
305
306    protected Get createGet(long keyToRead) throws IOException {
307      Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
308      String cfsString = "";
309      byte[][] columnFamilies = dataGenerator.getColumnFamilies();
310      for (byte[] cf : columnFamilies) {
311        get.addFamily(cf);
312        if (verbose) {
313          if (cfsString.length() > 0) {
314            cfsString += ", ";
315          }
316          cfsString += "[" + Bytes.toStringBinary(cf) + "]";
317        }
318      }
319      get = dataGenerator.beforeGet(keyToRead, get);
320      if (regionReplicaId > 0) {
321        get.setReplicaId(regionReplicaId);
322        get.setConsistency(Consistency.TIMELINE);
323      }
324      if (verbose) {
325        LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
326      }
327      return get;
328    }
329
330    public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
331      // read the data
332      long start = System.nanoTime();
333      // Uses multi/batch gets
334      Result[] results = table.get(Arrays.asList(gets));
335      long end = System.nanoTime();
336      verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
337    }
338
339    public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
340      // read the data
341
342      long start = System.nanoTime();
343      // Uses simple get
344      Result result = table.get(get);
345      long end = System.nanoTime();
346      verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
347    }
348
349    protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
350      Result[] results, Table table, boolean isNullExpected) throws IOException {
351      totalOpTimeMs.addAndGet(elapsedNano / 1000000);
352      numKeys.addAndGet(gets.length);
353      int i = 0;
354      for (Result result : results) {
355        verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
356          isNullExpected);
357      }
358    }
359
360    protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
361      Result result, Table table, boolean isNullExpected) throws IOException {
362      verifyResultsAndUpdateMetrics(verify, new Get[] { get }, elapsedNano, new Result[] { result },
363        table, isNullExpected);
364    }
365
366    private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result,
367      Table table, boolean isNullExpected) throws IOException {
368      if (!result.isEmpty()) {
369        if (verify) {
370          numKeysVerified.incrementAndGet();
371        }
372      } else {
373        HRegionLocation hloc;
374        try (RegionLocator locator = connection.getRegionLocator(tableName)) {
375          hloc = locator.getRegionLocation(get.getRow());
376        }
377        String rowKey = Bytes.toString(get.getRow());
378        LOG.info("Key = " + rowKey + ", Region location: " + hloc);
379        if (isNullExpected) {
380          nullResult.incrementAndGet();
381          LOG.debug("Null result obtained for the key =" + rowKey);
382          return;
383        }
384      }
385      boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
386      long numErrorsAfterThis = 0;
387      if (isOk) {
388        long cols = 0;
389        // Count the columns for reporting purposes.
390        for (byte[] cf : result.getMap().keySet()) {
391          cols += result.getFamilyMap(cf).size();
392        }
393        numCols.addAndGet(cols);
394      } else {
395        if (writer != null) {
396          LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
397        }
398        numErrorsAfterThis = numReadErrors.incrementAndGet();
399      }
400
401      if (numErrorsAfterThis > maxErrors) {
402        LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
403        aborted = true;
404      }
405    }
406  }
407
408  public long getNumReadFailures() {
409    return numReadFailures.get();
410  }
411
412  public long getNumReadErrors() {
413    return numReadErrors.get();
414  }
415
416  public long getNumKeysVerified() {
417    return numKeysVerified.get();
418  }
419
420  public long getNumUniqueKeysVerified() {
421    return numUniqueKeysVerified.get();
422  }
423
424  public long getNullResultsCount() {
425    return nullResult.get();
426  }
427
428  @Override
429  protected String progressInfo() {
430    StringBuilder sb = new StringBuilder();
431    appendToStatus(sb, "verified", numKeysVerified.get());
432    appendToStatus(sb, "READ FAILURES", numReadFailures.get());
433    appendToStatus(sb, "READ ERRORS", numReadErrors.get());
434    appendToStatus(sb, "NULL RESULT", nullResult.get());
435    return sb.toString();
436  }
437}