001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.lang.reflect.Field;
024import java.util.List;
025import java.util.NavigableMap;
026import java.util.TreeMap;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.regionserver.ChunkCreator;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
045import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.hbase.wal.WALEdit;
052import org.apache.hadoop.hbase.wal.WALKey;
053import org.junit.jupiter.api.BeforeEach;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.Test;
056import org.junit.jupiter.api.TestInfo;
057
058/**
059 * Provides FSHLog test cases.
060 */
061@Tag(RegionServerTests.TAG)
062@Tag(MediumTests.TAG)
063public class TestFSHLog extends AbstractTestFSWAL {
064
065  private String name;
066
067  @BeforeEach
068  public void initTestName(TestInfo testInfo) {
069    name = testInfo.getTestMethod().get().getName();
070  }
071
072  @Override
073  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir,
074    Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix,
075    String suffix) throws IOException {
076    FSHLog wal =
077      new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
078    wal.init();
079    return wal;
080  }
081
082  @Override
083  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir,
084    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
085    boolean failIfWALExists, String prefix, String suffix, final Runnable action)
086    throws IOException {
087    FSHLog wal = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists,
088      prefix, suffix) {
089
090      @Override
091      protected void atHeadOfRingBufferEventHandlerAppend() {
092        action.run();
093        super.atHeadOfRingBufferEventHandlerAppend();
094      }
095    };
096    wal.init();
097    return wal;
098  }
099
100  @Test
101  public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
102    SecurityException, IllegalArgumentException, IllegalAccessException {
103    FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), this.name));
104    FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), this.name,
105      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
106    log.init();
107    try {
108      Field syncRunnerIndexField = FSHLog.class.getDeclaredField("syncRunnerIndex");
109      syncRunnerIndexField.setAccessible(true);
110      syncRunnerIndexField.set(log, Integer.MAX_VALUE - 1);
111      TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name))
112        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
113      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
114      for (byte[] fam : htd.getColumnFamilyNames()) {
115        scopes.put(fam, 0);
116      }
117      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
118      MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
119      for (int i = 0; i < 10; i++) {
120        addEdits(log, hri, htd, 1, mvcc, scopes, "row");
121      }
122    } finally {
123      log.close();
124    }
125  }
126
127  /**
128   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
129   */
130  @Test
131  public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
132    final byte[] b = Bytes.toBytes("b");
133
134    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
135    final CountDownLatch holdAppend = new CountDownLatch(1);
136    final CountDownLatch flushFinished = new CountDownLatch(1);
137    final CountDownLatch putFinished = new CountDownLatch(1);
138
139    FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), this.name));
140    try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), this.name,
141      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
142      log.init();
143      log.registerWALActionsListener(new WALActionsListener() {
144        @Override
145        public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
146          if (startHoldingForAppend.get()) {
147            try {
148              holdAppend.await();
149            } catch (InterruptedException e) {
150              LOG.error(e.toString(), e);
151            }
152          }
153        }
154      });
155
156      // open a new region which uses this WAL
157      TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name))
158        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
159      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
160      ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
161        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
162      final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log);
163      ExecutorService exec = Executors.newFixedThreadPool(2);
164
165      // do a regular write first because of memstore size calculation.
166      region.put(new Put(b).addColumn(b, b, b));
167
168      startHoldingForAppend.set(true);
169      exec.submit(new Runnable() {
170        @Override
171        public void run() {
172          try {
173            region.put(new Put(b).addColumn(b, b, b));
174            putFinished.countDown();
175          } catch (IOException e) {
176            LOG.error(e.toString(), e);
177          }
178        }
179      });
180
181      // give the put a chance to start
182      Threads.sleep(3000);
183
184      exec.submit(new Runnable() {
185        @Override
186        public void run() {
187          try {
188            HRegion.FlushResult flushResult = region.flush(true);
189            LOG.info("Flush result:" + flushResult.getResult());
190            LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
191            flushFinished.countDown();
192          } catch (IOException e) {
193            LOG.error(e.toString(), e);
194          }
195        }
196      });
197
198      // give the flush a chance to start. Flush should have got the region lock, and
199      // should have been waiting on the mvcc complete after this.
200      Threads.sleep(3000);
201
202      // let the append to WAL go through now that the flush already started
203      holdAppend.countDown();
204      putFinished.await();
205      flushFinished.await();
206
207      // check whether flush went through
208      assertEquals(1, region.getStoreFileList(new byte[][] { b }).size(), "Region did not flush?");
209
210      // now check the region's unflushed seqIds.
211      long seqId = AbstractTestFSWAL.getEarliestMemStoreSeqNum(log, hri.getEncodedNameAsBytes());
212      assertEquals(HConstants.NO_SEQNUM, seqId,
213        "Found seqId for the region which is already flushed");
214
215      region.close();
216    }
217  }
218}