001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.empty;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.verify;
027
028import java.io.IOException;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Executors;
037import java.util.concurrent.TimeUnit;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNameTestRule;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager.ReplicationQueueStorageInitializer;
049import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
050import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
051import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
052import org.apache.hadoop.hbase.replication.ReplicationQueueData;
053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
055import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
056import org.apache.hadoop.hbase.replication.TestZKReplicationQueueStorage;
057import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
058import org.apache.hadoop.hbase.testclassification.MasterTests;
059import org.apache.hadoop.hbase.testclassification.MediumTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.junit.AfterClass;
063import org.junit.Before;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069
070import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
071
072@Category({ MasterTests.class, MediumTests.class })
073public class TestReplicationPeerManagerMigrateQueuesFromZk {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestReplicationPeerManagerMigrateQueuesFromZk.class);
078
079  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
080
081  private static ExecutorService EXECUTOR;
082
083  ConcurrentMap<String, ReplicationPeerDescription> peers;
084
085  private ReplicationPeerStorage peerStorage;
086
087  private ReplicationQueueStorage queueStorage;
088
089  private ReplicationQueueStorageInitializer queueStorageInitializer;
090
091  private ReplicationPeerManager manager;
092
093  private int nServers = 10;
094
095  private int nPeers = 10;
096
097  private int nRegions = 100;
098
099  private ServerName deadServerName;
100
101  @Rule
102  public final TableNameTestRule tableNameRule = new TableNameTestRule();
103
104  @BeforeClass
105  public static void setUpBeforeClass() throws Exception {
106    UTIL.startMiniCluster(1);
107    EXECUTOR = Executors.newFixedThreadPool(3,
108      new ThreadFactoryBuilder().setDaemon(true)
109        .setNameFormat(TestReplicationPeerManagerMigrateQueuesFromZk.class.getSimpleName() + "-%d")
110        .build());
111  }
112
113  @AfterClass
114  public static void tearDownAfterClass() throws Exception {
115    EXECUTOR.shutdownNow();
116    UTIL.shutdownMiniCluster();
117  }
118
119  @Before
120  public void setUp() throws IOException {
121    Configuration conf = UTIL.getConfiguration();
122    peerStorage = mock(ReplicationPeerStorage.class);
123    TableName tableName = tableNameRule.getTableName();
124    UTIL.getAdmin()
125      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
126    queueStorage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName);
127    queueStorageInitializer = mock(ReplicationQueueStorageInitializer.class);
128    peers = new ConcurrentHashMap<>();
129    deadServerName =
130      ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
131    manager = new ReplicationPeerManager(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(),
132      peerStorage, queueStorage, peers, conf, "cluster", queueStorageInitializer);
133  }
134
135  private Map<String, Set<String>> prepareData() throws Exception {
136    ZKReplicationQueueStorageForMigration storage = new ZKReplicationQueueStorageForMigration(
137      UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
138    TestZKReplicationQueueStorage.mockQueuesData(storage, 10, "peer_0", deadServerName);
139    Map<String, Set<String>> encodedName2PeerIds = TestZKReplicationQueueStorage
140      .mockLastPushedSeqIds(storage, "peer_1", "peer_2", nRegions, 10, 10);
141    TestZKReplicationQueueStorage.mockHFileRefs(storage, 10);
142    return encodedName2PeerIds;
143  }
144
145  @Test
146  public void testNoPeers() throws Exception {
147    prepareData();
148    manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
149    // should have called initializer
150    verify(queueStorageInitializer).initialize();
151    // should have not migrated any data since there is no peer
152    try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName())) {
153      assertEquals(0, HBaseTestingUtil.countRows(table));
154    }
155  }
156
157  @Test
158  public void testMigrate() throws Exception {
159    Map<String, Set<String>> encodedName2PeerIds = prepareData();
160    // add all peers so we will migrate them all
161    for (int i = 0; i < nPeers; i++) {
162      // value is not used in this test, so just add a mock
163      peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
164    }
165    manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
166    // should have called initializer
167    verify(queueStorageInitializer).initialize();
168    List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();
169    // there should be two empty queues so minus 2
170    assertEquals(2 * nServers - 2, queueDatas.size());
171    for (ReplicationQueueData queueData : queueDatas) {
172      assertEquals("peer_0", queueData.getId().getPeerId());
173      assertEquals(1, queueData.getOffsets().size());
174      String walGroup = queueData.getId().getServerWALsBelongTo().toString();
175      ReplicationGroupOffset offset = queueData.getOffsets().get(walGroup);
176      assertEquals(0, offset.getOffset());
177      assertEquals(queueData.getId().getServerWALsBelongTo().toString() + ".0", offset.getWal());
178    }
179    // there is no method in ReplicationQueueStorage can list all the last pushed sequence ids
180    try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName());
181      ResultScanner scanner =
182        table.getScanner(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) {
183      for (int i = 0; i < 2; i++) {
184        Result result = scanner.next();
185        String peerId = Bytes.toString(result.getRow());
186        assertEquals(nRegions, result.size());
187        for (Cell cell : result.rawCells()) {
188          String encodedRegionName = Bytes.toString(cell.getQualifierArray(),
189            cell.getQualifierOffset(), cell.getQualifierLength());
190          encodedName2PeerIds.get(encodedRegionName).remove(peerId);
191          long seqId =
192            Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
193          assertEquals(i + 1, seqId);
194        }
195      }
196      encodedName2PeerIds.forEach((encodedRegionName, peerIds) -> {
197        assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
198      });
199      assertNull(scanner.next());
200    }
201    for (int i = 0; i < nPeers; i++) {
202      List<String> refs = queueStorage.getReplicableHFiles("peer_" + i);
203      assertEquals(i, refs.size());
204      Set<String> refsSet = new HashSet<>(refs);
205      for (int j = 0; j < i; j++) {
206        assertTrue(refsSet.remove("hfile-" + j));
207      }
208      assertThat(refsSet, empty());
209    }
210  }
211}