001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.KeyValue; 030import org.apache.hadoop.hbase.Waiter; 031import org.apache.hadoop.hbase.client.RegionInfo; 032import org.apache.hadoop.hbase.client.RegionInfoBuilder; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 035import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 036import org.apache.hadoop.hbase.replication.regionserver.Replication; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; 038import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.testclassification.ReplicationTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 044import org.apache.hadoop.hbase.wal.WAL; 045import org.apache.hadoop.hbase.wal.WALEdit; 046import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 047import org.apache.hadoop.hbase.wal.WALKeyImpl; 048import org.junit.jupiter.api.BeforeEach; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051 052@Tag(ReplicationTests.TAG) 053@Tag(LargeTests.TAG) 054public class TestReplicationEmptyWALRecovery extends TestReplicationBase { 055 056 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 057 static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 058 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 059 060 @BeforeEach 061 public void setUp() throws IOException, InterruptedException { 062 cleanUp(); 063 scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL); 064 replicateCount.set(0); 065 replicatedEntries.clear(); 066 } 067 068 /** 069 * Waits until there is only one log(the current writing one) in the replication queue 070 * @param numRs number of region servers 071 */ 072 private void waitForLogAdvance(int numRs) { 073 Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() { 074 @Override 075 public boolean evaluate() throws Exception { 076 for (int i = 0; i < numRs; i++) { 077 HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); 078 RegionInfo regionInfo = 079 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 080 WAL wal = hrs.getWAL(regionInfo); 081 Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName(); 082 Replication replicationService = 083 (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); 084 for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() 085 .getSources()) { 086 ReplicationSource source = (ReplicationSource) rsi; 087 // We are making sure that there is only one log queue and that is for the 088 // current WAL of region server 089 String logPrefix = source.getQueues().keySet().stream().findFirst().get(); 090 if ( 091 !currentFile.equals(source.getCurrentPath()) 092 || source.getQueues().keySet().size() != 1 093 || source.getQueues().get(logPrefix).size() != 1 094 ) { 095 return false; 096 } 097 } 098 } 099 return true; 100 } 101 }); 102 } 103 104 private void verifyNumberOfLogsInQueue(int numQueues, int numRs) { 105 Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() { 106 @Override 107 public boolean evaluate() { 108 for (int i = 0; i < numRs; i++) { 109 Replication replicationService = 110 (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); 111 for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() 112 .getSources()) { 113 ReplicationSource source = (ReplicationSource) rsi; 114 String logPrefix = source.getQueues().keySet().stream().findFirst().get(); 115 if (source.getQueues().get(logPrefix).size() != numQueues) { 116 return false; 117 } 118 } 119 } 120 return true; 121 } 122 }); 123 } 124 125 @Test 126 public void testEmptyWALRecovery() throws Exception { 127 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 128 // for each RS, create an empty wal with same walGroupId 129 final List<Path> emptyWalPaths = new ArrayList<>(); 130 long ts = EnvironmentEdgeManager.currentTime(); 131 for (int i = 0; i < numRs; i++) { 132 RegionInfo regionInfo = 133 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 134 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 135 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 136 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 137 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 138 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 139 emptyWalPaths.add(emptyWalPath); 140 } 141 142 injectEmptyWAL(numRs, emptyWalPaths); 143 144 // ReplicationSource should advance past the empty wal, or else the test will fail 145 waitForLogAdvance(numRs); 146 verifyNumberOfLogsInQueue(1, numRs); 147 // we're now writing to the new wal 148 // if everything works, the source should've stopped reading from the empty wal, and start 149 // replicating from the new wal 150 runSimplePutDeleteTest(); 151 rollWalsAndWaitForDeque(numRs); 152 } 153 154 /** 155 * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we 156 * see the empty and handle the EOF exception, we are able to ship the previous batch of entries 157 * without loosing it. This test also tests the number of batches shipped 158 * @throws Exception throws any exception 159 */ 160 @Test 161 public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception { 162 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 163 // make sure we only the current active wal file in queue 164 verifyNumberOfLogsInQueue(1, numRs); 165 166 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 167 hbaseAdmin.disableReplicationPeer(PEER_ID2); 168 169 int numOfEntriesToReplicate = 20; 170 // for each RS, create an empty wal with same walGroupId 171 final List<Path> emptyWalPaths = new ArrayList<>(); 172 long ts = EnvironmentEdgeManager.currentTime(); 173 for (int i = 0; i < numRs; i++) { 174 RegionInfo regionInfo = 175 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 176 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 177 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 178 179 appendEntriesToWal(numOfEntriesToReplicate, wal); 180 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 181 Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts); 182 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 183 emptyWalPaths.add(emptyWalPath); 184 } 185 186 injectEmptyWAL(numRs, emptyWalPaths); 187 // There should be three WALs in queue 188 // 1. non empty WAL 189 // 2. empty WAL 190 // 3. live WAL 191 verifyNumberOfLogsInQueue(3, numRs); 192 hbaseAdmin.enableReplicationPeer(PEER_ID2); 193 // ReplicationSource should advance past the empty wal, or else the test will fail 194 waitForLogAdvance(numRs); 195 196 // Now we should expect numOfEntriesToReplicate entries 197 // replicated from each region server. This makes sure we didn't loose data 198 // from any previous batch when we encounter EOF exception for empty file. 199 assertEquals(numOfEntriesToReplicate * numRs, replicatedEntries.size(), 200 "Replicated entries are not correct"); 201 202 // We expect just one batch of replication which will 203 // be from when we handle the EOF exception. 204 assertEquals(1, replicateCount.intValue(), "Replicated batches are not correct"); 205 verifyNumberOfLogsInQueue(1, numRs); 206 // we're now writing to the new wal 207 // if everything works, the source should've stopped reading from the empty wal, and start 208 // replicating from the new wal 209 runSimplePutDeleteTest(); 210 rollWalsAndWaitForDeque(numRs); 211 } 212 213 /** 214 * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we 215 * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and 216 * replicate it properly without missing data. 217 * @throws Exception throws any exception 218 */ 219 @Test 220 public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception { 221 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 222 hbaseAdmin.disableReplicationPeer(PEER_ID2); 223 int numOfEntriesToReplicate = 20; 224 225 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 226 // for each RS, create an empty wal with same walGroupId 227 final List<Path> emptyWalPaths = new ArrayList<>(); 228 long ts = EnvironmentEdgeManager.currentTime(); 229 WAL wal = null; 230 for (int i = 0; i < numRs; i++) { 231 RegionInfo regionInfo = 232 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 233 wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 234 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 235 appendEntriesToWal(numOfEntriesToReplicate, wal); 236 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 237 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 238 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 239 emptyWalPaths.add(emptyWalPath); 240 241 } 242 injectEmptyWAL(numRs, emptyWalPaths); 243 // roll the WAL now 244 for (int i = 0; i < numRs; i++) { 245 wal.rollWriter(); 246 } 247 hbaseAdmin.enableReplicationPeer(PEER_ID2); 248 // ReplicationSource should advance past the empty wal, or else the test will fail 249 waitForLogAdvance(numRs); 250 251 // Now we should expect numOfEntriesToReplicate entries 252 // replicated from each region server. This makes sure we didn't loose data 253 // from any previous batch when we encounter EOF exception for empty file. 254 assertEquals(numOfEntriesToReplicate * numRs, replicatedEntries.size(), 255 "Replicated entries are not correct"); 256 257 // We expect just one batch of replication to be shipped which will 258 // for non empty WAL 259 assertEquals(1, replicateCount.get(), "Replicated batches are not correct"); 260 verifyNumberOfLogsInQueue(1, numRs); 261 // we're now writing to the new wal 262 // if everything works, the source should've stopped reading from the empty wal, and start 263 // replicating from the new wal 264 runSimplePutDeleteTest(); 265 rollWalsAndWaitForDeque(numRs); 266 } 267 268 /** 269 * This test make sure we replicate all the enties from the non empty WALs which are surrounding 270 * the empty WALs 271 * @throws Exception throws exception 272 */ 273 @Test 274 public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception { 275 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 276 hbaseAdmin.disableReplicationPeer(PEER_ID2); 277 int numOfEntriesToReplicate = 20; 278 279 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 280 // for each RS, create an empty wal with same walGroupId 281 final List<Path> emptyWalPaths = new ArrayList<>(); 282 long ts = EnvironmentEdgeManager.currentTime(); 283 WAL wal = null; 284 for (int i = 0; i < numRs; i++) { 285 RegionInfo regionInfo = 286 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 287 wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 288 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 289 appendEntriesToWal(numOfEntriesToReplicate, wal); 290 wal.rollWriter(); 291 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 292 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 293 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 294 emptyWalPaths.add(emptyWalPath); 295 } 296 injectEmptyWAL(numRs, emptyWalPaths); 297 298 // roll the WAL again with some entries 299 for (int i = 0; i < numRs; i++) { 300 appendEntriesToWal(numOfEntriesToReplicate, wal); 301 wal.rollWriter(); 302 } 303 304 hbaseAdmin.enableReplicationPeer(PEER_ID2); 305 // ReplicationSource should advance past the empty wal, or else the test will fail 306 waitForLogAdvance(numRs); 307 308 // Now we should expect numOfEntriesToReplicate entries 309 // replicated from each region server. This makes sure we didn't loose data 310 // from any previous batch when we encounter EOF exception for empty file. 311 assertEquals(numOfEntriesToReplicate * numRs * 2, replicatedEntries.size(), 312 "Replicated entries are not correct"); 313 314 // We expect two batch of replication to be shipped which will 315 // for non empty WAL 316 assertEquals(2, replicateCount.get(), "Replicated batches are not correct"); 317 verifyNumberOfLogsInQueue(1, numRs); 318 // we're now writing to the new wal 319 // if everything works, the source should've stopped reading from the empty wal, and start 320 // replicating from the new wal 321 runSimplePutDeleteTest(); 322 rollWalsAndWaitForDeque(numRs); 323 } 324 325 // inject our empty wal into the replication queue, and then roll the original wal, which 326 // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to 327 // determine if the file being replicated currently is still opened for write, so just inject a 328 // new wal to the replication queue does not mean the previous file is closed. 329 private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException { 330 for (int i = 0; i < numRs; i++) { 331 HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); 332 Replication replicationService = (Replication) hrs.getReplicationSourceService(); 333 replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); 334 RegionInfo regionInfo = 335 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 336 WAL wal = hrs.getWAL(regionInfo); 337 wal.rollWriter(true); 338 } 339 } 340 341 protected WALKeyImpl getWalKeyImpl() { 342 return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes); 343 } 344 345 // Roll the WAL and wait for it to get deque from the log queue 346 private void rollWalsAndWaitForDeque(int numRs) throws IOException { 347 RegionInfo regionInfo = 348 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 349 for (int i = 0; i < numRs; i++) { 350 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 351 wal.rollWriter(); 352 } 353 waitForLogAdvance(numRs); 354 } 355 356 private void appendEntriesToWal(int numEntries, WAL wal) throws IOException { 357 long txId = -1; 358 for (int i = 0; i < numEntries; i++) { 359 byte[] b = Bytes.toBytes(Integer.toString(i)); 360 KeyValue kv = new KeyValue(b, famName, b); 361 WALEdit edit = new WALEdit(); 362 WALEditInternalHelper.addExtendedCell(edit, kv); 363 txId = wal.appendData(info, getWalKeyImpl(), edit); 364 } 365 wal.sync(txId); 366 } 367}