001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.hasItems;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
036import org.apache.hadoop.hbase.zookeeper.ZKConfig;
037import org.apache.zookeeper.KeeperException;
038import org.junit.Test;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
043
044/**
045 * White box testing for replication state interfaces. Implementations should extend this class, and
046 * initialize the interfaces properly.
047 */
048public abstract class TestReplicationStateBasic {
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
051
052  protected ReplicationQueueStorage rqs;
053  protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
054  protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
055  protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345);
056  protected ReplicationPeers rp;
057  protected static final String ID_ONE = "1";
058  protected static final String ID_TWO = "2";
059  protected static String KEY_ONE;
060  protected static String KEY_TWO;
061
062  // For testing when we try to replicate to ourself
063  protected String OUR_KEY;
064
065  protected static int zkTimeoutCount;
066  protected static final int ZK_MAX_COUNT = 300;
067  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
068
069  @Test
070  public void testReplicationQueueStorage() throws ReplicationException {
071    // Test methods with empty state
072    assertEquals(0, rqs.getListOfReplicators().size());
073    assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty());
074    assertTrue(rqs.getAllQueues(server1).isEmpty());
075
076    /*
077     * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
078     * server2: zero queues
079     */
080    rqs.addWAL(server1, "qId1", "trash");
081    rqs.removeWAL(server1, "qId1", "trash");
082    rqs.addWAL(server1, "qId2", "filename1");
083    rqs.addWAL(server1, "qId3", "filename2");
084    rqs.addWAL(server1, "qId3", "filename3");
085    rqs.addWAL(server2, "trash", "trash");
086    rqs.removeQueue(server2, "trash");
087
088    List<ServerName> reps = rqs.getListOfReplicators();
089    assertEquals(2, reps.size());
090    assertTrue(server1.getServerName(), reps.contains(server1));
091    assertTrue(server2.getServerName(), reps.contains(server2));
092
093    assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty());
094    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
095    assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size());
096    assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size());
097    assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0));
098
099    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty());
100    assertEquals(0, rqs.getAllQueues(server2).size());
101    List<String> list = rqs.getAllQueues(server1);
102    assertEquals(3, list.size());
103    assertTrue(list.contains("qId2"));
104    assertTrue(list.contains("qId3"));
105  }
106
107  private void removeAllQueues(ServerName serverName) throws ReplicationException {
108    for (String queue : rqs.getAllQueues(serverName)) {
109      rqs.removeQueue(serverName, queue);
110    }
111  }
112
113  @Test
114  public void testReplicationQueues() throws ReplicationException {
115    // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
116    rp.init();
117
118    rqs.removeQueue(server1, "bogus");
119    rqs.removeWAL(server1, "bogus", "bogus");
120    removeAllQueues(server1);
121    assertEquals(0, rqs.getAllQueues(server1).size());
122    assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
123    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
124    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty());
125
126    populateQueues();
127
128    assertEquals(3, rqs.getListOfReplicators().size());
129    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
130    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
131    assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
132    rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap());
133    assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
134
135    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
136    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
137    assertEquals(0, rqs.getAllQueues(server1).size());
138    assertEquals(1, rqs.getAllQueues(server2).size());
139    assertEquals(5, rqs.getAllQueues(server3).size());
140
141    assertEquals(0, rqs.getAllQueues(server1).size());
142    rqs.removeReplicatorIfQueueIsEmpty(server1);
143    assertEquals(2, rqs.getListOfReplicators().size());
144
145    List<String> queues = rqs.getAllQueues(server3);
146    assertEquals(5, queues.size());
147    for (String queue : queues) {
148      rqs.claimQueue(server3, queue, server2);
149    }
150    rqs.removeReplicatorIfQueueIsEmpty(server3);
151    assertEquals(1, rqs.getListOfReplicators().size());
152
153    assertEquals(6, rqs.getAllQueues(server2).size());
154    removeAllQueues(server2);
155    rqs.removeReplicatorIfQueueIsEmpty(server2);
156    assertEquals(0, rqs.getListOfReplicators().size());
157  }
158
159  @Test
160  public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
161    rp.init();
162
163    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
164    files1.add(new Pair<>(null, new Path("file_1")));
165    files1.add(new Pair<>(null, new Path("file_2")));
166    files1.add(new Pair<>(null, new Path("file_3")));
167    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
168    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
169    rp.getPeerStorage().addPeer(ID_ONE,
170      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
171    rqs.addPeerToHFileRefs(ID_ONE);
172    rqs.addHFileRefs(ID_ONE, files1);
173    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
174    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
175    List<String> hfiles2 = new ArrayList<>(files1.size());
176    for (Pair<Path, Path> p : files1) {
177      hfiles2.add(p.getSecond().getName());
178    }
179    String removedString = hfiles2.remove(0);
180    rqs.removeHFileRefs(ID_ONE, hfiles2);
181    assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
182    hfiles2 = new ArrayList<>(1);
183    hfiles2.add(removedString);
184    rqs.removeHFileRefs(ID_ONE, hfiles2);
185    assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
186    rp.getPeerStorage().removePeer(ID_ONE);
187  }
188
189  @Test
190  public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
191    rp.init();
192    rp.getPeerStorage().addPeer(ID_ONE,
193      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
194    rqs.addPeerToHFileRefs(ID_ONE);
195    rp.getPeerStorage().addPeer(ID_TWO,
196      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
197    rqs.addPeerToHFileRefs(ID_TWO);
198
199    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
200    files1.add(new Pair<>(null, new Path("file_1")));
201    files1.add(new Pair<>(null, new Path("file_2")));
202    files1.add(new Pair<>(null, new Path("file_3")));
203    rqs.addHFileRefs(ID_ONE, files1);
204    rqs.addHFileRefs(ID_TWO, files1);
205    assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
206    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
207    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
208
209    rp.getPeerStorage().removePeer(ID_ONE);
210    rqs.removePeerFromHFileRefs(ID_ONE);
211    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
212    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
213    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
214
215    rp.getPeerStorage().removePeer(ID_TWO);
216    rqs.removePeerFromHFileRefs(ID_TWO);
217    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
218    assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
219  }
220
221  @Test
222  public void testReplicationPeers() throws Exception {
223    rp.init();
224
225    try {
226      rp.getPeerStorage().setPeerState("bogus", true);
227      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
228    } catch (ReplicationException e) {
229    }
230    try {
231      rp.getPeerStorage().setPeerState("bogus", false);
232      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
233    } catch (ReplicationException e) {
234    }
235
236    try {
237      assertFalse(rp.addPeer("bogus"));
238      fail("Should have thrown an ReplicationException when passed a bogus peerId");
239    } catch (ReplicationException e) {
240    }
241
242    assertNumberOfPeers(0);
243
244    // Add some peers
245    rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
246    assertNumberOfPeers(1);
247    rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
248    assertNumberOfPeers(2);
249
250    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
251      .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
252    rp.getPeerStorage().removePeer(ID_ONE);
253    rp.removePeer(ID_ONE);
254    assertNumberOfPeers(1);
255
256    // Add one peer
257    rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
258    rp.addPeer(ID_ONE);
259    assertNumberOfPeers(2);
260    assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
261    rp.getPeerStorage().setPeerState(ID_ONE, false);
262    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
263    // manually...
264    ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
265    rp.refreshPeerState(peer.getId());
266    assertEquals(PeerState.DISABLED, peer.getPeerState());
267    assertConnectedPeerStatus(false, ID_ONE);
268    rp.getPeerStorage().setPeerState(ID_ONE, true);
269    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
270    // manually...
271    rp.refreshPeerState(peer.getId());
272    assertEquals(PeerState.ENABLED, peer.getPeerState());
273    assertConnectedPeerStatus(true, ID_ONE);
274
275    // Disconnect peer
276    rp.removePeer(ID_ONE);
277    assertNumberOfPeers(2);
278  }
279
280  private String getFileName(String base, int i) {
281    return String.format(base + "-%04d", i);
282  }
283
284  @Test
285  public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
286    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
287    assertTrue(rqs.getAllQueues(serverName1).isEmpty());
288    String queue1 = "1";
289    String region0 = "6b2c8f8555335cc9af74455b94516cbe",
290        region1 = "6ecd2e9e010499f8ddef97ee8f70834f";
291    for (int i = 0; i < 10; i++) {
292      rqs.addWAL(serverName1, queue1, getFileName("file1", i));
293    }
294    List<String> queueIds = rqs.getAllQueues(serverName1);
295    assertEquals(1, queueIds.size());
296    assertThat(queueIds, hasItems("1"));
297
298    List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
299    assertEquals(10, wals1.size());
300    for (int i = 0; i < 10; i++) {
301      assertThat(wals1, hasItems(getFileName("file1", i)));
302    }
303
304    for (int i = 0; i < 10; i++) {
305      assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
306    }
307    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
308    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
309
310    for (int i = 0; i < 10; i++) {
311      rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
312        ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
313    }
314
315    for (int i = 0; i < 10; i++) {
316      assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
317    }
318    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
319    assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
320
321    // Try to decrease the last pushed id by setWALPosition method.
322    rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100,
323      ImmutableMap.of(region0, 899L, region1, 1001L));
324    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
325    assertEquals(1001L, rqs.getLastSequenceId(region1, queue1));
326  }
327
328  protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
329    // we can first check if the value was changed in the store, if it wasn't then fail right away
330    if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
331      fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
332    }
333    while (true) {
334      if (status == rp.getPeer(peerId).isPeerEnabled()) {
335        return;
336      }
337      if (zkTimeoutCount < ZK_MAX_COUNT) {
338        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
339          + ", sleeping and trying again.");
340        Thread.sleep(ZK_SLEEP_INTERVAL);
341      } else {
342        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
343      }
344    }
345  }
346
347  protected void assertNumberOfPeers(int total) throws ReplicationException {
348    assertEquals(total, rp.getPeerStorage().listPeerIds().size());
349  }
350
351  /*
352   * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
353   * 3, 4, 5 log files respectively
354   */
355  protected void populateQueues() throws ReplicationException {
356    rqs.addWAL(server1, "trash", "trash");
357    rqs.removeQueue(server1, "trash");
358
359    rqs.addWAL(server2, "qId1", "trash");
360    rqs.removeWAL(server2, "qId1", "trash");
361
362    for (int i = 1; i < 6; i++) {
363      for (int j = 0; j < i; j++) {
364        rqs.addWAL(server3, "qId" + i, "filename" + j);
365      }
366      // Add peers for the corresponding queues so they are not orphans
367      rp.getPeerStorage().addPeer("qId" + i, ReplicationPeerConfig.newBuilder()
368        .setClusterKey(MiniZooKeeperCluster.HOST + ":2818:/bogus" + i).build(), true);
369    }
370  }
371}