001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.empty;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027
028import java.io.IOException;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseZKTestingUtil;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
039import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
040import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.testclassification.ReplicationTests;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.util.MD5Hash;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hbase.zookeeper.ZKUtil;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
049import org.apache.zookeeper.KeeperException;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059
060import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
061import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
062import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
063import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
064
065@Category({ ReplicationTests.class, MediumTests.class })
066public class TestZKReplicationQueueStorage {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
071
072  private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
073
074  private ZKWatcher zk;
075
076  private ZKReplicationQueueStorageForMigration storage;
077
078  @Rule
079  public final TestName name = new TestName();
080
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    UTIL.startMiniZKCluster();
084  }
085
086  @AfterClass
087  public static void tearDownAfterClass() throws IOException {
088    UTIL.shutdownMiniZKCluster();
089  }
090
091  @Before
092  public void setUp() throws IOException {
093    Configuration conf = UTIL.getConfiguration();
094    conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName());
095    zk = new ZKWatcher(conf, name.getMethodName(), null);
096    storage = new ZKReplicationQueueStorageForMigration(zk, conf);
097  }
098
099  @After
100  public void tearDown() throws Exception {
101    ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode);
102    Closeables.close(zk, true);
103  }
104
105  public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers,
106    String peerId, ServerName deadServer) throws KeeperException {
107    ZKWatcher zk = storage.zookeeper;
108    for (int i = 0; i < nServers; i++) {
109      ServerName sn =
110        ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime());
111      String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString());
112      String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId);
113      ZKUtil.createWithParents(zk, peerZNode);
114      for (int j = 0; j < i; j++) {
115        String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j);
116        ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
117      }
118      String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer);
119      ZKUtil.createWithParents(zk, deadServerPeerZNode);
120      for (int j = 0; j < i; j++) {
121        String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j);
122        if (j > 0) {
123          ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
124        } else {
125          ZKUtil.createWithParents(zk, wal);
126        }
127      }
128    }
129    ZKUtil.createWithParents(zk,
130      ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
131  }
132
133  private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName,
134    String peerId) {
135    return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2),
136      encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId);
137  }
138
139  public static Map<String, Set<String>> mockLastPushedSeqIds(
140    ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions,
141    int emptyLevel1Count, int emptyLevel2Count) throws KeeperException {
142    ZKWatcher zk = storage.zookeeper;
143    Map<String, Set<String>> name2PeerIds = new HashMap<>();
144    byte[] bytes = new byte[32];
145    for (int i = 0; i < nRegions; i++) {
146      ThreadLocalRandom.current().nextBytes(bytes);
147      String encodeName = MD5Hash.getMD5AsHex(bytes);
148      String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1);
149      ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1));
150      String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2);
151      ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2));
152      name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2));
153    }
154    int addedEmptyZNodes = 0;
155    for (int i = 0; i < 256; i++) {
156      String level1ZNode =
157        ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i));
158      if (ZKUtil.checkExists(zk, level1ZNode) == -1) {
159        ZKUtil.createWithParents(zk, level1ZNode);
160        addedEmptyZNodes++;
161        if (addedEmptyZNodes <= emptyLevel2Count) {
162          ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab"));
163        }
164        if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) {
165          break;
166        }
167      }
168    }
169    return name2PeerIds;
170  }
171
172  public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers)
173    throws KeeperException {
174    ZKWatcher zk = storage.zookeeper;
175    for (int i = 0; i < nPeers; i++) {
176      String peerId = "peer_" + i;
177      ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId));
178      for (int j = 0; j < i; j++) {
179        ZKUtil.createWithParents(zk,
180          ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j));
181      }
182    }
183  }
184
185  @Test
186  public void testDeleteAllData() throws Exception {
187    assertFalse(storage.hasData());
188    ZKUtil.createWithParents(zk, storage.getQueuesZNode());
189    assertTrue(storage.hasData());
190    storage.deleteAllData();
191    assertFalse(storage.hasData());
192  }
193
194  @Test
195  public void testEmptyIter() throws Exception {
196    ZKUtil.createWithParents(zk, storage.getQueuesZNode());
197    ZKUtil.createWithParents(zk, storage.getRegionsZNode());
198    ZKUtil.createWithParents(zk, storage.getHfileRefsZNode());
199    assertNull(storage.listAllQueues().next());
200    assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
201    assertNull(storage.listAllLastPushedSeqIds().next());
202    assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
203    assertNull(storage.listAllHFileRefs().next());
204    assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
205  }
206
207  @Test
208  public void testListAllQueues() throws Exception {
209    String peerId = "1";
210    ServerName deadServer =
211      ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
212    int nServers = 10;
213    mockQueuesData(storage, nServers, peerId, deadServer);
214    MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
215      storage.listAllQueues();
216    ServerName previousServerName = null;
217    for (int i = 0; i < nServers + 1; i++) {
218      Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
219      assertNotNull(pair);
220      if (previousServerName != null) {
221        assertEquals(-1, ZKUtil.checkExists(zk,
222          ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())));
223      }
224      ServerName sn = pair.getFirst();
225      previousServerName = sn;
226      if (sn.equals(deadServer)) {
227        assertThat(pair.getSecond(), empty());
228      } else {
229        assertEquals(2, pair.getSecond().size());
230        int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname())));
231        ZkReplicationQueueData data0 = pair.getSecond().get(0);
232        assertEquals(peerId, data0.getQueueId().getPeerId());
233        assertEquals(sn, data0.getQueueId().getServerName());
234        assertEquals(n, data0.getWalOffsets().size());
235        for (int j = 0; j < n; j++) {
236          assertEquals(j,
237            data0.getWalOffsets().get(
238              (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
239              .intValue());
240        }
241        ZkReplicationQueueData data1 = pair.getSecond().get(1);
242        assertEquals(peerId, data1.getQueueId().getPeerId());
243        assertEquals(sn, data1.getQueueId().getServerName());
244        assertEquals(n, data1.getWalOffsets().size());
245        for (int j = 0; j < n; j++) {
246          assertEquals(j,
247            data1.getWalOffsets().get(
248              (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
249              .intValue());
250        }
251        // the order of the returned result is undetermined
252        if (data0.getQueueId().getSourceServerName().isPresent()) {
253          assertEquals(deadServer, data0.getQueueId().getSourceServerName().get());
254          assertFalse(data1.getQueueId().getSourceServerName().isPresent());
255        } else {
256          assertEquals(deadServer, data1.getQueueId().getSourceServerName().get());
257        }
258      }
259    }
260    assertNull(iter.next());
261    assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
262  }
263
264  @Test
265  public void testListAllLastPushedSeqIds() throws Exception {
266    String peerId1 = "1";
267    String peerId2 = "2";
268    Map<String, Set<String>> name2PeerIds =
269      mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10);
270    MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds();
271    int emptyListCount = 0;
272    for (;;) {
273      List<ZkLastPushedSeqId> list = iter.next();
274      if (list == null) {
275        break;
276      }
277      if (list.isEmpty()) {
278        emptyListCount++;
279        continue;
280      }
281      for (ZkLastPushedSeqId seqId : list) {
282        name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId());
283        if (seqId.getPeerId().equals(peerId1)) {
284          assertEquals(1, seqId.getLastPushedSeqId());
285        } else {
286          assertEquals(2, seqId.getLastPushedSeqId());
287        }
288      }
289    }
290    assertEquals(10, emptyListCount);
291    name2PeerIds.forEach((encodedRegionName, peerIds) -> {
292      assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
293    });
294    assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
295  }
296
297  @Test
298  public void testListAllHFileRefs() throws Exception {
299    int nPeers = 10;
300    mockHFileRefs(storage, nPeers);
301    MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs();
302    String previousPeerId = null;
303    for (int i = 0; i < nPeers; i++) {
304      Pair<String, List<String>> pair = iter.next();
305      if (previousPeerId != null) {
306        assertEquals(-1, ZKUtil.checkExists(zk,
307          ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId)));
308      }
309      String peerId = pair.getFirst();
310      previousPeerId = peerId;
311      int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId)));
312      assertEquals(index, pair.getSecond().size());
313    }
314    assertNull(iter.next());
315    assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
316  }
317}