001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.lang.reflect.Field;
024import java.util.List;
025import java.util.NavigableMap;
026import java.util.TreeMap;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.regionserver.ChunkCreator;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.CommonFSUtils;
051import org.apache.hadoop.hbase.util.Threads;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.apache.hadoop.hbase.wal.WALKey;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059
060/**
061 * Provides FSHLog test cases.
062 */
063@Category({ RegionServerTests.class, MediumTests.class })
064public class TestFSHLog extends AbstractTestFSWAL {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFSHLog.class);
068
069  private static final long TEST_TIMEOUT_MS = 10000;
070
071  @Rule
072  public TestName name = new TestName();
073
074  @Override
075  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir,
076    Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix,
077    String suffix) throws IOException {
078    FSHLog wal =
079      new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
080    wal.init();
081    return wal;
082  }
083
084  @Override
085  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir,
086    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
087    boolean failIfWALExists, String prefix, String suffix, final Runnable action)
088    throws IOException {
089    FSHLog wal = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists,
090      prefix, suffix) {
091
092      @Override
093      protected void atHeadOfRingBufferEventHandlerAppend() {
094        action.run();
095        super.atHeadOfRingBufferEventHandlerAppend();
096      }
097    };
098    wal.init();
099    return wal;
100  }
101
102  @Test
103  public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
104    SecurityException, IllegalArgumentException, IllegalAccessException {
105    final String name = this.name.getMethodName();
106    FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), name));
107    FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
108      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
109    log.init();
110    try {
111      Field syncRunnerIndexField = FSHLog.class.getDeclaredField("syncRunnerIndex");
112      syncRunnerIndexField.setAccessible(true);
113      syncRunnerIndexField.set(log, Integer.MAX_VALUE - 1);
114      TableDescriptor htd =
115        TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
116          .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
117      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
118      for (byte[] fam : htd.getColumnFamilyNames()) {
119        scopes.put(fam, 0);
120      }
121      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
122      MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
123      for (int i = 0; i < 10; i++) {
124        addEdits(log, hri, htd, 1, mvcc, scopes, "row");
125      }
126    } finally {
127      log.close();
128    }
129  }
130
131  /**
132   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
133   */
134  @Test
135  public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
136    final String name = this.name.getMethodName();
137    final byte[] b = Bytes.toBytes("b");
138
139    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
140    final CountDownLatch holdAppend = new CountDownLatch(1);
141    final CountDownLatch flushFinished = new CountDownLatch(1);
142    final CountDownLatch putFinished = new CountDownLatch(1);
143
144    FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), name));
145    try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
146      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
147      log.init();
148      log.registerWALActionsListener(new WALActionsListener() {
149        @Override
150        public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
151          if (startHoldingForAppend.get()) {
152            try {
153              holdAppend.await();
154            } catch (InterruptedException e) {
155              LOG.error(e.toString(), e);
156            }
157          }
158        }
159      });
160
161      // open a new region which uses this WAL
162      TableDescriptor htd =
163        TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
164          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
165      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
166      ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
167        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
168      final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log);
169      ExecutorService exec = Executors.newFixedThreadPool(2);
170
171      // do a regular write first because of memstore size calculation.
172      region.put(new Put(b).addColumn(b, b, b));
173
174      startHoldingForAppend.set(true);
175      exec.submit(new Runnable() {
176        @Override
177        public void run() {
178          try {
179            region.put(new Put(b).addColumn(b, b, b));
180            putFinished.countDown();
181          } catch (IOException e) {
182            LOG.error(e.toString(), e);
183          }
184        }
185      });
186
187      // give the put a chance to start
188      Threads.sleep(3000);
189
190      exec.submit(new Runnable() {
191        @Override
192        public void run() {
193          try {
194            HRegion.FlushResult flushResult = region.flush(true);
195            LOG.info("Flush result:" + flushResult.getResult());
196            LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
197            flushFinished.countDown();
198          } catch (IOException e) {
199            LOG.error(e.toString(), e);
200          }
201        }
202      });
203
204      // give the flush a chance to start. Flush should have got the region lock, and
205      // should have been waiting on the mvcc complete after this.
206      Threads.sleep(3000);
207
208      // let the append to WAL go through now that the flush already started
209      holdAppend.countDown();
210      putFinished.await();
211      flushFinished.await();
212
213      // check whether flush went through
214      assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][] { b }).size());
215
216      // now check the region's unflushed seqIds.
217      long seqId = AbstractTestFSWAL.getEarliestMemStoreSeqNum(log, hri.getEncodedNameAsBytes());
218      assertEquals("Found seqId for the region which is already flushed", HConstants.NO_SEQNUM,
219        seqId);
220
221      region.close();
222    }
223  }
224}