001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * http://www.apache.org/licenses/LICENSE-2.0
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.apache.hadoop.hbase.replication;
017
018import java.io.IOException;
019import java.util.ArrayList;
020import java.util.List;
021import java.util.NavigableMap;
022import java.util.TreeMap;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.KeyValue;
027import org.apache.hadoop.hbase.Waiter;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.RegionInfoBuilder;
030import org.apache.hadoop.hbase.regionserver.HRegionServer;
031import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
032import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
033import org.apache.hadoop.hbase.replication.regionserver.Replication;
034import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
035import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.ReplicationTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
041import org.apache.hadoop.hbase.wal.WAL;
042import org.apache.hadoop.hbase.wal.WALEdit;
043import org.apache.hadoop.hbase.wal.WALKeyImpl;
044import org.junit.Assert;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050@Category({ ReplicationTests.class, LargeTests.class })
051public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
052  MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
053  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
054  NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
059
060  @Before
061  public void setUp() throws IOException, InterruptedException {
062    cleanUp();
063    scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
064    replicateCount.set(0);
065    replicatedEntries.clear();
066  }
067
068  /**
069   * Waits until there is only one log(the current writing one) in the replication queue
070   *
071   * @param numRs number of region servers
072   */
073  private void waitForLogAdvance(int numRs) {
074    Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
075      @Override
076      public boolean evaluate() throws Exception {
077        for (int i = 0; i < numRs; i++) {
078          HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
079          RegionInfo regionInfo =
080            UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
081          WAL wal = hrs.getWAL(regionInfo);
082          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
083          Replication replicationService =
084            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
085          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
086            .getSources()) {
087            ReplicationSource source = (ReplicationSource) rsi;
088            // We are making sure that there is only one log queue and that is for the
089            // current WAL of region server
090            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
091            if (!currentFile.equals(source.getCurrentPath())
092              || source.getQueues().keySet().size() != 1
093              || source.getQueues().get(logPrefix).size() != 1) {
094              return false;
095            }
096          }
097        }
098        return true;
099      }
100    });
101  }
102
103  private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
104    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
105      @Override
106      public boolean evaluate() {
107        for (int i = 0; i < numRs; i++) {
108          Replication replicationService =
109            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
110          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
111            .getSources()) {
112            ReplicationSource source = (ReplicationSource) rsi;
113            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
114            if (source.getQueues().get(logPrefix).size() != numQueues) {
115              return false;
116            }
117          }
118        }
119        return true;
120      }
121    });
122  }
123
124  @Test
125  public void testEmptyWALRecovery() throws Exception {
126    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
127    // for each RS, create an empty wal with same walGroupId
128    final List<Path> emptyWalPaths = new ArrayList<>();
129    long ts = EnvironmentEdgeManager.currentTime();
130    for (int i = 0; i < numRs; i++) {
131      RegionInfo regionInfo =
132        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
133      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
134      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
135      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
136      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
137      UTIL1.getTestFileSystem().create(emptyWalPath).close();
138      emptyWalPaths.add(emptyWalPath);
139    }
140
141    injectEmptyWAL(numRs, emptyWalPaths);
142
143    // ReplicationSource should advance past the empty wal, or else the test will fail
144    waitForLogAdvance(numRs);
145    verifyNumberOfLogsInQueue(1, numRs);
146    // we're now writing to the new wal
147    // if everything works, the source should've stopped reading from the empty wal, and start
148    // replicating from the new wal
149    runSimplePutDeleteTest();
150    rollWalsAndWaitForDeque(numRs);
151  }
152
153  /**
154   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
155   * see the empty and handle the EOF exception, we are able to ship the previous batch of entries
156   * without loosing it. This test also tests the number of batches shipped
157   * @throws Exception throws any exception
158   */
159  @Test
160  public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
161    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
162    hbaseAdmin.disableReplicationPeer(PEER_ID2);
163    int numOfEntriesToReplicate = 20;
164
165    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
166    // for each RS, create an empty wal with same walGroupId
167    final List<Path> emptyWalPaths = new ArrayList<>();
168    long ts = EnvironmentEdgeManager.currentTime();
169    for (int i = 0; i < numRs; i++) {
170      RegionInfo regionInfo =
171        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
172      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
173      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
174
175      appendEntriesToWal(numOfEntriesToReplicate, wal);
176      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
177      Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
178      UTIL1.getTestFileSystem().create(emptyWalPath).close();
179      emptyWalPaths.add(emptyWalPath);
180    }
181
182    injectEmptyWAL(numRs, emptyWalPaths);
183    // There should be three WALs in queue
184    // 1. non empty WAL
185    // 2. empty WAL
186    // 3. live WAL
187    verifyNumberOfLogsInQueue(3, numRs);
188    hbaseAdmin.enableReplicationPeer(PEER_ID2);
189    // ReplicationSource should advance past the empty wal, or else the test will fail
190    waitForLogAdvance(numRs);
191
192    // Now we should expect numOfEntriesToReplicate entries
193    // replicated from each region server. This makes sure we didn't loose data
194    // from any previous batch when we encounter EOF exception for empty file.
195    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
196      replicatedEntries.size());
197
198    // We expect just one batch of replication which will
199    // be from when we handle the EOF exception.
200    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
201    verifyNumberOfLogsInQueue(1, numRs);
202    // we're now writing to the new wal
203    // if everything works, the source should've stopped reading from the empty wal, and start
204    // replicating from the new wal
205    runSimplePutDeleteTest();
206    rollWalsAndWaitForDeque(numRs);
207  }
208
209  /**
210   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
211   * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and
212   * replicate it properly without missing data.
213   * @throws Exception throws any exception
214   */
215  @Test
216  public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
217    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
218    hbaseAdmin.disableReplicationPeer(PEER_ID2);
219    int numOfEntriesToReplicate = 20;
220
221    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
222    // for each RS, create an empty wal with same walGroupId
223    final List<Path> emptyWalPaths = new ArrayList<>();
224    long ts = EnvironmentEdgeManager.currentTime();
225    WAL wal = null;
226    for (int i = 0; i < numRs; i++) {
227      RegionInfo regionInfo =
228        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
229      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
230      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
231      appendEntriesToWal(numOfEntriesToReplicate, wal);
232      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
233      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
234      UTIL1.getTestFileSystem().create(emptyWalPath).close();
235      emptyWalPaths.add(emptyWalPath);
236
237    }
238    injectEmptyWAL(numRs, emptyWalPaths);
239    // roll the WAL now
240    for (int i = 0; i < numRs; i++) {
241      wal.rollWriter();
242    }
243    hbaseAdmin.enableReplicationPeer(PEER_ID2);
244    // ReplicationSource should advance past the empty wal, or else the test will fail
245    waitForLogAdvance(numRs);
246
247    // Now we should expect numOfEntriesToReplicate entries
248    // replicated from each region server. This makes sure we didn't loose data
249    // from any previous batch when we encounter EOF exception for empty file.
250    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
251      replicatedEntries.size());
252
253    // We expect just one batch of replication to be shipped which will
254    // for non empty WAL
255    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
256    verifyNumberOfLogsInQueue(1, numRs);
257    // we're now writing to the new wal
258    // if everything works, the source should've stopped reading from the empty wal, and start
259    // replicating from the new wal
260    runSimplePutDeleteTest();
261    rollWalsAndWaitForDeque(numRs);
262  }
263
264  /**
265   * This test make sure we replicate all the enties from the non empty WALs which are surrounding
266   * the empty WALs
267   * @throws Exception throws exception
268   */
269  @Test
270  public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
271    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
272    hbaseAdmin.disableReplicationPeer(PEER_ID2);
273    int numOfEntriesToReplicate = 20;
274
275    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
276    // for each RS, create an empty wal with same walGroupId
277    final List<Path> emptyWalPaths = new ArrayList<>();
278    long ts = EnvironmentEdgeManager.currentTime();
279    WAL wal = null;
280    for (int i = 0; i < numRs; i++) {
281      RegionInfo regionInfo =
282        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
283      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
284      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
285      appendEntriesToWal(numOfEntriesToReplicate, wal);
286      wal.rollWriter();
287      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
288      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
289      UTIL1.getTestFileSystem().create(emptyWalPath).close();
290      emptyWalPaths.add(emptyWalPath);
291    }
292    injectEmptyWAL(numRs, emptyWalPaths);
293
294    // roll the WAL again with some entries
295    for (int i = 0; i < numRs; i++) {
296      appendEntriesToWal(numOfEntriesToReplicate, wal);
297      wal.rollWriter();
298    }
299
300    hbaseAdmin.enableReplicationPeer(PEER_ID2);
301    // ReplicationSource should advance past the empty wal, or else the test will fail
302    waitForLogAdvance(numRs);
303
304    // Now we should expect numOfEntriesToReplicate entries
305    // replicated from each region server. This makes sure we didn't loose data
306    // from any previous batch when we encounter EOF exception for empty file.
307    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
308      replicatedEntries.size());
309
310    // We expect two batch of replication to be shipped which will
311    // for non empty WAL
312    Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
313    verifyNumberOfLogsInQueue(1, numRs);
314    // we're now writing to the new wal
315    // if everything works, the source should've stopped reading from the empty wal, and start
316    // replicating from the new wal
317    runSimplePutDeleteTest();
318    rollWalsAndWaitForDeque(numRs);
319  }
320
321  // inject our empty wal into the replication queue, and then roll the original wal, which
322  // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
323  // determine if the file being replicated currently is still opened for write, so just inject a
324  // new wal to the replication queue does not mean the previous file is closed.
325  private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
326    for (int i = 0; i < numRs; i++) {
327      HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
328      Replication replicationService = (Replication) hrs.getReplicationSourceService();
329      replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
330      replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
331      RegionInfo regionInfo =
332        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
333      WAL wal = hrs.getWAL(regionInfo);
334      wal.rollWriter(true);
335    }
336  }
337
338  protected WALKeyImpl getWalKeyImpl() {
339    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
340  }
341
342  // Roll the WAL and wait for it to get deque from the log queue
343  private void rollWalsAndWaitForDeque(int numRs) throws IOException {
344    RegionInfo regionInfo =
345      UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
346    for (int i = 0; i < numRs; i++) {
347      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
348      wal.rollWriter();
349    }
350    waitForLogAdvance(numRs);
351  }
352
353  private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
354    long txId = -1;
355    for (int i = 0; i < numEntries; i++) {
356      byte[] b = Bytes.toBytes(Integer.toString(i));
357      KeyValue kv = new KeyValue(b, famName, b);
358      WALEdit edit = new WALEdit();
359      edit.add(kv);
360      txId = wal.appendData(info, getWalKeyImpl(), edit);
361    }
362    wal.sync(txId);
363  }
364}