001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.lang.reflect.Field;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.TreeMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.regionserver.ChunkCreator;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
050import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.hbase.wal.FSHLogProvider;
057import org.apache.hadoop.hbase.wal.WAL;
058import org.apache.hadoop.hbase.wal.WALEdit;
059import org.apache.hadoop.hbase.wal.WALKey;
060import org.apache.hadoop.hbase.wal.WALProvider;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.rules.TestName;
066
067/**
068 * Provides FSHLog test cases.
069 */
070@Category({ RegionServerTests.class, MediumTests.class })
071public class TestFSHLog extends AbstractTestFSWAL {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFSHLog.class);
075
076  private static final long TEST_TIMEOUT_MS = 10000;
077
078  @Rule
079  public TestName name = new TestName();
080
081  @Override
082  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir,
083    Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix,
084    String suffix) throws IOException {
085    FSHLog fshLog =
086      new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
087    fshLog.init();
088    return fshLog;
089  }
090
091  @Override
092  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir,
093    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
094    boolean failIfWALExists, String prefix, String suffix, final Runnable action)
095    throws IOException {
096    FSHLog fshLog = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists,
097      prefix, suffix) {
098
099      @Override
100      protected void atHeadOfRingBufferEventHandlerAppend() {
101        action.run();
102        super.atHeadOfRingBufferEventHandlerAppend();
103      }
104    };
105    fshLog.init();
106    return fshLog;
107  }
108
109  @Test
110  public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
111    SecurityException, IllegalArgumentException, IllegalAccessException {
112    final String name = this.name.getMethodName();
113    FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
114      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
115    log.init();
116    try {
117      Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
118      ringBufferEventHandlerField.setAccessible(true);
119      FSHLog.RingBufferEventHandler ringBufferEventHandler =
120        (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
121      Field syncRunnerIndexField =
122        FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex");
123      syncRunnerIndexField.setAccessible(true);
124      syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
125      TableDescriptor htd =
126        TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
127          .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
128      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
129      for (byte[] fam : htd.getColumnFamilyNames()) {
130        scopes.put(fam, 0);
131      }
132      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
133      MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
134      for (int i = 0; i < 10; i++) {
135        addEdits(log, hri, htd, 1, mvcc, scopes, "row");
136      }
137    } finally {
138      log.close();
139    }
140  }
141
142  /**
143   * Test for WAL stall due to sync future overwrites. See HBASE-25984.
144   */
145  @Test
146  public void testDeadlockWithSyncOverwrites() throws Exception {
147    final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
148
149    class FailingWriter implements WALProvider.Writer {
150      @Override
151      public void sync(boolean forceSync) throws IOException {
152        throw new IOException("Injected failure..");
153      }
154
155      @Override
156      public void append(WAL.Entry entry) throws IOException {
157      }
158
159      @Override
160      public long getLength() {
161        return 0;
162      }
163
164      @Override
165      public long getSyncedLength() {
166        return 0;
167      }
168
169      @Override
170      public void close() throws IOException {
171      }
172    }
173
174    /*
175     * Custom FSHLog implementation with a conditional wait before attaining safe point.
176     */
177    class CustomFSHLog extends FSHLog {
178      public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
179        Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
180        String prefix, String suffix) throws IOException {
181        super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
182      }
183
184      @Override
185      protected void beforeWaitOnSafePoint() {
186        try {
187          assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
188        } catch (InterruptedException e) {
189          throw new RuntimeException(e);
190        }
191      }
192
193      public SyncFuture publishSyncOnRingBuffer() {
194        long sequence = getSequenceOnRingBuffer();
195        return publishSyncOnRingBuffer(sequence, false);
196      }
197    }
198
199    final String name = this.name.getMethodName();
200    Configuration conf = new Configuration(CONF);
201    conf.set(FSHLogProvider.WRITER_IMPL, FailingWriter.class.getName());
202    try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(conf), name,
203      HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null)) {
204      Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
205      ringBufferEventHandlerField.setAccessible(true);
206      FSHLog.RingBufferEventHandler ringBufferEventHandler =
207        (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
208      // Force a safe point
209      FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
210      try {
211        SyncFuture future0 = log.publishSyncOnRingBuffer();
212        // Wait for the sync to be done.
213        Waiter.waitFor(conf, TEST_TIMEOUT_MS, future0::isDone);
214        // Publish another sync from the same thread, this should not overwrite the done sync.
215        SyncFuture future1 = log.publishSyncOnRingBuffer();
216        assertFalse(future1.isDone());
217        // Unblock the safe point trigger..
218        blockBeforeSafePoint.countDown();
219        // Wait for the safe point to be reached.
220        // With the deadlock in HBASE-25984, this is never possible, thus blocking the sync
221        // pipeline.
222        Waiter.waitFor(conf, TEST_TIMEOUT_MS, latch::isSafePointAttained);
223      } finally {
224        // Force release the safe point, for the clean up.
225        latch.releaseSafePoint();
226      }
227    }
228  }
229
230  /**
231   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
232   */
233  @Test
234  public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
235    final String name = this.name.getMethodName();
236    final byte[] b = Bytes.toBytes("b");
237
238    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
239    final CountDownLatch holdAppend = new CountDownLatch(1);
240    final CountDownLatch flushFinished = new CountDownLatch(1);
241    final CountDownLatch putFinished = new CountDownLatch(1);
242
243    try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
244      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
245      log.init();
246      log.registerWALActionsListener(new WALActionsListener() {
247        @Override
248        public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
249          if (startHoldingForAppend.get()) {
250            try {
251              holdAppend.await();
252            } catch (InterruptedException e) {
253              LOG.error(e.toString(), e);
254            }
255          }
256        }
257      });
258
259      // open a new region which uses this WAL
260      TableDescriptor htd =
261        TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
262          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
263      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
264      ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
265        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
266      final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log);
267      ExecutorService exec = Executors.newFixedThreadPool(2);
268
269      // do a regular write first because of memstore size calculation.
270      region.put(new Put(b).addColumn(b, b, b));
271
272      startHoldingForAppend.set(true);
273      exec.submit(new Runnable() {
274        @Override
275        public void run() {
276          try {
277            region.put(new Put(b).addColumn(b, b, b));
278            putFinished.countDown();
279          } catch (IOException e) {
280            LOG.error(e.toString(), e);
281          }
282        }
283      });
284
285      // give the put a chance to start
286      Threads.sleep(3000);
287
288      exec.submit(new Runnable() {
289        @Override
290        public void run() {
291          try {
292            HRegion.FlushResult flushResult = region.flush(true);
293            LOG.info("Flush result:" + flushResult.getResult());
294            LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
295            flushFinished.countDown();
296          } catch (IOException e) {
297            LOG.error(e.toString(), e);
298          }
299        }
300      });
301
302      // give the flush a chance to start. Flush should have got the region lock, and
303      // should have been waiting on the mvcc complete after this.
304      Threads.sleep(3000);
305
306      // let the append to WAL go through now that the flush already started
307      holdAppend.countDown();
308      putFinished.await();
309      flushFinished.await();
310
311      // check whether flush went through
312      assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][] { b }).size());
313
314      // now check the region's unflushed seqIds.
315      long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
316      assertEquals("Found seqId for the region which is already flushed", HConstants.NO_SEQNUM,
317        seqId);
318
319      region.close();
320    }
321  }
322}