001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.lang.reflect.Field;
024import java.util.List;
025import java.util.NavigableMap;
026import java.util.TreeMap;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.regionserver.ChunkCreator;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.FSUtils;
051import org.apache.hadoop.hbase.util.Threads;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.apache.hadoop.hbase.wal.WALKey;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059
060/**
061 * Provides FSHLog test cases.
062 */
063@Category({ RegionServerTests.class, MediumTests.class })
064public class TestFSHLog extends AbstractTestFSWAL {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestFSHLog.class);
069
070  @Rule
071  public TestName name = new TestName();
072
073  @Override
074  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir,
075      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
076      String prefix, String suffix) throws IOException {
077    return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix,
078        suffix);
079  }
080
081  @Override
082  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir,
083      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
084      boolean failIfWALExists, String prefix, String suffix, final Runnable action)
085      throws IOException {
086    return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix,
087        suffix) {
088
089      @Override
090      protected void atHeadOfRingBufferEventHandlerAppend() {
091        action.run();
092        super.atHeadOfRingBufferEventHandlerAppend();
093      }
094    };
095  }
096
097  @Test
098  public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
099      SecurityException, IllegalArgumentException, IllegalAccessException {
100    final String name = this.name.getMethodName();
101    FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME,
102      CONF, null, true, null, null);
103    try {
104      Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
105      ringBufferEventHandlerField.setAccessible(true);
106      FSHLog.RingBufferEventHandler ringBufferEventHandler =
107          (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
108      Field syncRunnerIndexField =
109          FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex");
110      syncRunnerIndexField.setAccessible(true);
111      syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
112      TableDescriptor htd =
113          TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
114            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
115      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
116      for (byte[] fam : htd.getColumnFamilyNames()) {
117        scopes.put(fam, 0);
118      }
119      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
120      MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
121      for (int i = 0; i < 10; i++) {
122        addEdits(log, hri, htd, 1, mvcc, scopes);
123      }
124    } finally {
125      log.close();
126    }
127  }
128
129  /**
130   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
131   */
132  @Test
133  public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
134    final String name = this.name.getMethodName();
135    final byte[] b = Bytes.toBytes("b");
136
137    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
138    final CountDownLatch holdAppend = new CountDownLatch(1);
139    final CountDownLatch flushFinished = new CountDownLatch(1);
140    final CountDownLatch putFinished = new CountDownLatch(1);
141
142    try (FSHLog log =
143        new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF,
144            null, true, null, null)) {
145
146      log.registerWALActionsListener(new WALActionsListener() {
147        @Override
148        public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)
149            throws IOException {
150          if (startHoldingForAppend.get()) {
151            try {
152              holdAppend.await();
153            } catch (InterruptedException e) {
154              LOG.error(e.toString(), e);
155            }
156          }
157        }
158      });
159
160      // open a new region which uses this WAL
161      TableDescriptor htd =
162          TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
163            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
164      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
165      ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
166      final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
167      ExecutorService exec = Executors.newFixedThreadPool(2);
168
169      // do a regular write first because of memstore size calculation.
170      region.put(new Put(b).addColumn(b, b,b));
171
172      startHoldingForAppend.set(true);
173      exec.submit(new Runnable() {
174        @Override
175        public void run() {
176          try {
177            region.put(new Put(b).addColumn(b, b,b));
178            putFinished.countDown();
179          } catch (IOException e) {
180            LOG.error(e.toString(), e);
181          }
182        }
183      });
184
185      // give the put a chance to start
186      Threads.sleep(3000);
187
188      exec.submit(new Runnable() {
189        @Override
190        public void run() {
191          try {
192            HRegion.FlushResult flushResult = region.flush(true);
193            LOG.info("Flush result:" +  flushResult.getResult());
194            LOG.info("Flush succeeded:" +  flushResult.isFlushSucceeded());
195            flushFinished.countDown();
196          } catch (IOException e) {
197            LOG.error(e.toString(), e);
198          }
199        }
200      });
201
202      // give the flush a chance to start. Flush should have got the region lock, and
203      // should have been waiting on the mvcc complete after this.
204      Threads.sleep(3000);
205
206      // let the append to WAL go through now that the flush already started
207      holdAppend.countDown();
208      putFinished.await();
209      flushFinished.await();
210
211      // check whether flush went through
212      assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());
213
214      // now check the region's unflushed seqIds.
215      long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
216      assertEquals("Found seqId for the region which is already flushed",
217          HConstants.NO_SEQNUM, seqId);
218
219      region.close();
220    }
221  }
222}