001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.test;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.Executors;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.commons.lang3.RandomUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.IntegrationTestIngest;
033import org.apache.hadoop.hbase.IntegrationTestingUtility;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ClusterConnection;
039import org.apache.hadoop.hbase.client.Consistency;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
044import org.apache.hadoop.hbase.testclassification.IntegrationTests;
045import org.apache.hadoop.hbase.util.LoadTestTool;
046import org.apache.hadoop.hbase.util.MultiThreadedReader;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
049import org.apache.hadoop.util.StringUtils;
050import org.apache.hadoop.util.ToolRunner;
051import org.junit.Assert;
052import org.junit.experimental.categories.Category;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056
057/**
058 * An IntegrationTest for doing reads with a timeout, to a read-only table with region
059 * replicas. ChaosMonkey is run which kills the region servers and master, but ensures
060 * that meta region server is not killed, and at most 2 region servers are dead at any point
061 * in time. The expected behavior is that all reads with stale mode true will return
062 * before the timeout (5 sec by default). The test fails if the read requests does not finish
063 * in time.
064 *
065 * <p> This test uses LoadTestTool to read and write the data from a single client but
066 * multiple threads. The data is written first, then we allow the region replicas to catch
067 * up. Then we start the reader threads doing get requests with stale mode true. Chaos Monkey is
068 * started after some delay (20 sec by default) after the reader threads are started so that
069 * there is enough time to fully cache meta.
070 *
071 * These parameters (and some other parameters from LoadTestTool) can be used to
072 * control behavior, given values are default:
073 * <pre>
074 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
075 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
076 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000
077 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500
078 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3
079 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20
080 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20
081 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
082 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000
083 * </pre>
084 * Use this test with "serverKilling" ChaosMonkey. Sample usage:
085 * <pre>
086 * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas
087 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
088 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40
089 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40
090 * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling
091 * </pre>
092 */
093@Category(IntegrationTests.class)
094public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest {
095
096  private static final Logger LOG = LoggerFactory.getLogger(
097    IntegrationTestTimeBoundedRequestsWithRegionReplicas.class);
098
099  private static final String TEST_NAME
100    = IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName();
101
102  protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec
103  protected static final String GET_TIMEOUT_KEY = "get_timeout_ms";
104
105  protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec
106  protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay";
107
108  protected static final int DEFAULT_REGION_REPLICATION = 3;
109
110  @Override
111  protected void startMonkey() throws Exception {
112    // we do not want to start the monkey at the start of the test.
113  }
114
115  @Override
116  protected MonkeyFactory getDefaultMonkeyFactory() {
117    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
118  }
119
120  @Override
121  public void setConf(Configuration conf) {
122    super.setConf(conf);
123    // default replication for this test is 3
124    String clazz = this.getClass().getSimpleName();
125    conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION),
126      Integer.toString(DEFAULT_REGION_REPLICATION));
127  }
128
129  protected void writeData(int colsPerKey, int recordSize, int writeThreads,
130      long startKey, long numKeys) throws IOException {
131    int ret = loadTool.run(getArgsForLoadTestTool("-write",
132      String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
133    if (0 != ret) {
134      String errorMsg = "Load failed with error code " + ret;
135      LOG.error(errorMsg);
136      Assert.fail(errorMsg);
137    }
138  }
139
140  @Override
141  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
142      int recordSize, int writeThreads, int readThreads) throws Exception {
143    LOG.info("Cluster size:"+
144      util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
145
146    long start = System.currentTimeMillis();
147    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
148    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
149    long startKey = 0;
150
151    long numKeys = getNumKeys(keysPerServerPerIter);
152
153
154    // write data once
155    LOG.info("Writing some data to the table");
156    writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys);
157
158    // flush the table
159    LOG.info("Flushing the table");
160    Admin admin = util.getAdmin();
161    admin.flush(getTablename());
162
163    // re-open the regions to make sure that the replicas are up to date
164    long refreshTime = conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
165    if (refreshTime > 0 && refreshTime <= 10000) {
166      LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated");
167      Threads.sleep(refreshTime*3);
168    } else {
169      LOG.info("Reopening the table");
170      admin.disableTable(getTablename());
171      admin.enableTable(getTablename());
172    }
173
174    // We should only start the ChaosMonkey after the readers are started and have cached
175    // all of the region locations. Because the meta is not replicated, the timebounded reads
176    // will timeout if meta server is killed.
177    // We will start the chaos monkey after 1 minute, and since the readers are reading random
178    // keys, it should be enough to cache every region entry.
179    long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY)
180      , DEFAUL_CHAOS_MONKEY_DELAY);
181    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
182    LOG.info(String.format("ChaosMonkey delay is : %d seconds. Will start %s " +
183        "ChaosMonkey after delay", chaosMonkeyDelay / 1000, monkeyToUse));
184    ScheduledFuture<?> result = executorService.schedule(new Runnable() {
185      @Override
186      public void run() {
187        try {
188          LOG.info("Starting ChaosMonkey");
189          monkey.start();
190          monkey.waitForStop();
191        } catch (Exception e) {
192          LOG.warn(StringUtils.stringifyException(e));
193        }
194
195      }
196    }, chaosMonkeyDelay, TimeUnit.MILLISECONDS);
197
198    // set the intended run time for the reader. The reader will do read requests
199    // to random keys for this amount of time.
200    long remainingTime = runtime - (System.currentTimeMillis() - start);
201    if (remainingTime <= 0) {
202      LOG.error("The amount of time left for the test to perform random reads is "
203          + "non-positive. Increase the test execution time via "
204          + String.format(RUN_TIME_KEY,
205                IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName())
206          + " or reduce the amount of data written per server via "
207          + IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName()
208          + "." + IntegrationTestIngest.NUM_KEYS_PER_SERVER_KEY);
209      throw new IllegalArgumentException("No time remains to execute random reads");
210    }
211    LOG.info("Reading random keys from the table for " + remainingTime/60000 + " min");
212    this.conf.setLong(
213      String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName())
214      , remainingTime); // load tool shares the same conf
215
216    // now start the readers which will run for configured run time
217    try {
218      int ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
219        , startKey, numKeys));
220      if (0 != ret) {
221        String errorMsg = "Verification failed with error code " + ret;
222        LOG.error(errorMsg);
223        Assert.fail(errorMsg);
224      }
225    } finally {
226      if (result != null) result.cancel(false);
227      monkey.stop("Stopping the test");
228      monkey.waitForStop();
229      executorService.shutdown();
230    }
231  }
232
233  @Override
234  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
235      long numKeys) {
236    List<String> args = Lists.newArrayList(super.getArgsForLoadTestTool(
237      mode, modeSpecificArg, startKey, numKeys));
238    args.add("-reader");
239    args.add(TimeBoundedMultiThreadedReader.class.getName());
240    return args.toArray(new String[args.size()]);
241  }
242
243  public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader {
244    protected long timeoutNano;
245    protected AtomicLong timedOutReads = new AtomicLong();
246    protected long runTime;
247    protected Thread timeoutThread;
248    protected AtomicLong staleReads = new AtomicLong();
249
250    public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
251        TableName tableName, double verifyPercent) throws IOException {
252      super(dataGen, conf, tableName, verifyPercent);
253      long timeoutMs = conf.getLong(
254        String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT);
255      timeoutNano = timeoutMs * 1000000;
256      LOG.info("Timeout for gets: " + timeoutMs);
257      String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
258      this.runTime = conf.getLong(runTimeKey, -1);
259      if (this.runTime <= 0) {
260        throw new IllegalArgumentException("Please configure " + runTimeKey);
261      }
262    }
263
264    @Override
265    public void waitForFinish() {
266      try {
267        this.timeoutThread.join();
268      } catch (InterruptedException e) {
269        e.printStackTrace();
270      }
271      this.aborted = true;
272      super.waitForFinish();
273    }
274
275    @Override
276    protected String progressInfo() {
277      StringBuilder builder = new StringBuilder(super.progressInfo());
278      appendToStatus(builder, "stale_reads", staleReads.get());
279      appendToStatus(builder, "get_timeouts", timedOutReads.get());
280      return builder.toString();
281    }
282
283    @Override
284    public void start(long startKey, long endKey, int numThreads) throws IOException {
285      super.start(startKey, endKey, numThreads);
286      this.timeoutThread = new TimeoutThread(this.runTime);
287      this.timeoutThread.start();
288    }
289
290    @Override
291    protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
292      return new TimeBoundedMultiThreadedReaderThread(readerId);
293    }
294
295    private class TimeoutThread extends Thread {
296      long timeout;
297      long reportInterval = 60000;
298      public TimeoutThread(long timeout) {
299        this.timeout = timeout;
300      }
301
302      @Override
303      public void run() {
304        while (true) {
305          long rem = Math.min(timeout, reportInterval);
306          if (rem <= 0) {
307            break;
308          }
309          LOG.info("Remaining execution time:" + timeout / 60000 + " min");
310          Threads.sleep(rem);
311          timeout -= rem;
312        }
313      }
314    }
315
316    public class TimeBoundedMultiThreadedReaderThread
317      extends MultiThreadedReader.HBaseReaderThread {
318
319      public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException {
320        super(readerId);
321      }
322
323      @Override
324      protected Get createGet(long keyToRead) throws IOException {
325        Get get = super.createGet(keyToRead);
326        get.setConsistency(Consistency.TIMELINE);
327        return get;
328      }
329
330      @Override
331      protected long getNextKeyToRead() {
332        // always read a random key, assuming that the writer has finished writing all keys
333        long key = startKey + Math.abs(RandomUtils.nextLong())
334            % (endKey - startKey);
335        return key;
336      }
337
338      @Override
339      protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
340          Result[] results, Table table, boolean isNullExpected)
341          throws IOException {
342        super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected);
343        for (Result r : results) {
344          if (r.isStale()) staleReads.incrementAndGet();
345        }
346        // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC
347        // to complete, but if the request took longer than timeout, we treat that as error.
348        if (elapsedNano > timeoutNano) {
349          timedOutReads.incrementAndGet();
350          numReadFailures.addAndGet(1); // fail the test
351          for (Result r : results) {
352            LOG.error("FAILED FOR " + r);
353            RegionLocations rl = ((ClusterConnection)connection).
354                locateRegion(tableName, r.getRow(), true, true);
355            HRegionLocation locations[] = rl.getRegionLocations();
356            for (HRegionLocation h : locations) {
357              LOG.error("LOCATION " + h);
358            }
359          }
360        }
361      }
362    }
363  }
364
365  public static void main(String[] args) throws Exception {
366    Configuration conf = HBaseConfiguration.create();
367    IntegrationTestingUtility.setUseDistributedCluster(conf);
368    int ret = ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args);
369    System.exit(ret);
370  }
371}