001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.hasItems; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertThat; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.hadoop.hbase.zookeeper.ZKConfig; 037import org.apache.zookeeper.KeeperException; 038import org.junit.Test; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 043 044/** 045 * White box testing for replication state interfaces. Implementations should extend this class, and 046 * initialize the interfaces properly. 047 */ 048public abstract class TestReplicationStateBasic { 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); 051 052 protected ReplicationQueueStorage rqs; 053 protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); 054 protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); 055 protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); 056 protected ReplicationPeers rp; 057 protected static final String ID_ONE = "1"; 058 protected static final String ID_TWO = "2"; 059 protected static String KEY_ONE; 060 protected static String KEY_TWO; 061 062 // For testing when we try to replicate to ourself 063 protected String OUR_KEY; 064 065 protected static int zkTimeoutCount; 066 protected static final int ZK_MAX_COUNT = 300; 067 protected static final int ZK_SLEEP_INTERVAL = 100; // millis 068 069 @Test 070 public void testReplicationQueueStorage() throws ReplicationException { 071 // Test methods with empty state 072 assertEquals(0, rqs.getListOfReplicators().size()); 073 assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty()); 074 assertTrue(rqs.getAllQueues(server1).isEmpty()); 075 076 /* 077 * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- 078 * server2: zero queues 079 */ 080 rqs.addWAL(server1, "qId1", "trash"); 081 rqs.removeWAL(server1, "qId1", "trash"); 082 rqs.addWAL(server1,"qId2", "filename1"); 083 rqs.addWAL(server1,"qId3", "filename2"); 084 rqs.addWAL(server1,"qId3", "filename3"); 085 rqs.addWAL(server2,"trash", "trash"); 086 rqs.removeQueue(server2,"trash"); 087 088 List<ServerName> reps = rqs.getListOfReplicators(); 089 assertEquals(2, reps.size()); 090 assertTrue(server1.getServerName(), reps.contains(server1)); 091 assertTrue(server2.getServerName(), reps.contains(server2)); 092 093 assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty()); 094 assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); 095 assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size()); 096 assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size()); 097 assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0)); 098 099 assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty()); 100 assertEquals(0, rqs.getAllQueues(server2).size()); 101 List<String> list = rqs.getAllQueues(server1); 102 assertEquals(3, list.size()); 103 assertTrue(list.contains("qId2")); 104 assertTrue(list.contains("qId3")); 105 } 106 107 private void removeAllQueues(ServerName serverName) throws ReplicationException { 108 for (String queue: rqs.getAllQueues(serverName)) { 109 rqs.removeQueue(serverName, queue); 110 } 111 } 112 @Test 113 public void testReplicationQueues() throws ReplicationException { 114 // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) 115 rp.init(); 116 117 rqs.removeQueue(server1, "bogus"); 118 rqs.removeWAL(server1, "bogus", "bogus"); 119 removeAllQueues(server1); 120 assertEquals(0, rqs.getAllQueues(server1).size()); 121 assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus")); 122 assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); 123 assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty()); 124 125 populateQueues(); 126 127 assertEquals(3, rqs.getListOfReplicators().size()); 128 assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); 129 assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); 130 assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); 131 rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap()); 132 assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); 133 134 assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); 135 assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); 136 assertEquals(0, rqs.getAllQueues(server1).size()); 137 assertEquals(1, rqs.getAllQueues(server2).size()); 138 assertEquals(5, rqs.getAllQueues(server3).size()); 139 140 assertEquals(0, rqs.getAllQueues(server1).size()); 141 rqs.removeReplicatorIfQueueIsEmpty(server1); 142 assertEquals(2, rqs.getListOfReplicators().size()); 143 144 List<String> queues = rqs.getAllQueues(server3); 145 assertEquals(5, queues.size()); 146 for (String queue : queues) { 147 rqs.claimQueue(server3, queue, server2); 148 } 149 rqs.removeReplicatorIfQueueIsEmpty(server3); 150 assertEquals(1, rqs.getListOfReplicators().size()); 151 152 assertEquals(6, rqs.getAllQueues(server2).size()); 153 removeAllQueues(server2); 154 rqs.removeReplicatorIfQueueIsEmpty(server2); 155 assertEquals(0, rqs.getListOfReplicators().size()); 156 } 157 158 @Test 159 public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { 160 rp.init(); 161 162 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 163 files1.add(new Pair<>(null, new Path("file_1"))); 164 files1.add(new Pair<>(null, new Path("file_2"))); 165 files1.add(new Pair<>(null, new Path("file_3"))); 166 assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); 167 assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); 168 rp.getPeerStorage().addPeer(ID_ONE, 169 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); 170 rqs.addPeerToHFileRefs(ID_ONE); 171 rqs.addHFileRefs(ID_ONE, files1); 172 assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); 173 assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); 174 List<String> hfiles2 = new ArrayList<>(files1.size()); 175 for (Pair<Path, Path> p : files1) { 176 hfiles2.add(p.getSecond().getName()); 177 } 178 String removedString = hfiles2.remove(0); 179 rqs.removeHFileRefs(ID_ONE, hfiles2); 180 assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); 181 hfiles2 = new ArrayList<>(1); 182 hfiles2.add(removedString); 183 rqs.removeHFileRefs(ID_ONE, hfiles2); 184 assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); 185 rp.getPeerStorage().removePeer(ID_ONE); 186 } 187 188 @Test 189 public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { 190 rp.init(); 191 rp.getPeerStorage().addPeer(ID_ONE, 192 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); 193 rqs.addPeerToHFileRefs(ID_ONE); 194 rp.getPeerStorage().addPeer(ID_TWO, 195 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true); 196 rqs.addPeerToHFileRefs(ID_TWO); 197 198 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 199 files1.add(new Pair<>(null, new Path("file_1"))); 200 files1.add(new Pair<>(null, new Path("file_2"))); 201 files1.add(new Pair<>(null, new Path("file_3"))); 202 rqs.addHFileRefs(ID_ONE, files1); 203 rqs.addHFileRefs(ID_TWO, files1); 204 assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); 205 assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); 206 assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); 207 208 rp.getPeerStorage().removePeer(ID_ONE); 209 rqs.removePeerFromHFileRefs(ID_ONE); 210 assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); 211 assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); 212 assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); 213 214 rp.getPeerStorage().removePeer(ID_TWO); 215 rqs.removePeerFromHFileRefs(ID_TWO); 216 assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); 217 assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); 218 } 219 220 @Test 221 public void testReplicationPeers() throws Exception { 222 rp.init(); 223 224 try { 225 rp.getPeerStorage().setPeerState("bogus", true); 226 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 227 } catch (ReplicationException e) { 228 } 229 try { 230 rp.getPeerStorage().setPeerState("bogus", false); 231 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 232 } catch (ReplicationException e) { 233 } 234 235 try { 236 assertFalse(rp.addPeer("bogus")); 237 fail("Should have thrown an ReplicationException when passed a bogus peerId"); 238 } catch (ReplicationException e) { 239 } 240 241 assertNumberOfPeers(0); 242 243 // Add some peers 244 rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); 245 assertNumberOfPeers(1); 246 rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); 247 assertNumberOfPeers(2); 248 249 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils 250 .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); 251 rp.getPeerStorage().removePeer(ID_ONE); 252 rp.removePeer(ID_ONE); 253 assertNumberOfPeers(1); 254 255 // Add one peer 256 rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); 257 rp.addPeer(ID_ONE); 258 assertNumberOfPeers(2); 259 assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); 260 rp.getPeerStorage().setPeerState(ID_ONE, false); 261 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 262 // manually... 263 ReplicationPeerImpl peer = rp.getPeer(ID_ONE); 264 rp.refreshPeerState(peer.getId()); 265 assertEquals(PeerState.DISABLED, peer.getPeerState()); 266 assertConnectedPeerStatus(false, ID_ONE); 267 rp.getPeerStorage().setPeerState(ID_ONE, true); 268 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 269 // manually... 270 rp.refreshPeerState(peer.getId()); 271 assertEquals(PeerState.ENABLED, peer.getPeerState()); 272 assertConnectedPeerStatus(true, ID_ONE); 273 274 // Disconnect peer 275 rp.removePeer(ID_ONE); 276 assertNumberOfPeers(2); 277 } 278 279 private String getFileName(String base, int i) { 280 return String.format(base + "-%04d", i); 281 } 282 283 @Test 284 public void testPersistLogPositionAndSeqIdAtomically() throws Exception { 285 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 286 assertTrue(rqs.getAllQueues(serverName1).isEmpty()); 287 String queue1 = "1"; 288 String region0 = "6b2c8f8555335cc9af74455b94516cbe", 289 region1 = "6ecd2e9e010499f8ddef97ee8f70834f"; 290 for (int i = 0; i < 10; i++) { 291 rqs.addWAL(serverName1, queue1, getFileName("file1", i)); 292 } 293 List<String> queueIds = rqs.getAllQueues(serverName1); 294 assertEquals(1, queueIds.size()); 295 assertThat(queueIds, hasItems("1")); 296 297 List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1); 298 assertEquals(10, wals1.size()); 299 for (int i = 0; i < 10; i++) { 300 assertThat(wals1, hasItems(getFileName("file1", i))); 301 } 302 303 for (int i = 0; i < 10; i++) { 304 assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); 305 } 306 assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1)); 307 assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1)); 308 309 for (int i = 0; i < 10; i++) { 310 rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, 311 ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); 312 } 313 314 for (int i = 0; i < 10; i++) { 315 assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); 316 } 317 assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); 318 assertEquals(1000L, rqs.getLastSequenceId(region1, queue1)); 319 320 // Try to decrease the last pushed id by setWALPosition method. 321 rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100, 322 ImmutableMap.of(region0, 899L, region1, 1001L)); 323 assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); 324 assertEquals(1001L, rqs.getLastSequenceId(region1, queue1)); 325 } 326 327 protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { 328 // we can first check if the value was changed in the store, if it wasn't then fail right away 329 if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { 330 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); 331 } 332 while (true) { 333 if (status == rp.getPeer(peerId).isPeerEnabled()) { 334 return; 335 } 336 if (zkTimeoutCount < ZK_MAX_COUNT) { 337 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status 338 + ", sleeping and trying again."); 339 Thread.sleep(ZK_SLEEP_INTERVAL); 340 } else { 341 fail("Timed out waiting for ConnectedPeerStatus to be " + status); 342 } 343 } 344 } 345 346 protected void assertNumberOfPeers(int total) throws ReplicationException { 347 assertEquals(total, rp.getPeerStorage().listPeerIds().size()); 348 } 349 350 /* 351 * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, 352 * 3, 4, 5 log files respectively 353 */ 354 protected void populateQueues() throws ReplicationException { 355 rqs.addWAL(server1, "trash", "trash"); 356 rqs.removeQueue(server1, "trash"); 357 358 rqs.addWAL(server2, "qId1", "trash"); 359 rqs.removeWAL(server2, "qId1", "trash"); 360 361 for (int i = 1; i < 6; i++) { 362 for (int j = 0; j < i; j++) { 363 rqs.addWAL(server3, "qId" + i, "filename" + j); 364 } 365 // Add peers for the corresponding queues so they are not orphans 366 rp.getPeerStorage().addPeer("qId" + i, 367 ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(), 368 true); 369 } 370 } 371}