001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.Executors;
023import java.util.concurrent.ScheduledExecutorService;
024import java.util.concurrent.ScheduledFuture;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.IntegrationTestIngest;
032import org.apache.hadoop.hbase.IntegrationTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.Consistency;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.RegionLocator;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
042import org.apache.hadoop.hbase.testclassification.IntegrationTests;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.util.LoadTestTool;
045import org.apache.hadoop.hbase.util.MultiThreadedReader;
046import org.apache.hadoop.hbase.util.Threads;
047import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
048import org.apache.hadoop.util.StringUtils;
049import org.apache.hadoop.util.ToolRunner;
050import org.junit.Assert;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056
057/**
058 * An IntegrationTest for doing reads with a timeout, to a read-only table with region replicas.
059 * ChaosMonkey is run which kills the region servers and master, but ensures that meta region server
060 * is not killed, and at most 2 region servers are dead at any point in time. The expected behavior
061 * is that all reads with stale mode true will return before the timeout (5 sec by default). The
062 * test fails if the read requests does not finish in time.
063 * <p>
064 * This test uses LoadTestTool to read and write the data from a single client but multiple threads.
065 * The data is written first, then we allow the region replicas to catch up. Then we start the
066 * reader threads doing get requests with stale mode true. Chaos Monkey is started after some delay
067 * (20 sec by default) after the reader threads are started so that there is enough time to fully
068 * cache meta. These parameters (and some other parameters from LoadTestTool) can be used to control
069 * behavior, given values are default:
070 *
071 * <pre>
072 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
073 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
074 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000
075 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500
076 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3
077 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20
078 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20
079 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
080 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000
081 * </pre>
082 *
083 * Use this test with "serverKilling" ChaosMonkey. Sample usage:
084 *
085 * <pre>
086 * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas
087 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
088 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40
089 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40
090 * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling
091 * </pre>
092 */
093@Category(IntegrationTests.class)
094public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest {
095
096  private static final Logger LOG =
097    LoggerFactory.getLogger(IntegrationTestTimeBoundedRequestsWithRegionReplicas.class);
098
099  private static final String TEST_NAME =
100    IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName();
101
102  protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec
103  protected static final String GET_TIMEOUT_KEY = "get_timeout_ms";
104
105  protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec
106  protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay";
107
108  protected static final int DEFAULT_REGION_REPLICATION = 3;
109
110  @Override
111  protected void startMonkey() throws Exception {
112    // we do not want to start the monkey at the start of the test.
113  }
114
115  @Override
116  protected MonkeyFactory getDefaultMonkeyFactory() {
117    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
118  }
119
120  @Override
121  public void setConf(Configuration conf) {
122    super.setConf(conf);
123    // default replication for this test is 3
124    String clazz = this.getClass().getSimpleName();
125    conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION),
126      Integer.toString(DEFAULT_REGION_REPLICATION));
127  }
128
129  protected void writeData(int colsPerKey, int recordSize, int writeThreads, long startKey,
130    long numKeys) throws IOException {
131    int ret = loadTool.run(getArgsForLoadTestTool("-write",
132      String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
133    if (0 != ret) {
134      String errorMsg = "Load failed with error code " + ret;
135      LOG.error(errorMsg);
136      Assert.fail(errorMsg);
137    }
138  }
139
140  @Override
141  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
142    int recordSize, int writeThreads, int readThreads) throws Exception {
143    LOG.info("Cluster size:"
144      + util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
145
146    long start = EnvironmentEdgeManager.currentTime();
147    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
148    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
149    long startKey = 0;
150
151    long numKeys = getNumKeys(keysPerServerPerIter);
152
153    // write data once
154    LOG.info("Writing some data to the table");
155    writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys);
156
157    // flush the table
158    LOG.info("Flushing the table");
159    Admin admin = util.getAdmin();
160    admin.flush(getTablename());
161
162    // re-open the regions to make sure that the replicas are up to date
163    long refreshTime =
164      conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
165    if (refreshTime > 0 && refreshTime <= 10000) {
166      LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated");
167      Threads.sleep(refreshTime * 3);
168    } else {
169      LOG.info("Reopening the table");
170      admin.disableTable(getTablename());
171      admin.enableTable(getTablename());
172    }
173
174    // We should only start the ChaosMonkey after the readers are started and have cached
175    // all of the region locations. Because the meta is not replicated, the timebounded reads
176    // will timeout if meta server is killed.
177    // We will start the chaos monkey after 1 minute, and since the readers are reading random
178    // keys, it should be enough to cache every region entry.
179    long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY),
180      DEFAUL_CHAOS_MONKEY_DELAY);
181    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
182    LOG.info(
183      String.format("ChaosMonkey delay is : %d seconds. Will start %s " + "ChaosMonkey after delay",
184        chaosMonkeyDelay / 1000, monkeyToUse));
185    ScheduledFuture<?> result = executorService.schedule(new Runnable() {
186      @Override
187      public void run() {
188        try {
189          LOG.info("Starting ChaosMonkey");
190          monkey.start();
191          monkey.waitForStop();
192        } catch (Exception e) {
193          LOG.warn(StringUtils.stringifyException(e));
194        }
195
196      }
197    }, chaosMonkeyDelay, TimeUnit.MILLISECONDS);
198
199    // set the intended run time for the reader. The reader will do read requests
200    // to random keys for this amount of time.
201    long remainingTime = runtime - (EnvironmentEdgeManager.currentTime() - start);
202    if (remainingTime <= 0) {
203      LOG.error("The amount of time left for the test to perform random reads is "
204        + "non-positive. Increase the test execution time via "
205        + String.format(RUN_TIME_KEY,
206          IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName())
207        + " or reduce the amount of data written per server via "
208        + IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName() + "."
209        + IntegrationTestIngest.NUM_KEYS_PER_SERVER_KEY);
210      throw new IllegalArgumentException("No time remains to execute random reads");
211    }
212    LOG.info("Reading random keys from the table for " + remainingTime / 60000 + " min");
213    this.conf.setLong(
214      String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName()),
215      remainingTime); // load tool shares the same conf
216
217    // now start the readers which will run for configured run time
218    try {
219      int ret = loadTool.run(
220        getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys));
221      if (0 != ret) {
222        String errorMsg = "Verification failed with error code " + ret;
223        LOG.error(errorMsg);
224        Assert.fail(errorMsg);
225      }
226    } finally {
227      if (result != null) result.cancel(false);
228      monkey.stop("Stopping the test");
229      monkey.waitForStop();
230      executorService.shutdown();
231    }
232  }
233
234  @Override
235  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
236    long numKeys) {
237    List<String> args =
238      Lists.newArrayList(super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys));
239    args.add("-reader");
240    args.add(TimeBoundedMultiThreadedReader.class.getName());
241    return args.toArray(new String[args.size()]);
242  }
243
244  public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader {
245    protected long timeoutNano;
246    protected AtomicLong timedOutReads = new AtomicLong();
247    protected long runTime;
248    protected Thread timeoutThread;
249    protected AtomicLong staleReads = new AtomicLong();
250
251    public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
252      TableName tableName, double verifyPercent) throws IOException {
253      super(dataGen, conf, tableName, verifyPercent);
254      long timeoutMs =
255        conf.getLong(String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT);
256      timeoutNano = timeoutMs * 1000000;
257      LOG.info("Timeout for gets: " + timeoutMs);
258      String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
259      this.runTime = conf.getLong(runTimeKey, -1);
260      if (this.runTime <= 0) {
261        throw new IllegalArgumentException("Please configure " + runTimeKey);
262      }
263    }
264
265    @Override
266    public void waitForFinish() {
267      try {
268        this.timeoutThread.join();
269      } catch (InterruptedException e) {
270        // Restore interrupt status
271        Thread.currentThread().interrupt();
272      }
273      this.aborted = true;
274      super.waitForFinish();
275    }
276
277    @Override
278    protected String progressInfo() {
279      StringBuilder builder = new StringBuilder(super.progressInfo());
280      appendToStatus(builder, "stale_reads", staleReads.get());
281      appendToStatus(builder, "get_timeouts", timedOutReads.get());
282      return builder.toString();
283    }
284
285    @Override
286    public void start(long startKey, long endKey, int numThreads) throws IOException {
287      super.start(startKey, endKey, numThreads);
288      this.timeoutThread = new TimeoutThread(this.runTime);
289      this.timeoutThread.start();
290    }
291
292    @Override
293    protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
294      return new TimeBoundedMultiThreadedReaderThread(readerId);
295    }
296
297    private static class TimeoutThread extends Thread {
298      long timeout;
299      long reportInterval = 60000;
300
301      public TimeoutThread(long timeout) {
302        this.timeout = timeout;
303      }
304
305      @Override
306      public void run() {
307        while (true) {
308          long rem = Math.min(timeout, reportInterval);
309          if (rem <= 0) {
310            break;
311          }
312          LOG.info("Remaining execution time:" + timeout / 60000 + " min");
313          Threads.sleep(rem);
314          timeout -= rem;
315        }
316      }
317    }
318
319    public class TimeBoundedMultiThreadedReaderThread
320      extends MultiThreadedReader.HBaseReaderThread {
321
322      public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException {
323        super(readerId);
324      }
325
326      @Override
327      protected Get createGet(long keyToRead) throws IOException {
328        Get get = super.createGet(keyToRead);
329        get.setConsistency(Consistency.TIMELINE);
330        return get;
331      }
332
333      @Override
334      protected long getNextKeyToRead() {
335        // always read a random key, assuming that the writer has finished writing all keys
336        long key =
337          startKey + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % (endKey - startKey);
338        return key;
339      }
340
341      @Override
342      protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
343        Result[] results, Table table, boolean isNullExpected) throws IOException {
344        super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table,
345          isNullExpected);
346        for (Result r : results) {
347          if (r.isStale()) staleReads.incrementAndGet();
348        }
349        // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC
350        // to complete, but if the request took longer than timeout, we treat that as error.
351        if (elapsedNano > timeoutNano) {
352          timedOutReads.incrementAndGet();
353          numReadFailures.addAndGet(1); // fail the test
354          for (Result r : results) {
355            LOG.error("FAILED FOR " + r);
356            List<HRegionLocation> locs;
357            try (RegionLocator locator = connection.getRegionLocator(tableName)) {
358              locs = locator.getRegionLocations(r.getRow());
359            }
360            for (HRegionLocation h : locs) {
361              LOG.error("LOCATION " + h);
362            }
363          }
364        }
365      }
366    }
367  }
368
369  public static void main(String[] args) throws Exception {
370    Configuration conf = HBaseConfiguration.create();
371    IntegrationTestingUtility.setUseDistributedCluster(conf);
372    int ret =
373      ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args);
374    System.exit(ret);
375  }
376}