001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.util;
018
019import java.io.IOException;
020import java.util.Arrays;
021import java.util.HashSet;
022import java.util.Set;
023import java.util.concurrent.atomic.AtomicLong;
024
025import org.apache.commons.lang3.RandomUtils;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HRegionLocation;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Get;
030
031import org.apache.hadoop.hbase.client.Consistency;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/** Creates multiple threads that read and verify previously written data */
039public class MultiThreadedReader extends MultiThreadedAction
040{
041  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class);
042
043  protected Set<HBaseReaderThread> readers = new HashSet<>();
044  private final double verifyPercent;
045  protected volatile boolean aborted;
046
047  protected MultiThreadedWriterBase writer = null;
048
049  /**
050   * The number of keys verified in a sequence. This will never be larger than
051   * the total number of keys in the range. The reader might also verify
052   * random keys when it catches up with the writer.
053   */
054  private final AtomicLong numUniqueKeysVerified = new AtomicLong();
055
056  /**
057   * Default maximum number of read errors to tolerate before shutting down all
058   * readers.
059   */
060  public static final int DEFAULT_MAX_ERRORS = 10;
061
062  /**
063   * Default "window" size between the last key written by the writer and the
064   * key that we attempt to read. The lower this number, the stricter our
065   * testing is. If this is zero, we always attempt to read the highest key
066   * in the contiguous sequence of keys written by the writers.
067   */
068  public static final int DEFAULT_KEY_WINDOW = 0;
069
070  /**
071   * Default batch size for multigets
072   */
073  public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET)
074
075  protected AtomicLong numKeysVerified = new AtomicLong(0);
076  protected AtomicLong numReadErrors = new AtomicLong(0);
077  protected AtomicLong numReadFailures = new AtomicLong(0);
078  protected AtomicLong nullResult = new AtomicLong(0);
079  private int maxErrors = DEFAULT_MAX_ERRORS;
080  private int keyWindow = DEFAULT_KEY_WINDOW;
081  private int batchSize = DEFAULT_BATCH_SIZE;
082  private int regionReplicaId = -1; // particular region replica id to do reads against if set
083
084  public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
085      TableName tableName, double verifyPercent) throws IOException {
086    super(dataGen, conf, tableName, "R");
087    this.verifyPercent = verifyPercent;
088  }
089
090  public void linkToWriter(MultiThreadedWriterBase writer) {
091    this.writer = writer;
092    writer.setTrackWroteKeys(true);
093  }
094
095  public void setMaxErrors(int maxErrors) {
096    this.maxErrors = maxErrors;
097  }
098
099  public void setKeyWindow(int keyWindow) {
100    this.keyWindow = keyWindow;
101  }
102
103  public void setMultiGetBatchSize(int batchSize) {
104    this.batchSize = batchSize;
105  }
106
107  public void setRegionReplicaId(int regionReplicaId) {
108    this.regionReplicaId = regionReplicaId;
109  }
110
111  @Override
112  public void start(long startKey, long endKey, int numThreads) throws IOException {
113    super.start(startKey, endKey, numThreads);
114    if (verbose) {
115      LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
116    }
117
118    addReaderThreads(numThreads);
119    startThreads(readers);
120  }
121
122  protected void addReaderThreads(int numThreads) throws IOException {
123    for (int i = 0; i < numThreads; ++i) {
124      HBaseReaderThread reader = createReaderThread(i);
125      readers.add(reader);
126    }
127  }
128
129  protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
130    HBaseReaderThread reader = new HBaseReaderThread(readerId);
131    Threads.setLoggingUncaughtExceptionHandler(reader);
132    return reader;
133  }
134
135  public class HBaseReaderThread extends Thread {
136    protected final int readerId;
137    protected final Table table;
138
139    /** The "current" key being read. Increases from startKey to endKey. */
140    private long curKey;
141
142    /** Time when the thread started */
143    protected long startTimeMs;
144
145    /** If we are ahead of the writer and reading a random key. */
146    private boolean readingRandomKey;
147
148    private boolean printExceptionTrace = true;
149
150    /**
151     * @param readerId only the keys with this remainder from division by
152     *          {@link #numThreads} will be read by this thread
153     */
154    public HBaseReaderThread(int readerId) throws IOException {
155      this.readerId = readerId;
156      table = createTable();
157      setName(getClass().getSimpleName() + "_" + readerId);
158    }
159
160    protected Table createTable() throws IOException {
161      return connection.getTable(tableName);
162    }
163
164    @Override
165    public void run() {
166      try {
167        runReader();
168      } finally {
169        closeTable();
170        numThreadsWorking.decrementAndGet();
171      }
172    }
173
174    protected void closeTable() {
175      try {
176        if (table != null) {
177          table.close();
178        }
179      } catch (IOException e) {
180        LOG.error("Error closing table", e);
181      }
182    }
183
184    private void runReader() {
185      if (verbose) {
186        LOG.info("Started thread #" + readerId + " for reads...");
187      }
188
189      startTimeMs = System.currentTimeMillis();
190      curKey = startKey;
191      long [] keysForThisReader = new long[batchSize];
192      while (curKey < endKey && !aborted) {
193        int readingRandomKeyStartIndex = -1;
194        int numKeys = 0;
195        // if multiGet, loop until we have the number of keys equal to the batch size
196        do {
197          long k = getNextKeyToRead();
198          if (k < startKey || k >= endKey) {
199            numReadErrors.incrementAndGet();
200            throw new AssertionError("Load tester logic error: proposed key " +
201                "to read " + k + " is out of range (startKey=" + startKey +
202                ", endKey=" + endKey + ")");
203          }
204          if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) {
205            // Skip keys that this thread should not read, as well as the keys
206            // that we know the writer failed to write.
207            continue;
208          }
209          keysForThisReader[numKeys] = k;
210          if (readingRandomKey && readingRandomKeyStartIndex == -1) {
211            //store the first index of a random read
212            readingRandomKeyStartIndex = numKeys;
213          }
214          numKeys++;
215        } while (numKeys < batchSize && curKey < endKey && !aborted);
216
217        if (numKeys > 0) { //meaning there is some key to read
218          readKey(keysForThisReader);
219          // We have verified some unique key(s).
220          numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ?
221              numKeys : readingRandomKeyStartIndex);
222        }
223      }
224    }
225
226    /**
227     * Should only be used for the concurrent writer/reader workload. The
228     * maximum key we are allowed to read, subject to the "key window"
229     * constraint.
230     */
231    private long maxKeyWeCanRead() {
232      long insertedUpToKey = writer.wroteUpToKey();
233      if (insertedUpToKey >= endKey - 1) {
234        // The writer has finished writing our range, so we can read any
235        // key in the range.
236        return endKey - 1;
237      }
238      return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
239    }
240
241    protected long getNextKeyToRead() {
242      readingRandomKey = false;
243      if (writer == null || curKey <= maxKeyWeCanRead()) {
244        return curKey++;
245      }
246
247      // We caught up with the writer. See if we can read any keys at all.
248      long maxKeyToRead;
249      while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
250        // The writer has not written sufficient keys for us to be able to read
251        // anything at all. Sleep a bit. This should only happen in the
252        // beginning of a load test run.
253        Threads.sleepWithoutInterrupt(50);
254      }
255
256      if (curKey <= maxKeyToRead) {
257        // The writer wrote some keys, and we are now allowed to read our
258        // current key.
259        return curKey++;
260      }
261
262      // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
263      // Don't increment the current key -- we still have to try reading it
264      // later. Set a flag to make sure that we don't count this key towards
265      // the set of unique keys we have verified.
266      readingRandomKey = true;
267      return startKey + Math.abs(RandomUtils.nextLong())
268          % (maxKeyToRead - startKey + 1);
269    }
270
271    private Get[] readKey(long[] keysToRead) {
272      Get [] gets = new Get[keysToRead.length];
273      int i = 0;
274      for (long keyToRead : keysToRead) {
275        try {
276          gets[i] = createGet(keyToRead);
277          if (keysToRead.length == 1) {
278            queryKey(gets[i], RandomUtils.nextInt(0, 100) < verifyPercent, keyToRead);
279          }
280          i++;
281        } catch (IOException e) {
282          numReadFailures.addAndGet(1);
283          LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
284              + ", time from start: "
285              + (System.currentTimeMillis() - startTimeMs) + " ms");
286          if (printExceptionTrace) {
287            LOG.warn(e.toString(), e);
288            printExceptionTrace = false;
289          }
290        }
291      }
292      if (keysToRead.length > 1) {
293        try {
294          queryKey(gets, RandomUtils.nextInt(0, 100) < verifyPercent, keysToRead);
295        } catch (IOException e) {
296          numReadFailures.addAndGet(gets.length);
297          for (long keyToRead : keysToRead) {
298            LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
299                + ", time from start: "
300                + (System.currentTimeMillis() - startTimeMs) + " ms");
301          }
302          if (printExceptionTrace) {
303            LOG.warn(e.toString(), e);
304            printExceptionTrace = false;
305          }
306        }
307      }
308      return gets;
309    }
310
311    protected Get createGet(long keyToRead) throws IOException {
312      Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
313      String cfsString = "";
314      byte[][] columnFamilies = dataGenerator.getColumnFamilies();
315      for (byte[] cf : columnFamilies) {
316        get.addFamily(cf);
317        if (verbose) {
318          if (cfsString.length() > 0) {
319            cfsString += ", ";
320          }
321          cfsString += "[" + Bytes.toStringBinary(cf) + "]";
322        }
323      }
324      get = dataGenerator.beforeGet(keyToRead, get);
325      if (regionReplicaId > 0) {
326        get.setReplicaId(regionReplicaId);
327        get.setConsistency(Consistency.TIMELINE);
328      }
329      if (verbose) {
330        LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
331      }
332      return get;
333    }
334
335    public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
336      // read the data
337      long start = System.nanoTime();
338      // Uses multi/batch gets
339      Result[] results = table.get(Arrays.asList(gets));
340      long end = System.nanoTime();
341      verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
342    }
343
344    public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
345      // read the data
346
347      long start = System.nanoTime();
348      // Uses simple get
349      Result result = table.get(get);
350      long end = System.nanoTime();
351      verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
352    }
353
354    protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
355        Result[] results, Table table, boolean isNullExpected)
356        throws IOException {
357      totalOpTimeMs.addAndGet(elapsedNano / 1000000);
358      numKeys.addAndGet(gets.length);
359      int i = 0;
360      for (Result result : results) {
361        verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
362            isNullExpected);
363      }
364    }
365
366    protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
367        Result result, Table table, boolean isNullExpected)
368        throws IOException {
369      verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
370          new Result[]{result}, table, isNullExpected);
371    }
372
373    private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
374        Result result, Table table, boolean isNullExpected) throws IOException {
375      if (!result.isEmpty()) {
376        if (verify) {
377          numKeysVerified.incrementAndGet();
378        }
379      } else {
380        HRegionLocation hloc = connection.getRegionLocation(tableName,
381          get.getRow(), false);
382        String rowKey = Bytes.toString(get.getRow());
383        LOG.info("Key = " + rowKey + ", Region location: " + hloc);
384        if(isNullExpected) {
385          nullResult.incrementAndGet();
386          LOG.debug("Null result obtained for the key ="+rowKey);
387          return;
388        }
389      }
390      boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
391      long numErrorsAfterThis = 0;
392      if (isOk) {
393        long cols = 0;
394        // Count the columns for reporting purposes.
395        for (byte[] cf : result.getMap().keySet()) {
396          cols += result.getFamilyMap(cf).size();
397        }
398        numCols.addAndGet(cols);
399      } else {
400        if (writer != null) {
401          LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
402        }
403        numErrorsAfterThis = numReadErrors.incrementAndGet();
404      }
405
406      if (numErrorsAfterThis > maxErrors) {
407        LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
408        aborted = true;
409      }
410    }
411  }
412
413  public long getNumReadFailures() {
414    return numReadFailures.get();
415  }
416
417  public long getNumReadErrors() {
418    return numReadErrors.get();
419  }
420
421  public long getNumKeysVerified() {
422    return numKeysVerified.get();
423  }
424
425  public long getNumUniqueKeysVerified() {
426    return numUniqueKeysVerified.get();
427  }
428
429  public long getNullResultsCount() {
430    return nullResult.get();
431  }
432
433  @Override
434  protected String progressInfo() {
435    StringBuilder sb = new StringBuilder();
436    appendToStatus(sb, "verified", numKeysVerified.get());
437    appendToStatus(sb, "READ FAILURES", numReadFailures.get());
438    appendToStatus(sb, "READ ERRORS", numReadErrors.get());
439    appendToStatus(sb, "NULL RESULT", nullResult.get());
440    return sb.toString();
441  }
442}