001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.util;
018
019import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Random;
029import java.util.Set;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
042import org.apache.hadoop.util.StringUtils;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
049
050/**
051 * Common base class for reader and writer parts of multi-thread HBase load
052 * test (See LoadTestTool).
053 */
054public abstract class MultiThreadedAction {
055  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class);
056
057  protected final TableName tableName;
058  protected final Configuration conf;
059  protected final Connection connection; // all reader / writer threads will share this connection
060
061  protected int numThreads = 1;
062
063  /** The start key of the key range, inclusive */
064  protected long startKey = 0;
065
066  /** The end key of the key range, exclusive */
067  protected long endKey = 1;
068
069  protected AtomicInteger numThreadsWorking = new AtomicInteger();
070  protected AtomicLong numKeys = new AtomicLong();
071  protected AtomicLong numCols = new AtomicLong();
072  protected AtomicLong totalOpTimeMs = new AtomicLong();
073  protected boolean verbose = false;
074
075  protected LoadTestDataGenerator dataGenerator = null;
076
077  /**
078   * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
079   * set of column families, and random number of columns in range. The table for it can
080   * be created manually or, for example, via
081   * {@link org.apache.hadoop.hbase.HBaseTestingUtil#createPreSplitLoadTestTable(Configuration,
082   * TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm,
083   * org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)}
084   */
085  public static class DefaultDataGenerator extends LoadTestDataGenerator {
086    private byte[][] columnFamilies = null;
087    private int minColumnsPerKey;
088    private int maxColumnsPerKey;
089    private final Random random = new Random();
090
091    public DefaultDataGenerator(int minValueSize, int maxValueSize,
092        int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
093      super(minValueSize, maxValueSize);
094      this.columnFamilies = columnFamilies;
095      this.minColumnsPerKey = minColumnsPerKey;
096      this.maxColumnsPerKey = maxColumnsPerKey;
097    }
098
099    public DefaultDataGenerator(byte[]... columnFamilies) {
100      // Default values for tests that didn't care to provide theirs.
101      this(256, 1024, 1, 10, columnFamilies);
102    }
103
104    @Override
105    public byte[] getDeterministicUniqueKey(long keyBase) {
106      return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase));
107    }
108
109    @Override
110    public byte[][] getColumnFamilies() {
111      return columnFamilies;
112    }
113
114    @Override
115    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
116      int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
117      byte[][] columns = new byte[numColumns][];
118      for (int i = 0; i < numColumns; ++i) {
119        columns[i] = Bytes.toBytes(Integer.toString(i));
120      }
121      return columns;
122    }
123
124    @Override
125    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
126      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
127    }
128
129    @Override
130    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
131      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
132    }
133
134    @Override
135    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
136      return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
137    }
138  }
139
140  /** "R" or "W" */
141  private String actionLetter;
142
143  /** Whether we need to print out Hadoop Streaming-style counters */
144  private boolean streamingCounters;
145
146  public static final int REPORTING_INTERVAL_MS = 5000;
147
148  public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
149      String actionLetter) throws IOException {
150    this.conf = conf;
151    this.dataGenerator = dataGen;
152    this.tableName = tableName;
153    this.actionLetter = actionLetter;
154    this.connection = ConnectionFactory.createConnection(conf);
155  }
156
157  public void start(long startKey, long endKey, int numThreads) throws IOException {
158    this.startKey = startKey;
159    this.endKey = endKey;
160    this.numThreads = numThreads;
161    (new Thread(new ProgressReporter(actionLetter),
162        "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start();
163  }
164
165  private static String formatTime(long elapsedTime) {
166    String format = String.format("%%0%dd", 2);
167    elapsedTime = elapsedTime / 1000;
168    String seconds = String.format(format, elapsedTime % 60);
169    String minutes = String.format(format, (elapsedTime % 3600) / 60);
170    String hours = String.format(format, elapsedTime / 3600);
171    String time =  hours + ":" + minutes + ":" + seconds;
172    return time;
173  }
174
175  /** Asynchronously reports progress */
176  private class ProgressReporter implements Runnable {
177
178    private String reporterId = "";
179
180    public ProgressReporter(String id) {
181      this.reporterId = id;
182    }
183
184    @Override
185    public void run() {
186      long startTime = EnvironmentEdgeManager.currentTime();
187      long priorNumKeys = 0;
188      long priorCumulativeOpTime = 0;
189      int priorAverageKeysPerSecond = 0;
190
191      // Give other threads time to start.
192      Threads.sleep(REPORTING_INTERVAL_MS);
193
194      while (numThreadsWorking.get() != 0) {
195        String threadsLeft =
196            "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
197        if (numKeys.get() == 0) {
198          LOG.info(threadsLeft + "Number of keys = 0");
199        } else {
200          long numKeys = MultiThreadedAction.this.numKeys.get();
201          long time = EnvironmentEdgeManager.currentTime() - startTime;
202          long totalOpTime = totalOpTimeMs.get();
203
204          long numKeysDelta = numKeys - priorNumKeys;
205          long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
206
207          double averageKeysPerSecond =
208              (time > 0) ? (numKeys * 1000 / time) : 0;
209
210          LOG.info(threadsLeft
211              + "Keys="
212              + numKeys
213              + ", cols="
214              + StringUtils.humanReadableInt(numCols.get())
215              + ", time="
216              + formatTime(time)
217              + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
218                  + numKeys * 1000 / time + ", latency="
219                  + String.format("%.2f", (double)totalOpTime / (double)numKeys)
220                  + " ms]") : "")
221              + ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
222                  + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
223                  + String.format("%.2f", (double)totalOpTimeDelta / (double)numKeysDelta)
224                  + " ms]") : "")
225              + progressInfo());
226
227          if (streamingCounters) {
228            printStreamingCounters(numKeysDelta,
229                averageKeysPerSecond - priorAverageKeysPerSecond);
230          }
231
232          priorNumKeys = numKeys;
233          priorCumulativeOpTime = totalOpTime;
234          priorAverageKeysPerSecond = (int) averageKeysPerSecond;
235        }
236
237        Threads.sleep(REPORTING_INTERVAL_MS);
238      }
239    }
240
241    private void printStreamingCounters(long numKeysDelta,
242        double avgKeysPerSecondDelta) {
243      // Write stats in a format that can be interpreted as counters by
244      // streaming map-reduce jobs.
245      System.err.println("reporter:counter:numKeys," + reporterId + ","
246          + numKeysDelta);
247      System.err.println("reporter:counter:numCols," + reporterId + ","
248          + numCols.get());
249      System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
250          + "," + (long) (avgKeysPerSecondDelta));
251    }
252  }
253
254  public void close() {
255    if (connection != null) {
256      try {
257        connection.close();
258      } catch (Exception ex) {
259        LOG.warn("Could not close the connection: " + ex);
260      }
261    }
262  }
263
264  public void waitForFinish() {
265    while (numThreadsWorking.get() != 0) {
266      Threads.sleepWithoutInterrupt(1000);
267    }
268    close();
269  }
270
271  public boolean isDone() {
272    return (numThreadsWorking.get() == 0);
273  }
274
275  protected void startThreads(Collection<? extends Thread> threads) {
276    numThreadsWorking.addAndGet(threads.size());
277    for (Thread thread : threads) {
278      thread.start();
279    }
280  }
281
282  /** @return the end key of the key range, exclusive */
283  public long getEndKey() {
284    return endKey;
285  }
286
287  /** Returns a task-specific progress string */
288  protected abstract String progressInfo();
289
290  protected static void appendToStatus(StringBuilder sb, String desc,
291      long v) {
292    if (v == 0) {
293      return;
294    }
295    sb.append(", ");
296    sb.append(desc);
297    sb.append("=");
298    sb.append(v);
299  }
300
301  protected static void appendToStatus(StringBuilder sb, String desc,
302      String v) {
303    sb.append(", ");
304    sb.append(desc);
305    sb.append("=");
306    sb.append(v);
307  }
308
309  /**
310   * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
311   * Does not verify cf/column integrity.
312   */
313  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
314    return verifyResultAgainstDataGenerator(result, verifyValues, false);
315  }
316
317  /**
318   * Verifies the result from get or scan using the dataGenerator (that was presumably
319   * also used to generate said result).
320   * @param verifyValues verify that values in the result make sense for row/cf/column combination
321   * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
322   *                                   that to use this multiPut should be used, or verification
323   *                                   has to happen after writes, otherwise there can be races.
324   * @return true if the values of row result makes sense for row/cf/column combination and true if
325   *         the cf/column set in the result is complete, false otherwise.
326   */
327  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
328      boolean verifyCfAndColumnIntegrity) {
329    String rowKeyStr = Bytes.toString(result.getRow());
330    // See if we have any data at all.
331    if (result.isEmpty()) {
332      LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
333      printLocations(result);
334      return false;
335    }
336
337    if (!verifyValues && !verifyCfAndColumnIntegrity) {
338      return true; // as long as we have something, we are good.
339    }
340
341    // See if we have all the CFs.
342    byte[][] expectedCfs = dataGenerator.getColumnFamilies();
343    if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
344      LOG.error("Error checking data for key [" + rowKeyStr
345        + "], bad family count: " + result.getMap().size());
346      printLocations(result);
347      return false;
348    }
349
350    // Verify each column family from get in the result.
351    for (byte[] cf : result.getMap().keySet()) {
352      String cfStr = Bytes.toString(cf);
353      Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
354      if (columnValues == null) {
355        LOG.error("Error checking data for key [" + rowKeyStr
356          + "], no data for family [" + cfStr + "]]");
357        printLocations(result);
358        return false;
359      }
360
361      Map<String, MutationType> mutateInfo = null;
362      if (verifyCfAndColumnIntegrity || verifyValues) {
363        if (!columnValues.containsKey(MUTATE_INFO)) {
364          LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
365            + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
366          printLocations(result);
367          return false;
368        }
369
370        long cfHash = Arrays.hashCode(cf);
371        // Verify deleted columns, and make up column counts if deleted
372        byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
373        mutateInfo = parseMutateInfo(mutateInfoValue);
374        for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) {
375          if (mutate.getValue() == MutationType.DELETE) {
376            byte[] column = Bytes.toBytes(mutate.getKey());
377            long columnHash = Arrays.hashCode(column);
378            long hashCode = cfHash + columnHash;
379            if (hashCode % 2 == 0) {
380              if (columnValues.containsKey(column)) {
381                LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
382                  + cfStr + "], column [" + mutate.getKey() + "]; should be deleted");
383                printLocations(result);
384                return false;
385              }
386              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
387              columnValues.put(column, hashCodeBytes);
388            }
389          }
390        }
391
392        // Verify increment
393        if (!columnValues.containsKey(INCREMENT)) {
394          LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
395            + cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
396          printLocations(result);
397          return false;
398        }
399        long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
400        if (verifyValues) {
401          long amount = mutateInfo.isEmpty() ? 0 : cfHash;
402          long originalValue = Arrays.hashCode(result.getRow());
403          long extra = currentValue - originalValue;
404          if (extra != 0 && (amount == 0 || extra % amount != 0)) {
405            LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
406              + cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
407            printLocations(result);
408            return false;
409          }
410          if (amount != 0 && extra != amount) {
411            LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
412              + cfStr + "], column [increment], incremented [" + (extra / amount) + "] times");
413          }
414        }
415
416        // See if we have correct columns.
417        if (verifyCfAndColumnIntegrity
418            && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
419          String colsStr = "";
420          for (byte[] col : columnValues.keySet()) {
421            if (colsStr.length() > 0) {
422              colsStr += ", ";
423            }
424            colsStr += "[" + Bytes.toString(col) + "]";
425          }
426          LOG.error("Error checking data for key [" + rowKeyStr
427            + "], bad columns for family [" + cfStr + "]: " + colsStr);
428          printLocations(result);
429          return false;
430        }
431        // See if values check out.
432        if (verifyValues) {
433          for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
434            String column = Bytes.toString(kv.getKey());
435            MutationType mutation = mutateInfo.get(column);
436            boolean verificationNeeded = true;
437            byte[] bytes = kv.getValue();
438            if (mutation != null) {
439              boolean mutationVerified = true;
440              long columnHash = Arrays.hashCode(kv.getKey());
441              long hashCode = cfHash + columnHash;
442              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
443              if (mutation == MutationType.APPEND) {
444                int offset = bytes.length - hashCodeBytes.length;
445                mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes,
446                  0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
447                if (mutationVerified) {
448                  int n = 1;
449                  while (true) {
450                    int newOffset = offset - hashCodeBytes.length;
451                    if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0,
452                        hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) {
453                      break;
454                    }
455                    offset = newOffset;
456                    n++;
457                  }
458                  if (n > 1) {
459                    LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
460                      + cfStr + "], column [" + column + "], appended [" + n + "] times");
461                  }
462                  byte[] dest = new byte[offset];
463                  System.arraycopy(bytes, 0, dest, 0, offset);
464                  bytes = dest;
465                }
466              } else if (hashCode % 2 == 0) { // checkAndPut
467                mutationVerified = Bytes.equals(bytes, hashCodeBytes);
468                verificationNeeded = false;
469              }
470              if (!mutationVerified) {
471                LOG.error("Error checking data for key [" + rowKeyStr
472                  + "], mutation checking failed for column family [" + cfStr + "], column ["
473                  + column + "]; mutation [" + mutation + "], hashCode ["
474                  + hashCode + "], verificationNeeded ["
475                  + verificationNeeded + "]");
476                printLocations(result);
477                return false;
478              }
479            } // end of mutation checking
480            if (verificationNeeded &&
481                !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) {
482              LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
483                + cfStr + "], column [" + column + "], mutation [" + mutation
484                + "]; value of length " + bytes.length);
485              printLocations(result);
486              return false;
487            }
488          }
489        }
490      }
491    }
492    return true;
493  }
494
495  private void printLocations(Result r) {
496    if (r == null) {
497      LOG.info("FAILED FOR null Result");
498      return;
499    }
500    LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale());
501    if (r.getRow() == null) {
502      return;
503    }
504    try (RegionLocator locator = connection.getRegionLocator(tableName)) {
505      List<HRegionLocation> locs = locator.getRegionLocations(r.getRow());
506      for (HRegionLocation h : locs) {
507        LOG.info("LOCATION " + h);
508      }
509    } catch (IOException e) {
510      LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
511    }
512  }
513
514  private String resultToString(Result result) {
515    StringBuilder sb = new StringBuilder();
516    sb.append("cells=");
517    if(result.isEmpty()) {
518      sb.append("NONE");
519      return sb.toString();
520    }
521    sb.append("{");
522    boolean moreThanOne = false;
523    for(Cell cell : result.listCells()) {
524      if(moreThanOne) {
525        sb.append(", ");
526      } else {
527        moreThanOne = true;
528      }
529      sb.append(CellUtil.toString(cell, true));
530    }
531    sb.append("}");
532    return sb.toString();
533  }
534
535  // Parse mutate info into a map of <column name> => <update action>
536  private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
537    Map<String, MutationType> mi = new HashMap<>();
538    if (mutateInfo != null) {
539      String mutateInfoStr = Bytes.toString(mutateInfo);
540      String[] mutations = mutateInfoStr.split("#");
541      for (String mutation: mutations) {
542        if (mutation.isEmpty()) continue;
543        Preconditions.checkArgument(mutation.contains(":"),
544          "Invalid mutation info " + mutation);
545        int p = mutation.indexOf(":");
546        String column = mutation.substring(0, p);
547        MutationType type = MutationType.valueOf(
548          Integer.parseInt(mutation.substring(p+1)));
549        mi.put(column, type);
550      }
551    }
552    return mi;
553  }
554}