001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.hasItems;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
036import org.apache.hadoop.hbase.zookeeper.ZKConfig;
037import org.apache.zookeeper.KeeperException;
038import org.junit.Test;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
043
044/**
045 * White box testing for replication state interfaces. Implementations should extend this class, and
046 * initialize the interfaces properly.
047 */
048public abstract class TestReplicationStateBasic {
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
051
052  protected ReplicationQueueStorage rqs;
053  protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
054  protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
055  protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345);
056  protected ReplicationPeers rp;
057  protected static final String ID_ONE = "1";
058  protected static final String ID_TWO = "2";
059  protected static String KEY_ONE;
060  protected static String KEY_TWO;
061
062  // For testing when we try to replicate to ourself
063  protected String OUR_KEY;
064
065  protected static int zkTimeoutCount;
066  protected static final int ZK_MAX_COUNT = 300;
067  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
068
069  @Test
070  public void testReplicationQueueStorage() throws ReplicationException {
071    // Test methods with empty state
072    assertEquals(0, rqs.getListOfReplicators().size());
073    assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty());
074    assertTrue(rqs.getAllQueues(server1).isEmpty());
075
076    /*
077     * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
078     * server2: zero queues
079     */
080    rqs.addWAL(server1, "qId1", "trash");
081    rqs.removeWAL(server1, "qId1", "trash");
082    rqs.addWAL(server1, "qId2", "filename1");
083    rqs.addWAL(server1, "qId3", "filename2");
084    rqs.addWAL(server1, "qId3", "filename3");
085    rqs.addWAL(server2, "trash", "trash");
086    rqs.removeQueue(server2, "trash");
087
088    List<ServerName> reps = rqs.getListOfReplicators();
089    assertEquals(2, reps.size());
090    assertTrue(server1.getServerName(), reps.contains(server1));
091    assertTrue(server2.getServerName(), reps.contains(server2));
092
093    assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty());
094    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
095    assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size());
096    assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size());
097    assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0));
098
099    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty());
100    assertEquals(0, rqs.getAllQueues(server2).size());
101    List<String> list = rqs.getAllQueues(server1);
102    assertEquals(3, list.size());
103    assertTrue(list.contains("qId2"));
104    assertTrue(list.contains("qId3"));
105  }
106
107  private void removeAllQueues(ServerName serverName) throws ReplicationException {
108    for (String queue : rqs.getAllQueues(serverName)) {
109      rqs.removeQueue(serverName, queue);
110    }
111  }
112
113  @Test
114  public void testReplicationQueues() throws ReplicationException {
115    // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
116    rp.init();
117
118    rqs.removeQueue(server1, "bogus");
119    rqs.removeWAL(server1, "bogus", "bogus");
120    removeAllQueues(server1);
121    assertEquals(0, rqs.getAllQueues(server1).size());
122    assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
123    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
124    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty());
125
126    populateQueues();
127
128    assertEquals(3, rqs.getListOfReplicators().size());
129    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
130    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
131    assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
132    rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap());
133    assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
134
135    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
136    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
137    assertEquals(0, rqs.getAllQueues(server1).size());
138    assertEquals(1, rqs.getAllQueues(server2).size());
139    assertEquals(5, rqs.getAllQueues(server3).size());
140
141    assertEquals(0, rqs.getAllQueues(server1).size());
142    rqs.removeReplicatorIfQueueIsEmpty(server1);
143    assertEquals(2, rqs.getListOfReplicators().size());
144
145    List<String> queues = rqs.getAllQueues(server3);
146    assertEquals(5, queues.size());
147    for (String queue : queues) {
148      rqs.claimQueue(server3, queue, server2);
149    }
150    rqs.removeReplicatorIfQueueIsEmpty(server3);
151    assertEquals(1, rqs.getListOfReplicators().size());
152
153    assertEquals(6, rqs.getAllQueues(server2).size());
154    removeAllQueues(server2);
155    rqs.removeReplicatorIfQueueIsEmpty(server2);
156    assertEquals(0, rqs.getListOfReplicators().size());
157  }
158
159  @Test
160  public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
161    rp.init();
162
163    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
164    files1.add(new Pair<>(null, new Path("file_1")));
165    files1.add(new Pair<>(null, new Path("file_2")));
166    files1.add(new Pair<>(null, new Path("file_3")));
167    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
168    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
169    rp.getPeerStorage().addPeer(ID_ONE,
170      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
171      SyncReplicationState.NONE);
172    rqs.addPeerToHFileRefs(ID_ONE);
173    rqs.addHFileRefs(ID_ONE, files1);
174    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
175    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
176    List<String> hfiles2 = new ArrayList<>(files1.size());
177    for (Pair<Path, Path> p : files1) {
178      hfiles2.add(p.getSecond().getName());
179    }
180    String removedString = hfiles2.remove(0);
181    rqs.removeHFileRefs(ID_ONE, hfiles2);
182    assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
183    hfiles2 = new ArrayList<>(1);
184    hfiles2.add(removedString);
185    rqs.removeHFileRefs(ID_ONE, hfiles2);
186    assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
187    rp.getPeerStorage().removePeer(ID_ONE);
188  }
189
190  @Test
191  public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
192    rp.init();
193    rp.getPeerStorage().addPeer(ID_ONE,
194      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
195      SyncReplicationState.NONE);
196    rqs.addPeerToHFileRefs(ID_ONE);
197    rp.getPeerStorage().addPeer(ID_TWO,
198      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true,
199      SyncReplicationState.NONE);
200    rqs.addPeerToHFileRefs(ID_TWO);
201
202    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
203    files1.add(new Pair<>(null, new Path("file_1")));
204    files1.add(new Pair<>(null, new Path("file_2")));
205    files1.add(new Pair<>(null, new Path("file_3")));
206    rqs.addHFileRefs(ID_ONE, files1);
207    rqs.addHFileRefs(ID_TWO, files1);
208    assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
209    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
210    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
211
212    rp.getPeerStorage().removePeer(ID_ONE);
213    rqs.removePeerFromHFileRefs(ID_ONE);
214    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
215    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
216    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
217
218    rp.getPeerStorage().removePeer(ID_TWO);
219    rqs.removePeerFromHFileRefs(ID_TWO);
220    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
221    assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
222  }
223
224  @Test
225  public void testReplicationPeers() throws Exception {
226    rp.init();
227
228    try {
229      rp.getPeerStorage().setPeerState("bogus", true);
230      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
231    } catch (ReplicationException e) {
232    }
233    try {
234      rp.getPeerStorage().setPeerState("bogus", false);
235      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
236    } catch (ReplicationException e) {
237    }
238
239    try {
240      assertFalse(rp.addPeer("bogus"));
241      fail("Should have thrown an ReplicationException when passed a bogus peerId");
242    } catch (ReplicationException e) {
243    }
244
245    assertNumberOfPeers(0);
246
247    // Add some peers
248    rp.getPeerStorage().addPeer(ID_ONE,
249      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
250      SyncReplicationState.NONE);
251    assertNumberOfPeers(1);
252    rp.getPeerStorage().addPeer(ID_TWO,
253      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true,
254      SyncReplicationState.NONE);
255    assertNumberOfPeers(2);
256
257    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
258      .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
259    rp.getPeerStorage().removePeer(ID_ONE);
260    rp.removePeer(ID_ONE);
261    assertNumberOfPeers(1);
262
263    // Add one peer
264    rp.getPeerStorage().addPeer(ID_ONE,
265      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
266      SyncReplicationState.NONE);
267    rp.addPeer(ID_ONE);
268    assertNumberOfPeers(2);
269    assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
270    rp.getPeerStorage().setPeerState(ID_ONE, false);
271    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
272    // manually...
273    ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
274    rp.refreshPeerState(peer.getId());
275    assertEquals(PeerState.DISABLED, peer.getPeerState());
276    assertConnectedPeerStatus(false, ID_ONE);
277    rp.getPeerStorage().setPeerState(ID_ONE, true);
278    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
279    // manually...
280    rp.refreshPeerState(peer.getId());
281    assertEquals(PeerState.ENABLED, peer.getPeerState());
282    assertConnectedPeerStatus(true, ID_ONE);
283
284    // Disconnect peer
285    rp.removePeer(ID_ONE);
286    assertNumberOfPeers(2);
287  }
288
289  private String getFileName(String base, int i) {
290    return String.format(base + "-%04d", i);
291  }
292
293  @Test
294  public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
295    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
296    assertTrue(rqs.getAllQueues(serverName1).isEmpty());
297    String queue1 = "1";
298    String region0 = "6b2c8f8555335cc9af74455b94516cbe",
299        region1 = "6ecd2e9e010499f8ddef97ee8f70834f";
300    for (int i = 0; i < 10; i++) {
301      rqs.addWAL(serverName1, queue1, getFileName("file1", i));
302    }
303    List<String> queueIds = rqs.getAllQueues(serverName1);
304    assertEquals(1, queueIds.size());
305    assertThat(queueIds, hasItems("1"));
306
307    List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
308    assertEquals(10, wals1.size());
309    for (int i = 0; i < 10; i++) {
310      assertThat(wals1, hasItems(getFileName("file1", i)));
311    }
312
313    for (int i = 0; i < 10; i++) {
314      assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
315    }
316    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
317    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
318
319    for (int i = 0; i < 10; i++) {
320      rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
321        ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
322    }
323
324    for (int i = 0; i < 10; i++) {
325      assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
326    }
327    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
328    assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
329
330    // Try to decrease the last pushed id by setWALPosition method.
331    rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100,
332      ImmutableMap.of(region0, 899L, region1, 1001L));
333    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
334    assertEquals(1001L, rqs.getLastSequenceId(region1, queue1));
335  }
336
337  protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
338    // we can first check if the value was changed in the store, if it wasn't then fail right away
339    if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
340      fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
341    }
342    while (true) {
343      if (status == rp.getPeer(peerId).isPeerEnabled()) {
344        return;
345      }
346      if (zkTimeoutCount < ZK_MAX_COUNT) {
347        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
348          + ", sleeping and trying again.");
349        Thread.sleep(ZK_SLEEP_INTERVAL);
350      } else {
351        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
352      }
353    }
354  }
355
356  protected void assertNumberOfPeers(int total) throws ReplicationException {
357    assertEquals(total, rp.getPeerStorage().listPeerIds().size());
358  }
359
360  /*
361   * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
362   * 3, 4, 5 log files respectively
363   */
364  protected void populateQueues() throws ReplicationException {
365    rqs.addWAL(server1, "trash", "trash");
366    rqs.removeQueue(server1, "trash");
367
368    rqs.addWAL(server2, "qId1", "trash");
369    rqs.removeWAL(server2, "qId1", "trash");
370
371    for (int i = 1; i < 6; i++) {
372      for (int j = 0; j < i; j++) {
373        rqs.addWAL(server3, "qId" + i, "filename" + j);
374      }
375      // Add peers for the corresponding queues so they are not orphans
376      rp.getPeerStorage().addPeer("qId" + i,
377        ReplicationPeerConfig.newBuilder()
378          .setClusterKey(MiniZooKeeperCluster.HOST + ":2818:/bogus" + i).build(),
379        true, SyncReplicationState.NONE);
380    }
381  }
382}