001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication; 020 021import static org.junit.Assert.*; 022 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.util.Pair; 029import org.apache.hadoop.hbase.zookeeper.ZKConfig; 030import org.apache.zookeeper.KeeperException; 031import org.junit.Before; 032import org.junit.Test; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * White box testing for replication state interfaces. Implementations should extend this class, and 038 * initialize the interfaces properly. 039 */ 040public abstract class TestReplicationStateBasic { 041 042 protected ReplicationQueues rq1; 043 protected ReplicationQueues rq2; 044 protected ReplicationQueues rq3; 045 protected ReplicationQueuesClient rqc; 046 protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString(); 047 protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString(); 048 protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString(); 049 protected ReplicationPeers rp; 050 protected static final String ID_ONE = "1"; 051 protected static final String ID_TWO = "2"; 052 protected static String KEY_ONE; 053 protected static String KEY_TWO; 054 055 // For testing when we try to replicate to ourself 056 protected String OUR_ID = "3"; 057 protected String OUR_KEY; 058 059 protected static int zkTimeoutCount; 060 protected static final int ZK_MAX_COUNT = 300; 061 protected static final int ZK_SLEEP_INTERVAL = 100; // millis 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); 064 065 @Before 066 public void setUp() { 067 zkTimeoutCount = 0; 068 } 069 070 @Test 071 public void testReplicationQueuesClient() throws ReplicationException, KeeperException { 072 rqc.init(); 073 // Test methods with empty state 074 assertEquals(0, rqc.getListOfReplicators().size()); 075 assertNull(rqc.getLogsInQueue(server1, "qId1")); 076 assertNull(rqc.getAllQueues(server1)); 077 078 /* 079 * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- 080 * server2: zero queues 081 */ 082 rq1.init(server1); 083 rq2.init(server2); 084 rq1.addLog("qId1", "trash"); 085 rq1.removeLog("qId1", "trash"); 086 rq1.addLog("qId2", "filename1"); 087 rq1.addLog("qId3", "filename2"); 088 rq1.addLog("qId3", "filename3"); 089 rq2.addLog("trash", "trash"); 090 rq2.removeQueue("trash"); 091 092 List<String> reps = rqc.getListOfReplicators(); 093 assertEquals(2, reps.size()); 094 assertTrue(server1, reps.contains(server1)); 095 assertTrue(server2, reps.contains(server2)); 096 097 assertNull(rqc.getLogsInQueue("bogus", "bogus")); 098 assertNull(rqc.getLogsInQueue(server1, "bogus")); 099 assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size()); 100 assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size()); 101 assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0)); 102 103 assertNull(rqc.getAllQueues("bogus")); 104 assertEquals(0, rqc.getAllQueues(server2).size()); 105 List<String> list = rqc.getAllQueues(server1); 106 assertEquals(3, list.size()); 107 assertTrue(list.contains("qId2")); 108 assertTrue(list.contains("qId3")); 109 } 110 111 @Test 112 public void testReplicationQueues() throws ReplicationException { 113 rq1.init(server1); 114 rq2.init(server2); 115 rq3.init(server3); 116 //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) 117 rp.init(); 118 119 // 3 replicators should exist 120 assertEquals(3, rq1.getListOfReplicators().size()); 121 rq1.removeQueue("bogus"); 122 rq1.removeLog("bogus", "bogus"); 123 rq1.removeAllQueues(); 124 assertEquals(0, rq1.getAllQueues().size()); 125 assertEquals(0, rq1.getLogPosition("bogus", "bogus")); 126 assertNull(rq1.getLogsInQueue("bogus")); 127 assertNull(rq1.getUnClaimedQueueIds( 128 ServerName.valueOf("bogus", 1234, -1L).toString())); 129 130 rq1.setLogPosition("bogus", "bogus", 5L); 131 132 populateQueues(); 133 134 assertEquals(3, rq1.getListOfReplicators().size()); 135 assertEquals(0, rq2.getLogsInQueue("qId1").size()); 136 assertEquals(5, rq3.getLogsInQueue("qId5").size()); 137 assertEquals(0, rq3.getLogPosition("qId1", "filename0")); 138 rq3.setLogPosition("qId5", "filename4", 354L); 139 assertEquals(354L, rq3.getLogPosition("qId5", "filename4")); 140 141 assertEquals(5, rq3.getLogsInQueue("qId5").size()); 142 assertEquals(0, rq2.getLogsInQueue("qId1").size()); 143 assertEquals(0, rq1.getAllQueues().size()); 144 assertEquals(1, rq2.getAllQueues().size()); 145 assertEquals(5, rq3.getAllQueues().size()); 146 147 assertEquals(0, rq3.getUnClaimedQueueIds(server1).size()); 148 rq3.removeReplicatorIfQueueIsEmpty(server1); 149 assertEquals(2, rq3.getListOfReplicators().size()); 150 151 List<String> queues = rq2.getUnClaimedQueueIds(server3); 152 assertEquals(5, queues.size()); 153 for(String queue: queues) { 154 rq2.claimQueue(server3, queue); 155 } 156 rq2.removeReplicatorIfQueueIsEmpty(server3); 157 assertEquals(1, rq2.getListOfReplicators().size()); 158 159 // Try to claim our own queues 160 assertNull(rq2.getUnClaimedQueueIds(server2)); 161 rq2.removeReplicatorIfQueueIsEmpty(server2); 162 163 assertEquals(6, rq2.getAllQueues().size()); 164 165 rq2.removeAllQueues(); 166 167 assertEquals(0, rq2.getListOfReplicators().size()); 168 } 169 170 @Test 171 public void testInvalidClusterKeys() throws ReplicationException, KeeperException { 172 rp.init(); 173 174 try { 175 rp.registerPeer(ID_ONE, 176 new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase")); 177 fail("Should throw an IllegalArgumentException because " 178 + "zookeeper.znode.parent is missing leading '/'."); 179 } catch (IllegalArgumentException e) { 180 // Expected. 181 } 182 183 try { 184 rp.registerPeer(ID_ONE, 185 new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/")); 186 fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing."); 187 } catch (IllegalArgumentException e) { 188 // Expected. 189 } 190 191 try { 192 rp.registerPeer(ID_ONE, 193 new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase")); 194 fail("Should throw an IllegalArgumentException because " 195 + "hbase.zookeeper.property.clientPort is missing."); 196 } catch (IllegalArgumentException e) { 197 // Expected. 198 } 199 } 200 201 @Test 202 public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { 203 rp.init(); 204 rq1.init(server1); 205 rqc.init(); 206 207 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 208 files1.add(new Pair<>(null, new Path("file_1"))); 209 files1.add(new Pair<>(null, new Path("file_2"))); 210 files1.add(new Pair<>(null, new Path("file_3"))); 211 assertNull(rqc.getReplicableHFiles(ID_ONE)); 212 assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); 213 rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); 214 rq1.addPeerToHFileRefs(ID_ONE); 215 rq1.addHFileRefs(ID_ONE, files1); 216 assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); 217 assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); 218 List<String> hfiles2 = new ArrayList<>(files1.size()); 219 for (Pair<Path, Path> p : files1) { 220 hfiles2.add(p.getSecond().getName()); 221 } 222 String removedString = hfiles2.remove(0); 223 rq1.removeHFileRefs(ID_ONE, hfiles2); 224 assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size()); 225 hfiles2 = new ArrayList<>(1); 226 hfiles2.add(removedString); 227 rq1.removeHFileRefs(ID_ONE, hfiles2); 228 assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size()); 229 rp.unregisterPeer(ID_ONE); 230 } 231 232 @Test 233 public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { 234 rq1.init(server1); 235 rqc.init(); 236 237 rp.init(); 238 rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); 239 rq1.addPeerToHFileRefs(ID_ONE); 240 rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); 241 rq1.addPeerToHFileRefs(ID_TWO); 242 243 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 244 files1.add(new Pair<>(null, new Path("file_1"))); 245 files1.add(new Pair<>(null, new Path("file_2"))); 246 files1.add(new Pair<>(null, new Path("file_3"))); 247 rq1.addHFileRefs(ID_ONE, files1); 248 rq1.addHFileRefs(ID_TWO, files1); 249 assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size()); 250 assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); 251 assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); 252 253 rp.unregisterPeer(ID_ONE); 254 rq1.removePeerFromHFileRefs(ID_ONE); 255 assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); 256 assertNull(rqc.getReplicableHFiles(ID_ONE)); 257 assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); 258 259 rp.unregisterPeer(ID_TWO); 260 rq1.removePeerFromHFileRefs(ID_TWO); 261 assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); 262 assertNull(rqc.getReplicableHFiles(ID_TWO)); 263 } 264 265 @Test 266 public void testReplicationPeers() throws Exception { 267 rp.init(); 268 269 // Test methods with non-existent peer ids 270 try { 271 rp.unregisterPeer("bogus"); 272 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 273 } catch (IllegalArgumentException e) { 274 } 275 try { 276 rp.enablePeer("bogus"); 277 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 278 } catch (IllegalArgumentException e) { 279 } 280 try { 281 rp.disablePeer("bogus"); 282 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 283 } catch (IllegalArgumentException e) { 284 } 285 try { 286 rp.getStatusOfPeer("bogus"); 287 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 288 } catch (IllegalArgumentException e) { 289 } 290 assertFalse(rp.peerConnected("bogus")); 291 rp.peerDisconnected("bogus"); 292 293 assertNull(rp.getPeerConf("bogus")); 294 assertNumberOfPeers(0); 295 296 // Add some peers 297 rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); 298 assertNumberOfPeers(1); 299 rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); 300 assertNumberOfPeers(2); 301 302 // Test methods with a peer that is added but not connected 303 try { 304 rp.getStatusOfPeer(ID_ONE); 305 fail("There are no connected peers, should have thrown an IllegalArgumentException"); 306 } catch (IllegalArgumentException e) { 307 } 308 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); 309 rp.unregisterPeer(ID_ONE); 310 rp.peerDisconnected(ID_ONE); 311 assertNumberOfPeers(1); 312 313 // Add one peer 314 rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); 315 rp.peerConnected(ID_ONE); 316 assertNumberOfPeers(2); 317 assertTrue(rp.getStatusOfPeer(ID_ONE)); 318 rp.disablePeer(ID_ONE); 319 assertConnectedPeerStatus(false, ID_ONE); 320 rp.enablePeer(ID_ONE); 321 assertConnectedPeerStatus(true, ID_ONE); 322 323 // Disconnect peer 324 rp.peerDisconnected(ID_ONE); 325 assertNumberOfPeers(2); 326 try { 327 rp.getStatusOfPeer(ID_ONE); 328 fail("There are no connected peers, should have thrown an IllegalArgumentException"); 329 } catch (IllegalArgumentException e) { 330 } 331 } 332 333 protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { 334 // we can first check if the value was changed in the store, if it wasn't then fail right away 335 if (status != rp.getStatusOfPeerFromBackingStore(peerId)) { 336 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); 337 } 338 while (true) { 339 if (status == rp.getStatusOfPeer(peerId)) { 340 return; 341 } 342 if (zkTimeoutCount < ZK_MAX_COUNT) { 343 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status 344 + ", sleeping and trying again."); 345 Thread.sleep(ZK_SLEEP_INTERVAL); 346 } else { 347 fail("Timed out waiting for ConnectedPeerStatus to be " + status); 348 } 349 } 350 } 351 352 protected void assertNumberOfPeers(int total) { 353 assertEquals(total, rp.getAllPeerConfigs().size()); 354 assertEquals(total, rp.getAllPeerIds().size()); 355 assertEquals(total, rp.getAllPeerIds().size()); 356 } 357 358 /* 359 * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, 360 * 3, 4, 5 log files respectively 361 */ 362 protected void populateQueues() throws ReplicationException { 363 rq1.addLog("trash", "trash"); 364 rq1.removeQueue("trash"); 365 366 rq2.addLog("qId1", "trash"); 367 rq2.removeLog("qId1", "trash"); 368 369 for (int i = 1; i < 6; i++) { 370 for (int j = 0; j < i; j++) { 371 rq3.addLog("qId" + i, "filename" + j); 372 } 373 //Add peers for the corresponding queues so they are not orphans 374 rp.registerPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); 375 } 376 } 377} 378