001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertFalse;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionInfoBuilder;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.CommonFSUtils;
041import org.apache.hadoop.hbase.util.FSTableDescriptors;
042import org.apache.hadoop.hbase.util.Threads;
043import org.apache.hadoop.hbase.wal.WAL;
044import org.apache.hadoop.hbase.wal.WALEdit;
045import org.apache.hadoop.hbase.wal.WALFactory;
046import org.apache.hadoop.hbase.wal.WALKeyImpl;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Test many concurrent appenders to an WAL while rolling the log.
055 */
056@Category({RegionServerTests.class, MediumTests.class})
057public class TestLogRollingNoCluster {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061      HBaseClassTestRule.forClass(TestLogRollingNoCluster.class);
062
063  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
064  private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
065  private static final int NUM_THREADS = 100; // Spin up this many threads
066  private static final int NUM_ENTRIES = 100; // How many entries to write
067
068  /** ProtobufLogWriter that simulates higher latencies in sync() call */
069  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
070    @Override
071    public void sync(boolean forceSync) throws IOException {
072      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
073      super.sync(forceSync);
074      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
075    }
076  }
077
078  /**
079   * Spin up a bunch of threads and have them all append to a WAL.  Roll the
080   * WAL frequently to try and trigger NPE.
081   * @throws IOException
082   * @throws InterruptedException
083   */
084  @Test
085  public void testContendedLogRolling() throws Exception {
086    TEST_UTIL.startMiniDFSCluster(3);
087    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
088
089    // The implementation needs to know the 'handler' count.
090    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
091    final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
092    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
093    CommonFSUtils.setRootDir(conf, dir);
094    FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration());
095    FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration());
096    TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME);
097    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
098    final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName());
099    final WAL wal = wals.getWAL(null);
100
101    Appender [] appenders = null;
102
103    final int numThreads = NUM_THREADS;
104    appenders = new Appender[numThreads];
105    try {
106      for (int i = 0; i < numThreads; i++) {
107        // Have each appending thread write 'count' entries
108        appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES);
109      }
110      for (int i = 0; i < numThreads; i++) {
111        appenders[i].start();
112      }
113      for (int i = 0; i < numThreads; i++) {
114        //ensure that all threads are joined before closing the wal
115        appenders[i].join();
116      }
117    } finally {
118      wals.close();
119    }
120    for (int i = 0; i < numThreads; i++) {
121      assertFalse("Error: " + appenders[i].getException(), appenders[i].isException());
122    }
123    TEST_UTIL.shutdownMiniDFSCluster();
124  }
125
126  /**
127   * Appender thread.  Appends to passed wal file.
128   */
129  static class Appender extends Thread {
130    private final Logger log;
131    private final WAL wal;
132    private final int count;
133    private Exception e = null;
134    private final TableDescriptor metaTableDescriptor;
135
136    Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) {
137      super("" + index);
138      this.wal = wal;
139      this.count = count;
140      this.metaTableDescriptor = metaTableDescriptor;
141      this.log = LoggerFactory.getLogger("Appender:" + getName());
142    }
143
144    /**
145     * @return Call when the thread is done.
146     */
147    boolean isException() {
148      return !isAlive() && this.e != null;
149    }
150
151    Exception getException() {
152      return this.e;
153    }
154
155    @Override
156    public void run() {
157      this.log.info(getName() +" started");
158      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
159      try {
160        for (int i = 0; i < this.count; i++) {
161          long now = System.currentTimeMillis();
162          // Roll every ten edits
163          if (i % 10 == 0) {
164            this.wal.rollWriter();
165          }
166          WALEdit edit = new WALEdit();
167          byte[] bytes = Bytes.toBytes(i);
168          edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
169          RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO;
170          NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
171          for(byte[] fam: this.metaTableDescriptor.getColumnFamilyNames()) {
172            scopes.put(fam, 0);
173          }
174          final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
175            TableName.META_TABLE_NAME, now, mvcc, scopes), edit);
176          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
177          wal.sync(txid);
178        }
179        String msg = getName() + " finished";
180        if (isException())
181          this.log.info(msg, getException());
182        else
183          this.log.info(msg);
184      } catch (Exception e) {
185        this.e = e;
186        log.info("Caught exception from Appender:" + getName(), e);
187      } finally {
188        // Call sync on our log.else threads just hang out.
189        try {
190          this.wal.sync();
191        } catch (IOException e) {
192          throw new RuntimeException(e);
193        }
194      }
195    }
196  }
197
198  //@org.junit.Rule
199  //public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
200  //  new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
201}