001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.KeyValue; 029import org.apache.hadoop.hbase.Waiter; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionInfoBuilder; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.replication.regionserver.Replication; 036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 043import org.apache.hadoop.hbase.wal.WAL; 044import org.apache.hadoop.hbase.wal.WALEdit; 045import org.apache.hadoop.hbase.wal.WALKeyImpl; 046import org.junit.Assert; 047import org.junit.Before; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051 052@Category({ ReplicationTests.class, LargeTests.class }) 053public class TestReplicationEmptyWALRecovery extends TestReplicationBase { 054 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 055 static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 056 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class); 061 062 @Before 063 public void setUp() throws IOException, InterruptedException { 064 cleanUp(); 065 scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL); 066 replicateCount.set(0); 067 replicatedEntries.clear(); 068 } 069 070 /** 071 * Waits until there is only one log(the current writing one) in the replication queue 072 * @param numRs number of region servers 073 */ 074 private void waitForLogAdvance(int numRs) { 075 Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() { 076 @Override 077 public boolean evaluate() throws Exception { 078 for (int i = 0; i < numRs; i++) { 079 HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); 080 RegionInfo regionInfo = 081 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 082 WAL wal = hrs.getWAL(regionInfo); 083 Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName(); 084 Replication replicationService = 085 (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); 086 for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() 087 .getSources()) { 088 ReplicationSource source = (ReplicationSource) rsi; 089 // We are making sure that there is only one log queue and that is for the 090 // current WAL of region server 091 String logPrefix = source.getQueues().keySet().stream().findFirst().get(); 092 if ( 093 !currentFile.equals(source.getCurrentPath()) 094 || source.getQueues().keySet().size() != 1 095 || source.getQueues().get(logPrefix).size() != 1 096 ) { 097 return false; 098 } 099 } 100 } 101 return true; 102 } 103 }); 104 } 105 106 private void verifyNumberOfLogsInQueue(int numQueues, int numRs) { 107 Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() { 108 @Override 109 public boolean evaluate() { 110 for (int i = 0; i < numRs; i++) { 111 Replication replicationService = 112 (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); 113 for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() 114 .getSources()) { 115 ReplicationSource source = (ReplicationSource) rsi; 116 String logPrefix = source.getQueues().keySet().stream().findFirst().get(); 117 if (source.getQueues().get(logPrefix).size() != numQueues) { 118 return false; 119 } 120 } 121 } 122 return true; 123 } 124 }); 125 } 126 127 @Test 128 public void testEmptyWALRecovery() throws Exception { 129 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 130 // for each RS, create an empty wal with same walGroupId 131 final List<Path> emptyWalPaths = new ArrayList<>(); 132 long ts = EnvironmentEdgeManager.currentTime(); 133 for (int i = 0; i < numRs; i++) { 134 RegionInfo regionInfo = 135 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 136 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 137 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 138 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 139 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 140 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 141 emptyWalPaths.add(emptyWalPath); 142 } 143 144 injectEmptyWAL(numRs, emptyWalPaths); 145 146 // ReplicationSource should advance past the empty wal, or else the test will fail 147 waitForLogAdvance(numRs); 148 verifyNumberOfLogsInQueue(1, numRs); 149 // we're now writing to the new wal 150 // if everything works, the source should've stopped reading from the empty wal, and start 151 // replicating from the new wal 152 runSimplePutDeleteTest(); 153 rollWalsAndWaitForDeque(numRs); 154 } 155 156 /** 157 * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we 158 * see the empty and handle the EOF exception, we are able to ship the previous batch of entries 159 * without loosing it. This test also tests the number of batches shipped 160 * @throws Exception throws any exception 161 */ 162 @Test 163 public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception { 164 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 165 hbaseAdmin.disableReplicationPeer(PEER_ID2); 166 int numOfEntriesToReplicate = 20; 167 168 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 169 // for each RS, create an empty wal with same walGroupId 170 final List<Path> emptyWalPaths = new ArrayList<>(); 171 long ts = EnvironmentEdgeManager.currentTime(); 172 for (int i = 0; i < numRs; i++) { 173 RegionInfo regionInfo = 174 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 175 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 176 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 177 178 appendEntriesToWal(numOfEntriesToReplicate, wal); 179 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 180 Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts); 181 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 182 emptyWalPaths.add(emptyWalPath); 183 } 184 185 injectEmptyWAL(numRs, emptyWalPaths); 186 // There should be three WALs in queue 187 // 1. non empty WAL 188 // 2. empty WAL 189 // 3. live WAL 190 verifyNumberOfLogsInQueue(3, numRs); 191 hbaseAdmin.enableReplicationPeer(PEER_ID2); 192 // ReplicationSource should advance past the empty wal, or else the test will fail 193 waitForLogAdvance(numRs); 194 195 // Now we should expect numOfEntriesToReplicate entries 196 // replicated from each region server. This makes sure we didn't loose data 197 // from any previous batch when we encounter EOF exception for empty file. 198 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, 199 replicatedEntries.size()); 200 201 // We expect just one batch of replication which will 202 // be from when we handle the EOF exception. 203 Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue()); 204 verifyNumberOfLogsInQueue(1, numRs); 205 // we're now writing to the new wal 206 // if everything works, the source should've stopped reading from the empty wal, and start 207 // replicating from the new wal 208 runSimplePutDeleteTest(); 209 rollWalsAndWaitForDeque(numRs); 210 } 211 212 /** 213 * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we 214 * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and 215 * replicate it properly without missing data. 216 * @throws Exception throws any exception 217 */ 218 @Test 219 public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception { 220 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 221 hbaseAdmin.disableReplicationPeer(PEER_ID2); 222 int numOfEntriesToReplicate = 20; 223 224 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 225 // for each RS, create an empty wal with same walGroupId 226 final List<Path> emptyWalPaths = new ArrayList<>(); 227 long ts = EnvironmentEdgeManager.currentTime(); 228 WAL wal = null; 229 for (int i = 0; i < numRs; i++) { 230 RegionInfo regionInfo = 231 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 232 wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 233 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 234 appendEntriesToWal(numOfEntriesToReplicate, wal); 235 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 236 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 237 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 238 emptyWalPaths.add(emptyWalPath); 239 240 } 241 injectEmptyWAL(numRs, emptyWalPaths); 242 // roll the WAL now 243 for (int i = 0; i < numRs; i++) { 244 wal.rollWriter(); 245 } 246 hbaseAdmin.enableReplicationPeer(PEER_ID2); 247 // ReplicationSource should advance past the empty wal, or else the test will fail 248 waitForLogAdvance(numRs); 249 250 // Now we should expect numOfEntriesToReplicate entries 251 // replicated from each region server. This makes sure we didn't loose data 252 // from any previous batch when we encounter EOF exception for empty file. 253 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, 254 replicatedEntries.size()); 255 256 // We expect just one batch of replication to be shipped which will 257 // for non empty WAL 258 Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get()); 259 verifyNumberOfLogsInQueue(1, numRs); 260 // we're now writing to the new wal 261 // if everything works, the source should've stopped reading from the empty wal, and start 262 // replicating from the new wal 263 runSimplePutDeleteTest(); 264 rollWalsAndWaitForDeque(numRs); 265 } 266 267 /** 268 * This test make sure we replicate all the enties from the non empty WALs which are surrounding 269 * the empty WALs 270 * @throws Exception throws exception 271 */ 272 @Test 273 public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception { 274 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 275 hbaseAdmin.disableReplicationPeer(PEER_ID2); 276 int numOfEntriesToReplicate = 20; 277 278 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 279 // for each RS, create an empty wal with same walGroupId 280 final List<Path> emptyWalPaths = new ArrayList<>(); 281 long ts = EnvironmentEdgeManager.currentTime(); 282 WAL wal = null; 283 for (int i = 0; i < numRs; i++) { 284 RegionInfo regionInfo = 285 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 286 wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 287 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 288 appendEntriesToWal(numOfEntriesToReplicate, wal); 289 wal.rollWriter(); 290 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 291 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 292 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 293 emptyWalPaths.add(emptyWalPath); 294 } 295 injectEmptyWAL(numRs, emptyWalPaths); 296 297 // roll the WAL again with some entries 298 for (int i = 0; i < numRs; i++) { 299 appendEntriesToWal(numOfEntriesToReplicate, wal); 300 wal.rollWriter(); 301 } 302 303 hbaseAdmin.enableReplicationPeer(PEER_ID2); 304 // ReplicationSource should advance past the empty wal, or else the test will fail 305 waitForLogAdvance(numRs); 306 307 // Now we should expect numOfEntriesToReplicate entries 308 // replicated from each region server. This makes sure we didn't loose data 309 // from any previous batch when we encounter EOF exception for empty file. 310 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2, 311 replicatedEntries.size()); 312 313 // We expect two batch of replication to be shipped which will 314 // for non empty WAL 315 Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get()); 316 verifyNumberOfLogsInQueue(1, numRs); 317 // we're now writing to the new wal 318 // if everything works, the source should've stopped reading from the empty wal, and start 319 // replicating from the new wal 320 runSimplePutDeleteTest(); 321 rollWalsAndWaitForDeque(numRs); 322 } 323 324 // inject our empty wal into the replication queue, and then roll the original wal, which 325 // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to 326 // determine if the file being replicated currently is still opened for write, so just inject a 327 // new wal to the replication queue does not mean the previous file is closed. 328 private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException { 329 for (int i = 0; i < numRs; i++) { 330 HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); 331 Replication replicationService = (Replication) hrs.getReplicationSourceService(); 332 replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); 333 replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); 334 RegionInfo regionInfo = 335 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 336 WAL wal = hrs.getWAL(regionInfo); 337 wal.rollWriter(true); 338 } 339 } 340 341 protected WALKeyImpl getWalKeyImpl() { 342 return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes); 343 } 344 345 // Roll the WAL and wait for it to get deque from the log queue 346 private void rollWalsAndWaitForDeque(int numRs) throws IOException { 347 RegionInfo regionInfo = 348 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 349 for (int i = 0; i < numRs; i++) { 350 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 351 wal.rollWriter(); 352 } 353 waitForLogAdvance(numRs); 354 } 355 356 private void appendEntriesToWal(int numEntries, WAL wal) throws IOException { 357 long txId = -1; 358 for (int i = 0; i < numEntries; i++) { 359 byte[] b = Bytes.toBytes(Integer.toString(i)); 360 KeyValue kv = new KeyValue(b, famName, b); 361 WALEdit edit = new WALEdit(); 362 edit.add(kv); 363 txId = wal.appendData(info, getWalKeyImpl(), edit); 364 } 365 wal.sync(txId); 366 } 367}