001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.Waiter;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionInfoBuilder;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.replication.regionserver.Replication;
036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
043import org.apache.hadoop.hbase.wal.WAL;
044import org.apache.hadoop.hbase.wal.WALEdit;
045import org.apache.hadoop.hbase.wal.WALKeyImpl;
046import org.junit.Assert;
047import org.junit.Before;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051
052@Category({ ReplicationTests.class, LargeTests.class })
053public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
054  MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
055  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
056  NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
061
062  @Before
063  public void setUp() throws IOException, InterruptedException {
064    cleanUp();
065    scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
066    replicateCount.set(0);
067    replicatedEntries.clear();
068  }
069
070  /**
071   * Waits until there is only one log(the current writing one) in the replication queue
072   * @param numRs number of region servers
073   */
074  private void waitForLogAdvance(int numRs) {
075    Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
076      @Override
077      public boolean evaluate() throws Exception {
078        for (int i = 0; i < numRs; i++) {
079          HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
080          RegionInfo regionInfo =
081            UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
082          WAL wal = hrs.getWAL(regionInfo);
083          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
084          Replication replicationService =
085            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
086          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
087            .getSources()) {
088            ReplicationSource source = (ReplicationSource) rsi;
089            // We are making sure that there is only one log queue and that is for the
090            // current WAL of region server
091            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
092            if (
093              !currentFile.equals(source.getCurrentPath())
094                || source.getQueues().keySet().size() != 1
095                || source.getQueues().get(logPrefix).size() != 1
096            ) {
097              return false;
098            }
099          }
100        }
101        return true;
102      }
103    });
104  }
105
106  private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
107    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
108      @Override
109      public boolean evaluate() {
110        for (int i = 0; i < numRs; i++) {
111          Replication replicationService =
112            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
113          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
114            .getSources()) {
115            ReplicationSource source = (ReplicationSource) rsi;
116            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
117            if (source.getQueues().get(logPrefix).size() != numQueues) {
118              return false;
119            }
120          }
121        }
122        return true;
123      }
124    });
125  }
126
127  @Test
128  public void testEmptyWALRecovery() throws Exception {
129    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
130    // for each RS, create an empty wal with same walGroupId
131    final List<Path> emptyWalPaths = new ArrayList<>();
132    long ts = EnvironmentEdgeManager.currentTime();
133    for (int i = 0; i < numRs; i++) {
134      RegionInfo regionInfo =
135        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
136      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
137      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
138      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
139      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
140      UTIL1.getTestFileSystem().create(emptyWalPath).close();
141      emptyWalPaths.add(emptyWalPath);
142    }
143
144    injectEmptyWAL(numRs, emptyWalPaths);
145
146    // ReplicationSource should advance past the empty wal, or else the test will fail
147    waitForLogAdvance(numRs);
148    verifyNumberOfLogsInQueue(1, numRs);
149    // we're now writing to the new wal
150    // if everything works, the source should've stopped reading from the empty wal, and start
151    // replicating from the new wal
152    runSimplePutDeleteTest();
153    rollWalsAndWaitForDeque(numRs);
154  }
155
156  /**
157   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
158   * see the empty and handle the EOF exception, we are able to ship the previous batch of entries
159   * without loosing it. This test also tests the number of batches shipped
160   * @throws Exception throws any exception
161   */
162  @Test
163  public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
164    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
165    hbaseAdmin.disableReplicationPeer(PEER_ID2);
166    int numOfEntriesToReplicate = 20;
167
168    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
169    // for each RS, create an empty wal with same walGroupId
170    final List<Path> emptyWalPaths = new ArrayList<>();
171    long ts = EnvironmentEdgeManager.currentTime();
172    for (int i = 0; i < numRs; i++) {
173      RegionInfo regionInfo =
174        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
175      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
176      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
177
178      appendEntriesToWal(numOfEntriesToReplicate, wal);
179      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
180      Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
181      UTIL1.getTestFileSystem().create(emptyWalPath).close();
182      emptyWalPaths.add(emptyWalPath);
183    }
184
185    injectEmptyWAL(numRs, emptyWalPaths);
186    // There should be three WALs in queue
187    // 1. non empty WAL
188    // 2. empty WAL
189    // 3. live WAL
190    verifyNumberOfLogsInQueue(3, numRs);
191    hbaseAdmin.enableReplicationPeer(PEER_ID2);
192    // ReplicationSource should advance past the empty wal, or else the test will fail
193    waitForLogAdvance(numRs);
194
195    // Now we should expect numOfEntriesToReplicate entries
196    // replicated from each region server. This makes sure we didn't loose data
197    // from any previous batch when we encounter EOF exception for empty file.
198    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
199      replicatedEntries.size());
200
201    // We expect just one batch of replication which will
202    // be from when we handle the EOF exception.
203    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
204    verifyNumberOfLogsInQueue(1, numRs);
205    // we're now writing to the new wal
206    // if everything works, the source should've stopped reading from the empty wal, and start
207    // replicating from the new wal
208    runSimplePutDeleteTest();
209    rollWalsAndWaitForDeque(numRs);
210  }
211
212  /**
213   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
214   * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and
215   * replicate it properly without missing data.
216   * @throws Exception throws any exception
217   */
218  @Test
219  public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
220    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
221    hbaseAdmin.disableReplicationPeer(PEER_ID2);
222    int numOfEntriesToReplicate = 20;
223
224    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
225    // for each RS, create an empty wal with same walGroupId
226    final List<Path> emptyWalPaths = new ArrayList<>();
227    long ts = EnvironmentEdgeManager.currentTime();
228    WAL wal = null;
229    for (int i = 0; i < numRs; i++) {
230      RegionInfo regionInfo =
231        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
232      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
233      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
234      appendEntriesToWal(numOfEntriesToReplicate, wal);
235      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
236      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
237      UTIL1.getTestFileSystem().create(emptyWalPath).close();
238      emptyWalPaths.add(emptyWalPath);
239
240    }
241    injectEmptyWAL(numRs, emptyWalPaths);
242    // roll the WAL now
243    for (int i = 0; i < numRs; i++) {
244      wal.rollWriter();
245    }
246    hbaseAdmin.enableReplicationPeer(PEER_ID2);
247    // ReplicationSource should advance past the empty wal, or else the test will fail
248    waitForLogAdvance(numRs);
249
250    // Now we should expect numOfEntriesToReplicate entries
251    // replicated from each region server. This makes sure we didn't loose data
252    // from any previous batch when we encounter EOF exception for empty file.
253    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
254      replicatedEntries.size());
255
256    // We expect just one batch of replication to be shipped which will
257    // for non empty WAL
258    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
259    verifyNumberOfLogsInQueue(1, numRs);
260    // we're now writing to the new wal
261    // if everything works, the source should've stopped reading from the empty wal, and start
262    // replicating from the new wal
263    runSimplePutDeleteTest();
264    rollWalsAndWaitForDeque(numRs);
265  }
266
267  /**
268   * This test make sure we replicate all the enties from the non empty WALs which are surrounding
269   * the empty WALs
270   * @throws Exception throws exception
271   */
272  @Test
273  public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
274    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
275    hbaseAdmin.disableReplicationPeer(PEER_ID2);
276    int numOfEntriesToReplicate = 20;
277
278    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
279    // for each RS, create an empty wal with same walGroupId
280    final List<Path> emptyWalPaths = new ArrayList<>();
281    long ts = EnvironmentEdgeManager.currentTime();
282    WAL wal = null;
283    for (int i = 0; i < numRs; i++) {
284      RegionInfo regionInfo =
285        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
286      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
287      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
288      appendEntriesToWal(numOfEntriesToReplicate, wal);
289      wal.rollWriter();
290      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
291      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
292      UTIL1.getTestFileSystem().create(emptyWalPath).close();
293      emptyWalPaths.add(emptyWalPath);
294    }
295    injectEmptyWAL(numRs, emptyWalPaths);
296
297    // roll the WAL again with some entries
298    for (int i = 0; i < numRs; i++) {
299      appendEntriesToWal(numOfEntriesToReplicate, wal);
300      wal.rollWriter();
301    }
302
303    hbaseAdmin.enableReplicationPeer(PEER_ID2);
304    // ReplicationSource should advance past the empty wal, or else the test will fail
305    waitForLogAdvance(numRs);
306
307    // Now we should expect numOfEntriesToReplicate entries
308    // replicated from each region server. This makes sure we didn't loose data
309    // from any previous batch when we encounter EOF exception for empty file.
310    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
311      replicatedEntries.size());
312
313    // We expect two batch of replication to be shipped which will
314    // for non empty WAL
315    Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
316    verifyNumberOfLogsInQueue(1, numRs);
317    // we're now writing to the new wal
318    // if everything works, the source should've stopped reading from the empty wal, and start
319    // replicating from the new wal
320    runSimplePutDeleteTest();
321    rollWalsAndWaitForDeque(numRs);
322  }
323
324  // inject our empty wal into the replication queue, and then roll the original wal, which
325  // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
326  // determine if the file being replicated currently is still opened for write, so just inject a
327  // new wal to the replication queue does not mean the previous file is closed.
328  private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
329    for (int i = 0; i < numRs; i++) {
330      HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
331      Replication replicationService = (Replication) hrs.getReplicationSourceService();
332      replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
333      replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
334      RegionInfo regionInfo =
335        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
336      WAL wal = hrs.getWAL(regionInfo);
337      wal.rollWriter(true);
338    }
339  }
340
341  protected WALKeyImpl getWalKeyImpl() {
342    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
343  }
344
345  // Roll the WAL and wait for it to get deque from the log queue
346  private void rollWalsAndWaitForDeque(int numRs) throws IOException {
347    RegionInfo regionInfo =
348      UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
349    for (int i = 0; i < numRs; i++) {
350      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
351      wal.rollWriter();
352    }
353    waitForLogAdvance(numRs);
354  }
355
356  private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
357    long txId = -1;
358    for (int i = 0; i < numEntries; i++) {
359      byte[] b = Bytes.toBytes(Integer.toString(i));
360      KeyValue kv = new KeyValue(b, famName, b);
361      WALEdit edit = new WALEdit();
362      edit.add(kv);
363      txId = wal.appendData(info, getWalKeyImpl(), edit);
364    }
365    wal.sync(txId);
366  }
367}