001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication;
020
021import static org.junit.Assert.*;
022
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.util.Pair;
029import org.apache.hadoop.hbase.zookeeper.ZKConfig;
030import org.apache.zookeeper.KeeperException;
031import org.junit.Before;
032import org.junit.Test;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * White box testing for replication state interfaces. Implementations should extend this class, and
038 * initialize the interfaces properly.
039 */
040public abstract class TestReplicationStateBasic {
041
042  protected ReplicationQueues rq1;
043  protected ReplicationQueues rq2;
044  protected ReplicationQueues rq3;
045  protected ReplicationQueuesClient rqc;
046  protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
047  protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
048  protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
049  protected ReplicationPeers rp;
050  protected static final String ID_ONE = "1";
051  protected static final String ID_TWO = "2";
052  protected static String KEY_ONE;
053  protected static String KEY_TWO;
054
055  // For testing when we try to replicate to ourself
056  protected String OUR_ID = "3";
057  protected String OUR_KEY;
058
059  protected static int zkTimeoutCount;
060  protected static final int ZK_MAX_COUNT = 300;
061  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
064
065  @Before
066  public void setUp() {
067    zkTimeoutCount = 0;
068  }
069
070  @Test
071  public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
072    rqc.init();
073    // Test methods with empty state
074    assertEquals(0, rqc.getListOfReplicators().size());
075    assertNull(rqc.getLogsInQueue(server1, "qId1"));
076    assertNull(rqc.getAllQueues(server1));
077
078    /*
079     * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
080     * server2: zero queues
081     */
082    rq1.init(server1);
083    rq2.init(server2);
084    rq1.addLog("qId1", "trash");
085    rq1.removeLog("qId1", "trash");
086    rq1.addLog("qId2", "filename1");
087    rq1.addLog("qId3", "filename2");
088    rq1.addLog("qId3", "filename3");
089    rq2.addLog("trash", "trash");
090    rq2.removeQueue("trash");
091
092    List<String> reps = rqc.getListOfReplicators();
093    assertEquals(2, reps.size());
094    assertTrue(server1, reps.contains(server1));
095    assertTrue(server2, reps.contains(server2));
096
097    assertNull(rqc.getLogsInQueue("bogus", "bogus"));
098    assertNull(rqc.getLogsInQueue(server1, "bogus"));
099    assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
100    assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
101    assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
102
103    assertNull(rqc.getAllQueues("bogus"));
104    assertEquals(0, rqc.getAllQueues(server2).size());
105    List<String> list = rqc.getAllQueues(server1);
106    assertEquals(3, list.size());
107    assertTrue(list.contains("qId2"));
108    assertTrue(list.contains("qId3"));
109  }
110
111  @Test
112  public void testReplicationQueues() throws ReplicationException {
113    rq1.init(server1);
114    rq2.init(server2);
115    rq3.init(server3);
116    //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
117    rp.init();
118
119    // 3 replicators should exist
120    assertEquals(3, rq1.getListOfReplicators().size());
121    rq1.removeQueue("bogus");
122    rq1.removeLog("bogus", "bogus");
123    rq1.removeAllQueues();
124    assertEquals(0, rq1.getAllQueues().size());
125    assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
126    assertNull(rq1.getLogsInQueue("bogus"));
127    assertNull(rq1.getUnClaimedQueueIds(
128        ServerName.valueOf("bogus", 1234, -1L).toString()));
129
130    rq1.setLogPosition("bogus", "bogus", 5L);
131
132    populateQueues();
133
134    assertEquals(3, rq1.getListOfReplicators().size());
135    assertEquals(0, rq2.getLogsInQueue("qId1").size());
136    assertEquals(5, rq3.getLogsInQueue("qId5").size());
137    assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
138    rq3.setLogPosition("qId5", "filename4", 354L);
139    assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
140
141    assertEquals(5, rq3.getLogsInQueue("qId5").size());
142    assertEquals(0, rq2.getLogsInQueue("qId1").size());
143    assertEquals(0, rq1.getAllQueues().size());
144    assertEquals(1, rq2.getAllQueues().size());
145    assertEquals(5, rq3.getAllQueues().size());
146
147    assertEquals(0, rq3.getUnClaimedQueueIds(server1).size());
148    rq3.removeReplicatorIfQueueIsEmpty(server1);
149    assertEquals(2, rq3.getListOfReplicators().size());
150
151    List<String> queues = rq2.getUnClaimedQueueIds(server3);
152    assertEquals(5, queues.size());
153    for(String queue: queues) {
154      rq2.claimQueue(server3, queue);
155    }
156    rq2.removeReplicatorIfQueueIsEmpty(server3);
157    assertEquals(1, rq2.getListOfReplicators().size());
158
159    // Try to claim our own queues
160    assertNull(rq2.getUnClaimedQueueIds(server2));
161    rq2.removeReplicatorIfQueueIsEmpty(server2);
162
163    assertEquals(6, rq2.getAllQueues().size());
164
165    rq2.removeAllQueues();
166
167    assertEquals(0, rq2.getListOfReplicators().size());
168  }
169
170  @Test
171  public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
172    rp.init();
173
174    try {
175      rp.registerPeer(ID_ONE,
176        new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
177      fail("Should throw an IllegalArgumentException because "
178            + "zookeeper.znode.parent is missing leading '/'.");
179    } catch (IllegalArgumentException e) {
180      // Expected.
181    }
182
183    try {
184      rp.registerPeer(ID_ONE,
185        new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
186      fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
187    } catch (IllegalArgumentException e) {
188      // Expected.
189    }
190
191    try {
192      rp.registerPeer(ID_ONE,
193        new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
194      fail("Should throw an IllegalArgumentException because "
195          + "hbase.zookeeper.property.clientPort is missing.");
196    } catch (IllegalArgumentException e) {
197      // Expected.
198    }
199  }
200
201  @Test
202  public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
203    rp.init();
204    rq1.init(server1);
205    rqc.init();
206
207    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
208    files1.add(new Pair<>(null, new Path("file_1")));
209    files1.add(new Pair<>(null, new Path("file_2")));
210    files1.add(new Pair<>(null, new Path("file_3")));
211    assertNull(rqc.getReplicableHFiles(ID_ONE));
212    assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
213    rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
214    rq1.addPeerToHFileRefs(ID_ONE);
215    rq1.addHFileRefs(ID_ONE, files1);
216    assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
217    assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
218    List<String> hfiles2 = new ArrayList<>(files1.size());
219    for (Pair<Path, Path> p : files1) {
220      hfiles2.add(p.getSecond().getName());
221    }
222    String removedString = hfiles2.remove(0);
223    rq1.removeHFileRefs(ID_ONE, hfiles2);
224    assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
225    hfiles2 = new ArrayList<>(1);
226    hfiles2.add(removedString);
227    rq1.removeHFileRefs(ID_ONE, hfiles2);
228    assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
229    rp.unregisterPeer(ID_ONE);
230  }
231
232  @Test
233  public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
234    rq1.init(server1);
235    rqc.init();
236
237    rp.init();
238    rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
239    rq1.addPeerToHFileRefs(ID_ONE);
240    rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
241    rq1.addPeerToHFileRefs(ID_TWO);
242
243    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
244    files1.add(new Pair<>(null, new Path("file_1")));
245    files1.add(new Pair<>(null, new Path("file_2")));
246    files1.add(new Pair<>(null, new Path("file_3")));
247    rq1.addHFileRefs(ID_ONE, files1);
248    rq1.addHFileRefs(ID_TWO, files1);
249    assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
250    assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
251    assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
252
253    rp.unregisterPeer(ID_ONE);
254    rq1.removePeerFromHFileRefs(ID_ONE);
255    assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
256    assertNull(rqc.getReplicableHFiles(ID_ONE));
257    assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
258
259    rp.unregisterPeer(ID_TWO);
260    rq1.removePeerFromHFileRefs(ID_TWO);
261    assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
262    assertNull(rqc.getReplicableHFiles(ID_TWO));
263  }
264
265  @Test
266  public void testReplicationPeers() throws Exception {
267    rp.init();
268
269    // Test methods with non-existent peer ids
270    try {
271      rp.unregisterPeer("bogus");
272      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
273    } catch (IllegalArgumentException e) {
274    }
275    try {
276      rp.enablePeer("bogus");
277      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
278    } catch (IllegalArgumentException e) {
279    }
280    try {
281      rp.disablePeer("bogus");
282      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
283    } catch (IllegalArgumentException e) {
284    }
285    try {
286      rp.getStatusOfPeer("bogus");
287      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
288    } catch (IllegalArgumentException e) {
289    }
290    assertFalse(rp.peerConnected("bogus"));
291    rp.peerDisconnected("bogus");
292
293    assertNull(rp.getPeerConf("bogus"));
294    assertNumberOfPeers(0);
295
296    // Add some peers
297    rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
298    assertNumberOfPeers(1);
299    rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
300    assertNumberOfPeers(2);
301
302    // Test methods with a peer that is added but not connected
303    try {
304      rp.getStatusOfPeer(ID_ONE);
305      fail("There are no connected peers, should have thrown an IllegalArgumentException");
306    } catch (IllegalArgumentException e) {
307    }
308    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
309    rp.unregisterPeer(ID_ONE);
310    rp.peerDisconnected(ID_ONE);
311    assertNumberOfPeers(1);
312
313    // Add one peer
314    rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
315    rp.peerConnected(ID_ONE);
316    assertNumberOfPeers(2);
317    assertTrue(rp.getStatusOfPeer(ID_ONE));
318    rp.disablePeer(ID_ONE);
319    assertConnectedPeerStatus(false, ID_ONE);
320    rp.enablePeer(ID_ONE);
321    assertConnectedPeerStatus(true, ID_ONE);
322
323    // Disconnect peer
324    rp.peerDisconnected(ID_ONE);
325    assertNumberOfPeers(2);
326    try {
327      rp.getStatusOfPeer(ID_ONE);
328      fail("There are no connected peers, should have thrown an IllegalArgumentException");
329    } catch (IllegalArgumentException e) {
330    }
331  }
332
333  protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
334    // we can first check if the value was changed in the store, if it wasn't then fail right away
335    if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
336      fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
337    }
338    while (true) {
339      if (status == rp.getStatusOfPeer(peerId)) {
340        return;
341      }
342      if (zkTimeoutCount < ZK_MAX_COUNT) {
343        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
344            + ", sleeping and trying again.");
345        Thread.sleep(ZK_SLEEP_INTERVAL);
346      } else {
347        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
348      }
349    }
350  }
351
352  protected void assertNumberOfPeers(int total) {
353    assertEquals(total, rp.getAllPeerConfigs().size());
354    assertEquals(total, rp.getAllPeerIds().size());
355    assertEquals(total, rp.getAllPeerIds().size());
356  }
357
358  /*
359   * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
360   * 3, 4, 5 log files respectively
361   */
362  protected void populateQueues() throws ReplicationException {
363    rq1.addLog("trash", "trash");
364    rq1.removeQueue("trash");
365
366    rq2.addLog("qId1", "trash");
367    rq2.removeLog("qId1", "trash");
368
369    for (int i = 1; i < 6; i++) {
370      for (int j = 0; j < i; j++) {
371        rq3.addLog("qId" + i, "filename" + j);
372      }
373      //Add peers for the corresponding queues so they are not orphans
374      rp.registerPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
375    }
376  }
377}
378