001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023 024import java.io.IOException; 025import java.util.OptionalLong; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.MiniHBaseCluster; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.Waiter.Predicate; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.regionserver.HRegionServer; 044import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 045import org.apache.hadoop.hbase.replication.regionserver.Replication; 046import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; 047import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; 048import org.apache.hadoop.hbase.testclassification.MediumTests; 049import org.apache.hadoop.hbase.testclassification.ReplicationTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.apache.hadoop.hbase.wal.WALFactory; 054import org.apache.hadoop.hbase.wal.WALKeyImpl; 055import org.apache.hadoop.hbase.wal.WALProvider; 056import org.junit.AfterClass; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.mockito.Mockito; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065@Category({ReplicationTests.class, MediumTests.class}) 066public class TestReplicationSource { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestReplicationSource.class); 071 072 private static final Logger LOG = 073 LoggerFactory.getLogger(TestReplicationSource.class); 074 private final static HBaseTestingUtility TEST_UTIL = 075 new HBaseTestingUtility(); 076 private final static HBaseTestingUtility TEST_UTIL_PEER = 077 new HBaseTestingUtility(); 078 private static FileSystem FS; 079 private static Path oldLogDir; 080 private static Path logDir; 081 private static Configuration conf = TEST_UTIL.getConfiguration(); 082 083 /** 084 * @throws java.lang.Exception 085 */ 086 @BeforeClass 087 public static void setUpBeforeClass() throws Exception { 088 TEST_UTIL.startMiniDFSCluster(1); 089 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 090 Path rootDir = TEST_UTIL.createRootDir(); 091 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 092 if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true); 093 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 094 if (FS.exists(logDir)) FS.delete(logDir, true); 095 } 096 097 @AfterClass 098 public static void tearDownAfterClass() throws Exception { 099 TEST_UTIL_PEER.shutdownMiniHBaseCluster(); 100 TEST_UTIL.shutdownMiniHBaseCluster(); 101 TEST_UTIL.shutdownMiniDFSCluster(); 102 } 103 104 /** 105 * Sanity check that we can move logs around while we are reading 106 * from them. Should this test fail, ReplicationSource would have a hard 107 * time reading logs that are being archived. 108 * @throws Exception 109 */ 110 @Test 111 public void testLogMoving() throws Exception{ 112 Path logPath = new Path(logDir, "log"); 113 if (!FS.exists(logDir)) FS.mkdirs(logDir); 114 if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); 115 WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, 116 TEST_UTIL.getConfiguration()); 117 for(int i = 0; i < 3; i++) { 118 byte[] b = Bytes.toBytes(Integer.toString(i)); 119 KeyValue kv = new KeyValue(b,b,b); 120 WALEdit edit = new WALEdit(); 121 edit.add(kv); 122 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, 123 HConstants.DEFAULT_CLUSTER_ID); 124 writer.append(new WAL.Entry(key, edit)); 125 writer.sync(); 126 } 127 writer.close(); 128 129 WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration()); 130 WAL.Entry entry = reader.next(); 131 assertNotNull(entry); 132 133 Path oldLogPath = new Path(oldLogDir, "log"); 134 FS.rename(logPath, oldLogPath); 135 136 entry = reader.next(); 137 assertNotNull(entry); 138 139 entry = reader.next(); 140 entry = reader.next(); 141 142 assertNull(entry); 143 reader.close(); 144 } 145 146 /** 147 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly 148 */ 149 @Test 150 public void testTerminateTimeout() throws Exception { 151 ReplicationSource source = new ReplicationSource(); 152 ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() { 153 @Override 154 protected void doStart() { 155 notifyStarted(); 156 } 157 158 @Override 159 protected void doStop() { 160 // not calling notifyStopped() here causes the caller of stop() to get a Future that never 161 // completes 162 } 163 }; 164 replicationEndpoint.start(); 165 ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class); 166 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 167 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 168 Configuration testConf = HBaseConfiguration.create(); 169 testConf.setInt("replication.source.maxretriesmultiplier", 1); 170 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 171 Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 172 source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null, 173 replicationEndpoint, p -> OptionalLong.empty(), null); 174 ExecutorService executor = Executors.newSingleThreadExecutor(); 175 Future<?> future = executor.submit(new Runnable() { 176 177 @Override 178 public void run() { 179 source.terminate("testing source termination"); 180 } 181 }); 182 long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); 183 Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() { 184 185 @Override 186 public boolean evaluate() throws Exception { 187 return future.isDone(); 188 } 189 190 }); 191 192 } 193 194 /** 195 * Tests that recovered queues are preserved on a regionserver shutdown. 196 * See HBASE-18192 197 * @throws Exception 198 */ 199 @Test 200 public void testServerShutdownRecoveredQueue() throws Exception { 201 try { 202 // Ensure single-threaded WAL 203 conf.set("hbase.wal.provider", "defaultProvider"); 204 conf.setInt("replication.sleep.before.failover", 2000); 205 // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. 206 conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); 207 MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); 208 TEST_UTIL_PEER.startMiniCluster(1); 209 210 HRegionServer serverA = cluster.getRegionServer(0); 211 final ReplicationSourceManager managerA = 212 ((Replication) serverA.getReplicationSourceService()).getReplicationManager(); 213 HRegionServer serverB = cluster.getRegionServer(1); 214 final ReplicationSourceManager managerB = 215 ((Replication) serverB.getReplicationSourceService()).getReplicationManager(); 216 final Admin admin = TEST_UTIL.getAdmin(); 217 218 final String peerId = "TestPeer"; 219 admin.addReplicationPeer(peerId, 220 new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey())); 221 // Wait for replication sources to come up 222 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 223 @Override public boolean evaluate() throws Exception { 224 return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); 225 } 226 }); 227 // Disabling peer makes sure there is at least one log to claim when the server dies 228 // The recovered queue will also stay there until the peer is disabled even if the 229 // WALs it contains have no data. 230 admin.disableReplicationPeer(peerId); 231 232 // Stopping serverA 233 // It's queues should be claimed by the only other alive server i.e. serverB 234 cluster.stopRegionServer(serverA.getServerName()); 235 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 236 @Override public boolean evaluate() throws Exception { 237 return managerB.getOldSources().size() == 1; 238 } 239 }); 240 241 final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); 242 serverC.waitForServerOnline(); 243 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 244 @Override public boolean evaluate() throws Exception { 245 return serverC.getReplicationSourceService() != null; 246 } 247 }); 248 final ReplicationSourceManager managerC = 249 ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); 250 // Sanity check 251 assertEquals(0, managerC.getOldSources().size()); 252 253 // Stopping serverB 254 // Now serverC should have two recovered queues: 255 // 1. The serverB's normal queue 256 // 2. serverA's recovered queue on serverB 257 cluster.stopRegionServer(serverB.getServerName()); 258 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 259 @Override public boolean evaluate() throws Exception { 260 return managerC.getOldSources().size() == 2; 261 } 262 }); 263 admin.enableReplicationPeer(peerId); 264 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 265 @Override public boolean evaluate() throws Exception { 266 return managerC.getOldSources().size() == 0; 267 } 268 }); 269 } finally { 270 conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); 271 } 272 } 273 274 /** 275 * Regionserver implementation that adds a delay on the graceful shutdown. 276 */ 277 public static class ShutdownDelayRegionServer extends HRegionServer { 278 public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException { 279 super(conf); 280 } 281 282 @Override 283 protected void stopServiceThreads() { 284 // Add a delay before service threads are shutdown. 285 // This will keep the zookeeper connection alive for the duration of the delay. 286 LOG.info("Adding a delay to the regionserver shutdown"); 287 try { 288 Thread.sleep(2000); 289 } catch (InterruptedException ex) { 290 LOG.error("Interrupted while sleeping"); 291 } 292 super.stopServiceThreads(); 293 } 294 } 295 296} 297