001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.hasItems;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertThat;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import java.util.SortedSet;
031
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseZKTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.MD5Hash;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.hadoop.hbase.zookeeper.ZKUtil;
043import org.apache.zookeeper.KeeperException;
044import org.junit.After;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
052
053@Category({ ReplicationTests.class, MediumTests.class })
054public class TestZKReplicationQueueStorage {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058      HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
059
060  private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
061
062  private static ZKReplicationQueueStorage STORAGE;
063
064  @BeforeClass
065  public static void setUp() throws Exception {
066    UTIL.startMiniZKCluster();
067    STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
068  }
069
070  @AfterClass
071  public static void tearDown() throws IOException {
072    UTIL.shutdownMiniZKCluster();
073  }
074
075  @After
076  public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException {
077    for (ServerName serverName : STORAGE.getListOfReplicators()) {
078      for (String queue : STORAGE.getAllQueues(serverName)) {
079        STORAGE.removeQueue(serverName, queue);
080      }
081      STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
082    }
083    for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
084      STORAGE.removePeerFromHFileRefs(peerId);
085    }
086  }
087
088  private ServerName getServerName(int i) {
089    return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
090  }
091
092  @Test
093  public void testReplicator() throws ReplicationException {
094    assertTrue(STORAGE.getListOfReplicators().isEmpty());
095    String queueId = "1";
096    for (int i = 0; i < 10; i++) {
097      STORAGE.addWAL(getServerName(i), queueId, "file" + i);
098    }
099    List<ServerName> replicators = STORAGE.getListOfReplicators();
100    assertEquals(10, replicators.size());
101    for (int i = 0; i < 10; i++) {
102      assertThat(replicators, hasItems(getServerName(i)));
103    }
104    for (int i = 0; i < 5; i++) {
105      STORAGE.removeQueue(getServerName(i), queueId);
106    }
107    for (int i = 0; i < 10; i++) {
108      STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
109    }
110    replicators = STORAGE.getListOfReplicators();
111    assertEquals(5, replicators.size());
112    for (int i = 5; i < 10; i++) {
113      assertThat(replicators, hasItems(getServerName(i)));
114    }
115  }
116
117  private String getFileName(String base, int i) {
118    return String.format(base + "-%04d", i);
119  }
120
121  @Test
122  public void testAddRemoveLog() throws ReplicationException {
123    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
124    assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
125    String queue1 = "1";
126    String queue2 = "2";
127    for (int i = 0; i < 10; i++) {
128      STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
129      STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
130    }
131    List<String> queueIds = STORAGE.getAllQueues(serverName1);
132    assertEquals(2, queueIds.size());
133    assertThat(queueIds, hasItems("1", "2"));
134
135    List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
136    List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
137    assertEquals(10, wals1.size());
138    assertEquals(10, wals2.size());
139    for (int i = 0; i < 10; i++) {
140      assertThat(wals1, hasItems(getFileName("file1", i)));
141      assertThat(wals2, hasItems(getFileName("file2", i)));
142    }
143
144    for (int i = 0; i < 10; i++) {
145      assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
146      assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
147      STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
148        Collections.emptyMap());
149      STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
150        Collections.emptyMap());
151    }
152
153    for (int i = 0; i < 10; i++) {
154      assertEquals((i + 1) * 100,
155        STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
156      assertEquals((i + 1) * 100 + 10,
157        STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
158    }
159
160    for (int i = 0; i < 10; i++) {
161      if (i % 2 == 0) {
162        STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
163      } else {
164        STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
165      }
166    }
167
168    queueIds = STORAGE.getAllQueues(serverName1);
169    assertEquals(2, queueIds.size());
170    assertThat(queueIds, hasItems("1", "2"));
171
172    ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
173    Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2);
174
175    assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
176    assertEquals(5, peer1.getSecond().size());
177    int i = 1;
178    for (String wal : peer1.getSecond()) {
179      assertEquals(getFileName("file1", i), wal);
180      assertEquals((i + 1) * 100,
181        STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i)));
182      i += 2;
183    }
184
185    queueIds = STORAGE.getAllQueues(serverName1);
186    assertEquals(1, queueIds.size());
187    assertThat(queueIds, hasItems("2"));
188    wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
189    assertEquals(5, wals2.size());
190    for (i = 0; i < 10; i += 2) {
191      assertThat(wals2, hasItems(getFileName("file2", i)));
192    }
193
194    queueIds = STORAGE.getAllQueues(serverName2);
195    assertEquals(1, queueIds.size());
196    assertThat(queueIds, hasItems(peer1.getFirst()));
197    wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst());
198    assertEquals(5, wals1.size());
199    for (i = 1; i < 10; i += 2) {
200      assertThat(wals1, hasItems(getFileName("file1", i)));
201    }
202
203    Set<String> allWals = STORAGE.getAllWALs();
204    assertEquals(10, allWals.size());
205    for (i = 0; i < 10; i++) {
206      assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i)));
207    }
208  }
209
210  // For HBASE-12865
211  @Test
212  public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
213    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
214    STORAGE.addWAL(serverName1, "1", "file");
215
216    int v0 = STORAGE.getQueuesZNodeCversion();
217    ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
218    STORAGE.claimQueue(serverName1, "1", serverName2);
219    int v1 = STORAGE.getQueuesZNodeCversion();
220    // cversion should increase by 1 since a child node is deleted
221    assertEquals(1, v1 - v0);
222  }
223
224  private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
225    return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
226
227      private int called = 0;
228      private int getLastSeqIdOpIndex = 0;
229
230      @Override
231      protected int getQueuesZNodeCversion() throws KeeperException {
232        if (called < 4) {
233          called++;
234        }
235        return called;
236      }
237
238      @Override
239      protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
240          String peerId) throws KeeperException {
241        Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId);
242        if (getLastSeqIdOpIndex < 100) {
243          // Let the ZNode version increase.
244          String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
245          ZKUtil.createWithParents(zookeeper, path);
246          ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L));
247        }
248        getLastSeqIdOpIndex++;
249        return oldPair;
250      }
251    };
252  }
253
254  @Test
255  public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
256    ZKReplicationQueueStorage storage = createWithUnstableVersion();
257    storage.addWAL(getServerName(0), "1", "file");
258    // This should return eventually when cversion stabilizes
259    Set<String> allWals = storage.getAllWALs();
260    assertEquals(1, allWals.size());
261    assertThat(allWals, hasItems("file"));
262  }
263
264  // For HBASE-14621
265  @Test
266  public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
267    ZKReplicationQueueStorage storage = createWithUnstableVersion();
268    storage.addPeerToHFileRefs("1");
269    Path p = new Path("/test");
270    storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
271    // This should return eventually when cversion stabilizes
272    Set<String> allHFileRefs = storage.getAllHFileRefs();
273    assertEquals(1, allHFileRefs.size());
274    assertThat(allHFileRefs, hasItems("test"));
275  }
276
277  // For HBASE-20138
278  @Test
279  public void testSetWALPositionBadVersion() throws IOException, ReplicationException {
280    ZKReplicationQueueStorage storage = createWithUnstableVersion();
281    ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000);
282    assertTrue(storage.getAllQueues(serverName1).isEmpty());
283    String queue1 = "1";
284    String fileName = getFileName("file1", 0);
285    String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6";
286    storage.addWAL(serverName1, queue1, fileName);
287
288    List<String> wals1 = storage.getWALsInQueue(serverName1, queue1);
289    assertEquals(1, wals1.size());
290
291    assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName));
292    // This should return eventually when data version stabilizes
293    storage.setWALPosition(serverName1, queue1, fileName, 100,
294      ImmutableMap.of(encodedRegionName, 120L));
295
296    assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName));
297    assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1));
298  }
299
300  @Test
301  public void testRegionsZNodeLayout() throws Exception {
302    String peerId = "1";
303    String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7";
304    String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId;
305    String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
306    assertEquals(expectedPath, path);
307  }
308
309  @Test
310  public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
311    String peerId = "1";
312    String peerIdToDelete = "2";
313    for (int i = 0; i < 100; i++) {
314      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
315      STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i));
316      STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i));
317    }
318    for (int i = 0; i < 100; i++) {
319      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
320      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
321      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
322    }
323    STORAGE.removeLastSequenceIds(peerIdToDelete);
324    for (int i = 0; i < 100; i++) {
325      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
326      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
327      assertEquals(HConstants.NO_SEQNUM,
328        STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
329    }
330  }
331}