001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.hasItems;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertThat;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.zookeeper.ZKConfig;
037import org.apache.zookeeper.KeeperException;
038import org.junit.Test;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
043
044/**
045 * White box testing for replication state interfaces. Implementations should extend this class, and
046 * initialize the interfaces properly.
047 */
048public abstract class TestReplicationStateBasic {
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
051
052  protected ReplicationQueueStorage rqs;
053  protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
054  protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
055  protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345);
056  protected ReplicationPeers rp;
057  protected static final String ID_ONE = "1";
058  protected static final String ID_TWO = "2";
059  protected static String KEY_ONE;
060  protected static String KEY_TWO;
061
062  // For testing when we try to replicate to ourself
063  protected String OUR_KEY;
064
065  protected static int zkTimeoutCount;
066  protected static final int ZK_MAX_COUNT = 300;
067  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
068
069  @Test
070  public void testReplicationQueueStorage() throws ReplicationException {
071    // Test methods with empty state
072    assertEquals(0, rqs.getListOfReplicators().size());
073    assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty());
074    assertTrue(rqs.getAllQueues(server1).isEmpty());
075
076    /*
077     * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
078     * server2: zero queues
079     */
080    rqs.addWAL(server1, "qId1", "trash");
081    rqs.removeWAL(server1, "qId1", "trash");
082    rqs.addWAL(server1,"qId2", "filename1");
083    rqs.addWAL(server1,"qId3", "filename2");
084    rqs.addWAL(server1,"qId3", "filename3");
085    rqs.addWAL(server2,"trash", "trash");
086    rqs.removeQueue(server2,"trash");
087
088    List<ServerName> reps = rqs.getListOfReplicators();
089    assertEquals(2, reps.size());
090    assertTrue(server1.getServerName(), reps.contains(server1));
091    assertTrue(server2.getServerName(), reps.contains(server2));
092
093    assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty());
094    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
095    assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size());
096    assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size());
097    assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0));
098
099    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty());
100    assertEquals(0, rqs.getAllQueues(server2).size());
101    List<String> list = rqs.getAllQueues(server1);
102    assertEquals(3, list.size());
103    assertTrue(list.contains("qId2"));
104    assertTrue(list.contains("qId3"));
105  }
106
107  private void removeAllQueues(ServerName serverName) throws ReplicationException {
108    for (String queue: rqs.getAllQueues(serverName)) {
109      rqs.removeQueue(serverName, queue);
110    }
111  }
112  @Test
113  public void testReplicationQueues() throws ReplicationException {
114    // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
115    rp.init();
116
117    rqs.removeQueue(server1, "bogus");
118    rqs.removeWAL(server1, "bogus", "bogus");
119    removeAllQueues(server1);
120    assertEquals(0, rqs.getAllQueues(server1).size());
121    assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
122    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
123    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty());
124
125    populateQueues();
126
127    assertEquals(3, rqs.getListOfReplicators().size());
128    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
129    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
130    assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
131    rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap());
132    assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
133
134    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
135    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
136    assertEquals(0, rqs.getAllQueues(server1).size());
137    assertEquals(1, rqs.getAllQueues(server2).size());
138    assertEquals(5, rqs.getAllQueues(server3).size());
139
140    assertEquals(0, rqs.getAllQueues(server1).size());
141    rqs.removeReplicatorIfQueueIsEmpty(server1);
142    assertEquals(2, rqs.getListOfReplicators().size());
143
144    List<String> queues = rqs.getAllQueues(server3);
145    assertEquals(5, queues.size());
146    for (String queue : queues) {
147      rqs.claimQueue(server3, queue, server2);
148    }
149    rqs.removeReplicatorIfQueueIsEmpty(server3);
150    assertEquals(1, rqs.getListOfReplicators().size());
151
152    assertEquals(6, rqs.getAllQueues(server2).size());
153    removeAllQueues(server2);
154    rqs.removeReplicatorIfQueueIsEmpty(server2);
155    assertEquals(0, rqs.getListOfReplicators().size());
156  }
157
158  @Test
159  public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
160    rp.init();
161
162    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
163    files1.add(new Pair<>(null, new Path("file_1")));
164    files1.add(new Pair<>(null, new Path("file_2")));
165    files1.add(new Pair<>(null, new Path("file_3")));
166    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
167    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
168    rp.getPeerStorage().addPeer(ID_ONE,
169            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
170    rqs.addPeerToHFileRefs(ID_ONE);
171    rqs.addHFileRefs(ID_ONE, files1);
172    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
173    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
174    List<String> hfiles2 = new ArrayList<>(files1.size());
175    for (Pair<Path, Path> p : files1) {
176      hfiles2.add(p.getSecond().getName());
177    }
178    String removedString = hfiles2.remove(0);
179    rqs.removeHFileRefs(ID_ONE, hfiles2);
180    assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
181    hfiles2 = new ArrayList<>(1);
182    hfiles2.add(removedString);
183    rqs.removeHFileRefs(ID_ONE, hfiles2);
184    assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
185    rp.getPeerStorage().removePeer(ID_ONE);
186  }
187
188  @Test
189  public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
190    rp.init();
191    rp.getPeerStorage().addPeer(ID_ONE,
192      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
193    rqs.addPeerToHFileRefs(ID_ONE);
194    rp.getPeerStorage().addPeer(ID_TWO,
195      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
196    rqs.addPeerToHFileRefs(ID_TWO);
197
198    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
199    files1.add(new Pair<>(null, new Path("file_1")));
200    files1.add(new Pair<>(null, new Path("file_2")));
201    files1.add(new Pair<>(null, new Path("file_3")));
202    rqs.addHFileRefs(ID_ONE, files1);
203    rqs.addHFileRefs(ID_TWO, files1);
204    assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
205    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
206    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
207
208    rp.getPeerStorage().removePeer(ID_ONE);
209    rqs.removePeerFromHFileRefs(ID_ONE);
210    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
211    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
212    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
213
214    rp.getPeerStorage().removePeer(ID_TWO);
215    rqs.removePeerFromHFileRefs(ID_TWO);
216    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
217    assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
218  }
219
220  @Test
221  public void testReplicationPeers() throws Exception {
222    rp.init();
223
224    try {
225      rp.getPeerStorage().setPeerState("bogus", true);
226      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
227    } catch (ReplicationException e) {
228    }
229    try {
230      rp.getPeerStorage().setPeerState("bogus", false);
231      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
232    } catch (ReplicationException e) {
233    }
234
235    try {
236      assertFalse(rp.addPeer("bogus"));
237      fail("Should have thrown an ReplicationException when passed a bogus peerId");
238    } catch (ReplicationException e) {
239    }
240
241    assertNumberOfPeers(0);
242
243    // Add some peers
244    rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
245    assertNumberOfPeers(1);
246    rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
247    assertNumberOfPeers(2);
248
249    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
250        .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
251    rp.getPeerStorage().removePeer(ID_ONE);
252    rp.removePeer(ID_ONE);
253    assertNumberOfPeers(1);
254
255    // Add one peer
256    rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
257    rp.addPeer(ID_ONE);
258    assertNumberOfPeers(2);
259    assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
260    rp.getPeerStorage().setPeerState(ID_ONE, false);
261    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
262    // manually...
263    ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
264    rp.refreshPeerState(peer.getId());
265    assertEquals(PeerState.DISABLED, peer.getPeerState());
266    assertConnectedPeerStatus(false, ID_ONE);
267    rp.getPeerStorage().setPeerState(ID_ONE, true);
268    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
269    // manually...
270    rp.refreshPeerState(peer.getId());
271    assertEquals(PeerState.ENABLED, peer.getPeerState());
272    assertConnectedPeerStatus(true, ID_ONE);
273
274    // Disconnect peer
275    rp.removePeer(ID_ONE);
276    assertNumberOfPeers(2);
277  }
278
279  private String getFileName(String base, int i) {
280    return String.format(base + "-%04d", i);
281  }
282
283  @Test
284  public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
285    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
286    assertTrue(rqs.getAllQueues(serverName1).isEmpty());
287    String queue1 = "1";
288    String region0 = "6b2c8f8555335cc9af74455b94516cbe",
289        region1 = "6ecd2e9e010499f8ddef97ee8f70834f";
290    for (int i = 0; i < 10; i++) {
291      rqs.addWAL(serverName1, queue1, getFileName("file1", i));
292    }
293    List<String> queueIds = rqs.getAllQueues(serverName1);
294    assertEquals(1, queueIds.size());
295    assertThat(queueIds, hasItems("1"));
296
297    List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
298    assertEquals(10, wals1.size());
299    for (int i = 0; i < 10; i++) {
300      assertThat(wals1, hasItems(getFileName("file1", i)));
301    }
302
303    for (int i = 0; i < 10; i++) {
304      assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
305    }
306    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
307    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
308
309    for (int i = 0; i < 10; i++) {
310      rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
311        ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
312    }
313
314    for (int i = 0; i < 10; i++) {
315      assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
316    }
317    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
318    assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
319
320    // Try to decrease the last pushed id by setWALPosition method.
321    rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100,
322      ImmutableMap.of(region0, 899L, region1, 1001L));
323    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
324    assertEquals(1001L, rqs.getLastSequenceId(region1, queue1));
325  }
326
327  protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
328    // we can first check if the value was changed in the store, if it wasn't then fail right away
329    if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
330      fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
331    }
332    while (true) {
333      if (status == rp.getPeer(peerId).isPeerEnabled()) {
334        return;
335      }
336      if (zkTimeoutCount < ZK_MAX_COUNT) {
337        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
338            + ", sleeping and trying again.");
339        Thread.sleep(ZK_SLEEP_INTERVAL);
340      } else {
341        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
342      }
343    }
344  }
345
346  protected void assertNumberOfPeers(int total) throws ReplicationException {
347    assertEquals(total, rp.getPeerStorage().listPeerIds().size());
348  }
349
350  /*
351   * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
352   * 3, 4, 5 log files respectively
353   */
354  protected void populateQueues() throws ReplicationException {
355    rqs.addWAL(server1, "trash", "trash");
356    rqs.removeQueue(server1, "trash");
357
358    rqs.addWAL(server2, "qId1", "trash");
359    rqs.removeWAL(server2, "qId1", "trash");
360
361    for (int i = 1; i < 6; i++) {
362      for (int j = 0; j < i; j++) {
363        rqs.addWAL(server3, "qId" + i, "filename" + j);
364      }
365      // Add peers for the corresponding queues so they are not orphans
366      rp.getPeerStorage().addPeer("qId" + i,
367        ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
368        true);
369    }
370  }
371}