001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.util;
018
019import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
021
022import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicLong;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.RegionLocations;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.ClusterConnection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
044import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
045import org.apache.hadoop.util.StringUtils;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Common base class for reader and writer parts of multi-thread HBase load
051 * test (See LoadTestTool).
052 */
053public abstract class MultiThreadedAction {
054  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class);
055
056  protected final TableName tableName;
057  protected final Configuration conf;
058  protected final ClusterConnection connection; // all reader / writer threads will share this connection
059
060  protected int numThreads = 1;
061
062  /** The start key of the key range, inclusive */
063  protected long startKey = 0;
064
065  /** The end key of the key range, exclusive */
066  protected long endKey = 1;
067
068  protected AtomicInteger numThreadsWorking = new AtomicInteger();
069  protected AtomicLong numKeys = new AtomicLong();
070  protected AtomicLong numCols = new AtomicLong();
071  protected AtomicLong totalOpTimeMs = new AtomicLong();
072  protected boolean verbose = false;
073
074  protected LoadTestDataGenerator dataGenerator = null;
075
076  /**
077   * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
078   * set of column families, and random number of columns in range. The table for it can
079   * be created manually or, for example, via
080   * {@link org.apache.hadoop.hbase.HBaseTestingUtility#createPreSplitLoadTestTable(Configuration, TableName, byte[],
081   * org.apache.hadoop.hbase.io.compress.Compression.Algorithm, org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)}
082   */
083  public static class DefaultDataGenerator extends LoadTestDataGenerator {
084    private byte[][] columnFamilies = null;
085    private int minColumnsPerKey;
086    private int maxColumnsPerKey;
087    private final Random random = new Random();
088
089    public DefaultDataGenerator(int minValueSize, int maxValueSize,
090        int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
091      super(minValueSize, maxValueSize);
092      this.columnFamilies = columnFamilies;
093      this.minColumnsPerKey = minColumnsPerKey;
094      this.maxColumnsPerKey = maxColumnsPerKey;
095    }
096
097    public DefaultDataGenerator(byte[]... columnFamilies) {
098      // Default values for tests that didn't care to provide theirs.
099      this(256, 1024, 1, 10, columnFamilies);
100    }
101
102    @Override
103    public byte[] getDeterministicUniqueKey(long keyBase) {
104      return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase));
105    }
106
107    @Override
108    public byte[][] getColumnFamilies() {
109      return columnFamilies;
110    }
111
112    @Override
113    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
114      int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
115      byte[][] columns = new byte[numColumns][];
116      for (int i = 0; i < numColumns; ++i) {
117        columns[i] = Bytes.toBytes(Integer.toString(i));
118      }
119      return columns;
120    }
121
122    @Override
123    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
124      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
125    }
126
127    @Override
128    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
129      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
130    }
131
132    @Override
133    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
134      return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
135    }
136  }
137
138  /** "R" or "W" */
139  private String actionLetter;
140
141  /** Whether we need to print out Hadoop Streaming-style counters */
142  private boolean streamingCounters;
143
144  public static final int REPORTING_INTERVAL_MS = 5000;
145
146  public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf,
147                             TableName tableName,
148                             String actionLetter) throws IOException {
149    this.conf = conf;
150    this.dataGenerator = dataGen;
151    this.tableName = tableName;
152    this.actionLetter = actionLetter;
153    this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
154  }
155
156  public void start(long startKey, long endKey, int numThreads) throws IOException {
157    this.startKey = startKey;
158    this.endKey = endKey;
159    this.numThreads = numThreads;
160    (new Thread(new ProgressReporter(actionLetter),
161        "MultiThreadedAction-ProgressReporter-" + System.currentTimeMillis())).start();
162  }
163
164  private static String formatTime(long elapsedTime) {
165    String format = String.format("%%0%dd", 2);
166    elapsedTime = elapsedTime / 1000;
167    String seconds = String.format(format, elapsedTime % 60);
168    String minutes = String.format(format, (elapsedTime % 3600) / 60);
169    String hours = String.format(format, elapsedTime / 3600);
170    String time =  hours + ":" + minutes + ":" + seconds;
171    return time;
172  }
173
174  /** Asynchronously reports progress */
175  private class ProgressReporter implements Runnable {
176
177    private String reporterId = "";
178
179    public ProgressReporter(String id) {
180      this.reporterId = id;
181    }
182
183    @Override
184    public void run() {
185      long startTime = System.currentTimeMillis();
186      long priorNumKeys = 0;
187      long priorCumulativeOpTime = 0;
188      int priorAverageKeysPerSecond = 0;
189
190      // Give other threads time to start.
191      Threads.sleep(REPORTING_INTERVAL_MS);
192
193      while (numThreadsWorking.get() != 0) {
194        String threadsLeft =
195            "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
196        if (numKeys.get() == 0) {
197          LOG.info(threadsLeft + "Number of keys = 0");
198        } else {
199          long numKeys = MultiThreadedAction.this.numKeys.get();
200          long time = System.currentTimeMillis() - startTime;
201          long totalOpTime = totalOpTimeMs.get();
202
203          long numKeysDelta = numKeys - priorNumKeys;
204          long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
205
206          double averageKeysPerSecond =
207              (time > 0) ? (numKeys * 1000 / time) : 0;
208
209          LOG.info(threadsLeft
210              + "Keys="
211              + numKeys
212              + ", cols="
213              + StringUtils.humanReadableInt(numCols.get())
214              + ", time="
215              + formatTime(time)
216              + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
217                  + numKeys * 1000 / time + ", latency="
218                  + String.format("%.2f", (double)totalOpTime / (double)numKeys)
219                  + " ms]") : "")
220              + ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
221                  + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
222                  + String.format("%.2f", (double)totalOpTimeDelta / (double)numKeysDelta)
223                  + " ms]") : "")
224              + progressInfo());
225
226          if (streamingCounters) {
227            printStreamingCounters(numKeysDelta,
228                averageKeysPerSecond - priorAverageKeysPerSecond);
229          }
230
231          priorNumKeys = numKeys;
232          priorCumulativeOpTime = totalOpTime;
233          priorAverageKeysPerSecond = (int) averageKeysPerSecond;
234        }
235
236        Threads.sleep(REPORTING_INTERVAL_MS);
237      }
238    }
239
240    private void printStreamingCounters(long numKeysDelta,
241        double avgKeysPerSecondDelta) {
242      // Write stats in a format that can be interpreted as counters by
243      // streaming map-reduce jobs.
244      System.err.println("reporter:counter:numKeys," + reporterId + ","
245          + numKeysDelta);
246      System.err.println("reporter:counter:numCols," + reporterId + ","
247          + numCols.get());
248      System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
249          + "," + (long) (avgKeysPerSecondDelta));
250    }
251  }
252
253  public void close() {
254    if (connection != null) {
255      try {
256        connection.close();
257      } catch (Exception ex) {
258        LOG.warn("Could not close the connection: " + ex);
259      }
260    }
261  }
262
263  public void waitForFinish() {
264    while (numThreadsWorking.get() != 0) {
265      Threads.sleepWithoutInterrupt(1000);
266    }
267    close();
268  }
269
270  public boolean isDone() {
271    return (numThreadsWorking.get() == 0);
272  }
273
274  protected void startThreads(Collection<? extends Thread> threads) {
275    numThreadsWorking.addAndGet(threads.size());
276    for (Thread thread : threads) {
277      thread.start();
278    }
279  }
280
281  /** @return the end key of the key range, exclusive */
282  public long getEndKey() {
283    return endKey;
284  }
285
286  /** Returns a task-specific progress string */
287  protected abstract String progressInfo();
288
289  protected static void appendToStatus(StringBuilder sb, String desc,
290      long v) {
291    if (v == 0) {
292      return;
293    }
294    sb.append(", ");
295    sb.append(desc);
296    sb.append("=");
297    sb.append(v);
298  }
299
300  protected static void appendToStatus(StringBuilder sb, String desc,
301      String v) {
302    sb.append(", ");
303    sb.append(desc);
304    sb.append("=");
305    sb.append(v);
306  }
307
308  /**
309   * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
310   * Does not verify cf/column integrity.
311   */
312  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
313    return verifyResultAgainstDataGenerator(result, verifyValues, false);
314  }
315
316  /**
317   * Verifies the result from get or scan using the dataGenerator (that was presumably
318   * also used to generate said result).
319   * @param verifyValues verify that values in the result make sense for row/cf/column combination
320   * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
321   *                                   that to use this multiPut should be used, or verification
322   *                                   has to happen after writes, otherwise there can be races.
323   * @return true if the values of row result makes sense for row/cf/column combination and true if
324   *         the cf/column set in the result is complete, false otherwise.
325   */
326  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
327      boolean verifyCfAndColumnIntegrity) {
328    String rowKeyStr = Bytes.toString(result.getRow());
329    // See if we have any data at all.
330    if (result.isEmpty()) {
331      LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
332      printLocations(result);
333      return false;
334    }
335
336    if (!verifyValues && !verifyCfAndColumnIntegrity) {
337      return true; // as long as we have something, we are good.
338    }
339
340    // See if we have all the CFs.
341    byte[][] expectedCfs = dataGenerator.getColumnFamilies();
342    if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
343      LOG.error("Error checking data for key [" + rowKeyStr
344        + "], bad family count: " + result.getMap().size());
345      printLocations(result);
346      return false;
347    }
348
349    // Verify each column family from get in the result.
350    for (byte[] cf : result.getMap().keySet()) {
351      String cfStr = Bytes.toString(cf);
352      Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
353      if (columnValues == null) {
354        LOG.error("Error checking data for key [" + rowKeyStr
355          + "], no data for family [" + cfStr + "]]");
356        printLocations(result);
357        return false;
358      }
359
360      Map<String, MutationType> mutateInfo = null;
361      if (verifyCfAndColumnIntegrity || verifyValues) {
362        if (!columnValues.containsKey(MUTATE_INFO)) {
363          LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
364            + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
365          printLocations(result);
366          return false;
367        }
368
369        long cfHash = Arrays.hashCode(cf);
370        // Verify deleted columns, and make up column counts if deleted
371        byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
372        mutateInfo = parseMutateInfo(mutateInfoValue);
373        for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) {
374          if (mutate.getValue() == MutationType.DELETE) {
375            byte[] column = Bytes.toBytes(mutate.getKey());
376            long columnHash = Arrays.hashCode(column);
377            long hashCode = cfHash + columnHash;
378            if (hashCode % 2 == 0) {
379              if (columnValues.containsKey(column)) {
380                LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
381                  + cfStr + "], column [" + mutate.getKey() + "]; should be deleted");
382                printLocations(result);
383                return false;
384              }
385              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
386              columnValues.put(column, hashCodeBytes);
387            }
388          }
389        }
390
391        // Verify increment
392        if (!columnValues.containsKey(INCREMENT)) {
393          LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
394            + cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
395          printLocations(result);
396          return false;
397        }
398        long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
399        if (verifyValues) {
400          long amount = mutateInfo.isEmpty() ? 0 : cfHash;
401          long originalValue = Arrays.hashCode(result.getRow());
402          long extra = currentValue - originalValue;
403          if (extra != 0 && (amount == 0 || extra % amount != 0)) {
404            LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
405              + cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
406            printLocations(result);
407            return false;
408          }
409          if (amount != 0 && extra != amount) {
410            LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
411              + cfStr + "], column [increment], incremented [" + (extra / amount) + "] times");
412          }
413        }
414
415        // See if we have correct columns.
416        if (verifyCfAndColumnIntegrity
417            && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
418          String colsStr = "";
419          for (byte[] col : columnValues.keySet()) {
420            if (colsStr.length() > 0) {
421              colsStr += ", ";
422            }
423            colsStr += "[" + Bytes.toString(col) + "]";
424          }
425          LOG.error("Error checking data for key [" + rowKeyStr
426            + "], bad columns for family [" + cfStr + "]: " + colsStr);
427          printLocations(result);
428          return false;
429        }
430        // See if values check out.
431        if (verifyValues) {
432          for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
433            String column = Bytes.toString(kv.getKey());
434            MutationType mutation = mutateInfo.get(column);
435            boolean verificationNeeded = true;
436            byte[] bytes = kv.getValue();
437            if (mutation != null) {
438              boolean mutationVerified = true;
439              long columnHash = Arrays.hashCode(kv.getKey());
440              long hashCode = cfHash + columnHash;
441              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
442              if (mutation == MutationType.APPEND) {
443                int offset = bytes.length - hashCodeBytes.length;
444                mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes,
445                  0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
446                if (mutationVerified) {
447                  int n = 1;
448                  while (true) {
449                    int newOffset = offset - hashCodeBytes.length;
450                    if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0,
451                        hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) {
452                      break;
453                    }
454                    offset = newOffset;
455                    n++;
456                  }
457                  if (n > 1) {
458                    LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
459                      + cfStr + "], column [" + column + "], appended [" + n + "] times");
460                  }
461                  byte[] dest = new byte[offset];
462                  System.arraycopy(bytes, 0, dest, 0, offset);
463                  bytes = dest;
464                }
465              } else if (hashCode % 2 == 0) { // checkAndPut
466                mutationVerified = Bytes.equals(bytes, hashCodeBytes);
467                verificationNeeded = false;
468              }
469              if (!mutationVerified) {
470                LOG.error("Error checking data for key [" + rowKeyStr
471                  + "], mutation checking failed for column family [" + cfStr + "], column ["
472                  + column + "]; mutation [" + mutation + "], hashCode ["
473                  + hashCode + "], verificationNeeded ["
474                  + verificationNeeded + "]");
475                printLocations(result);
476                return false;
477              }
478            } // end of mutation checking
479            if (verificationNeeded &&
480                !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) {
481              LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
482                + cfStr + "], column [" + column + "], mutation [" + mutation
483                + "]; value of length " + bytes.length);
484              printLocations(result);
485              return false;
486            }
487          }
488        }
489      }
490    }
491    return true;
492  }
493
494  private void printLocations(Result r) {
495    RegionLocations rl = null;
496    if (r == null) {
497      LOG.info("FAILED FOR null Result");
498      return;
499    }
500    LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale());
501    if (r.getRow() == null) {
502      return;
503    }
504    try {
505      rl = ((ClusterConnection)connection).locateRegion(tableName, r.getRow(), true, true);
506    } catch (IOException e) {
507      LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
508    }
509    HRegionLocation locations[] = rl.getRegionLocations();
510    for (HRegionLocation h : locations) {
511      LOG.info("LOCATION " + h);
512    }
513  }
514
515  private String resultToString(Result result) {
516    StringBuilder sb = new StringBuilder();
517    sb.append("cells=");
518    if(result.isEmpty()) {
519      sb.append("NONE");
520      return sb.toString();
521    }
522    sb.append("{");
523    boolean moreThanOne = false;
524    for(Cell cell : result.listCells()) {
525      if(moreThanOne) {
526        sb.append(", ");
527      } else {
528        moreThanOne = true;
529      }
530      sb.append(CellUtil.toString(cell, true));
531    }
532    sb.append("}");
533    return sb.toString();
534  }
535
536  // Parse mutate info into a map of <column name> => <update action>
537  private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
538    Map<String, MutationType> mi = new HashMap<>();
539    if (mutateInfo != null) {
540      String mutateInfoStr = Bytes.toString(mutateInfo);
541      String[] mutations = mutateInfoStr.split("#");
542      for (String mutation: mutations) {
543        if (mutation.isEmpty()) continue;
544        Preconditions.checkArgument(mutation.contains(":"),
545          "Invalid mutation info " + mutation);
546        int p = mutation.indexOf(":");
547        String column = mutation.substring(0, p);
548        MutationType type = MutationType.valueOf(
549          Integer.parseInt(mutation.substring(p+1)));
550        mi.put(column, type);
551      }
552    }
553    return mi;
554  }
555}