001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertFalse;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionInfoBuilder;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.FSUtils;
041import org.apache.hadoop.hbase.util.Threads;
042import org.apache.hadoop.hbase.wal.WAL;
043import org.apache.hadoop.hbase.wal.WALEdit;
044import org.apache.hadoop.hbase.wal.WALFactory;
045import org.apache.hadoop.hbase.wal.WALKeyImpl;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Test many concurrent appenders to an WAL while rolling the log.
054 */
055@Category({RegionServerTests.class, MediumTests.class})
056public class TestLogRollingNoCluster {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestLogRollingNoCluster.class);
061
062  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
063  private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
064  private static final int NUM_THREADS = 100; // Spin up this many threads
065  private static final int NUM_ENTRIES = 100; // How many entries to write
066
067  /** ProtobufLogWriter that simulates higher latencies in sync() call */
068  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
069    @Override
070    public void sync(boolean forceSync) throws IOException {
071      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
072      super.sync(forceSync);
073      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
074    }
075  }
076
077  /**
078   * Spin up a bunch of threads and have them all append to a WAL.  Roll the
079   * WAL frequently to try and trigger NPE.
080   * @throws IOException
081   * @throws InterruptedException
082   */
083  @Test
084  public void testContendedLogRolling() throws Exception {
085    TEST_UTIL.startMiniDFSCluster(3);
086    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
087
088    // The implementation needs to know the 'handler' count.
089    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
090    final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
091    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
092    FSUtils.setRootDir(conf, dir);
093    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
094    final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName());
095    final WAL wal = wals.getWAL(null);
096
097    Appender [] appenders = null;
098
099    final int numThreads = NUM_THREADS;
100    appenders = new Appender[numThreads];
101    try {
102      for (int i = 0; i < numThreads; i++) {
103        // Have each appending thread write 'count' entries
104        appenders[i] = new Appender(wal, i, NUM_ENTRIES);
105      }
106      for (int i = 0; i < numThreads; i++) {
107        appenders[i].start();
108      }
109      for (int i = 0; i < numThreads; i++) {
110        //ensure that all threads are joined before closing the wal
111        appenders[i].join();
112      }
113    } finally {
114      wals.close();
115    }
116    for (int i = 0; i < numThreads; i++) {
117      assertFalse(appenders[i].isException());
118    }
119    TEST_UTIL.shutdownMiniDFSCluster();
120  }
121
122  /**
123   * Appender thread.  Appends to passed wal file.
124   */
125  static class Appender extends Thread {
126    private final Logger log;
127    private final WAL wal;
128    private final int count;
129    private Exception e = null;
130
131    Appender(final WAL wal, final int index, final int count) {
132      super("" + index);
133      this.wal = wal;
134      this.count = count;
135      this.log = LoggerFactory.getLogger("Appender:" + getName());
136    }
137
138    /**
139     * @return Call when the thread is done.
140     */
141    boolean isException() {
142      return !isAlive() && this.e != null;
143    }
144
145    Exception getException() {
146      return this.e;
147    }
148
149    @Override
150    public void run() {
151      this.log.info(getName() +" started");
152      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
153      try {
154        for (int i = 0; i < this.count; i++) {
155          long now = System.currentTimeMillis();
156          // Roll every ten edits
157          if (i % 10 == 0) {
158            this.wal.rollWriter();
159          }
160          WALEdit edit = new WALEdit();
161          byte[] bytes = Bytes.toBytes(i);
162          edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
163          RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO;
164          TableDescriptor htd = TEST_UTIL.getMetaTableDescriptorBuilder().build();
165          NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
166          for(byte[] fam : htd.getColumnFamilyNames()) {
167            scopes.put(fam, 0);
168          }
169          final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
170            TableName.META_TABLE_NAME, now, mvcc, scopes), edit);
171          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
172          wal.sync(txid);
173        }
174        String msg = getName() + " finished";
175        if (isException())
176          this.log.info(msg, getException());
177        else
178          this.log.info(msg);
179      } catch (Exception e) {
180        this.e = e;
181        log.info("Caught exception from Appender:" + getName(), e);
182      } finally {
183        // Call sync on our log.else threads just hang out.
184        try {
185          this.wal.sync();
186        } catch (IOException e) {
187          throw new RuntimeException(e);
188        }
189      }
190    }
191  }
192
193  //@org.junit.Rule
194  //public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
195  //  new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
196}