001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.empty;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.verify;
027
028import java.io.IOException;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Executors;
037import java.util.concurrent.TimeUnit;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.TableNameTestExtension;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager.ReplicationQueueStorageInitializer;
048import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
050import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
051import org.apache.hadoop.hbase.replication.ReplicationQueueData;
052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
054import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
055import org.apache.hadoop.hbase.replication.TestZKReplicationQueueStorage;
056import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
057import org.apache.hadoop.hbase.testclassification.MasterTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.junit.jupiter.api.AfterAll;
062import org.junit.jupiter.api.BeforeAll;
063import org.junit.jupiter.api.BeforeEach;
064import org.junit.jupiter.api.Tag;
065import org.junit.jupiter.api.Test;
066import org.junit.jupiter.api.extension.RegisterExtension;
067
068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
069
070@Tag(MasterTests.TAG)
071@Tag(MediumTests.TAG)
072public class TestReplicationPeerManagerMigrateQueuesFromZk {
073
074  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
075
076  private static ExecutorService EXECUTOR;
077
078  ConcurrentMap<String, ReplicationPeerDescription> peers;
079
080  private ReplicationPeerStorage peerStorage;
081
082  private ReplicationQueueStorage queueStorage;
083
084  private ReplicationQueueStorageInitializer queueStorageInitializer;
085
086  private ReplicationPeerManager manager;
087
088  private int nServers = 10;
089
090  private int nPeers = 10;
091
092  private int nRegions = 100;
093
094  private ServerName deadServerName;
095
096  @RegisterExtension
097  public final TableNameTestExtension tableNameExtension = new TableNameTestExtension();
098
099  @BeforeAll
100  public static void setUpBeforeClass() throws Exception {
101    UTIL.startMiniCluster(1);
102    EXECUTOR = Executors.newFixedThreadPool(3,
103      new ThreadFactoryBuilder().setDaemon(true)
104        .setNameFormat(TestReplicationPeerManagerMigrateQueuesFromZk.class.getSimpleName() + "-%d")
105        .build());
106  }
107
108  @AfterAll
109  public static void tearDownAfterClass() throws Exception {
110    EXECUTOR.shutdownNow();
111    UTIL.shutdownMiniCluster();
112  }
113
114  @BeforeEach
115  public void setUp() throws IOException {
116    Configuration conf = UTIL.getConfiguration();
117    peerStorage = mock(ReplicationPeerStorage.class);
118    TableName tableName = tableNameExtension.getTableName();
119    UTIL.getAdmin()
120      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
121    queueStorage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName);
122    queueStorageInitializer = mock(ReplicationQueueStorageInitializer.class);
123    peers = new ConcurrentHashMap<>();
124    deadServerName =
125      ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
126    manager = new ReplicationPeerManager(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(),
127      peerStorage, queueStorage, peers, conf, "cluster", queueStorageInitializer);
128  }
129
130  private Map<String, Set<String>> prepareData() throws Exception {
131    ZKReplicationQueueStorageForMigration storage = new ZKReplicationQueueStorageForMigration(
132      UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
133    TestZKReplicationQueueStorage.mockQueuesData(storage, 10, "peer_0", deadServerName);
134    Map<String, Set<String>> encodedName2PeerIds = TestZKReplicationQueueStorage
135      .mockLastPushedSeqIds(storage, "peer_1", "peer_2", nRegions, 10, 10);
136    TestZKReplicationQueueStorage.mockHFileRefs(storage, 10);
137    return encodedName2PeerIds;
138  }
139
140  @Test
141  public void testNoPeers() throws Exception {
142    prepareData();
143    manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
144    // should have called initializer
145    verify(queueStorageInitializer).initialize();
146    // should have not migrated any data since there is no peer
147    try (Table table = UTIL.getConnection().getTable(tableNameExtension.getTableName())) {
148      assertEquals(0, HBaseTestingUtil.countRows(table));
149    }
150  }
151
152  @Test
153  public void testMigrate() throws Exception {
154    Map<String, Set<String>> encodedName2PeerIds = prepareData();
155    // add all peers so we will migrate them all
156    for (int i = 0; i < nPeers; i++) {
157      // value is not used in this test, so just add a mock
158      peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
159    }
160    manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
161    // should have called initializer
162    verify(queueStorageInitializer).initialize();
163    List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();
164    // there should be two empty queues so minus 2
165    assertEquals(2 * nServers - 2, queueDatas.size());
166    for (ReplicationQueueData queueData : queueDatas) {
167      assertEquals("peer_0", queueData.getId().getPeerId());
168      assertEquals(1, queueData.getOffsets().size());
169      String walGroup = queueData.getId().getServerWALsBelongTo().toString();
170      ReplicationGroupOffset offset = queueData.getOffsets().get(walGroup);
171      assertEquals(0, offset.getOffset());
172      assertEquals(queueData.getId().getServerWALsBelongTo().toString() + ".0", offset.getWal());
173    }
174    // there is no method in ReplicationQueueStorage can list all the last pushed sequence ids
175    try (Table table = UTIL.getConnection().getTable(tableNameExtension.getTableName());
176      ResultScanner scanner =
177        table.getScanner(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) {
178      for (int i = 0; i < 2; i++) {
179        Result result = scanner.next();
180        String peerId = Bytes.toString(result.getRow());
181        assertEquals(nRegions, result.size());
182        for (Cell cell : result.rawCells()) {
183          String encodedRegionName = Bytes.toString(cell.getQualifierArray(),
184            cell.getQualifierOffset(), cell.getQualifierLength());
185          encodedName2PeerIds.get(encodedRegionName).remove(peerId);
186          long seqId =
187            Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
188          assertEquals(i + 1, seqId);
189        }
190      }
191      encodedName2PeerIds.forEach((encodedRegionName, peerIds) -> {
192        assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
193      });
194      assertNull(scanner.next());
195    }
196    for (int i = 0; i < nPeers; i++) {
197      List<String> refs = queueStorage.getReplicableHFiles("peer_" + i);
198      assertEquals(i, refs.size());
199      Set<String> refsSet = new HashSet<>(refs);
200      for (int j = 0; j < i; j++) {
201        assertTrue(refsSet.remove("hfile-" + j));
202      }
203      assertThat(refsSet, empty());
204    }
205  }
206}