001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
022
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicLong;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.RegionLocator;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
050
051/**
052 * Common base class for reader and writer parts of multi-thread HBase load test (See LoadTestTool).
053 */
054@InterfaceAudience.Private
055public abstract class MultiThreadedAction {
056  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class);
057
058  protected final TableName tableName;
059  protected final Configuration conf;
060  protected final Connection connection; // all reader / writer threads will share this connection
061
062  protected int numThreads = 1;
063
064  /** The start key of the key range, inclusive */
065  protected long startKey = 0;
066
067  /** The end key of the key range, exclusive */
068  protected long endKey = 1;
069
070  protected AtomicInteger numThreadsWorking = new AtomicInteger();
071  protected AtomicLong numKeys = new AtomicLong();
072  protected AtomicLong numCols = new AtomicLong();
073  protected AtomicLong totalOpTimeMs = new AtomicLong();
074  protected boolean verbose = false;
075
076  protected LoadTestDataGenerator dataGenerator = null;
077
078  /**
079   * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed set of
080   * column families, and random number of columns in range. The table for it can be created
081   * manually or, for example, via
082   * {@link org.apache.hadoop.hbase.util.LoadTestUtil#createPreSplitLoadTestTable(Configuration, TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm, org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)}
083   */
084  public static class DefaultDataGenerator extends LoadTestDataGenerator {
085    private byte[][] columnFamilies = null;
086    private int minColumnsPerKey;
087    private int maxColumnsPerKey;
088
089    public DefaultDataGenerator(int minValueSize, int maxValueSize, int minColumnsPerKey,
090      int maxColumnsPerKey, byte[]... columnFamilies) {
091      super(minValueSize, maxValueSize);
092      this.columnFamilies = columnFamilies;
093      this.minColumnsPerKey = minColumnsPerKey;
094      this.maxColumnsPerKey = maxColumnsPerKey;
095    }
096
097    public DefaultDataGenerator(byte[]... columnFamilies) {
098      // Default values for tests that didn't care to provide theirs.
099      this(256, 1024, 1, 10, columnFamilies);
100    }
101
102    @Override
103    public byte[] getDeterministicUniqueKey(long keyBase) {
104      return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase));
105    }
106
107    @Override
108    public byte[][] getColumnFamilies() {
109      return columnFamilies;
110    }
111
112    @Override
113    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
114      int numColumns = minColumnsPerKey
115        + ThreadLocalRandom.current().nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
116      byte[][] columns = new byte[numColumns][];
117      for (int i = 0; i < numColumns; ++i) {
118        columns[i] = Bytes.toBytes(Integer.toString(i));
119      }
120      return columns;
121    }
122
123    @Override
124    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
125      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
126    }
127
128    @Override
129    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
130      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
131    }
132
133    @Override
134    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
135      return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
136    }
137  }
138
139  /** "R" or "W" */
140  private String actionLetter;
141
142  /** Whether we need to print out Hadoop Streaming-style counters */
143  private boolean streamingCounters;
144
145  public static final int REPORTING_INTERVAL_MS = 5000;
146
147  public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
148    String actionLetter) throws IOException {
149    this.conf = conf;
150    this.dataGenerator = dataGen;
151    this.tableName = tableName;
152    this.actionLetter = actionLetter;
153    this.connection = ConnectionFactory.createConnection(conf);
154  }
155
156  public void start(long startKey, long endKey, int numThreads) throws IOException {
157    this.startKey = startKey;
158    this.endKey = endKey;
159    this.numThreads = numThreads;
160    (new Thread(new ProgressReporter(actionLetter),
161      "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start();
162  }
163
164  private static String formatTime(long elapsedTime) {
165    String format = String.format("%%0%dd", 2);
166    elapsedTime = elapsedTime / 1000;
167    String seconds = String.format(format, elapsedTime % 60);
168    String minutes = String.format(format, (elapsedTime % 3600) / 60);
169    String hours = String.format(format, elapsedTime / 3600);
170    String time = hours + ":" + minutes + ":" + seconds;
171    return time;
172  }
173
174  /** Asynchronously reports progress */
175  private class ProgressReporter implements Runnable {
176
177    private String reporterId = "";
178
179    public ProgressReporter(String id) {
180      this.reporterId = id;
181    }
182
183    @Override
184    public void run() {
185      long startTime = EnvironmentEdgeManager.currentTime();
186      long priorNumKeys = 0;
187      long priorCumulativeOpTime = 0;
188      int priorAverageKeysPerSecond = 0;
189
190      // Give other threads time to start.
191      Threads.sleep(REPORTING_INTERVAL_MS);
192
193      while (numThreadsWorking.get() != 0) {
194        String threadsLeft = "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
195        if (numKeys.get() == 0) {
196          LOG.info(threadsLeft + "Number of keys = 0");
197        } else {
198          long numKeys = MultiThreadedAction.this.numKeys.get();
199          long time = EnvironmentEdgeManager.currentTime() - startTime;
200          long totalOpTime = totalOpTimeMs.get();
201
202          long numKeysDelta = numKeys - priorNumKeys;
203          long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
204
205          double averageKeysPerSecond = (time > 0) ? (numKeys * 1000.0 / time) : 0;
206
207          LOG.info(threadsLeft + "Keys=" + numKeys + ", cols="
208            + Strings.humanReadableInt(numCols.get()) + ", time=" + formatTime(time)
209            + ((numKeys > 0 && time > 0)
210              ? (" Overall: [" + "keys/s= " + (numKeys * 1000.0 / time) + ", latency="
211                + String.format("%.2f", (double) totalOpTime / (double) numKeys) + " ms]")
212              : "")
213            + ((numKeysDelta > 0)
214              ? (" Current: [" + "keys/s=" + (numKeysDelta * 1000.0 / REPORTING_INTERVAL_MS)
215                + ", latency="
216                + String.format("%.2f", (double) totalOpTimeDelta / (double) numKeysDelta) + " ms]")
217              : "")
218            + progressInfo());
219
220          if (streamingCounters) {
221            printStreamingCounters(numKeysDelta, averageKeysPerSecond - priorAverageKeysPerSecond);
222          }
223
224          priorNumKeys = numKeys;
225          priorCumulativeOpTime = totalOpTime;
226          priorAverageKeysPerSecond = (int) averageKeysPerSecond;
227        }
228
229        Threads.sleep(REPORTING_INTERVAL_MS);
230      }
231    }
232
233    private void printStreamingCounters(long numKeysDelta, double avgKeysPerSecondDelta) {
234      // Write stats in a format that can be interpreted as counters by
235      // streaming map-reduce jobs.
236      System.err.println("reporter:counter:numKeys," + reporterId + "," + numKeysDelta);
237      System.err.println("reporter:counter:numCols," + reporterId + "," + numCols.get());
238      System.err.println(
239        "reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta));
240    }
241  }
242
243  public void close() {
244    if (connection != null) {
245      try {
246        connection.close();
247      } catch (Exception ex) {
248        LOG.warn("Could not close the connection: " + ex);
249      }
250    }
251  }
252
253  public void waitForFinish() {
254    while (numThreadsWorking.get() != 0) {
255      Threads.sleepWithoutInterrupt(1000);
256    }
257    close();
258  }
259
260  public boolean isDone() {
261    return (numThreadsWorking.get() == 0);
262  }
263
264  protected void startThreads(Collection<? extends Thread> threads) {
265    numThreadsWorking.addAndGet(threads.size());
266    for (Thread thread : threads) {
267      thread.start();
268    }
269  }
270
271  /** Returns the end key of the key range, exclusive */
272  public long getEndKey() {
273    return endKey;
274  }
275
276  /** Returns a task-specific progress string */
277  protected abstract String progressInfo();
278
279  protected static void appendToStatus(StringBuilder sb, String desc, long v) {
280    if (v == 0) {
281      return;
282    }
283    sb.append(", ");
284    sb.append(desc);
285    sb.append("=");
286    sb.append(v);
287  }
288
289  protected static void appendToStatus(StringBuilder sb, String desc, String v) {
290    sb.append(", ");
291    sb.append(desc);
292    sb.append("=");
293    sb.append(v);
294  }
295
296  /**
297   * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. Does not verify
298   * cf/column integrity.
299   */
300  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
301    return verifyResultAgainstDataGenerator(result, verifyValues, false);
302  }
303
304  /**
305   * Verifies the result from get or scan using the dataGenerator (that was presumably also used to
306   * generate said result).
307   * @param verifyValues               verify that values in the result make sense for row/cf/column
308   *                                   combination
309   * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
310   *                                   that to use this multiPut should be used, or verification has
311   *                                   to happen after writes, otherwise there can be races.
312   * @return true if the values of row result makes sense for row/cf/column combination and true if
313   *         the cf/column set in the result is complete, false otherwise.
314   */
315  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
316    boolean verifyCfAndColumnIntegrity) {
317    String rowKeyStr = Bytes.toString(result.getRow());
318    // See if we have any data at all.
319    if (result.isEmpty()) {
320      LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
321      printLocations(result);
322      return false;
323    }
324
325    if (!verifyValues && !verifyCfAndColumnIntegrity) {
326      return true; // as long as we have something, we are good.
327    }
328
329    // See if we have all the CFs.
330    byte[][] expectedCfs = dataGenerator.getColumnFamilies();
331    if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
332      LOG.error("Error checking data for key [" + rowKeyStr + "], bad family count: "
333        + result.getMap().size());
334      printLocations(result);
335      return false;
336    }
337
338    // Verify each column family from get in the result.
339    for (byte[] cf : result.getMap().keySet()) {
340      String cfStr = Bytes.toString(cf);
341      Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
342      if (columnValues == null) {
343        LOG.error(
344          "Error checking data for key [" + rowKeyStr + "], no data for family [" + cfStr + "]]");
345        printLocations(result);
346        return false;
347      }
348
349      Map<String, MutationType> mutateInfo = null;
350      if (verifyCfAndColumnIntegrity || verifyValues) {
351        if (!columnValues.containsKey(MUTATE_INFO)) {
352          LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
353            + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
354          printLocations(result);
355          return false;
356        }
357
358        long cfHash = Arrays.hashCode(cf);
359        // Verify deleted columns, and make up column counts if deleted
360        byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
361        mutateInfo = parseMutateInfo(mutateInfoValue);
362        for (Map.Entry<String, MutationType> mutate : mutateInfo.entrySet()) {
363          if (mutate.getValue() == MutationType.DELETE) {
364            byte[] column = Bytes.toBytes(mutate.getKey());
365            long columnHash = Arrays.hashCode(column);
366            long hashCode = cfHash + columnHash;
367            if (hashCode % 2 == 0) {
368              if (columnValues.containsKey(column)) {
369                LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
370                  + "], column [" + mutate.getKey() + "]; should be deleted");
371                printLocations(result);
372                return false;
373              }
374              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
375              columnValues.put(column, hashCodeBytes);
376            }
377          }
378        }
379
380        // Verify increment
381        if (!columnValues.containsKey(INCREMENT)) {
382          LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
383            + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
384          printLocations(result);
385          return false;
386        }
387        long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
388        if (verifyValues) {
389          long amount = mutateInfo.isEmpty() ? 0 : cfHash;
390          long originalValue = Arrays.hashCode(result.getRow());
391          long extra = currentValue - originalValue;
392          if (extra != 0 && (amount == 0 || extra % amount != 0)) {
393            LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
394              + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
395            printLocations(result);
396            return false;
397          }
398          if (amount != 0 && extra != amount) {
399            LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" + cfStr
400              + "], column [increment], incremented [" + (extra / amount) + "] times");
401          }
402        }
403
404        // See if we have correct columns.
405        if (
406          verifyCfAndColumnIntegrity
407            && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())
408        ) {
409          StringBuilder colsStr = new StringBuilder();
410          for (byte[] col : columnValues.keySet()) {
411            if (colsStr.length() > 0) {
412              colsStr.append(", ");
413            }
414            colsStr.append("[").append(Bytes.toString(col)).append("]");
415          }
416          LOG.error("Error checking data for key [{}], bad columns for family [{}]: {}", rowKeyStr,
417            cfStr, colsStr.toString());
418          printLocations(result);
419          return false;
420        }
421        // See if values check out.
422        if (verifyValues) {
423          for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
424            String column = Bytes.toString(kv.getKey());
425            MutationType mutation = mutateInfo.get(column);
426            boolean verificationNeeded = true;
427            byte[] bytes = kv.getValue();
428            if (mutation != null) {
429              boolean mutationVerified = true;
430              long columnHash = Arrays.hashCode(kv.getKey());
431              long hashCode = cfHash + columnHash;
432              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
433              if (mutation == MutationType.APPEND) {
434                int offset = bytes.length - hashCodeBytes.length;
435                mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes, 0,
436                  hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
437                if (mutationVerified) {
438                  int n = 1;
439                  while (true) {
440                    int newOffset = offset - hashCodeBytes.length;
441                    if (
442                      newOffset < 0 || !Bytes.equals(hashCodeBytes, 0, hashCodeBytes.length, bytes,
443                        newOffset, hashCodeBytes.length)
444                    ) {
445                      break;
446                    }
447                    offset = newOffset;
448                    n++;
449                  }
450                  if (n > 1) {
451                    LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
452                      + cfStr + "], column [" + column + "], appended [" + n + "] times");
453                  }
454                  byte[] dest = new byte[offset];
455                  System.arraycopy(bytes, 0, dest, 0, offset);
456                  bytes = dest;
457                }
458              } else if (hashCode % 2 == 0) { // checkAndPut
459                mutationVerified = Bytes.equals(bytes, hashCodeBytes);
460                verificationNeeded = false;
461              }
462              if (!mutationVerified) {
463                LOG.error("Error checking data for key [" + rowKeyStr
464                  + "], mutation checking failed for column family [" + cfStr + "], column ["
465                  + column + "]; mutation [" + mutation + "], hashCode [" + hashCode
466                  + "], verificationNeeded [" + verificationNeeded + "]");
467                printLocations(result);
468                return false;
469              }
470            } // end of mutation checking
471            if (
472              verificationNeeded && !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)
473            ) {
474              LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
475                + "], column [" + column + "], mutation [" + mutation + "]; value of length "
476                + bytes.length);
477              printLocations(result);
478              return false;
479            }
480          }
481        }
482      }
483    }
484    return true;
485  }
486
487  private void printLocations(Result r) {
488    if (r == null) {
489      LOG.info("FAILED FOR null Result");
490      return;
491    }
492    LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale());
493    if (r.getRow() == null) {
494      return;
495    }
496    try (RegionLocator locator = connection.getRegionLocator(tableName)) {
497      List<HRegionLocation> locs = locator.getRegionLocations(r.getRow());
498      for (HRegionLocation h : locs) {
499        LOG.info("LOCATION " + h);
500      }
501    } catch (IOException e) {
502      LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
503    }
504  }
505
506  private String resultToString(Result result) {
507    StringBuilder sb = new StringBuilder();
508    sb.append("cells=");
509    if (result.isEmpty()) {
510      sb.append("NONE");
511      return sb.toString();
512    }
513    sb.append("{");
514    boolean moreThanOne = false;
515    for (Cell cell : result.listCells()) {
516      if (moreThanOne) {
517        sb.append(", ");
518      } else {
519        moreThanOne = true;
520      }
521      sb.append(CellUtil.toString(cell, true));
522    }
523    sb.append("}");
524    return sb.toString();
525  }
526
527  // Parse mutate info into a map of <column name> => <update action>
528  private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
529    Map<String, MutationType> mi = new HashMap<>();
530    if (mutateInfo != null) {
531      String mutateInfoStr = Bytes.toString(mutateInfo);
532      String[] mutations = mutateInfoStr.split("#");
533      for (String mutation : mutations) {
534        if (mutation.isEmpty()) continue;
535        Preconditions.checkArgument(mutation.contains(":"), "Invalid mutation info " + mutation);
536        int p = mutation.indexOf(":");
537        String column = mutation.substring(0, p);
538        MutationType type = MutationType.valueOf(Integer.parseInt(mutation.substring(p + 1)));
539        mi.put(column, type);
540      }
541    }
542    return mi;
543  }
544}