001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertFalse;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.TableDescriptors;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionInfoBuilder;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.testclassification.RegionServerTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.hbase.util.FSTableDescriptors;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.hadoop.hbase.wal.WAL;
046import org.apache.hadoop.hbase.wal.WALEdit;
047import org.apache.hadoop.hbase.wal.WALFactory;
048import org.apache.hadoop.hbase.wal.WALKeyImpl;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Test many concurrent appenders to an WAL while rolling the log.
057 */
058@Category({ RegionServerTests.class, MediumTests.class })
059public class TestLogRollingNoCluster {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestLogRollingNoCluster.class);
064
065  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
066  private final static byte[] EMPTY_1K_ARRAY = new byte[1024];
067  private static final int NUM_THREADS = 100; // Spin up this many threads
068  private static final int NUM_ENTRIES = 100; // How many entries to write
069
070  /** ProtobufLogWriter that simulates higher latencies in sync() call */
071  public static class HighLatencySyncWriter extends ProtobufLogWriter {
072    @Override
073    public void sync(boolean forceSync) throws IOException {
074      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
075      super.sync(forceSync);
076      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
077    }
078  }
079
080  /**
081   * Spin up a bunch of threads and have them all append to a WAL. Roll the WAL frequently to try
082   * and trigger NPE. nn
083   */
084  @Test
085  public void testContendedLogRolling() throws Exception {
086    TEST_UTIL.startMiniDFSCluster(3);
087    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
088
089    // The implementation needs to know the 'handler' count.
090    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
091    final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
092    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
093    CommonFSUtils.setRootDir(conf, dir);
094    FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration());
095    FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration());
096    TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME);
097    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
098    final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName());
099    final WAL wal = wals.getWAL(null);
100
101    Appender[] appenders = null;
102
103    final int numThreads = NUM_THREADS;
104    appenders = new Appender[numThreads];
105    try {
106      for (int i = 0; i < numThreads; i++) {
107        // Have each appending thread write 'count' entries
108        appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES);
109      }
110      for (int i = 0; i < numThreads; i++) {
111        appenders[i].start();
112      }
113      for (int i = 0; i < numThreads; i++) {
114        // ensure that all threads are joined before closing the wal
115        appenders[i].join();
116      }
117    } finally {
118      wals.close();
119    }
120    for (int i = 0; i < numThreads; i++) {
121      assertFalse("Error: " + appenders[i].getException(), appenders[i].isException());
122    }
123    TEST_UTIL.shutdownMiniDFSCluster();
124  }
125
126  /**
127   * Appender thread. Appends to passed wal file.
128   */
129  static class Appender extends Thread {
130    private final Logger log;
131    private final WAL wal;
132    private final int count;
133    private Exception e = null;
134    private final TableDescriptor metaTableDescriptor;
135
136    Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) {
137      super("" + index);
138      this.wal = wal;
139      this.count = count;
140      this.metaTableDescriptor = metaTableDescriptor;
141      this.log = LoggerFactory.getLogger("Appender:" + getName());
142    }
143
144    /** Returns Call when the thread is done. */
145    boolean isException() {
146      return !isAlive() && this.e != null;
147    }
148
149    Exception getException() {
150      return this.e;
151    }
152
153    @Override
154    public void run() {
155      this.log.info(getName() + " started");
156      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
157      try {
158        TableDescriptors tds = new FSTableDescriptors(TEST_UTIL.getConfiguration());
159        TableDescriptor htd = tds.get(TableName.META_TABLE_NAME);
160        for (int i = 0; i < this.count; i++) {
161          long now = EnvironmentEdgeManager.currentTime();
162          // Roll every ten edits
163          if (i % 10 == 0) {
164            this.wal.rollWriter();
165          }
166          WALEdit edit = new WALEdit();
167          byte[] bytes = Bytes.toBytes(i);
168          edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
169          RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO;
170          NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
171          for (byte[] fam : this.metaTableDescriptor.getColumnFamilyNames()) {
172            scopes.put(fam, 0);
173          }
174          final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
175            TableName.META_TABLE_NAME, now, mvcc, scopes), edit);
176          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
177          wal.sync(txid);
178        }
179        String msg = getName() + " finished";
180        if (isException()) this.log.info(msg, getException());
181        else this.log.info(msg);
182      } catch (Exception e) {
183        this.e = e;
184        log.info("Caught exception from Appender:" + getName(), e);
185      } finally {
186        // Call sync on our log.else threads just hang out.
187        try {
188          this.wal.sync();
189        } catch (IOException e) {
190          throw new RuntimeException(e);
191        }
192      }
193    }
194  }
195
196  // @org.junit.Rule
197  // public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
198  // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
199}