001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.hasItems;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import java.util.SortedSet;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseZKTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.testclassification.ReplicationTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.MD5Hash;
040import org.apache.hadoop.hbase.util.Pair;
041import org.apache.hadoop.hbase.zookeeper.ZKUtil;
042import org.apache.zookeeper.KeeperException;
043import org.junit.After;
044import org.junit.AfterClass;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
051
052@Category({ ReplicationTests.class, MediumTests.class })
053public class TestZKReplicationQueueStorage {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
058
059  private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
060
061  private static ZKReplicationQueueStorage STORAGE;
062
063  @BeforeClass
064  public static void setUp() throws Exception {
065    UTIL.startMiniZKCluster();
066    STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
067  }
068
069  @AfterClass
070  public static void tearDown() throws IOException {
071    UTIL.shutdownMiniZKCluster();
072  }
073
074  @After
075  public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException {
076    for (ServerName serverName : STORAGE.getListOfReplicators()) {
077      for (String queue : STORAGE.getAllQueues(serverName)) {
078        STORAGE.removeQueue(serverName, queue);
079      }
080      STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
081    }
082    for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
083      STORAGE.removePeerFromHFileRefs(peerId);
084    }
085  }
086
087  private ServerName getServerName(int i) {
088    return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
089  }
090
091  @Test
092  public void testReplicator() throws ReplicationException {
093    assertTrue(STORAGE.getListOfReplicators().isEmpty());
094    String queueId = "1";
095    for (int i = 0; i < 10; i++) {
096      STORAGE.addWAL(getServerName(i), queueId, "file" + i);
097    }
098    List<ServerName> replicators = STORAGE.getListOfReplicators();
099    assertEquals(10, replicators.size());
100    for (int i = 0; i < 10; i++) {
101      assertThat(replicators, hasItems(getServerName(i)));
102    }
103    for (int i = 0; i < 5; i++) {
104      STORAGE.removeQueue(getServerName(i), queueId);
105    }
106    for (int i = 0; i < 10; i++) {
107      STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
108    }
109    replicators = STORAGE.getListOfReplicators();
110    assertEquals(5, replicators.size());
111    for (int i = 5; i < 10; i++) {
112      assertThat(replicators, hasItems(getServerName(i)));
113    }
114  }
115
116  private String getFileName(String base, int i) {
117    return String.format(base + "-%04d", i);
118  }
119
120  @Test
121  public void testAddRemoveLog() throws ReplicationException {
122    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
123    assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
124    String queue1 = "1";
125    String queue2 = "2";
126    for (int i = 0; i < 10; i++) {
127      STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
128      STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
129    }
130    List<String> queueIds = STORAGE.getAllQueues(serverName1);
131    assertEquals(2, queueIds.size());
132    assertThat(queueIds, hasItems("1", "2"));
133
134    List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
135    List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
136    assertEquals(10, wals1.size());
137    assertEquals(10, wals2.size());
138    for (int i = 0; i < 10; i++) {
139      assertThat(wals1, hasItems(getFileName("file1", i)));
140      assertThat(wals2, hasItems(getFileName("file2", i)));
141    }
142
143    for (int i = 0; i < 10; i++) {
144      assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
145      assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
146      STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
147        Collections.emptyMap());
148      STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
149        Collections.emptyMap());
150    }
151
152    for (int i = 0; i < 10; i++) {
153      assertEquals((i + 1) * 100,
154        STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
155      assertEquals((i + 1) * 100 + 10,
156        STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
157    }
158
159    for (int i = 0; i < 10; i++) {
160      if (i % 2 == 0) {
161        STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
162      } else {
163        STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
164      }
165    }
166
167    queueIds = STORAGE.getAllQueues(serverName1);
168    assertEquals(2, queueIds.size());
169    assertThat(queueIds, hasItems("1", "2"));
170
171    ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
172    Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2);
173
174    assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
175    assertEquals(5, peer1.getSecond().size());
176    int i = 1;
177    for (String wal : peer1.getSecond()) {
178      assertEquals(getFileName("file1", i), wal);
179      assertEquals((i + 1) * 100,
180        STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i)));
181      i += 2;
182    }
183
184    queueIds = STORAGE.getAllQueues(serverName1);
185    assertEquals(1, queueIds.size());
186    assertThat(queueIds, hasItems("2"));
187    wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
188    assertEquals(5, wals2.size());
189    for (i = 0; i < 10; i += 2) {
190      assertThat(wals2, hasItems(getFileName("file2", i)));
191    }
192
193    queueIds = STORAGE.getAllQueues(serverName2);
194    assertEquals(1, queueIds.size());
195    assertThat(queueIds, hasItems(peer1.getFirst()));
196    wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst());
197    assertEquals(5, wals1.size());
198    for (i = 1; i < 10; i += 2) {
199      assertThat(wals1, hasItems(getFileName("file1", i)));
200    }
201
202    Set<String> allWals = STORAGE.getAllWALs();
203    assertEquals(10, allWals.size());
204    for (i = 0; i < 10; i++) {
205      assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i)));
206    }
207  }
208
209  // For HBASE-12865, HBASE-26482
210  @Test
211  public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
212    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
213    STORAGE.addWAL(serverName1, "1", "file");
214    STORAGE.addWAL(serverName1, "2", "file");
215
216    ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
217    // Avoid claimQueue update cversion for prepare server2 rsNode.
218    STORAGE.addWAL(serverName2, "1", "file");
219    STORAGE.addWAL(serverName2, "2", "file");
220
221    int v0 = STORAGE.getQueuesZNodeCversion();
222
223    STORAGE.claimQueue(serverName1, "1", serverName2);
224    int v1 = STORAGE.getQueuesZNodeCversion();
225    // cversion should be increased by claimQueue method.
226    assertTrue(v1 > v0);
227
228    STORAGE.claimQueue(serverName1, "2", serverName2);
229    int v2 = STORAGE.getQueuesZNodeCversion();
230    // cversion should be increased by claimQueue method.
231    assertTrue(v2 > v1);
232  }
233
234  private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
235    return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
236
237      private int called = 0;
238      private int getLastSeqIdOpIndex = 0;
239
240      @Override
241      protected int getQueuesZNodeCversion() throws KeeperException {
242        if (called < 4) {
243          called++;
244        }
245        return called;
246      }
247
248      @Override
249      protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
250        String peerId) throws KeeperException {
251        Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId);
252        if (getLastSeqIdOpIndex < 100) {
253          // Let the ZNode version increase.
254          String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
255          ZKUtil.createWithParents(zookeeper, path);
256          ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L));
257        }
258        getLastSeqIdOpIndex++;
259        return oldPair;
260      }
261    };
262  }
263
264  @Test
265  public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
266    ZKReplicationQueueStorage storage = createWithUnstableVersion();
267    storage.addWAL(getServerName(0), "1", "file");
268    // This should return eventually when cversion stabilizes
269    Set<String> allWals = storage.getAllWALs();
270    assertEquals(1, allWals.size());
271    assertThat(allWals, hasItems("file"));
272  }
273
274  // For HBASE-14621
275  @Test
276  public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
277    ZKReplicationQueueStorage storage = createWithUnstableVersion();
278    storage.addPeerToHFileRefs("1");
279    Path p = new Path("/test");
280    storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
281    // This should return eventually when cversion stabilizes
282    Set<String> allHFileRefs = storage.getAllHFileRefs();
283    assertEquals(1, allHFileRefs.size());
284    assertThat(allHFileRefs, hasItems("test"));
285  }
286
287  // For HBASE-20138
288  @Test
289  public void testSetWALPositionBadVersion() throws IOException, ReplicationException {
290    ZKReplicationQueueStorage storage = createWithUnstableVersion();
291    ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000);
292    assertTrue(storage.getAllQueues(serverName1).isEmpty());
293    String queue1 = "1";
294    String fileName = getFileName("file1", 0);
295    String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6";
296    storage.addWAL(serverName1, queue1, fileName);
297
298    List<String> wals1 = storage.getWALsInQueue(serverName1, queue1);
299    assertEquals(1, wals1.size());
300
301    assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName));
302    // This should return eventually when data version stabilizes
303    storage.setWALPosition(serverName1, queue1, fileName, 100,
304      ImmutableMap.of(encodedRegionName, 120L));
305
306    assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName));
307    assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1));
308  }
309
310  @Test
311  public void testRegionsZNodeLayout() throws Exception {
312    String peerId = "1";
313    String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7";
314    String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId;
315    String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
316    assertEquals(expectedPath, path);
317  }
318
319  @Test
320  public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
321    String peerId = "1";
322    String peerIdToDelete = "2";
323    for (int i = 0; i < 100; i++) {
324      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
325      STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i));
326      STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i));
327    }
328    for (int i = 0; i < 100; i++) {
329      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
330      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
331      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
332    }
333    STORAGE.removeLastSequenceIds(peerIdToDelete);
334    for (int i = 0; i < 100; i++) {
335      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
336      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
337      assertEquals(HConstants.NO_SEQNUM,
338        STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
339    }
340  }
341}