001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.Executors;
023import java.util.concurrent.ScheduledExecutorService;
024import java.util.concurrent.ScheduledFuture;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.IntegrationTestIngest;
032import org.apache.hadoop.hbase.IntegrationTestingUtility;
033import org.apache.hadoop.hbase.RegionLocations;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ClusterConnection;
038import org.apache.hadoop.hbase.client.Consistency;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
043import org.apache.hadoop.hbase.testclassification.IntegrationTests;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.LoadTestTool;
046import org.apache.hadoop.hbase.util.MultiThreadedReader;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
049import org.apache.hadoop.util.StringUtils;
050import org.apache.hadoop.util.ToolRunner;
051import org.junit.Assert;
052import org.junit.experimental.categories.Category;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057
058/**
059 * An IntegrationTest for doing reads with a timeout, to a read-only table with region replicas.
060 * ChaosMonkey is run which kills the region servers and master, but ensures that meta region server
061 * is not killed, and at most 2 region servers are dead at any point in time. The expected behavior
062 * is that all reads with stale mode true will return before the timeout (5 sec by default). The
063 * test fails if the read requests does not finish in time.
064 * <p>
065 * This test uses LoadTestTool to read and write the data from a single client but multiple threads.
066 * The data is written first, then we allow the region replicas to catch up. Then we start the
067 * reader threads doing get requests with stale mode true. Chaos Monkey is started after some delay
068 * (20 sec by default) after the reader threads are started so that there is enough time to fully
069 * cache meta. These parameters (and some other parameters from LoadTestTool) can be used to control
070 * behavior, given values are default:
071 *
072 * <pre>
073 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
074 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
075 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000
076 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500
077 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3
078 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20
079 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20
080 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
081 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000
082 * </pre>
083 *
084 * Use this test with "serverKilling" ChaosMonkey. Sample usage:
085 *
086 * <pre>
087 * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas
088 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
089 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40
090 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40
091 * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling
092 * </pre>
093 */
094@Category(IntegrationTests.class)
095public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest {
096
097  private static final Logger LOG =
098    LoggerFactory.getLogger(IntegrationTestTimeBoundedRequestsWithRegionReplicas.class);
099
100  private static final String TEST_NAME =
101    IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName();
102
103  protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec
104  protected static final String GET_TIMEOUT_KEY = "get_timeout_ms";
105
106  protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec
107  protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay";
108
109  protected static final int DEFAULT_REGION_REPLICATION = 3;
110
111  @Override
112  protected void startMonkey() throws Exception {
113    // we do not want to start the monkey at the start of the test.
114  }
115
116  @Override
117  protected MonkeyFactory getDefaultMonkeyFactory() {
118    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
119  }
120
121  @Override
122  public void setConf(Configuration conf) {
123    super.setConf(conf);
124    // default replication for this test is 3
125    String clazz = this.getClass().getSimpleName();
126    conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION),
127      Integer.toString(DEFAULT_REGION_REPLICATION));
128  }
129
130  protected void writeData(int colsPerKey, int recordSize, int writeThreads, long startKey,
131    long numKeys) throws IOException {
132    int ret = loadTool.run(getArgsForLoadTestTool("-write",
133      String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
134    if (0 != ret) {
135      String errorMsg = "Load failed with error code " + ret;
136      LOG.error(errorMsg);
137      Assert.fail(errorMsg);
138    }
139  }
140
141  @Override
142  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
143    int recordSize, int writeThreads, int readThreads) throws Exception {
144    LOG.info("Cluster size:"
145      + util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
146
147    long start = EnvironmentEdgeManager.currentTime();
148    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
149    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
150    long startKey = 0;
151
152    long numKeys = getNumKeys(keysPerServerPerIter);
153
154    // write data once
155    LOG.info("Writing some data to the table");
156    writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys);
157
158    // flush the table
159    LOG.info("Flushing the table");
160    Admin admin = util.getAdmin();
161    admin.flush(getTablename());
162
163    // re-open the regions to make sure that the replicas are up to date
164    long refreshTime =
165      conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
166    if (refreshTime > 0 && refreshTime <= 10000) {
167      LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated");
168      Threads.sleep(refreshTime * 3);
169    } else {
170      LOG.info("Reopening the table");
171      admin.disableTable(getTablename());
172      admin.enableTable(getTablename());
173    }
174
175    // We should only start the ChaosMonkey after the readers are started and have cached
176    // all of the region locations. Because the meta is not replicated, the timebounded reads
177    // will timeout if meta server is killed.
178    // We will start the chaos monkey after 1 minute, and since the readers are reading random
179    // keys, it should be enough to cache every region entry.
180    long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY),
181      DEFAUL_CHAOS_MONKEY_DELAY);
182    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
183    LOG.info(
184      String.format("ChaosMonkey delay is : %d seconds. Will start %s " + "ChaosMonkey after delay",
185        chaosMonkeyDelay / 1000, monkeyToUse));
186    ScheduledFuture<?> result = executorService.schedule(new Runnable() {
187      @Override
188      public void run() {
189        try {
190          LOG.info("Starting ChaosMonkey");
191          monkey.start();
192          monkey.waitForStop();
193        } catch (Exception e) {
194          LOG.warn(StringUtils.stringifyException(e));
195        }
196
197      }
198    }, chaosMonkeyDelay, TimeUnit.MILLISECONDS);
199
200    // set the intended run time for the reader. The reader will do read requests
201    // to random keys for this amount of time.
202    long remainingTime = runtime - (EnvironmentEdgeManager.currentTime() - start);
203    if (remainingTime <= 0) {
204      LOG.error("The amount of time left for the test to perform random reads is "
205        + "non-positive. Increase the test execution time via "
206        + String.format(RUN_TIME_KEY,
207          IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName())
208        + " or reduce the amount of data written per server via "
209        + IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName() + "."
210        + IntegrationTestIngest.NUM_KEYS_PER_SERVER_KEY);
211      throw new IllegalArgumentException("No time remains to execute random reads");
212    }
213    LOG.info("Reading random keys from the table for " + remainingTime / 60000 + " min");
214    this.conf.setLong(
215      String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName()),
216      remainingTime); // load tool shares the same conf
217
218    // now start the readers which will run for configured run time
219    try {
220      int ret = loadTool.run(
221        getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys));
222      if (0 != ret) {
223        String errorMsg = "Verification failed with error code " + ret;
224        LOG.error(errorMsg);
225        Assert.fail(errorMsg);
226      }
227    } finally {
228      if (result != null) result.cancel(false);
229      monkey.stop("Stopping the test");
230      monkey.waitForStop();
231      executorService.shutdown();
232    }
233  }
234
235  @Override
236  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
237    long numKeys) {
238    List<String> args =
239      Lists.newArrayList(super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys));
240    args.add("-reader");
241    args.add(TimeBoundedMultiThreadedReader.class.getName());
242    return args.toArray(new String[args.size()]);
243  }
244
245  public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader {
246    protected long timeoutNano;
247    protected AtomicLong timedOutReads = new AtomicLong();
248    protected long runTime;
249    protected Thread timeoutThread;
250    protected AtomicLong staleReads = new AtomicLong();
251
252    public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
253      TableName tableName, double verifyPercent) throws IOException {
254      super(dataGen, conf, tableName, verifyPercent);
255      long timeoutMs =
256        conf.getLong(String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT);
257      timeoutNano = timeoutMs * 1000000;
258      LOG.info("Timeout for gets: " + timeoutMs);
259      String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
260      this.runTime = conf.getLong(runTimeKey, -1);
261      if (this.runTime <= 0) {
262        throw new IllegalArgumentException("Please configure " + runTimeKey);
263      }
264    }
265
266    @Override
267    public void waitForFinish() {
268      try {
269        this.timeoutThread.join();
270      } catch (InterruptedException e) {
271        // Restore interrupt status
272        Thread.currentThread().interrupt();
273      }
274      this.aborted = true;
275      super.waitForFinish();
276    }
277
278    @Override
279    protected String progressInfo() {
280      StringBuilder builder = new StringBuilder(super.progressInfo());
281      appendToStatus(builder, "stale_reads", staleReads.get());
282      appendToStatus(builder, "get_timeouts", timedOutReads.get());
283      return builder.toString();
284    }
285
286    @Override
287    public void start(long startKey, long endKey, int numThreads) throws IOException {
288      super.start(startKey, endKey, numThreads);
289      this.timeoutThread = new TimeoutThread(this.runTime);
290      this.timeoutThread.start();
291    }
292
293    @Override
294    protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
295      return new TimeBoundedMultiThreadedReaderThread(readerId);
296    }
297
298    private static class TimeoutThread extends Thread {
299      long timeout;
300      long reportInterval = 60000;
301
302      public TimeoutThread(long timeout) {
303        this.timeout = timeout;
304      }
305
306      @Override
307      public void run() {
308        while (true) {
309          long rem = Math.min(timeout, reportInterval);
310          if (rem <= 0) {
311            break;
312          }
313          LOG.info("Remaining execution time:" + timeout / 60000 + " min");
314          Threads.sleep(rem);
315          timeout -= rem;
316        }
317      }
318    }
319
320    public class TimeBoundedMultiThreadedReaderThread
321      extends MultiThreadedReader.HBaseReaderThread {
322
323      public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException {
324        super(readerId);
325      }
326
327      @Override
328      protected Get createGet(long keyToRead) throws IOException {
329        Get get = super.createGet(keyToRead);
330        get.setConsistency(Consistency.TIMELINE);
331        return get;
332      }
333
334      @Override
335      protected long getNextKeyToRead() {
336        // always read a random key, assuming that the writer has finished writing all keys
337        long key =
338          startKey + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % (endKey - startKey);
339        return key;
340      }
341
342      @Override
343      protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
344        Result[] results, Table table, boolean isNullExpected) throws IOException {
345        super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table,
346          isNullExpected);
347        for (Result r : results) {
348          if (r.isStale()) staleReads.incrementAndGet();
349        }
350        // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC
351        // to complete, but if the request took longer than timeout, we treat that as error.
352        if (elapsedNano > timeoutNano) {
353          timedOutReads.incrementAndGet();
354          numReadFailures.addAndGet(1); // fail the test
355          for (Result r : results) {
356            LOG.error("FAILED FOR " + r);
357            RegionLocations rl =
358              ((ClusterConnection) connection).locateRegion(tableName, r.getRow(), true, true);
359            HRegionLocation locations[] = rl.getRegionLocations();
360            for (HRegionLocation h : locations) {
361              LOG.error("LOCATION " + h);
362            }
363          }
364        }
365      }
366    }
367  }
368
369  public static void main(String[] args) throws Exception {
370    Configuration conf = HBaseConfiguration.create();
371    IntegrationTestingUtility.setUseDistributedCluster(conf);
372    int ret =
373      ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args);
374    System.exit(ret);
375  }
376}