001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.hasItems; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 036import org.apache.hadoop.hbase.zookeeper.ZKConfig; 037import org.apache.zookeeper.KeeperException; 038import org.junit.Test; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 043 044/** 045 * White box testing for replication state interfaces. Implementations should extend this class, and 046 * initialize the interfaces properly. 047 */ 048public abstract class TestReplicationStateBasic { 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); 051 052 protected ReplicationQueueStorage rqs; 053 protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); 054 protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); 055 protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); 056 protected ReplicationPeers rp; 057 protected static final String ID_ONE = "1"; 058 protected static final String ID_TWO = "2"; 059 protected static String KEY_ONE; 060 protected static String KEY_TWO; 061 062 // For testing when we try to replicate to ourself 063 protected String OUR_KEY; 064 065 protected static int zkTimeoutCount; 066 protected static final int ZK_MAX_COUNT = 300; 067 protected static final int ZK_SLEEP_INTERVAL = 100; // millis 068 069 @Test 070 public void testReplicationQueueStorage() throws ReplicationException { 071 // Test methods with empty state 072 assertEquals(0, rqs.getListOfReplicators().size()); 073 assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty()); 074 assertTrue(rqs.getAllQueues(server1).isEmpty()); 075 076 /* 077 * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- 078 * server2: zero queues 079 */ 080 rqs.addWAL(server1, "qId1", "trash"); 081 rqs.removeWAL(server1, "qId1", "trash"); 082 rqs.addWAL(server1, "qId2", "filename1"); 083 rqs.addWAL(server1, "qId3", "filename2"); 084 rqs.addWAL(server1, "qId3", "filename3"); 085 rqs.addWAL(server2, "trash", "trash"); 086 rqs.removeQueue(server2, "trash"); 087 088 List<ServerName> reps = rqs.getListOfReplicators(); 089 assertEquals(2, reps.size()); 090 assertTrue(server1.getServerName(), reps.contains(server1)); 091 assertTrue(server2.getServerName(), reps.contains(server2)); 092 093 assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty()); 094 assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); 095 assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size()); 096 assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size()); 097 assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0)); 098 099 assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty()); 100 assertEquals(0, rqs.getAllQueues(server2).size()); 101 List<String> list = rqs.getAllQueues(server1); 102 assertEquals(3, list.size()); 103 assertTrue(list.contains("qId2")); 104 assertTrue(list.contains("qId3")); 105 } 106 107 private void removeAllQueues(ServerName serverName) throws ReplicationException { 108 for (String queue : rqs.getAllQueues(serverName)) { 109 rqs.removeQueue(serverName, queue); 110 } 111 } 112 113 @Test 114 public void testReplicationQueues() throws ReplicationException { 115 // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) 116 rp.init(); 117 118 rqs.removeQueue(server1, "bogus"); 119 rqs.removeWAL(server1, "bogus", "bogus"); 120 removeAllQueues(server1); 121 assertEquals(0, rqs.getAllQueues(server1).size()); 122 assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus")); 123 assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); 124 assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty()); 125 126 populateQueues(); 127 128 assertEquals(3, rqs.getListOfReplicators().size()); 129 assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); 130 assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); 131 assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); 132 rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap()); 133 assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); 134 135 assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); 136 assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); 137 assertEquals(0, rqs.getAllQueues(server1).size()); 138 assertEquals(1, rqs.getAllQueues(server2).size()); 139 assertEquals(5, rqs.getAllQueues(server3).size()); 140 141 assertEquals(0, rqs.getAllQueues(server1).size()); 142 rqs.removeReplicatorIfQueueIsEmpty(server1); 143 assertEquals(2, rqs.getListOfReplicators().size()); 144 145 List<String> queues = rqs.getAllQueues(server3); 146 assertEquals(5, queues.size()); 147 for (String queue : queues) { 148 rqs.claimQueue(server3, queue, server2); 149 } 150 rqs.removeReplicatorIfQueueIsEmpty(server3); 151 assertEquals(1, rqs.getListOfReplicators().size()); 152 153 assertEquals(6, rqs.getAllQueues(server2).size()); 154 removeAllQueues(server2); 155 rqs.removeReplicatorIfQueueIsEmpty(server2); 156 assertEquals(0, rqs.getListOfReplicators().size()); 157 } 158 159 @Test 160 public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { 161 rp.init(); 162 163 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 164 files1.add(new Pair<>(null, new Path("file_1"))); 165 files1.add(new Pair<>(null, new Path("file_2"))); 166 files1.add(new Pair<>(null, new Path("file_3"))); 167 assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); 168 assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); 169 rp.getPeerStorage().addPeer(ID_ONE, 170 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); 171 rqs.addPeerToHFileRefs(ID_ONE); 172 rqs.addHFileRefs(ID_ONE, files1); 173 assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); 174 assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); 175 List<String> hfiles2 = new ArrayList<>(files1.size()); 176 for (Pair<Path, Path> p : files1) { 177 hfiles2.add(p.getSecond().getName()); 178 } 179 String removedString = hfiles2.remove(0); 180 rqs.removeHFileRefs(ID_ONE, hfiles2); 181 assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); 182 hfiles2 = new ArrayList<>(1); 183 hfiles2.add(removedString); 184 rqs.removeHFileRefs(ID_ONE, hfiles2); 185 assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); 186 rp.getPeerStorage().removePeer(ID_ONE); 187 } 188 189 @Test 190 public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { 191 rp.init(); 192 rp.getPeerStorage().addPeer(ID_ONE, 193 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); 194 rqs.addPeerToHFileRefs(ID_ONE); 195 rp.getPeerStorage().addPeer(ID_TWO, 196 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true); 197 rqs.addPeerToHFileRefs(ID_TWO); 198 199 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 200 files1.add(new Pair<>(null, new Path("file_1"))); 201 files1.add(new Pair<>(null, new Path("file_2"))); 202 files1.add(new Pair<>(null, new Path("file_3"))); 203 rqs.addHFileRefs(ID_ONE, files1); 204 rqs.addHFileRefs(ID_TWO, files1); 205 assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); 206 assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); 207 assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); 208 209 rp.getPeerStorage().removePeer(ID_ONE); 210 rqs.removePeerFromHFileRefs(ID_ONE); 211 assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); 212 assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); 213 assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); 214 215 rp.getPeerStorage().removePeer(ID_TWO); 216 rqs.removePeerFromHFileRefs(ID_TWO); 217 assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); 218 assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); 219 } 220 221 @Test 222 public void testReplicationPeers() throws Exception { 223 rp.init(); 224 225 try { 226 rp.getPeerStorage().setPeerState("bogus", true); 227 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 228 } catch (ReplicationException e) { 229 } 230 try { 231 rp.getPeerStorage().setPeerState("bogus", false); 232 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 233 } catch (ReplicationException e) { 234 } 235 236 try { 237 assertFalse(rp.addPeer("bogus")); 238 fail("Should have thrown an ReplicationException when passed a bogus peerId"); 239 } catch (ReplicationException e) { 240 } 241 242 assertNumberOfPeers(0); 243 244 // Add some peers 245 rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); 246 assertNumberOfPeers(1); 247 rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); 248 assertNumberOfPeers(2); 249 250 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils 251 .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); 252 rp.getPeerStorage().removePeer(ID_ONE); 253 rp.removePeer(ID_ONE); 254 assertNumberOfPeers(1); 255 256 // Add one peer 257 rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); 258 rp.addPeer(ID_ONE); 259 assertNumberOfPeers(2); 260 assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); 261 rp.getPeerStorage().setPeerState(ID_ONE, false); 262 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 263 // manually... 264 ReplicationPeerImpl peer = rp.getPeer(ID_ONE); 265 rp.refreshPeerState(peer.getId()); 266 assertEquals(PeerState.DISABLED, peer.getPeerState()); 267 assertConnectedPeerStatus(false, ID_ONE); 268 rp.getPeerStorage().setPeerState(ID_ONE, true); 269 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 270 // manually... 271 rp.refreshPeerState(peer.getId()); 272 assertEquals(PeerState.ENABLED, peer.getPeerState()); 273 assertConnectedPeerStatus(true, ID_ONE); 274 275 // Disconnect peer 276 rp.removePeer(ID_ONE); 277 assertNumberOfPeers(2); 278 } 279 280 private String getFileName(String base, int i) { 281 return String.format(base + "-%04d", i); 282 } 283 284 @Test 285 public void testPersistLogPositionAndSeqIdAtomically() throws Exception { 286 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 287 assertTrue(rqs.getAllQueues(serverName1).isEmpty()); 288 String queue1 = "1"; 289 String region0 = "6b2c8f8555335cc9af74455b94516cbe", 290 region1 = "6ecd2e9e010499f8ddef97ee8f70834f"; 291 for (int i = 0; i < 10; i++) { 292 rqs.addWAL(serverName1, queue1, getFileName("file1", i)); 293 } 294 List<String> queueIds = rqs.getAllQueues(serverName1); 295 assertEquals(1, queueIds.size()); 296 assertThat(queueIds, hasItems("1")); 297 298 List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1); 299 assertEquals(10, wals1.size()); 300 for (int i = 0; i < 10; i++) { 301 assertThat(wals1, hasItems(getFileName("file1", i))); 302 } 303 304 for (int i = 0; i < 10; i++) { 305 assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); 306 } 307 assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1)); 308 assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1)); 309 310 for (int i = 0; i < 10; i++) { 311 rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, 312 ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); 313 } 314 315 for (int i = 0; i < 10; i++) { 316 assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); 317 } 318 assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); 319 assertEquals(1000L, rqs.getLastSequenceId(region1, queue1)); 320 321 // Try to decrease the last pushed id by setWALPosition method. 322 rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100, 323 ImmutableMap.of(region0, 899L, region1, 1001L)); 324 assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); 325 assertEquals(1001L, rqs.getLastSequenceId(region1, queue1)); 326 } 327 328 protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { 329 // we can first check if the value was changed in the store, if it wasn't then fail right away 330 if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { 331 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); 332 } 333 while (true) { 334 if (status == rp.getPeer(peerId).isPeerEnabled()) { 335 return; 336 } 337 if (zkTimeoutCount < ZK_MAX_COUNT) { 338 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status 339 + ", sleeping and trying again."); 340 Thread.sleep(ZK_SLEEP_INTERVAL); 341 } else { 342 fail("Timed out waiting for ConnectedPeerStatus to be " + status); 343 } 344 } 345 } 346 347 protected void assertNumberOfPeers(int total) throws ReplicationException { 348 assertEquals(total, rp.getPeerStorage().listPeerIds().size()); 349 } 350 351 /* 352 * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, 353 * 3, 4, 5 log files respectively 354 */ 355 protected void populateQueues() throws ReplicationException { 356 rqs.addWAL(server1, "trash", "trash"); 357 rqs.removeQueue(server1, "trash"); 358 359 rqs.addWAL(server2, "qId1", "trash"); 360 rqs.removeWAL(server2, "qId1", "trash"); 361 362 for (int i = 1; i < 6; i++) { 363 for (int j = 0; j < i; j++) { 364 rqs.addWAL(server3, "qId" + i, "filename" + j); 365 } 366 // Add peers for the corresponding queues so they are not orphans 367 rp.getPeerStorage().addPeer("qId" + i, ReplicationPeerConfig.newBuilder() 368 .setClusterKey(MiniZooKeeperCluster.HOST + ":2818:/bogus" + i).build(), true); 369 } 370 } 371}