001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.concurrent.TimeUnit;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.hbase.client.Admin;
026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.ConnectionUtils;
030import org.apache.hadoop.hbase.client.Consistency;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
037import org.apache.hadoop.hbase.metrics.impl.FastLongHistogram;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hadoop.util.Tool;
041import org.apache.hadoop.util.ToolRunner;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
047import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
050
051/**
052 * A tool to evaluating the lag between primary replica and secondary replica.
053 * <p/>
054 * It simply adds a row to the primary replica, and then check how long before we can read it from
055 * the secondary replica.
056 */
057@InterfaceAudience.Private
058public class RegionReplicationLagEvaluation extends Configured implements Tool {
059
060  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationLagEvaluation.class);
061
062  public static final String TABLE_NAME = "TestLagTable";
063
064  public static final String FAMILY_NAME = "info";
065
066  public static final String QUALIFIER_NAME = "qual";
067
068  public static final int VALUE_LENGTH = 256;
069
070  public static final int ROW_LENGTH = 16;
071
072  private static final Options OPTIONS = new Options().addOption("t", "table", true, "Table name")
073    .addOption("rlen", "rlength", true, "The length of row key")
074    .addOption("vlen", "vlength", true, "The length of value")
075    .addRequiredOption("r", "rows", true, "Number of rows to test");
076
077  private FastLongHistogram histogram = new FastLongHistogram();
078
079  @RestrictedApi(explanation = "Should only be called in tests", link = "",
080      allowedOnPath = ".*/src/test/.*")
081  FastLongHistogram getHistogram() {
082    return histogram;
083  }
084
085  @Override
086  public int run(String[] args) throws Exception {
087    TableName tableName;
088    int rlen;
089    int vlen;
090    int rows;
091    try {
092      CommandLine cli = new DefaultParser().parse(OPTIONS, args);
093      tableName = TableName.valueOf(cli.getOptionValue("t", TABLE_NAME));
094      rlen = Integer.parseInt(cli.getOptionValue("rlen", String.valueOf(ROW_LENGTH)));
095      vlen = Integer.parseInt(cli.getOptionValue("vlen", String.valueOf(VALUE_LENGTH)));
096      rows = Integer.parseInt(cli.getOptionValue("r"));
097    } catch (Exception e) {
098      LOG.warn("Error parsing command line options", e);
099      HelpFormatter formatter = new HelpFormatter();
100      formatter.printHelp(getClass().getName(), OPTIONS);
101      return -1;
102    }
103    exec(tableName, rlen, vlen, rows);
104    return 0;
105  }
106
107  private void createTable(Admin admin, TableName tableName) throws IOException {
108    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
109      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME)).setRegionReplication(2)
110      .build();
111    admin.createTable(td);
112  }
113
114  private void checkLag(Table table, int rlen, int vlen, int rows) throws IOException {
115    byte[] family = Bytes.toBytes(FAMILY_NAME);
116    byte[] qualifier = Bytes.toBytes(QUALIFIER_NAME);
117    LOG.info("Test replication lag on table {} with {} rows", table.getName(), rows);
118    for (int i = 0; i < rows; i++) {
119      byte[] row = new byte[rlen];
120      Bytes.random(row);
121      byte[] value = new byte[vlen];
122      Bytes.random(value);
123      table.put(new Put(row).addColumn(family, qualifier, value));
124      // get from secondary replica
125      Get get = new Get(row).setConsistency(Consistency.TIMELINE).setReplicaId(1);
126      long startNs = System.nanoTime();
127      for (int retry = 0;; retry++) {
128        Result result = table.get(get);
129        byte[] gotValue = result.getValue(family, qualifier);
130        if (Arrays.equals(value, gotValue)) {
131          break;
132        }
133        long pauseTimeMs = Math.min(ConnectionUtils.getPauseTime(1, retry), 1000);
134        Threads.sleepWithoutInterrupt(pauseTimeMs);
135      }
136      long lagMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
137      histogram.add(lagMs, 1);
138    }
139    LOG.info("Test finished, min lag {} ms, max lag {} ms, mean lag {} ms", histogram.getMin(),
140      histogram.getMax(), histogram.getMean());
141    long[] q = histogram.getQuantiles(FastLongHistogram.DEFAULT_QUANTILES);
142    for (int i = 0; i < q.length; i++) {
143      LOG.info("{}% lag: {} ms", FastLongHistogram.DEFAULT_QUANTILES[i] * 100, q[i]);
144    }
145  }
146
147  private void exec(TableName tableName, int rlen, int vlen, int rows) throws IOException {
148    try (Connection conn = ConnectionFactory.createConnection(getConf())) {
149      try (Admin admin = conn.getAdmin()) {
150        if (!admin.tableExists(tableName)) {
151          createTable(admin, tableName);
152        }
153      }
154      try (Table table = conn.getTable(tableName)) {
155        checkLag(table, rlen, vlen, rows);
156      }
157    }
158  }
159
160  public static void main(String[] args) throws Exception {
161    int res =
162      ToolRunner.run(HBaseConfiguration.create(), new RegionReplicationLagEvaluation(), args);
163    System.exit(res);
164  }
165}