001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.test;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.Executors;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.commons.lang3.RandomUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.IntegrationTestIngest;
033import org.apache.hadoop.hbase.IntegrationTestingUtility;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.Consistency;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
043import org.apache.hadoop.hbase.testclassification.IntegrationTests;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.LoadTestTool;
046import org.apache.hadoop.hbase.util.MultiThreadedReader;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
049import org.apache.hadoop.util.StringUtils;
050import org.apache.hadoop.util.ToolRunner;
051import org.junit.Assert;
052import org.junit.experimental.categories.Category;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057
058/**
059 * An IntegrationTest for doing reads with a timeout, to a read-only table with region
060 * replicas. ChaosMonkey is run which kills the region servers and master, but ensures
061 * that meta region server is not killed, and at most 2 region servers are dead at any point
062 * in time. The expected behavior is that all reads with stale mode true will return
063 * before the timeout (5 sec by default). The test fails if the read requests does not finish
064 * in time.
065 *
066 * <p> This test uses LoadTestTool to read and write the data from a single client but
067 * multiple threads. The data is written first, then we allow the region replicas to catch
068 * up. Then we start the reader threads doing get requests with stale mode true. Chaos Monkey is
069 * started after some delay (20 sec by default) after the reader threads are started so that
070 * there is enough time to fully cache meta.
071 *
072 * These parameters (and some other parameters from LoadTestTool) can be used to
073 * control behavior, given values are default:
074 * <pre>
075 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
076 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
077 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000
078 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500
079 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3
080 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20
081 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20
082 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
083 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000
084 * </pre>
085 * Use this test with "serverKilling" ChaosMonkey. Sample usage:
086 * <pre>
087 * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas
088 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
089 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40
090 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40
091 * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling
092 * </pre>
093 */
094@Category(IntegrationTests.class)
095public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest {
096
097  private static final Logger LOG = LoggerFactory.getLogger(
098    IntegrationTestTimeBoundedRequestsWithRegionReplicas.class);
099
100  private static final String TEST_NAME
101    = IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName();
102
103  protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec
104  protected static final String GET_TIMEOUT_KEY = "get_timeout_ms";
105
106  protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec
107  protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay";
108
109  protected static final int DEFAULT_REGION_REPLICATION = 3;
110
111  @Override
112  protected void startMonkey() throws Exception {
113    // we do not want to start the monkey at the start of the test.
114  }
115
116  @Override
117  protected MonkeyFactory getDefaultMonkeyFactory() {
118    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
119  }
120
121  @Override
122  public void setConf(Configuration conf) {
123    super.setConf(conf);
124    // default replication for this test is 3
125    String clazz = this.getClass().getSimpleName();
126    conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION),
127      Integer.toString(DEFAULT_REGION_REPLICATION));
128  }
129
130  protected void writeData(int colsPerKey, int recordSize, int writeThreads,
131      long startKey, long numKeys) throws IOException {
132    int ret = loadTool.run(getArgsForLoadTestTool("-write",
133      String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
134    if (0 != ret) {
135      String errorMsg = "Load failed with error code " + ret;
136      LOG.error(errorMsg);
137      Assert.fail(errorMsg);
138    }
139  }
140
141  @Override
142  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
143      int recordSize, int writeThreads, int readThreads) throws Exception {
144    LOG.info("Cluster size:"+
145      util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
146
147    long start = EnvironmentEdgeManager.currentTime();
148    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
149    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
150    long startKey = 0;
151
152    long numKeys = getNumKeys(keysPerServerPerIter);
153
154
155    // write data once
156    LOG.info("Writing some data to the table");
157    writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys);
158
159    // flush the table
160    LOG.info("Flushing the table");
161    Admin admin = util.getAdmin();
162    admin.flush(getTablename());
163
164    // re-open the regions to make sure that the replicas are up to date
165    long refreshTime = conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
166    if (refreshTime > 0 && refreshTime <= 10000) {
167      LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated");
168      Threads.sleep(refreshTime*3);
169    } else {
170      LOG.info("Reopening the table");
171      admin.disableTable(getTablename());
172      admin.enableTable(getTablename());
173    }
174
175    // We should only start the ChaosMonkey after the readers are started and have cached
176    // all of the region locations. Because the meta is not replicated, the timebounded reads
177    // will timeout if meta server is killed.
178    // We will start the chaos monkey after 1 minute, and since the readers are reading random
179    // keys, it should be enough to cache every region entry.
180    long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY)
181      , DEFAUL_CHAOS_MONKEY_DELAY);
182    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
183    LOG.info(String.format("ChaosMonkey delay is : %d seconds. Will start %s " +
184        "ChaosMonkey after delay", chaosMonkeyDelay / 1000, monkeyToUse));
185    ScheduledFuture<?> result = executorService.schedule(new Runnable() {
186      @Override
187      public void run() {
188        try {
189          LOG.info("Starting ChaosMonkey");
190          monkey.start();
191          monkey.waitForStop();
192        } catch (Exception e) {
193          LOG.warn(StringUtils.stringifyException(e));
194        }
195
196      }
197    }, chaosMonkeyDelay, TimeUnit.MILLISECONDS);
198
199    // set the intended run time for the reader. The reader will do read requests
200    // to random keys for this amount of time.
201    long remainingTime = runtime - (EnvironmentEdgeManager.currentTime() - start);
202    if (remainingTime <= 0) {
203      LOG.error("The amount of time left for the test to perform random reads is "
204          + "non-positive. Increase the test execution time via "
205          + String.format(RUN_TIME_KEY,
206                IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName())
207          + " or reduce the amount of data written per server via "
208          + IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName()
209          + "." + IntegrationTestIngest.NUM_KEYS_PER_SERVER_KEY);
210      throw new IllegalArgumentException("No time remains to execute random reads");
211    }
212    LOG.info("Reading random keys from the table for " + remainingTime/60000 + " min");
213    this.conf.setLong(
214      String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName())
215      , remainingTime); // load tool shares the same conf
216
217    // now start the readers which will run for configured run time
218    try {
219      int ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
220        , startKey, numKeys));
221      if (0 != ret) {
222        String errorMsg = "Verification failed with error code " + ret;
223        LOG.error(errorMsg);
224        Assert.fail(errorMsg);
225      }
226    } finally {
227      if (result != null) result.cancel(false);
228      monkey.stop("Stopping the test");
229      monkey.waitForStop();
230      executorService.shutdown();
231    }
232  }
233
234  @Override
235  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
236      long numKeys) {
237    List<String> args = Lists.newArrayList(super.getArgsForLoadTestTool(
238      mode, modeSpecificArg, startKey, numKeys));
239    args.add("-reader");
240    args.add(TimeBoundedMultiThreadedReader.class.getName());
241    return args.toArray(new String[args.size()]);
242  }
243
244  public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader {
245    protected long timeoutNano;
246    protected AtomicLong timedOutReads = new AtomicLong();
247    protected long runTime;
248    protected Thread timeoutThread;
249    protected AtomicLong staleReads = new AtomicLong();
250
251    public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
252        TableName tableName, double verifyPercent) throws IOException {
253      super(dataGen, conf, tableName, verifyPercent);
254      long timeoutMs = conf.getLong(
255        String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT);
256      timeoutNano = timeoutMs * 1000000;
257      LOG.info("Timeout for gets: " + timeoutMs);
258      String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
259      this.runTime = conf.getLong(runTimeKey, -1);
260      if (this.runTime <= 0) {
261        throw new IllegalArgumentException("Please configure " + runTimeKey);
262      }
263    }
264
265    @Override
266    public void waitForFinish() {
267      try {
268        this.timeoutThread.join();
269      } catch (InterruptedException e) {
270        e.printStackTrace();
271      }
272      this.aborted = true;
273      super.waitForFinish();
274    }
275
276    @Override
277    protected String progressInfo() {
278      StringBuilder builder = new StringBuilder(super.progressInfo());
279      appendToStatus(builder, "stale_reads", staleReads.get());
280      appendToStatus(builder, "get_timeouts", timedOutReads.get());
281      return builder.toString();
282    }
283
284    @Override
285    public void start(long startKey, long endKey, int numThreads) throws IOException {
286      super.start(startKey, endKey, numThreads);
287      this.timeoutThread = new TimeoutThread(this.runTime);
288      this.timeoutThread.start();
289    }
290
291    @Override
292    protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
293      return new TimeBoundedMultiThreadedReaderThread(readerId);
294    }
295
296    private class TimeoutThread extends Thread {
297      long timeout;
298      long reportInterval = 60000;
299      public TimeoutThread(long timeout) {
300        this.timeout = timeout;
301      }
302
303      @Override
304      public void run() {
305        while (true) {
306          long rem = Math.min(timeout, reportInterval);
307          if (rem <= 0) {
308            break;
309          }
310          LOG.info("Remaining execution time:" + timeout / 60000 + " min");
311          Threads.sleep(rem);
312          timeout -= rem;
313        }
314      }
315    }
316
317    public class TimeBoundedMultiThreadedReaderThread
318      extends MultiThreadedReader.HBaseReaderThread {
319
320      public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException {
321        super(readerId);
322      }
323
324      @Override
325      protected Get createGet(long keyToRead) throws IOException {
326        Get get = super.createGet(keyToRead);
327        get.setConsistency(Consistency.TIMELINE);
328        return get;
329      }
330
331      @Override
332      protected long getNextKeyToRead() {
333        // always read a random key, assuming that the writer has finished writing all keys
334        long key = startKey + Math.abs(RandomUtils.nextLong())
335            % (endKey - startKey);
336        return key;
337      }
338
339      @Override
340      protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
341          Result[] results, Table table, boolean isNullExpected)
342          throws IOException {
343        super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected);
344        for (Result r : results) {
345          if (r.isStale()) staleReads.incrementAndGet();
346        }
347        // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC
348        // to complete, but if the request took longer than timeout, we treat that as error.
349        if (elapsedNano > timeoutNano) {
350          timedOutReads.incrementAndGet();
351          numReadFailures.addAndGet(1); // fail the test
352          for (Result r : results) {
353            LOG.error("FAILED FOR " + r);
354            List<HRegionLocation> locs;
355            try (RegionLocator locator = connection.getRegionLocator(tableName)) {
356              locs = locator.getRegionLocations(r.getRow());
357            }
358            for (HRegionLocation h : locs) {
359              LOG.error("LOCATION " + h);
360            }
361          }
362        }
363      }
364    }
365  }
366
367  public static void main(String[] args) throws Exception {
368    Configuration conf = HBaseConfiguration.create();
369    IntegrationTestingUtility.setUseDistributedCluster(conf);
370    int ret = ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args);
371    System.exit(ret);
372  }
373}