001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.TableDescriptors;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionInfoBuilder;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.CommonFSUtils;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.hbase.util.FSTableDescriptors;
043import org.apache.hadoop.hbase.util.Threads;
044import org.apache.hadoop.hbase.wal.FSHLogProvider;
045import org.apache.hadoop.hbase.wal.WAL;
046import org.apache.hadoop.hbase.wal.WALEdit;
047import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
048import org.apache.hadoop.hbase.wal.WALFactory;
049import org.apache.hadoop.hbase.wal.WALKeyImpl;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Test many concurrent appenders to an WAL while rolling the log.
057 */
058@Tag(RegionServerTests.TAG)
059@Tag(MediumTests.TAG)
060public class TestLogRollingNoCluster {
061
062  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
063  private final static byte[] EMPTY_1K_ARRAY = new byte[1024];
064  private static final int NUM_THREADS = 100; // Spin up this many threads
065  private static final int NUM_ENTRIES = 100; // How many entries to write
066
067  /** ProtobufLogWriter that simulates higher latencies in sync() call */
068  public static class HighLatencySyncWriter extends ProtobufLogWriter {
069    @Override
070    public void sync(boolean forceSync) throws IOException {
071      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
072      super.sync(forceSync);
073      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
074    }
075  }
076
077  /**
078   * Spin up a bunch of threads and have them all append to a WAL. Roll the WAL frequently to try
079   * and trigger NPE.
080   */
081  @Test
082  public void testContendedLogRolling() throws Exception {
083    TEST_UTIL.startMiniDFSCluster(3);
084    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
085
086    // The implementation needs to know the 'handler' count.
087    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
088    final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
089    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
090    CommonFSUtils.setRootDir(conf, dir);
091    FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration());
092    FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration());
093    TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME);
094    conf.set(FSHLogProvider.WRITER_IMPL, HighLatencySyncWriter.class.getName());
095    final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName());
096    final WAL wal = wals.getWAL(null);
097
098    Appender[] appenders = null;
099
100    final int numThreads = NUM_THREADS;
101    appenders = new Appender[numThreads];
102    try {
103      for (int i = 0; i < numThreads; i++) {
104        // Have each appending thread write 'count' entries
105        appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES);
106      }
107      for (int i = 0; i < numThreads; i++) {
108        appenders[i].start();
109      }
110      for (int i = 0; i < numThreads; i++) {
111        // ensure that all threads are joined before closing the wal
112        appenders[i].join();
113      }
114    } finally {
115      wals.close();
116    }
117    for (int i = 0; i < numThreads; i++) {
118      assertFalse(appenders[i].isException(), "Error: " + appenders[i].getException());
119    }
120    TEST_UTIL.shutdownMiniDFSCluster();
121  }
122
123  /**
124   * Appender thread. Appends to passed wal file.
125   */
126  static class Appender extends Thread {
127    private final Logger log;
128    private final WAL wal;
129    private final int count;
130    private Exception e = null;
131    private final TableDescriptor metaTableDescriptor;
132
133    Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) {
134      super("" + index);
135      this.wal = wal;
136      this.count = count;
137      this.metaTableDescriptor = metaTableDescriptor;
138      this.log = LoggerFactory.getLogger("Appender:" + getName());
139    }
140
141    /** Returns Call when the thread is done. */
142    boolean isException() {
143      return !isAlive() && this.e != null;
144    }
145
146    Exception getException() {
147      return this.e;
148    }
149
150    @Override
151    public void run() {
152      this.log.info(getName() + " started");
153      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
154      try {
155        TableDescriptors tds = new FSTableDescriptors(TEST_UTIL.getConfiguration());
156        FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration());
157        TableDescriptor htd = tds.get(TableName.META_TABLE_NAME);
158        for (int i = 0; i < this.count; i++) {
159          long now = EnvironmentEdgeManager.currentTime();
160          // Roll every ten edits
161          if (i % 10 == 0) {
162            this.wal.rollWriter();
163          }
164          WALEdit edit = new WALEdit();
165          byte[] bytes = Bytes.toBytes(i);
166          WALEditInternalHelper.addExtendedCell(edit,
167            new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
168          RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO;
169          NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
170          for (byte[] fam : this.metaTableDescriptor.getColumnFamilyNames()) {
171            scopes.put(fam, 0);
172          }
173          final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
174            TableName.META_TABLE_NAME, now, mvcc, scopes), edit);
175          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
176          wal.sync(txid);
177        }
178        String msg = getName() + " finished";
179        if (isException()) this.log.info(msg, getException());
180        else this.log.info(msg);
181      } catch (Exception e) {
182        this.e = e;
183        log.info("Caught exception from Appender:" + getName(), e);
184      } finally {
185        // Call sync on our log.else threads just hang out.
186        try {
187          this.wal.sync();
188        } catch (IOException e) {
189          throw new RuntimeException(e);
190        }
191      }
192    }
193  }
194
195  // @org.junit.Rule
196  // public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
197  // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
198}