001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
022
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.RegionLocations;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ClusterConnection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
042import org.apache.hadoop.util.StringUtils;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
049
050/**
051 * Common base class for reader and writer parts of multi-thread HBase load test (See LoadTestTool).
052 */
053public abstract class MultiThreadedAction {
054  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class);
055
056  protected final TableName tableName;
057  protected final Configuration conf;
058  protected final ClusterConnection connection; // all reader / writer threads will share this
059                                                // connection
060
061  protected int numThreads = 1;
062
063  /** The start key of the key range, inclusive */
064  protected long startKey = 0;
065
066  /** The end key of the key range, exclusive */
067  protected long endKey = 1;
068
069  protected AtomicInteger numThreadsWorking = new AtomicInteger();
070  protected AtomicLong numKeys = new AtomicLong();
071  protected AtomicLong numCols = new AtomicLong();
072  protected AtomicLong totalOpTimeMs = new AtomicLong();
073  protected boolean verbose = false;
074
075  protected LoadTestDataGenerator dataGenerator = null;
076
077  /**
078   * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed set of
079   * column families, and random number of columns in range. The table for it can be created
080   * manually or, for example, via
081   * {@link org.apache.hadoop.hbase.HBaseTestingUtility#createPreSplitLoadTestTable(Configuration, TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm, org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)}
082   */
083  public static class DefaultDataGenerator extends LoadTestDataGenerator {
084    private byte[][] columnFamilies = null;
085    private int minColumnsPerKey;
086    private int maxColumnsPerKey;
087
088    public DefaultDataGenerator(int minValueSize, int maxValueSize, int minColumnsPerKey,
089      int maxColumnsPerKey, byte[]... columnFamilies) {
090      super(minValueSize, maxValueSize);
091      this.columnFamilies = columnFamilies;
092      this.minColumnsPerKey = minColumnsPerKey;
093      this.maxColumnsPerKey = maxColumnsPerKey;
094    }
095
096    public DefaultDataGenerator(byte[]... columnFamilies) {
097      // Default values for tests that didn't care to provide theirs.
098      this(256, 1024, 1, 10, columnFamilies);
099    }
100
101    @Override
102    public byte[] getDeterministicUniqueKey(long keyBase) {
103      return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase));
104    }
105
106    @Override
107    public byte[][] getColumnFamilies() {
108      return columnFamilies;
109    }
110
111    @Override
112    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
113      int numColumns = minColumnsPerKey
114        + ThreadLocalRandom.current().nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
115      byte[][] columns = new byte[numColumns][];
116      for (int i = 0; i < numColumns; ++i) {
117        columns[i] = Bytes.toBytes(Integer.toString(i));
118      }
119      return columns;
120    }
121
122    @Override
123    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
124      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
125    }
126
127    @Override
128    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
129      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
130    }
131
132    @Override
133    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
134      return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
135    }
136  }
137
138  /** "R" or "W" */
139  private String actionLetter;
140
141  /** Whether we need to print out Hadoop Streaming-style counters */
142  private boolean streamingCounters;
143
144  public static final int REPORTING_INTERVAL_MS = 5000;
145
146  public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
147    String actionLetter) throws IOException {
148    this.conf = conf;
149    this.dataGenerator = dataGen;
150    this.tableName = tableName;
151    this.actionLetter = actionLetter;
152    this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
153  }
154
155  public void start(long startKey, long endKey, int numThreads) throws IOException {
156    this.startKey = startKey;
157    this.endKey = endKey;
158    this.numThreads = numThreads;
159    (new Thread(new ProgressReporter(actionLetter),
160      "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start();
161  }
162
163  private static String formatTime(long elapsedTime) {
164    String format = String.format("%%0%dd", 2);
165    elapsedTime = elapsedTime / 1000;
166    String seconds = String.format(format, elapsedTime % 60);
167    String minutes = String.format(format, (elapsedTime % 3600) / 60);
168    String hours = String.format(format, elapsedTime / 3600);
169    String time = hours + ":" + minutes + ":" + seconds;
170    return time;
171  }
172
173  /** Asynchronously reports progress */
174  private class ProgressReporter implements Runnable {
175
176    private String reporterId = "";
177
178    public ProgressReporter(String id) {
179      this.reporterId = id;
180    }
181
182    @Override
183    public void run() {
184      long startTime = EnvironmentEdgeManager.currentTime();
185      long priorNumKeys = 0;
186      long priorCumulativeOpTime = 0;
187      int priorAverageKeysPerSecond = 0;
188
189      // Give other threads time to start.
190      Threads.sleep(REPORTING_INTERVAL_MS);
191
192      while (numThreadsWorking.get() != 0) {
193        String threadsLeft = "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
194        if (numKeys.get() == 0) {
195          LOG.info(threadsLeft + "Number of keys = 0");
196        } else {
197          long numKeys = MultiThreadedAction.this.numKeys.get();
198          long time = EnvironmentEdgeManager.currentTime() - startTime;
199          long totalOpTime = totalOpTimeMs.get();
200
201          long numKeysDelta = numKeys - priorNumKeys;
202          long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
203
204          double averageKeysPerSecond = (time > 0) ? (numKeys * 1000 / time) : 0;
205
206          LOG.info(threadsLeft + "Keys=" + numKeys + ", cols="
207            + StringUtils.humanReadableInt(numCols.get()) + ", time=" + formatTime(time)
208            + ((numKeys > 0 && time > 0)
209              ? (" Overall: [" + "keys/s= " + numKeys * 1000 / time + ", latency="
210                + String.format("%.2f", (double) totalOpTime / (double) numKeys) + " ms]")
211              : "")
212            + ((numKeysDelta > 0)
213              ? (" Current: [" + "keys/s=" + numKeysDelta * 1000 / REPORTING_INTERVAL_MS
214                + ", latency="
215                + String.format("%.2f", (double) totalOpTimeDelta / (double) numKeysDelta) + " ms]")
216              : "")
217            + progressInfo());
218
219          if (streamingCounters) {
220            printStreamingCounters(numKeysDelta, averageKeysPerSecond - priorAverageKeysPerSecond);
221          }
222
223          priorNumKeys = numKeys;
224          priorCumulativeOpTime = totalOpTime;
225          priorAverageKeysPerSecond = (int) averageKeysPerSecond;
226        }
227
228        Threads.sleep(REPORTING_INTERVAL_MS);
229      }
230    }
231
232    private void printStreamingCounters(long numKeysDelta, double avgKeysPerSecondDelta) {
233      // Write stats in a format that can be interpreted as counters by
234      // streaming map-reduce jobs.
235      System.err.println("reporter:counter:numKeys," + reporterId + "," + numKeysDelta);
236      System.err.println("reporter:counter:numCols," + reporterId + "," + numCols.get());
237      System.err.println(
238        "reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta));
239    }
240  }
241
242  public void close() {
243    if (connection != null) {
244      try {
245        connection.close();
246      } catch (Exception ex) {
247        LOG.warn("Could not close the connection: " + ex);
248      }
249    }
250  }
251
252  public void waitForFinish() {
253    while (numThreadsWorking.get() != 0) {
254      Threads.sleepWithoutInterrupt(1000);
255    }
256    close();
257  }
258
259  public boolean isDone() {
260    return (numThreadsWorking.get() == 0);
261  }
262
263  protected void startThreads(Collection<? extends Thread> threads) {
264    numThreadsWorking.addAndGet(threads.size());
265    for (Thread thread : threads) {
266      thread.start();
267    }
268  }
269
270  /** Returns the end key of the key range, exclusive */
271  public long getEndKey() {
272    return endKey;
273  }
274
275  /** Returns a task-specific progress string */
276  protected abstract String progressInfo();
277
278  protected static void appendToStatus(StringBuilder sb, String desc, long v) {
279    if (v == 0) {
280      return;
281    }
282    sb.append(", ");
283    sb.append(desc);
284    sb.append("=");
285    sb.append(v);
286  }
287
288  protected static void appendToStatus(StringBuilder sb, String desc, String v) {
289    sb.append(", ");
290    sb.append(desc);
291    sb.append("=");
292    sb.append(v);
293  }
294
295  /**
296   * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. Does not verify
297   * cf/column integrity.
298   */
299  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
300    return verifyResultAgainstDataGenerator(result, verifyValues, false);
301  }
302
303  /**
304   * Verifies the result from get or scan using the dataGenerator (that was presumably also used to
305   * generate said result).
306   * @param verifyValues               verify that values in the result make sense for row/cf/column
307   *                                   combination
308   * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
309   *                                   that to use this multiPut should be used, or verification has
310   *                                   to happen after writes, otherwise there can be races.
311   * @return true if the values of row result makes sense for row/cf/column combination and true if
312   *         the cf/column set in the result is complete, false otherwise.
313   */
314  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
315    boolean verifyCfAndColumnIntegrity) {
316    String rowKeyStr = Bytes.toString(result.getRow());
317    // See if we have any data at all.
318    if (result.isEmpty()) {
319      LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
320      printLocations(result);
321      return false;
322    }
323
324    if (!verifyValues && !verifyCfAndColumnIntegrity) {
325      return true; // as long as we have something, we are good.
326    }
327
328    // See if we have all the CFs.
329    byte[][] expectedCfs = dataGenerator.getColumnFamilies();
330    if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
331      LOG.error("Error checking data for key [" + rowKeyStr + "], bad family count: "
332        + result.getMap().size());
333      printLocations(result);
334      return false;
335    }
336
337    // Verify each column family from get in the result.
338    for (byte[] cf : result.getMap().keySet()) {
339      String cfStr = Bytes.toString(cf);
340      Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
341      if (columnValues == null) {
342        LOG.error(
343          "Error checking data for key [" + rowKeyStr + "], no data for family [" + cfStr + "]]");
344        printLocations(result);
345        return false;
346      }
347
348      Map<String, MutationType> mutateInfo = null;
349      if (verifyCfAndColumnIntegrity || verifyValues) {
350        if (!columnValues.containsKey(MUTATE_INFO)) {
351          LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
352            + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
353          printLocations(result);
354          return false;
355        }
356
357        long cfHash = Arrays.hashCode(cf);
358        // Verify deleted columns, and make up column counts if deleted
359        byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
360        mutateInfo = parseMutateInfo(mutateInfoValue);
361        for (Map.Entry<String, MutationType> mutate : mutateInfo.entrySet()) {
362          if (mutate.getValue() == MutationType.DELETE) {
363            byte[] column = Bytes.toBytes(mutate.getKey());
364            long columnHash = Arrays.hashCode(column);
365            long hashCode = cfHash + columnHash;
366            if (hashCode % 2 == 0) {
367              if (columnValues.containsKey(column)) {
368                LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
369                  + "], column [" + mutate.getKey() + "]; should be deleted");
370                printLocations(result);
371                return false;
372              }
373              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
374              columnValues.put(column, hashCodeBytes);
375            }
376          }
377        }
378
379        // Verify increment
380        if (!columnValues.containsKey(INCREMENT)) {
381          LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
382            + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
383          printLocations(result);
384          return false;
385        }
386        long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
387        if (verifyValues) {
388          long amount = mutateInfo.isEmpty() ? 0 : cfHash;
389          long originalValue = Arrays.hashCode(result.getRow());
390          long extra = currentValue - originalValue;
391          if (extra != 0 && (amount == 0 || extra % amount != 0)) {
392            LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
393              + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
394            printLocations(result);
395            return false;
396          }
397          if (amount != 0 && extra != amount) {
398            LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" + cfStr
399              + "], column [increment], incremented [" + (extra / amount) + "] times");
400          }
401        }
402
403        // See if we have correct columns.
404        if (
405          verifyCfAndColumnIntegrity
406            && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())
407        ) {
408          String colsStr = "";
409          for (byte[] col : columnValues.keySet()) {
410            if (colsStr.length() > 0) {
411              colsStr += ", ";
412            }
413            colsStr += "[" + Bytes.toString(col) + "]";
414          }
415          LOG.error("Error checking data for key [" + rowKeyStr + "], bad columns for family ["
416            + cfStr + "]: " + colsStr);
417          printLocations(result);
418          return false;
419        }
420        // See if values check out.
421        if (verifyValues) {
422          for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
423            String column = Bytes.toString(kv.getKey());
424            MutationType mutation = mutateInfo.get(column);
425            boolean verificationNeeded = true;
426            byte[] bytes = kv.getValue();
427            if (mutation != null) {
428              boolean mutationVerified = true;
429              long columnHash = Arrays.hashCode(kv.getKey());
430              long hashCode = cfHash + columnHash;
431              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
432              if (mutation == MutationType.APPEND) {
433                int offset = bytes.length - hashCodeBytes.length;
434                mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes, 0,
435                  hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
436                if (mutationVerified) {
437                  int n = 1;
438                  while (true) {
439                    int newOffset = offset - hashCodeBytes.length;
440                    if (
441                      newOffset < 0 || !Bytes.equals(hashCodeBytes, 0, hashCodeBytes.length, bytes,
442                        newOffset, hashCodeBytes.length)
443                    ) {
444                      break;
445                    }
446                    offset = newOffset;
447                    n++;
448                  }
449                  if (n > 1) {
450                    LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
451                      + cfStr + "], column [" + column + "], appended [" + n + "] times");
452                  }
453                  byte[] dest = new byte[offset];
454                  System.arraycopy(bytes, 0, dest, 0, offset);
455                  bytes = dest;
456                }
457              } else if (hashCode % 2 == 0) { // checkAndPut
458                mutationVerified = Bytes.equals(bytes, hashCodeBytes);
459                verificationNeeded = false;
460              }
461              if (!mutationVerified) {
462                LOG.error("Error checking data for key [" + rowKeyStr
463                  + "], mutation checking failed for column family [" + cfStr + "], column ["
464                  + column + "]; mutation [" + mutation + "], hashCode [" + hashCode
465                  + "], verificationNeeded [" + verificationNeeded + "]");
466                printLocations(result);
467                return false;
468              }
469            } // end of mutation checking
470            if (
471              verificationNeeded && !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)
472            ) {
473              LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr
474                + "], column [" + column + "], mutation [" + mutation + "]; value of length "
475                + bytes.length);
476              printLocations(result);
477              return false;
478            }
479          }
480        }
481      }
482    }
483    return true;
484  }
485
486  private void printLocations(Result r) {
487    RegionLocations rl = null;
488    if (r == null) {
489      LOG.info("FAILED FOR null Result");
490      return;
491    }
492    LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale());
493    if (r.getRow() == null) {
494      return;
495    }
496    try {
497      rl = ((ClusterConnection) connection).locateRegion(tableName, r.getRow(), true, true);
498    } catch (IOException e) {
499      LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
500    }
501    HRegionLocation locations[] = rl.getRegionLocations();
502    for (HRegionLocation h : locations) {
503      LOG.info("LOCATION " + h);
504    }
505  }
506
507  private String resultToString(Result result) {
508    StringBuilder sb = new StringBuilder();
509    sb.append("cells=");
510    if (result.isEmpty()) {
511      sb.append("NONE");
512      return sb.toString();
513    }
514    sb.append("{");
515    boolean moreThanOne = false;
516    for (Cell cell : result.listCells()) {
517      if (moreThanOne) {
518        sb.append(", ");
519      } else {
520        moreThanOne = true;
521      }
522      sb.append(CellUtil.toString(cell, true));
523    }
524    sb.append("}");
525    return sb.toString();
526  }
527
528  // Parse mutate info into a map of <column name> => <update action>
529  private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
530    Map<String, MutationType> mi = new HashMap<>();
531    if (mutateInfo != null) {
532      String mutateInfoStr = Bytes.toString(mutateInfo);
533      String[] mutations = mutateInfoStr.split("#");
534      for (String mutation : mutations) {
535        if (mutation.isEmpty()) continue;
536        Preconditions.checkArgument(mutation.contains(":"), "Invalid mutation info " + mutation);
537        int p = mutation.indexOf(":");
538        String column = mutation.substring(0, p);
539        MutationType type = MutationType.valueOf(Integer.parseInt(mutation.substring(p + 1)));
540        mi.put(column, type);
541      }
542    }
543    return mi;
544  }
545}