001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.empty;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027
028import java.io.IOException;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseZKTestingUtil;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
039import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
040import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.testclassification.ReplicationTests;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.util.MD5Hash;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hbase.zookeeper.ZKUtil;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
049import org.apache.zookeeper.KeeperException;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059
060import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
061import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
062import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
063import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
064
065@Category({ ReplicationTests.class, MediumTests.class })
066public class TestZKReplicationQueueStorage {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
071
072  private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
073
074  private ZKWatcher zk;
075
076  private ZKReplicationQueueStorageForMigration storage;
077
078  @Rule
079  public final TestName name = new TestName();
080
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    UTIL.startMiniZKCluster();
084  }
085
086  @AfterClass
087  public static void tearDownAfterClass() throws IOException {
088    UTIL.shutdownMiniZKCluster();
089  }
090
091  @Before
092  public void setUp() throws IOException {
093    Configuration conf = UTIL.getConfiguration();
094    conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName());
095    zk = new ZKWatcher(conf, name.getMethodName(), null);
096    storage = new ZKReplicationQueueStorageForMigration(zk, conf);
097  }
098
099  @After
100  public void tearDown() throws Exception {
101    ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode);
102    Closeables.close(zk, true);
103  }
104
105  public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers,
106    String peerId, ServerName deadServer) throws KeeperException {
107    ZKWatcher zk = storage.zookeeper;
108    for (int i = 0; i < nServers; i++) {
109      ServerName sn =
110        ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime());
111      String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString());
112      String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId);
113      ZKUtil.createWithParents(zk, peerZNode);
114      for (int j = 0; j < i; j++) {
115        String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j);
116        ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
117      }
118      String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer);
119      ZKUtil.createWithParents(zk, deadServerPeerZNode);
120      for (int j = 0; j < i; j++) {
121        String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j);
122        if (j > 0) {
123          ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
124        } else {
125          ZKUtil.createWithParents(zk, wal);
126        }
127      }
128      if (i % 2 == 0) {
129        // add a region_replica_replication znode
130        ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(rsZNode,
131          ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER));
132      }
133    }
134    ZKUtil.createWithParents(zk,
135      ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
136  }
137
138  private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName,
139    String peerId) {
140    return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2),
141      encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId);
142  }
143
144  public static Map<String, Set<String>> mockLastPushedSeqIds(
145    ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions,
146    int emptyLevel1Count, int emptyLevel2Count) throws KeeperException {
147    ZKWatcher zk = storage.zookeeper;
148    Map<String, Set<String>> name2PeerIds = new HashMap<>();
149    byte[] bytes = new byte[32];
150    for (int i = 0; i < nRegions; i++) {
151      ThreadLocalRandom.current().nextBytes(bytes);
152      String encodeName = MD5Hash.getMD5AsHex(bytes);
153      String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1);
154      ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1));
155      String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2);
156      ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2));
157      name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2));
158    }
159    int addedEmptyZNodes = 0;
160    for (int i = 0; i < 256; i++) {
161      String level1ZNode =
162        ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i));
163      if (ZKUtil.checkExists(zk, level1ZNode) == -1) {
164        ZKUtil.createWithParents(zk, level1ZNode);
165        addedEmptyZNodes++;
166        if (addedEmptyZNodes <= emptyLevel2Count) {
167          ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab"));
168        }
169        if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) {
170          break;
171        }
172      }
173    }
174    return name2PeerIds;
175  }
176
177  public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers)
178    throws KeeperException {
179    ZKWatcher zk = storage.zookeeper;
180    for (int i = 0; i < nPeers; i++) {
181      String peerId = "peer_" + i;
182      ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId));
183      for (int j = 0; j < i; j++) {
184        ZKUtil.createWithParents(zk,
185          ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j));
186      }
187    }
188  }
189
190  @Test
191  public void testDeleteAllData() throws Exception {
192    assertFalse(storage.hasData());
193    ZKUtil.createWithParents(zk, storage.getQueuesZNode());
194    assertTrue(storage.hasData());
195    storage.deleteAllData();
196    assertFalse(storage.hasData());
197  }
198
199  @Test
200  public void testEmptyIter() throws Exception {
201    ZKUtil.createWithParents(zk, storage.getQueuesZNode());
202    ZKUtil.createWithParents(zk, storage.getRegionsZNode());
203    ZKUtil.createWithParents(zk, storage.getHfileRefsZNode());
204    assertNull(storage.listAllQueues().next());
205    assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
206    assertNull(storage.listAllLastPushedSeqIds().next());
207    assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
208    assertNull(storage.listAllHFileRefs().next());
209    assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
210  }
211
212  @Test
213  public void testListAllQueues() throws Exception {
214    String peerId = "1";
215    ServerName deadServer =
216      ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
217    int nServers = 10;
218    mockQueuesData(storage, nServers, peerId, deadServer);
219    MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
220      storage.listAllQueues();
221    ServerName previousServerName = null;
222    for (int i = 0; i < nServers + 1; i++) {
223      Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
224      assertNotNull(pair);
225      if (previousServerName != null) {
226        int index = previousServerName.equals(deadServer)
227          ? -1
228          : Integer
229            .parseInt(Iterables.getLast(Splitter.on('-').split(previousServerName.getHostname())));
230        if (index % 2 == 0) {
231          List<String> children = ZKUtil.listChildrenNoWatch(zk,
232            ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString()));
233          assertEquals(1, children.size());
234          assertEquals(ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER,
235            children.get(0));
236        } else {
237          assertEquals(-1, ZKUtil.checkExists(zk,
238            ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())));
239        }
240      }
241      ServerName sn = pair.getFirst();
242      previousServerName = sn;
243      if (sn.equals(deadServer)) {
244        assertThat(pair.getSecond(), empty());
245      } else {
246        assertEquals(2, pair.getSecond().size());
247        int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname())));
248        ZkReplicationQueueData data0 = pair.getSecond().get(0);
249        assertEquals(peerId, data0.getQueueId().getPeerId());
250        assertEquals(sn, data0.getQueueId().getServerName());
251        assertEquals(n, data0.getWalOffsets().size());
252        for (int j = 0; j < n; j++) {
253          assertEquals(j,
254            data0.getWalOffsets().get(
255              (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
256              .intValue());
257        }
258        ZkReplicationQueueData data1 = pair.getSecond().get(1);
259        assertEquals(peerId, data1.getQueueId().getPeerId());
260        assertEquals(sn, data1.getQueueId().getServerName());
261        assertEquals(n, data1.getWalOffsets().size());
262        for (int j = 0; j < n; j++) {
263          assertEquals(j,
264            data1.getWalOffsets().get(
265              (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
266              .intValue());
267        }
268        // the order of the returned result is undetermined
269        if (data0.getQueueId().getSourceServerName().isPresent()) {
270          assertEquals(deadServer, data0.getQueueId().getSourceServerName().get());
271          assertFalse(data1.getQueueId().getSourceServerName().isPresent());
272        } else {
273          assertEquals(deadServer, data1.getQueueId().getSourceServerName().get());
274        }
275      }
276    }
277    assertNull(iter.next());
278    assertEquals(nServers / 2, ZKUtil.listChildrenNoWatch(zk, storage.getQueuesZNode()).size());
279  }
280
281  @Test
282  public void testListAllLastPushedSeqIds() throws Exception {
283    String peerId1 = "1";
284    String peerId2 = "2";
285    Map<String, Set<String>> name2PeerIds =
286      mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10);
287    MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds();
288    int emptyListCount = 0;
289    for (;;) {
290      List<ZkLastPushedSeqId> list = iter.next();
291      if (list == null) {
292        break;
293      }
294      if (list.isEmpty()) {
295        emptyListCount++;
296        continue;
297      }
298      for (ZkLastPushedSeqId seqId : list) {
299        name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId());
300        if (seqId.getPeerId().equals(peerId1)) {
301          assertEquals(1, seqId.getLastPushedSeqId());
302        } else {
303          assertEquals(2, seqId.getLastPushedSeqId());
304        }
305      }
306    }
307    assertEquals(10, emptyListCount);
308    name2PeerIds.forEach((encodedRegionName, peerIds) -> {
309      assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
310    });
311    assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
312  }
313
314  @Test
315  public void testListAllHFileRefs() throws Exception {
316    int nPeers = 10;
317    mockHFileRefs(storage, nPeers);
318    MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs();
319    String previousPeerId = null;
320    for (int i = 0; i < nPeers; i++) {
321      Pair<String, List<String>> pair = iter.next();
322      if (previousPeerId != null) {
323        assertEquals(-1, ZKUtil.checkExists(zk,
324          ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId)));
325      }
326      String peerId = pair.getFirst();
327      previousPeerId = peerId;
328      int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId)));
329      assertEquals(index, pair.getSecond().size());
330    }
331    assertNull(iter.next());
332    assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
333  }
334}