001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.empty;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertNotNull;
025import static org.junit.jupiter.api.Assertions.assertNull;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027
028import java.io.IOException;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseZKTestingUtil;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
038import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
039import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.testclassification.ReplicationTests;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.hbase.util.MD5Hash;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.hbase.zookeeper.ZKUtil;
046import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
047import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
048import org.apache.zookeeper.KeeperException;
049import org.junit.jupiter.api.AfterAll;
050import org.junit.jupiter.api.AfterEach;
051import org.junit.jupiter.api.BeforeAll;
052import org.junit.jupiter.api.BeforeEach;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.junit.jupiter.api.TestInfo;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
058import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
059import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
060import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
061
062@Tag(ReplicationTests.TAG)
063@Tag(MediumTests.TAG)
064public class TestZKReplicationQueueStorage {
065
066  private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
067
068  private ZKWatcher zk;
069
070  private ZKReplicationQueueStorageForMigration storage;
071
072  @BeforeAll
073  public static void setUpBeforeClass() throws Exception {
074    UTIL.startMiniZKCluster();
075  }
076
077  @AfterAll
078  public static void tearDownAfterClass() throws IOException {
079    UTIL.shutdownMiniZKCluster();
080  }
081
082  @BeforeEach
083  public void setUp(TestInfo testInfo) throws IOException {
084    String methodName = testInfo.getTestMethod().get().getName();
085    Configuration conf = UTIL.getConfiguration();
086    conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, methodName);
087    zk = new ZKWatcher(conf, methodName, null);
088    storage = new ZKReplicationQueueStorageForMigration(zk, conf);
089  }
090
091  @AfterEach
092  public void tearDown() throws Exception {
093    ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode);
094    Closeables.close(zk, true);
095  }
096
097  public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers,
098    String peerId, ServerName deadServer) throws KeeperException {
099    ZKWatcher zk = storage.zookeeper;
100    for (int i = 0; i < nServers; i++) {
101      ServerName sn =
102        ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime());
103      String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString());
104      String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId);
105      ZKUtil.createWithParents(zk, peerZNode);
106      for (int j = 0; j < i; j++) {
107        String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j);
108        ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
109      }
110      String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer);
111      ZKUtil.createWithParents(zk, deadServerPeerZNode);
112      for (int j = 0; j < i; j++) {
113        String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j);
114        if (j > 0) {
115          ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
116        } else {
117          ZKUtil.createWithParents(zk, wal);
118        }
119      }
120      if (i % 2 == 0) {
121        // add a region_replica_replication znode
122        ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(rsZNode,
123          ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER));
124      }
125    }
126    ZKUtil.createWithParents(zk,
127      ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
128  }
129
130  private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName,
131    String peerId) {
132    return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2),
133      encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId);
134  }
135
136  public static Map<String, Set<String>> mockLastPushedSeqIds(
137    ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions,
138    int emptyLevel1Count, int emptyLevel2Count) throws KeeperException {
139    ZKWatcher zk = storage.zookeeper;
140    Map<String, Set<String>> name2PeerIds = new HashMap<>();
141    byte[] bytes = new byte[32];
142    for (int i = 0; i < nRegions; i++) {
143      ThreadLocalRandom.current().nextBytes(bytes);
144      String encodeName = MD5Hash.getMD5AsHex(bytes);
145      String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1);
146      ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1));
147      String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2);
148      ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2));
149      name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2));
150    }
151    int addedEmptyZNodes = 0;
152    for (int i = 0; i < 256; i++) {
153      String level1ZNode =
154        ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i));
155      if (ZKUtil.checkExists(zk, level1ZNode) == -1) {
156        ZKUtil.createWithParents(zk, level1ZNode);
157        addedEmptyZNodes++;
158        if (addedEmptyZNodes <= emptyLevel2Count) {
159          ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab"));
160        }
161        if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) {
162          break;
163        }
164      }
165    }
166    return name2PeerIds;
167  }
168
169  public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers)
170    throws KeeperException {
171    ZKWatcher zk = storage.zookeeper;
172    for (int i = 0; i < nPeers; i++) {
173      String peerId = "peer_" + i;
174      ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId));
175      for (int j = 0; j < i; j++) {
176        ZKUtil.createWithParents(zk,
177          ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j));
178      }
179    }
180  }
181
182  @Test
183  public void testDeleteAllData() throws Exception {
184    assertFalse(storage.hasData());
185    ZKUtil.createWithParents(zk, storage.getQueuesZNode());
186    assertTrue(storage.hasData());
187    storage.deleteAllData();
188    assertFalse(storage.hasData());
189  }
190
191  @Test
192  public void testEmptyIter() throws Exception {
193    ZKUtil.createWithParents(zk, storage.getQueuesZNode());
194    ZKUtil.createWithParents(zk, storage.getRegionsZNode());
195    ZKUtil.createWithParents(zk, storage.getHfileRefsZNode());
196    assertNull(storage.listAllQueues().next());
197    assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
198    assertNull(storage.listAllLastPushedSeqIds().next());
199    assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
200    assertNull(storage.listAllHFileRefs().next());
201    assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
202  }
203
204  @Test
205  public void testListAllQueues() throws Exception {
206    String peerId = "1";
207    ServerName deadServer =
208      ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
209    int nServers = 10;
210    mockQueuesData(storage, nServers, peerId, deadServer);
211    MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
212      storage.listAllQueues();
213    ServerName previousServerName = null;
214    for (int i = 0; i < nServers + 1; i++) {
215      Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
216      assertNotNull(pair);
217      if (previousServerName != null) {
218        int index = previousServerName.equals(deadServer)
219          ? -1
220          : Integer
221            .parseInt(Iterables.getLast(Splitter.on('-').split(previousServerName.getHostname())));
222        if (index % 2 == 0) {
223          List<String> children = ZKUtil.listChildrenNoWatch(zk,
224            ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString()));
225          assertEquals(1, children.size());
226          assertEquals(ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER,
227            children.get(0));
228        } else {
229          assertEquals(-1, ZKUtil.checkExists(zk,
230            ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())));
231        }
232      }
233      ServerName sn = pair.getFirst();
234      previousServerName = sn;
235      if (sn.equals(deadServer)) {
236        assertThat(pair.getSecond(), empty());
237      } else {
238        assertEquals(2, pair.getSecond().size());
239        int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname())));
240        ZkReplicationQueueData data0 = pair.getSecond().get(0);
241        assertEquals(peerId, data0.getQueueId().getPeerId());
242        assertEquals(sn, data0.getQueueId().getServerName());
243        assertEquals(n, data0.getWalOffsets().size());
244        for (int j = 0; j < n; j++) {
245          assertEquals(j,
246            data0.getWalOffsets().get(
247              (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
248              .intValue());
249        }
250        ZkReplicationQueueData data1 = pair.getSecond().get(1);
251        assertEquals(peerId, data1.getQueueId().getPeerId());
252        assertEquals(sn, data1.getQueueId().getServerName());
253        assertEquals(n, data1.getWalOffsets().size());
254        for (int j = 0; j < n; j++) {
255          assertEquals(j,
256            data1.getWalOffsets().get(
257              (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
258              .intValue());
259        }
260        // the order of the returned result is undetermined
261        if (data0.getQueueId().getSourceServerName().isPresent()) {
262          assertEquals(deadServer, data0.getQueueId().getSourceServerName().get());
263          assertFalse(data1.getQueueId().getSourceServerName().isPresent());
264        } else {
265          assertEquals(deadServer, data1.getQueueId().getSourceServerName().get());
266        }
267      }
268    }
269    assertNull(iter.next());
270    assertEquals(nServers / 2, ZKUtil.listChildrenNoWatch(zk, storage.getQueuesZNode()).size());
271  }
272
273  @Test
274  public void testListAllLastPushedSeqIds() throws Exception {
275    String peerId1 = "1";
276    String peerId2 = "2";
277    Map<String, Set<String>> name2PeerIds =
278      mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10);
279    MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds();
280    int emptyListCount = 0;
281    for (;;) {
282      List<ZkLastPushedSeqId> list = iter.next();
283      if (list == null) {
284        break;
285      }
286      if (list.isEmpty()) {
287        emptyListCount++;
288        continue;
289      }
290      for (ZkLastPushedSeqId seqId : list) {
291        name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId());
292        if (seqId.getPeerId().equals(peerId1)) {
293          assertEquals(1, seqId.getLastPushedSeqId());
294        } else {
295          assertEquals(2, seqId.getLastPushedSeqId());
296        }
297      }
298    }
299    assertEquals(10, emptyListCount);
300    name2PeerIds.forEach((encodedRegionName, peerIds) -> {
301      assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
302    });
303    assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
304  }
305
306  @Test
307  public void testListAllHFileRefs() throws Exception {
308    int nPeers = 10;
309    mockHFileRefs(storage, nPeers);
310    MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs();
311    String previousPeerId = null;
312    for (int i = 0; i < nPeers; i++) {
313      Pair<String, List<String>> pair = iter.next();
314      if (previousPeerId != null) {
315        assertEquals(-1, ZKUtil.checkExists(zk,
316          ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId)));
317      }
318      String peerId = pair.getFirst();
319      previousPeerId = peerId;
320      int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId)));
321      assertEquals(index, pair.getSecond().size());
322    }
323    assertNull(iter.next());
324    assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
325  }
326}