001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
022
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicLong;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.RegionLocator;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
043import org.apache.hadoop.util.StringUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
049
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
051
052/**
053 * Common base class for reader and writer parts of multi-thread HBase load test (See LoadTestTool).
054 */
055@InterfaceAudience.Private
056public abstract class MultiThreadedAction {
057  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class);
058
059  protected final TableName tableName;
060  protected final Configuration conf;
061  protected final Connection connection; // all reader / writer threads will share this connection
062
063  protected int numThreads = 1;
064
065  /** The start key of the key range, inclusive */
066  protected long startKey = 0;
067
068  /** The end key of the key range, exclusive */
069  protected long endKey = 1;
070
071  protected AtomicInteger numThreadsWorking = new AtomicInteger();
072  protected AtomicLong numKeys = new AtomicLong();
073  protected AtomicLong numCols = new AtomicLong();
074  protected AtomicLong totalOpTimeMs = new AtomicLong();
075  protected boolean verbose = false;
076
077  protected LoadTestDataGenerator dataGenerator = null;
078
079  /**
080   * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed set of
081   * column families, and random number of columns in range. The table for it can be created
082   * manually or, for example, via
083   * {@link org.apache.hadoop.hbase.util.LoadTestUtil#createPreSplitLoadTestTable(Configuration, TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm, org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)}
084   */
085  public static class DefaultDataGenerator extends LoadTestDataGenerator {
086    private byte[][] columnFamilies = null;
087    private int minColumnsPerKey;
088    private int maxColumnsPerKey;
089
090    public DefaultDataGenerator(int minValueSize, int maxValueSize, int minColumnsPerKey,
091      int maxColumnsPerKey, byte[]... columnFamilies) {
092      super(minValueSize, maxValueSize);
093      this.columnFamilies = columnFamilies;
094      this.minColumnsPerKey = minColumnsPerKey;
095      this.maxColumnsPerKey = maxColumnsPerKey;
096    }
097
098    public DefaultDataGenerator(byte[]... columnFamilies) {
099      // Default values for tests that didn't care to provide theirs.
100      this(256, 1024, 1, 10, columnFamilies);
101    }
102
103    @Override
104    public byte[] getDeterministicUniqueKey(long keyBase) {
105      return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase));
106    }
107
108    @Override
109    public byte[][] getColumnFamilies() {
110      return columnFamilies;
111    }
112
113    @Override
114    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
115      int numColumns = minColumnsPerKey
116        + ThreadLocalRandom.current().nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
117      byte[][] columns = new byte[numColumns][];
118      for (int i = 0; i < numColumns; ++i) {
119        columns[i] = Bytes.toBytes(Integer.toString(i));
120      }
121      return columns;
122    }
123
124    @Override
125    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
126      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
127    }
128
129    @Override
130    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
131      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
132    }
133
134    @Override
135    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
136      return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
137    }
138  }
139
140  /** "R" or "W" */
141  private String actionLetter;
142
143  /** Whether we need to print out Hadoop Streaming-style counters */
144  private boolean streamingCounters;
145
146  public static final int REPORTING_INTERVAL_MS = 5000;
147
148  public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
149    String actionLetter) throws IOException {
150    this.conf = conf;
151    this.dataGenerator = dataGen;
152    this.tableName = tableName;
153    this.actionLetter = actionLetter;
154    this.connection = ConnectionFactory.createConnection(conf);
155  }
156
157  public void start(long startKey, long endKey, int numThreads) throws IOException {
158    this.startKey = startKey;
159    this.endKey = endKey;
160    this.numThreads = numThreads;
161    (new Thread(new ProgressReporter(actionLetter),
162      "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start();
163  }
164
165  private static String formatTime(long elapsedTime) {
166    String format = String.format("%%0%dd", 2);
167    elapsedTime = elapsedTime / 1000;
168    String seconds = String.format(format, elapsedTime % 60);
169    String minutes = String.format(format, (elapsedTime % 3600) / 60);
170    String hours = String.format(format, elapsedTime / 3600);
171    String time = hours + ":" + minutes + ":" + seconds;
172    return time;
173  }
174
175  /** Asynchronously reports progress */
176  private class ProgressReporter implements Runnable {
177
178    private String reporterId = "";
179
180    public ProgressReporter(String id) {
181      this.reporterId = id;
182    }
183
184    @Override
185    public void run() {
186      long startTime = EnvironmentEdgeManager.currentTime();
187      long priorNumKeys = 0;
188      long priorCumulativeOpTime = 0;
189      int priorAverageKeysPerSecond = 0;
190
191      // Give other threads time to start.
192      Threads.sleep(REPORTING_INTERVAL_MS);
193
194      while (numThreadsWorking.get() != 0) {
195        String threadsLeft = "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
196        if (numKeys.get() == 0) {
197          LOG.info(threadsLeft + "Number of keys = 0");
198        } else {
199          long numKeys = MultiThreadedAction.this.numKeys.get();
200          long time = EnvironmentEdgeManager.currentTime() - startTime;
201          long totalOpTime = totalOpTimeMs.get();
202
203          long numKeysDelta = numKeys - priorNumKeys;
204          long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
205
206          double averageKeysPerSecond = (time > 0) ? (numKeys * 1000.0 / time) : 0;
207
208          LOG.info(threadsLeft + "Keys=" + numKeys + ", cols="
209            + StringUtils.humanReadableInt(numCols.get()) + ", time=" + formatTime(time)
210            + ((numKeys > 0 && time > 0)
211              ? (" Overall: [" + "keys/s= " + (numKeys * 1000.0 / time) + ", latency="
212                + String.format("%.2f", (double) totalOpTime / (double) numKeys) + " ms]")
213              : "")
214            + ((numKeysDelta > 0)
215              ? (" Current: [" + "keys/s=" + (numKeysDelta * 1000.0 / REPORTING_INTERVAL_MS)
216                + ", latency="
217                + String.format("%.2f", (double) totalOpTimeDelta / (double) numKeysDelta) + " ms]")
218              : "")
219            + progressInfo());
220
221          if (streamingCounters) {
222            printStreamingCounters(numKeysDelta, averageKeysPerSecond - priorAverageKeysPerSecond);
223          }
224
225          priorNumKeys = numKeys;
226          priorCumulativeOpTime = totalOpTime;
227          priorAverageKeysPerSecond = (int) averageKeysPerSecond;
228        }
229
230        Threads.sleep(REPORTING_INTERVAL_MS);
231      }
232    }
233
234    private void printStreamingCounters(long numKeysDelta, double avgKeysPerSecondDelta) {
235      // Write stats in a format that can be interpreted as counters by
236      // streaming map-reduce jobs.
237      System.err.println("reporter:counter:numKeys," + reporterId + "," + numKeysDelta);
238      System.err.println("reporter:counter:numCols," + reporterId + "," + numCols.get());
239      System.err.println(
240        "reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta));
241    }
242  }
243
244  public void close() {
245    if (connection != null) {
246      try {
247        connection.close();
248      } catch (Exception ex) {
249        LOG.warn("Could not close the connection: " + ex);
250      }
251    }
252  }
253
254  public void waitForFinish() {
255    while (numThreadsWorking.get() != 0) {
256      Threads.sleepWithoutInterrupt(1000);
257    }
258    close();
259  }
260
261  public boolean isDone() {
262    return (numThreadsWorking.get() == 0);
263  }
264
265  protected void startThreads(Collection<? extends Thread> threads) {
266    numThreadsWorking.addAndGet(threads.size());
267    for (Thread thread : threads) {
268      thread.start();
269    }
270  }
271
272  /** Returns the end key of the key range, exclusive */
273  public long getEndKey() {
274    return endKey;
275  }
276
277  /** Returns a task-specific progress string */
278  protected abstract String progressInfo();
279
280  protected static void appendToStatus(StringBuilder sb, String desc, long v) {
281    if (v == 0) {
282      return;
283    }
284    sb.append(", ");
285    sb.append(desc);
286    sb.append("=");
287    sb.append(v);
288  }
289
290  protected static void appendToStatus(StringBuilder sb, String desc, String v) {
291    sb.append(", ");
292    sb.append(desc);
293    sb.append("=");
294    sb.append(v);
295  }
296
297  /**
298   * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. Does not verify
299   * cf/column integrity.
300   */
301  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
302    return verifyResultAgainstDataGenerator(result, verifyValues, false);
303  }
304
305  /**
306   * Verifies the result from get or scan using the dataGenerator (that was presumably also used to
307   * generate said result).
308   * @param verifyValues               verify that values in the result make sense for row/cf/column
309   *                                   combination
310   * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
311   *                                   that to use this multiPut should be used, or verification has
312   *                                   to happen after writes, otherwise there can be races.
313   * @return true if the values of row result makes sense for row/cf/column combination and true if
314   *         the cf/column set in the result is complete, false otherwise.
315   */
316  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
317    boolean verifyCfAndColumnIntegrity) {
318    String rowKeyStr = Bytes.toString(result.getRow());
319    // See if we have any data at all.
320    if (result.isEmpty()) {
321      LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
322      printLocations(result);
323      return false;
324    }
325
326    if (!verifyValues && !verifyCfAndColumnIntegrity) {
327      return true; // as long as we have something, we are good.
328    }
329
330    // See if we have all the CFs.
331    byte[][] expectedCfs = dataGenerator.getColumnFamilies();
332    if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
333      LOG.error("Error checking data for key [" + rowKeyStr + "], bad family count: "
334        + result.getMap().size());
335      printLocations(result);
336      return false;
337    }
338
339    // Verify each column family from get in the result.
340    for (byte[] cf : result.getMap().keySet()) {
341      String cfStr = Bytes.toString(cf);
342      Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
343      if (columnValues == null) {
344        LOG.error(
345          "Error checking data for key [" + rowKeyStr + "], no data for family [" + cfStr + "]]");
346        printLocations(result);
347        return false;
348      }
349
350      Map<String, MutationType> mutateInfo = null;
351      if (verifyCfAndColumnIntegrity || verifyValues) {
352        if (!columnValues.containsKey(MUTATE_INFO)) {
353          LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
354            + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
355          printLocations(result);
356          return false;
357        }
358
359        long cfHash = Arrays.hashCode(cf);
360        // Verify deleted columns, and make up column counts if deleted
361        byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
362        mutateInfo = parseMutateInfo(mutateInfoValue);
363        for (Map.Entry<String, MutationType> mutate : mutateInfo.entrySet()) {
364          if (mutate.getValue() == MutationType.DELETE) {
365            byte[] column = Bytes.toBytes(mutate.getKey());
366            long columnHash = Arrays.hashCode(column);
367            long hashCode = cfHash + columnHash;
368            if (hashCode % 2 == 0) {
369              if (columnValues.containsKey(column)) {
370                LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
371                  + "], column [" + mutate.getKey() + "]; should be deleted");
372                printLocations(result);
373                return false;
374              }
375              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
376              columnValues.put(column, hashCodeBytes);
377            }
378          }
379        }
380
381        // Verify increment
382        if (!columnValues.containsKey(INCREMENT)) {
383          LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
384            + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
385          printLocations(result);
386          return false;
387        }
388        long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
389        if (verifyValues) {
390          long amount = mutateInfo.isEmpty() ? 0 : cfHash;
391          long originalValue = Arrays.hashCode(result.getRow());
392          long extra = currentValue - originalValue;
393          if (extra != 0 && (amount == 0 || extra % amount != 0)) {
394            LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
395              + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
396            printLocations(result);
397            return false;
398          }
399          if (amount != 0 && extra != amount) {
400            LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" + cfStr
401              + "], column [increment], incremented [" + (extra / amount) + "] times");
402          }
403        }
404
405        // See if we have correct columns.
406        if (
407          verifyCfAndColumnIntegrity
408            && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())
409        ) {
410          StringBuilder colsStr = new StringBuilder();
411          for (byte[] col : columnValues.keySet()) {
412            if (colsStr.length() > 0) {
413              colsStr.append(", ");
414            }
415            colsStr.append("[").append(Bytes.toString(col)).append("]");
416          }
417          LOG.error("Error checking data for key [{}], bad columns for family [{}]: {}", rowKeyStr,
418            cfStr, colsStr.toString());
419          printLocations(result);
420          return false;
421        }
422        // See if values check out.
423        if (verifyValues) {
424          for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
425            String column = Bytes.toString(kv.getKey());
426            MutationType mutation = mutateInfo.get(column);
427            boolean verificationNeeded = true;
428            byte[] bytes = kv.getValue();
429            if (mutation != null) {
430              boolean mutationVerified = true;
431              long columnHash = Arrays.hashCode(kv.getKey());
432              long hashCode = cfHash + columnHash;
433              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
434              if (mutation == MutationType.APPEND) {
435                int offset = bytes.length - hashCodeBytes.length;
436                mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes, 0,
437                  hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
438                if (mutationVerified) {
439                  int n = 1;
440                  while (true) {
441                    int newOffset = offset - hashCodeBytes.length;
442                    if (
443                      newOffset < 0 || !Bytes.equals(hashCodeBytes, 0, hashCodeBytes.length, bytes,
444                        newOffset, hashCodeBytes.length)
445                    ) {
446                      break;
447                    }
448                    offset = newOffset;
449                    n++;
450                  }
451                  if (n > 1) {
452                    LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
453                      + cfStr + "], column [" + column + "], appended [" + n + "] times");
454                  }
455                  byte[] dest = new byte[offset];
456                  System.arraycopy(bytes, 0, dest, 0, offset);
457                  bytes = dest;
458                }
459              } else if (hashCode % 2 == 0) { // checkAndPut
460                mutationVerified = Bytes.equals(bytes, hashCodeBytes);
461                verificationNeeded = false;
462              }
463              if (!mutationVerified) {
464                LOG.error("Error checking data for key [" + rowKeyStr
465                  + "], mutation checking failed for column family [" + cfStr + "], column ["
466                  + column + "]; mutation [" + mutation + "], hashCode [" + hashCode
467                  + "], verificationNeeded [" + verificationNeeded + "]");
468                printLocations(result);
469                return false;
470              }
471            } // end of mutation checking
472            if (
473              verificationNeeded && !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)
474            ) {
475              LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
476                + "], column [" + column + "], mutation [" + mutation + "]; value of length "
477                + bytes.length);
478              printLocations(result);
479              return false;
480            }
481          }
482        }
483      }
484    }
485    return true;
486  }
487
488  private void printLocations(Result r) {
489    if (r == null) {
490      LOG.info("FAILED FOR null Result");
491      return;
492    }
493    LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale());
494    if (r.getRow() == null) {
495      return;
496    }
497    try (RegionLocator locator = connection.getRegionLocator(tableName)) {
498      List<HRegionLocation> locs = locator.getRegionLocations(r.getRow());
499      for (HRegionLocation h : locs) {
500        LOG.info("LOCATION " + h);
501      }
502    } catch (IOException e) {
503      LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
504    }
505  }
506
507  private String resultToString(Result result) {
508    StringBuilder sb = new StringBuilder();
509    sb.append("cells=");
510    if (result.isEmpty()) {
511      sb.append("NONE");
512      return sb.toString();
513    }
514    sb.append("{");
515    boolean moreThanOne = false;
516    for (Cell cell : result.listCells()) {
517      if (moreThanOne) {
518        sb.append(", ");
519      } else {
520        moreThanOne = true;
521      }
522      sb.append(CellUtil.toString(cell, true));
523    }
524    sb.append("}");
525    return sb.toString();
526  }
527
528  // Parse mutate info into a map of <column name> => <update action>
529  private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
530    Map<String, MutationType> mi = new HashMap<>();
531    if (mutateInfo != null) {
532      String mutateInfoStr = Bytes.toString(mutateInfo);
533      String[] mutations = mutateInfoStr.split("#");
534      for (String mutation : mutations) {
535        if (mutation.isEmpty()) continue;
536        Preconditions.checkArgument(mutation.contains(":"), "Invalid mutation info " + mutation);
537        int p = mutation.indexOf(":");
538        String column = mutation.substring(0, p);
539        MutationType type = MutationType.valueOf(Integer.parseInt(mutation.substring(p + 1)));
540        mi.put(column, type);
541      }
542    }
543    return mi;
544  }
545}