001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.Waiter;
026import org.apache.hadoop.hbase.client.RegionInfo;
027import org.apache.hadoop.hbase.regionserver.HRegionServer;
028import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
029import org.apache.hadoop.hbase.replication.regionserver.Replication;
030import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
031import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
032import org.apache.hadoop.hbase.testclassification.LargeTests;
033import org.apache.hadoop.hbase.testclassification.ReplicationTests;
034import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
035import org.apache.hadoop.hbase.wal.WAL;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041@Category({ ReplicationTests.class, LargeTests.class })
042public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046      HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
047
048  @Before
049  public void setUp() throws IOException, InterruptedException {
050    cleanUp();
051  }
052
053  /**
054   * Waits until there is only one log(the current writing one) in the replication queue
055   * @param numRs number of regionservers
056   */
057  private void waitForLogAdvance(int numRs) throws Exception {
058    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
059      @Override
060      public boolean evaluate() throws Exception {
061        for (int i = 0; i < numRs; i++) {
062          HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
063          RegionInfo regionInfo =
064              UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
065          WAL wal = hrs.getWAL(regionInfo);
066          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
067          Replication replicationService = (Replication) UTIL1.getHBaseCluster()
068              .getRegionServer(i).getReplicationSourceService();
069          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
070              .getSources()) {
071            ReplicationSource source = (ReplicationSource) rsi;
072            if (!currentFile.equals(source.getCurrentPath())) {
073              return false;
074            }
075          }
076        }
077        return true;
078      }
079    });
080  }
081
082  @Test
083  public void testEmptyWALRecovery() throws Exception {
084    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
085
086    // for each RS, create an empty wal with same walGroupId
087    final List<Path> emptyWalPaths = new ArrayList<>();
088    long ts = System.currentTimeMillis();
089    for (int i = 0; i < numRs; i++) {
090      RegionInfo regionInfo =
091          UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
092      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
093      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
094      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
095      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
096      UTIL1.getTestFileSystem().create(emptyWalPath).close();
097      emptyWalPaths.add(emptyWalPath);
098    }
099
100    // inject our empty wal into the replication queue, and then roll the original wal, which
101    // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
102    // determine if the file being replicated currently is still opened for write, so just inject a
103    // new wal to the replication queue does not mean the previous file is closed.
104    for (int i = 0; i < numRs; i++) {
105      HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
106      Replication replicationService = (Replication) hrs.getReplicationSourceService();
107      replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
108      replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
109      RegionInfo regionInfo =
110        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
111      WAL wal = hrs.getWAL(regionInfo);
112      wal.rollWriter(true);
113    }
114
115    // ReplicationSource should advance past the empty wal, or else the test will fail
116    waitForLogAdvance(numRs);
117
118    // we're now writing to the new wal
119    // if everything works, the source should've stopped reading from the empty wal, and start
120    // replicating from the new wal
121    runSimplePutDeleteTest();
122  }
123}