001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.lang.reflect.Field;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.TreeMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.regionserver.ChunkCreator;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
050import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.hbase.wal.WAL;
057import org.apache.hadoop.hbase.wal.WALEdit;
058import org.apache.hadoop.hbase.wal.WALKey;
059import org.apache.hadoop.hbase.wal.WALProvider;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065
066/**
067 * Provides FSHLog test cases.
068 */
069@Category({ RegionServerTests.class, MediumTests.class })
070public class TestFSHLog extends AbstractTestFSWAL {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFSHLog.class);
074
075  private static final long TEST_TIMEOUT_MS = 10000;
076
077  @Rule
078  public TestName name = new TestName();
079
080  @Override
081  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir,
082    Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix,
083    String suffix) throws IOException {
084    FSHLog fshLog =
085      new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
086    fshLog.init();
087    return fshLog;
088  }
089
090  @Override
091  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir,
092    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
093    boolean failIfWALExists, String prefix, String suffix, final Runnable action)
094    throws IOException {
095    FSHLog fshLog = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists,
096      prefix, suffix) {
097
098      @Override
099      protected void atHeadOfRingBufferEventHandlerAppend() {
100        action.run();
101        super.atHeadOfRingBufferEventHandlerAppend();
102      }
103    };
104    fshLog.init();
105    return fshLog;
106  }
107
108  @Test
109  public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
110    SecurityException, IllegalArgumentException, IllegalAccessException {
111    final String name = this.name.getMethodName();
112    FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
113      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
114    log.init();
115    try {
116      Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
117      ringBufferEventHandlerField.setAccessible(true);
118      FSHLog.RingBufferEventHandler ringBufferEventHandler =
119        (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
120      Field syncRunnerIndexField =
121        FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex");
122      syncRunnerIndexField.setAccessible(true);
123      syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
124      TableDescriptor htd =
125        TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
126          .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
127      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
128      for (byte[] fam : htd.getColumnFamilyNames()) {
129        scopes.put(fam, 0);
130      }
131      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
132      MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
133      for (int i = 0; i < 10; i++) {
134        addEdits(log, hri, htd, 1, mvcc, scopes, "row");
135      }
136    } finally {
137      log.close();
138    }
139  }
140
141  /**
142   * Test for WAL stall due to sync future overwrites. See HBASE-25984.
143   */
144  @Test
145  public void testDeadlockWithSyncOverwrites() throws Exception {
146    final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
147
148    class FailingWriter implements WALProvider.Writer {
149      @Override
150      public void sync(boolean forceSync) throws IOException {
151        throw new IOException("Injected failure..");
152      }
153
154      @Override
155      public void append(WAL.Entry entry) throws IOException {
156      }
157
158      @Override
159      public long getLength() {
160        return 0;
161      }
162
163      @Override
164      public long getSyncedLength() {
165        return 0;
166      }
167
168      @Override
169      public void close() throws IOException {
170      }
171    }
172
173    /*
174     * Custom FSHLog implementation with a conditional wait before attaining safe point.
175     */
176    class CustomFSHLog extends FSHLog {
177      public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
178        Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
179        String prefix, String suffix) throws IOException {
180        super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
181      }
182
183      @Override
184      protected void beforeWaitOnSafePoint() {
185        try {
186          assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
187        } catch (InterruptedException e) {
188          throw new RuntimeException(e);
189        }
190      }
191
192      public SyncFuture publishSyncOnRingBuffer() {
193        long sequence = getSequenceOnRingBuffer();
194        return publishSyncOnRingBuffer(sequence, false);
195      }
196    }
197
198    final String name = this.name.getMethodName();
199    try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
200      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
201      log.setWriter(new FailingWriter());
202      Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
203      ringBufferEventHandlerField.setAccessible(true);
204      FSHLog.RingBufferEventHandler ringBufferEventHandler =
205        (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
206      // Force a safe point
207      FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
208      try {
209        SyncFuture future0 = log.publishSyncOnRingBuffer();
210        // Wait for the sync to be done.
211        Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone);
212        // Publish another sync from the same thread, this should not overwrite the done sync.
213        SyncFuture future1 = log.publishSyncOnRingBuffer();
214        assertFalse(future1.isDone());
215        // Unblock the safe point trigger..
216        blockBeforeSafePoint.countDown();
217        // Wait for the safe point to be reached.
218        // With the deadlock in HBASE-25984, this is never possible, thus blocking the sync
219        // pipeline.
220        Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained);
221      } finally {
222        // Force release the safe point, for the clean up.
223        latch.releaseSafePoint();
224      }
225    }
226  }
227
228  /**
229   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
230   */
231  @Test
232  public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
233    final String name = this.name.getMethodName();
234    final byte[] b = Bytes.toBytes("b");
235
236    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
237    final CountDownLatch holdAppend = new CountDownLatch(1);
238    final CountDownLatch flushFinished = new CountDownLatch(1);
239    final CountDownLatch putFinished = new CountDownLatch(1);
240
241    try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
242      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
243      log.init();
244      log.registerWALActionsListener(new WALActionsListener() {
245        @Override
246        public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
247          if (startHoldingForAppend.get()) {
248            try {
249              holdAppend.await();
250            } catch (InterruptedException e) {
251              LOG.error(e.toString(), e);
252            }
253          }
254        }
255      });
256
257      // open a new region which uses this WAL
258      TableDescriptor htd =
259        TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
260          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
261      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
262      ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
263        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
264      final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log);
265      ExecutorService exec = Executors.newFixedThreadPool(2);
266
267      // do a regular write first because of memstore size calculation.
268      region.put(new Put(b).addColumn(b, b, b));
269
270      startHoldingForAppend.set(true);
271      exec.submit(new Runnable() {
272        @Override
273        public void run() {
274          try {
275            region.put(new Put(b).addColumn(b, b, b));
276            putFinished.countDown();
277          } catch (IOException e) {
278            LOG.error(e.toString(), e);
279          }
280        }
281      });
282
283      // give the put a chance to start
284      Threads.sleep(3000);
285
286      exec.submit(new Runnable() {
287        @Override
288        public void run() {
289          try {
290            HRegion.FlushResult flushResult = region.flush(true);
291            LOG.info("Flush result:" + flushResult.getResult());
292            LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
293            flushFinished.countDown();
294          } catch (IOException e) {
295            LOG.error(e.toString(), e);
296          }
297        }
298      });
299
300      // give the flush a chance to start. Flush should have got the region lock, and
301      // should have been waiting on the mvcc complete after this.
302      Threads.sleep(3000);
303
304      // let the append to WAL go through now that the flush already started
305      holdAppend.countDown();
306      putFinished.await();
307      flushFinished.await();
308
309      // check whether flush went through
310      assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][] { b }).size());
311
312      // now check the region's unflushed seqIds.
313      long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
314      assertEquals("Found seqId for the region which is already flushed", HConstants.NO_SEQNUM,
315        seqId);
316
317      region.close();
318    }
319  }
320}