001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.NavigableMap;
026import java.util.TreeMap;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.KeyValue;
030import org.apache.hadoop.hbase.Waiter;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.client.RegionInfoBuilder;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
035import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
036import org.apache.hadoop.hbase.replication.regionserver.Replication;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
038import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
044import org.apache.hadoop.hbase.wal.WAL;
045import org.apache.hadoop.hbase.wal.WALEdit;
046import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
047import org.apache.hadoop.hbase.wal.WALKeyImpl;
048import org.junit.jupiter.api.BeforeEach;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051
052@Tag(ReplicationTests.TAG)
053@Tag(LargeTests.TAG)
054public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
055
056  MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
057  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
058  NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
059
060  @BeforeEach
061  public void setUp() throws IOException, InterruptedException {
062    cleanUp();
063    scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
064    replicateCount.set(0);
065    replicatedEntries.clear();
066  }
067
068  /**
069   * Waits until there is only one log(the current writing one) in the replication queue
070   * @param numRs number of region servers
071   */
072  private void waitForLogAdvance(int numRs) {
073    Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
074      @Override
075      public boolean evaluate() throws Exception {
076        for (int i = 0; i < numRs; i++) {
077          HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
078          RegionInfo regionInfo =
079            UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
080          WAL wal = hrs.getWAL(regionInfo);
081          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
082          Replication replicationService =
083            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
084          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
085            .getSources()) {
086            ReplicationSource source = (ReplicationSource) rsi;
087            // We are making sure that there is only one log queue and that is for the
088            // current WAL of region server
089            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
090            if (
091              !currentFile.equals(source.getCurrentPath())
092                || source.getQueues().keySet().size() != 1
093                || source.getQueues().get(logPrefix).size() != 1
094            ) {
095              return false;
096            }
097          }
098        }
099        return true;
100      }
101    });
102  }
103
104  private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
105    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
106      @Override
107      public boolean evaluate() {
108        for (int i = 0; i < numRs; i++) {
109          Replication replicationService =
110            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
111          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
112            .getSources()) {
113            ReplicationSource source = (ReplicationSource) rsi;
114            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
115            if (source.getQueues().get(logPrefix).size() != numQueues) {
116              return false;
117            }
118          }
119        }
120        return true;
121      }
122    });
123  }
124
125  @Test
126  public void testEmptyWALRecovery() throws Exception {
127    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
128    // for each RS, create an empty wal with same walGroupId
129    final List<Path> emptyWalPaths = new ArrayList<>();
130    long ts = EnvironmentEdgeManager.currentTime();
131    for (int i = 0; i < numRs; i++) {
132      RegionInfo regionInfo =
133        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
134      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
135      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
136      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
137      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
138      UTIL1.getTestFileSystem().create(emptyWalPath).close();
139      emptyWalPaths.add(emptyWalPath);
140    }
141
142    injectEmptyWAL(numRs, emptyWalPaths);
143
144    // ReplicationSource should advance past the empty wal, or else the test will fail
145    waitForLogAdvance(numRs);
146    verifyNumberOfLogsInQueue(1, numRs);
147    // we're now writing to the new wal
148    // if everything works, the source should've stopped reading from the empty wal, and start
149    // replicating from the new wal
150    runSimplePutDeleteTest();
151    rollWalsAndWaitForDeque(numRs);
152  }
153
154  /**
155   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
156   * see the empty and handle the EOF exception, we are able to ship the previous batch of entries
157   * without loosing it. This test also tests the number of batches shipped
158   * @throws Exception throws any exception
159   */
160  @Test
161  public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
162    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
163    // make sure we only the current active wal file in queue
164    verifyNumberOfLogsInQueue(1, numRs);
165
166    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
167    hbaseAdmin.disableReplicationPeer(PEER_ID2);
168
169    int numOfEntriesToReplicate = 20;
170    // for each RS, create an empty wal with same walGroupId
171    final List<Path> emptyWalPaths = new ArrayList<>();
172    long ts = EnvironmentEdgeManager.currentTime();
173    for (int i = 0; i < numRs; i++) {
174      RegionInfo regionInfo =
175        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
176      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
177      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
178
179      appendEntriesToWal(numOfEntriesToReplicate, wal);
180      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
181      Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
182      UTIL1.getTestFileSystem().create(emptyWalPath).close();
183      emptyWalPaths.add(emptyWalPath);
184    }
185
186    injectEmptyWAL(numRs, emptyWalPaths);
187    // There should be three WALs in queue
188    // 1. non empty WAL
189    // 2. empty WAL
190    // 3. live WAL
191    verifyNumberOfLogsInQueue(3, numRs);
192    hbaseAdmin.enableReplicationPeer(PEER_ID2);
193    // ReplicationSource should advance past the empty wal, or else the test will fail
194    waitForLogAdvance(numRs);
195
196    // Now we should expect numOfEntriesToReplicate entries
197    // replicated from each region server. This makes sure we didn't loose data
198    // from any previous batch when we encounter EOF exception for empty file.
199    assertEquals(numOfEntriesToReplicate * numRs, replicatedEntries.size(),
200      "Replicated entries are not correct");
201
202    // We expect just one batch of replication which will
203    // be from when we handle the EOF exception.
204    assertEquals(1, replicateCount.intValue(), "Replicated batches are not correct");
205    verifyNumberOfLogsInQueue(1, numRs);
206    // we're now writing to the new wal
207    // if everything works, the source should've stopped reading from the empty wal, and start
208    // replicating from the new wal
209    runSimplePutDeleteTest();
210    rollWalsAndWaitForDeque(numRs);
211  }
212
213  /**
214   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
215   * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and
216   * replicate it properly without missing data.
217   * @throws Exception throws any exception
218   */
219  @Test
220  public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
221    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
222    hbaseAdmin.disableReplicationPeer(PEER_ID2);
223    int numOfEntriesToReplicate = 20;
224
225    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
226    // for each RS, create an empty wal with same walGroupId
227    final List<Path> emptyWalPaths = new ArrayList<>();
228    long ts = EnvironmentEdgeManager.currentTime();
229    WAL wal = null;
230    for (int i = 0; i < numRs; i++) {
231      RegionInfo regionInfo =
232        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
233      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
234      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
235      appendEntriesToWal(numOfEntriesToReplicate, wal);
236      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
237      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
238      UTIL1.getTestFileSystem().create(emptyWalPath).close();
239      emptyWalPaths.add(emptyWalPath);
240
241    }
242    injectEmptyWAL(numRs, emptyWalPaths);
243    // roll the WAL now
244    for (int i = 0; i < numRs; i++) {
245      wal.rollWriter();
246    }
247    hbaseAdmin.enableReplicationPeer(PEER_ID2);
248    // ReplicationSource should advance past the empty wal, or else the test will fail
249    waitForLogAdvance(numRs);
250
251    // Now we should expect numOfEntriesToReplicate entries
252    // replicated from each region server. This makes sure we didn't loose data
253    // from any previous batch when we encounter EOF exception for empty file.
254    assertEquals(numOfEntriesToReplicate * numRs, replicatedEntries.size(),
255      "Replicated entries are not correct");
256
257    // We expect just one batch of replication to be shipped which will
258    // for non empty WAL
259    assertEquals(1, replicateCount.get(), "Replicated batches are not correct");
260    verifyNumberOfLogsInQueue(1, numRs);
261    // we're now writing to the new wal
262    // if everything works, the source should've stopped reading from the empty wal, and start
263    // replicating from the new wal
264    runSimplePutDeleteTest();
265    rollWalsAndWaitForDeque(numRs);
266  }
267
268  /**
269   * This test make sure we replicate all the enties from the non empty WALs which are surrounding
270   * the empty WALs
271   * @throws Exception throws exception
272   */
273  @Test
274  public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
275    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
276    hbaseAdmin.disableReplicationPeer(PEER_ID2);
277    int numOfEntriesToReplicate = 20;
278
279    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
280    // for each RS, create an empty wal with same walGroupId
281    final List<Path> emptyWalPaths = new ArrayList<>();
282    long ts = EnvironmentEdgeManager.currentTime();
283    WAL wal = null;
284    for (int i = 0; i < numRs; i++) {
285      RegionInfo regionInfo =
286        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
287      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
288      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
289      appendEntriesToWal(numOfEntriesToReplicate, wal);
290      wal.rollWriter();
291      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
292      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
293      UTIL1.getTestFileSystem().create(emptyWalPath).close();
294      emptyWalPaths.add(emptyWalPath);
295    }
296    injectEmptyWAL(numRs, emptyWalPaths);
297
298    // roll the WAL again with some entries
299    for (int i = 0; i < numRs; i++) {
300      appendEntriesToWal(numOfEntriesToReplicate, wal);
301      wal.rollWriter();
302    }
303
304    hbaseAdmin.enableReplicationPeer(PEER_ID2);
305    // ReplicationSource should advance past the empty wal, or else the test will fail
306    waitForLogAdvance(numRs);
307
308    // Now we should expect numOfEntriesToReplicate entries
309    // replicated from each region server. This makes sure we didn't loose data
310    // from any previous batch when we encounter EOF exception for empty file.
311    assertEquals(numOfEntriesToReplicate * numRs * 2, replicatedEntries.size(),
312      "Replicated entries are not correct");
313
314    // We expect two batch of replication to be shipped which will
315    // for non empty WAL
316    assertEquals(2, replicateCount.get(), "Replicated batches are not correct");
317    verifyNumberOfLogsInQueue(1, numRs);
318    // we're now writing to the new wal
319    // if everything works, the source should've stopped reading from the empty wal, and start
320    // replicating from the new wal
321    runSimplePutDeleteTest();
322    rollWalsAndWaitForDeque(numRs);
323  }
324
325  // inject our empty wal into the replication queue, and then roll the original wal, which
326  // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
327  // determine if the file being replicated currently is still opened for write, so just inject a
328  // new wal to the replication queue does not mean the previous file is closed.
329  private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
330    for (int i = 0; i < numRs; i++) {
331      HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
332      Replication replicationService = (Replication) hrs.getReplicationSourceService();
333      replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
334      RegionInfo regionInfo =
335        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
336      WAL wal = hrs.getWAL(regionInfo);
337      wal.rollWriter(true);
338    }
339  }
340
341  protected WALKeyImpl getWalKeyImpl() {
342    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
343  }
344
345  // Roll the WAL and wait for it to get deque from the log queue
346  private void rollWalsAndWaitForDeque(int numRs) throws IOException {
347    RegionInfo regionInfo =
348      UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
349    for (int i = 0; i < numRs; i++) {
350      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
351      wal.rollWriter();
352    }
353    waitForLogAdvance(numRs);
354  }
355
356  private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
357    long txId = -1;
358    for (int i = 0; i < numEntries; i++) {
359      byte[] b = Bytes.toBytes(Integer.toString(i));
360      KeyValue kv = new KeyValue(b, famName, b);
361      WALEdit edit = new WALEdit();
362      WALEditInternalHelper.addExtendedCell(edit, kv);
363      txId = wal.appendData(info, getWalKeyImpl(), edit);
364    }
365    wal.sync(txId);
366  }
367}