001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.hasItems; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 036import org.apache.hadoop.hbase.zookeeper.ZKConfig; 037import org.apache.zookeeper.KeeperException; 038import org.junit.Test; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 043 044/** 045 * White box testing for replication state interfaces. Implementations should extend this class, and 046 * initialize the interfaces properly. 047 */ 048public abstract class TestReplicationStateBasic { 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); 051 052 protected ReplicationQueueStorage rqs; 053 protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); 054 protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); 055 protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); 056 protected ReplicationPeers rp; 057 protected static final String ID_ONE = "1"; 058 protected static final String ID_TWO = "2"; 059 protected static String KEY_ONE; 060 protected static String KEY_TWO; 061 062 // For testing when we try to replicate to ourself 063 protected String OUR_KEY; 064 065 protected static int zkTimeoutCount; 066 protected static final int ZK_MAX_COUNT = 300; 067 protected static final int ZK_SLEEP_INTERVAL = 100; // millis 068 069 @Test 070 public void testReplicationQueueStorage() throws ReplicationException { 071 // Test methods with empty state 072 assertEquals(0, rqs.getListOfReplicators().size()); 073 assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty()); 074 assertTrue(rqs.getAllQueues(server1).isEmpty()); 075 076 /* 077 * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- 078 * server2: zero queues 079 */ 080 rqs.addWAL(server1, "qId1", "trash"); 081 rqs.removeWAL(server1, "qId1", "trash"); 082 rqs.addWAL(server1, "qId2", "filename1"); 083 rqs.addWAL(server1, "qId3", "filename2"); 084 rqs.addWAL(server1, "qId3", "filename3"); 085 rqs.addWAL(server2, "trash", "trash"); 086 rqs.removeQueue(server2, "trash"); 087 088 List<ServerName> reps = rqs.getListOfReplicators(); 089 assertEquals(2, reps.size()); 090 assertTrue(server1.getServerName(), reps.contains(server1)); 091 assertTrue(server2.getServerName(), reps.contains(server2)); 092 093 assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty()); 094 assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); 095 assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size()); 096 assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size()); 097 assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0)); 098 099 assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty()); 100 assertEquals(0, rqs.getAllQueues(server2).size()); 101 List<String> list = rqs.getAllQueues(server1); 102 assertEquals(3, list.size()); 103 assertTrue(list.contains("qId2")); 104 assertTrue(list.contains("qId3")); 105 } 106 107 private void removeAllQueues(ServerName serverName) throws ReplicationException { 108 for (String queue : rqs.getAllQueues(serverName)) { 109 rqs.removeQueue(serverName, queue); 110 } 111 } 112 113 @Test 114 public void testReplicationQueues() throws ReplicationException { 115 // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) 116 rp.init(); 117 118 rqs.removeQueue(server1, "bogus"); 119 rqs.removeWAL(server1, "bogus", "bogus"); 120 removeAllQueues(server1); 121 assertEquals(0, rqs.getAllQueues(server1).size()); 122 assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus")); 123 assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); 124 assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty()); 125 126 populateQueues(); 127 128 assertEquals(3, rqs.getListOfReplicators().size()); 129 assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); 130 assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); 131 assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); 132 rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap()); 133 assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); 134 135 assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); 136 assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); 137 assertEquals(0, rqs.getAllQueues(server1).size()); 138 assertEquals(1, rqs.getAllQueues(server2).size()); 139 assertEquals(5, rqs.getAllQueues(server3).size()); 140 141 assertEquals(0, rqs.getAllQueues(server1).size()); 142 rqs.removeReplicatorIfQueueIsEmpty(server1); 143 assertEquals(2, rqs.getListOfReplicators().size()); 144 145 List<String> queues = rqs.getAllQueues(server3); 146 assertEquals(5, queues.size()); 147 for (String queue : queues) { 148 rqs.claimQueue(server3, queue, server2); 149 } 150 rqs.removeReplicatorIfQueueIsEmpty(server3); 151 assertEquals(1, rqs.getListOfReplicators().size()); 152 153 assertEquals(6, rqs.getAllQueues(server2).size()); 154 removeAllQueues(server2); 155 rqs.removeReplicatorIfQueueIsEmpty(server2); 156 assertEquals(0, rqs.getListOfReplicators().size()); 157 } 158 159 @Test 160 public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { 161 rp.init(); 162 163 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 164 files1.add(new Pair<>(null, new Path("file_1"))); 165 files1.add(new Pair<>(null, new Path("file_2"))); 166 files1.add(new Pair<>(null, new Path("file_3"))); 167 assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); 168 assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); 169 rp.getPeerStorage().addPeer(ID_ONE, 170 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, 171 SyncReplicationState.NONE); 172 rqs.addPeerToHFileRefs(ID_ONE); 173 rqs.addHFileRefs(ID_ONE, files1); 174 assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); 175 assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); 176 List<String> hfiles2 = new ArrayList<>(files1.size()); 177 for (Pair<Path, Path> p : files1) { 178 hfiles2.add(p.getSecond().getName()); 179 } 180 String removedString = hfiles2.remove(0); 181 rqs.removeHFileRefs(ID_ONE, hfiles2); 182 assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); 183 hfiles2 = new ArrayList<>(1); 184 hfiles2.add(removedString); 185 rqs.removeHFileRefs(ID_ONE, hfiles2); 186 assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); 187 rp.getPeerStorage().removePeer(ID_ONE); 188 } 189 190 @Test 191 public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { 192 rp.init(); 193 rp.getPeerStorage().addPeer(ID_ONE, 194 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, 195 SyncReplicationState.NONE); 196 rqs.addPeerToHFileRefs(ID_ONE); 197 rp.getPeerStorage().addPeer(ID_TWO, 198 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true, 199 SyncReplicationState.NONE); 200 rqs.addPeerToHFileRefs(ID_TWO); 201 202 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 203 files1.add(new Pair<>(null, new Path("file_1"))); 204 files1.add(new Pair<>(null, new Path("file_2"))); 205 files1.add(new Pair<>(null, new Path("file_3"))); 206 rqs.addHFileRefs(ID_ONE, files1); 207 rqs.addHFileRefs(ID_TWO, files1); 208 assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); 209 assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); 210 assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); 211 212 rp.getPeerStorage().removePeer(ID_ONE); 213 rqs.removePeerFromHFileRefs(ID_ONE); 214 assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); 215 assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); 216 assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); 217 218 rp.getPeerStorage().removePeer(ID_TWO); 219 rqs.removePeerFromHFileRefs(ID_TWO); 220 assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); 221 assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); 222 } 223 224 @Test 225 public void testReplicationPeers() throws Exception { 226 rp.init(); 227 228 try { 229 rp.getPeerStorage().setPeerState("bogus", true); 230 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 231 } catch (ReplicationException e) { 232 } 233 try { 234 rp.getPeerStorage().setPeerState("bogus", false); 235 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 236 } catch (ReplicationException e) { 237 } 238 239 try { 240 assertFalse(rp.addPeer("bogus")); 241 fail("Should have thrown an ReplicationException when passed a bogus peerId"); 242 } catch (ReplicationException e) { 243 } 244 245 assertNumberOfPeers(0); 246 247 // Add some peers 248 rp.getPeerStorage().addPeer(ID_ONE, 249 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, 250 SyncReplicationState.NONE); 251 assertNumberOfPeers(1); 252 rp.getPeerStorage().addPeer(ID_TWO, 253 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true, 254 SyncReplicationState.NONE); 255 assertNumberOfPeers(2); 256 257 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils 258 .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); 259 rp.getPeerStorage().removePeer(ID_ONE); 260 rp.removePeer(ID_ONE); 261 assertNumberOfPeers(1); 262 263 // Add one peer 264 rp.getPeerStorage().addPeer(ID_ONE, 265 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, 266 SyncReplicationState.NONE); 267 rp.addPeer(ID_ONE); 268 assertNumberOfPeers(2); 269 assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); 270 rp.getPeerStorage().setPeerState(ID_ONE, false); 271 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 272 // manually... 273 ReplicationPeerImpl peer = rp.getPeer(ID_ONE); 274 rp.refreshPeerState(peer.getId()); 275 assertEquals(PeerState.DISABLED, peer.getPeerState()); 276 assertConnectedPeerStatus(false, ID_ONE); 277 rp.getPeerStorage().setPeerState(ID_ONE, true); 278 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 279 // manually... 280 rp.refreshPeerState(peer.getId()); 281 assertEquals(PeerState.ENABLED, peer.getPeerState()); 282 assertConnectedPeerStatus(true, ID_ONE); 283 284 // Disconnect peer 285 rp.removePeer(ID_ONE); 286 assertNumberOfPeers(2); 287 } 288 289 private String getFileName(String base, int i) { 290 return String.format(base + "-%04d", i); 291 } 292 293 @Test 294 public void testPersistLogPositionAndSeqIdAtomically() throws Exception { 295 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 296 assertTrue(rqs.getAllQueues(serverName1).isEmpty()); 297 String queue1 = "1"; 298 String region0 = "6b2c8f8555335cc9af74455b94516cbe", 299 region1 = "6ecd2e9e010499f8ddef97ee8f70834f"; 300 for (int i = 0; i < 10; i++) { 301 rqs.addWAL(serverName1, queue1, getFileName("file1", i)); 302 } 303 List<String> queueIds = rqs.getAllQueues(serverName1); 304 assertEquals(1, queueIds.size()); 305 assertThat(queueIds, hasItems("1")); 306 307 List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1); 308 assertEquals(10, wals1.size()); 309 for (int i = 0; i < 10; i++) { 310 assertThat(wals1, hasItems(getFileName("file1", i))); 311 } 312 313 for (int i = 0; i < 10; i++) { 314 assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); 315 } 316 assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1)); 317 assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1)); 318 319 for (int i = 0; i < 10; i++) { 320 rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, 321 ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); 322 } 323 324 for (int i = 0; i < 10; i++) { 325 assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); 326 } 327 assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); 328 assertEquals(1000L, rqs.getLastSequenceId(region1, queue1)); 329 330 // Try to decrease the last pushed id by setWALPosition method. 331 rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100, 332 ImmutableMap.of(region0, 899L, region1, 1001L)); 333 assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); 334 assertEquals(1001L, rqs.getLastSequenceId(region1, queue1)); 335 } 336 337 protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { 338 // we can first check if the value was changed in the store, if it wasn't then fail right away 339 if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { 340 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); 341 } 342 while (true) { 343 if (status == rp.getPeer(peerId).isPeerEnabled()) { 344 return; 345 } 346 if (zkTimeoutCount < ZK_MAX_COUNT) { 347 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status 348 + ", sleeping and trying again."); 349 Thread.sleep(ZK_SLEEP_INTERVAL); 350 } else { 351 fail("Timed out waiting for ConnectedPeerStatus to be " + status); 352 } 353 } 354 } 355 356 protected void assertNumberOfPeers(int total) throws ReplicationException { 357 assertEquals(total, rp.getPeerStorage().listPeerIds().size()); 358 } 359 360 /* 361 * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, 362 * 3, 4, 5 log files respectively 363 */ 364 protected void populateQueues() throws ReplicationException { 365 rqs.addWAL(server1, "trash", "trash"); 366 rqs.removeQueue(server1, "trash"); 367 368 rqs.addWAL(server2, "qId1", "trash"); 369 rqs.removeWAL(server2, "qId1", "trash"); 370 371 for (int i = 1; i < 6; i++) { 372 for (int j = 0; j < i; j++) { 373 rqs.addWAL(server3, "qId" + i, "filename" + j); 374 } 375 // Add peers for the corresponding queues so they are not orphans 376 rp.getPeerStorage().addPeer("qId" + i, 377 ReplicationPeerConfig.newBuilder() 378 .setClusterKey(MiniZooKeeperCluster.HOST + ":2818:/bogus" + i).build(), 379 true, SyncReplicationState.NONE); 380 } 381 } 382}