001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.stream.Collectors; 026import java.util.stream.IntStream; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.Get; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.RegionInfoBuilder; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 040import org.apache.hadoop.hbase.master.HMaster; 041import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 042import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure; 043import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; 044import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 045import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 046import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 047import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 048import org.apache.hadoop.hbase.replication.ReplicationUtils; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.MasterTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.wal.WAL.Entry; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 057import org.apache.hadoop.hbase.wal.WALKeyImpl; 058import org.junit.jupiter.api.AfterAll; 059import org.junit.jupiter.api.AfterEach; 060import org.junit.jupiter.api.BeforeAll; 061import org.junit.jupiter.api.BeforeEach; 062import org.junit.jupiter.api.Tag; 063import org.junit.jupiter.api.Test; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067@Tag(MasterTests.TAG) 068@Tag(LargeTests.TAG) 069public class TestRecoverStandbyProcedure { 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestRecoverStandbyProcedure.class); 072 073 private static final TableName tableName = TableName.valueOf("TestRecoverStandbyProcedure"); 074 075 private static final RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 076 077 private static final byte[] family = Bytes.toBytes("CF"); 078 079 private static final byte[] qualifier = Bytes.toBytes("q"); 080 081 private static final long timestamp = EnvironmentEdgeManager.currentTime(); 082 083 private static final int ROW_COUNT = 1000; 084 085 private static final int WAL_NUMBER = 10; 086 087 private static final int RS_NUMBER = 3; 088 089 private static final String PEER_ID = "1"; 090 091 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 092 093 private static SyncReplicationReplayWALManager syncReplicationReplayWALManager; 094 095 private static ProcedureExecutor<MasterProcedureEnv> procExec; 096 097 private static FileSystem fs; 098 099 private static Configuration conf; 100 101 @BeforeAll 102 public static void setupCluster() throws Exception { 103 UTIL.startMiniCluster(RS_NUMBER); 104 UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 105 conf = UTIL.getConfiguration(); 106 HMaster master = UTIL.getHBaseCluster().getMaster(); 107 fs = master.getMasterFileSystem().getWALFileSystem(); 108 syncReplicationReplayWALManager = master.getSyncReplicationReplayWALManager(); 109 procExec = master.getMasterProcedureExecutor(); 110 } 111 112 @AfterAll 113 public static void cleanupTest() throws Exception { 114 UTIL.shutdownMiniCluster(); 115 } 116 117 @BeforeEach 118 public void setupBeforeTest() throws IOException { 119 UTIL.createTable(tableName, family); 120 } 121 122 @AfterEach 123 public void tearDownAfterTest() throws IOException { 124 try (Admin admin = UTIL.getAdmin()) { 125 if (admin.isTableEnabled(tableName)) { 126 admin.disableTable(tableName); 127 } 128 admin.deleteTable(tableName); 129 } 130 } 131 132 @Test 133 public void testRecoverStandby() throws IOException, StreamLacksCapabilityException { 134 setupSyncReplicationWALs(); 135 long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID, false)); 136 ProcedureTestingUtility.waitProcedure(procExec, procId); 137 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 138 139 try (Table table = UTIL.getConnection().getTable(tableName)) { 140 for (int i = 0; i < WAL_NUMBER * ROW_COUNT; i++) { 141 Result result = table.get(new Get(Bytes.toBytes(i)).setTimestamp(timestamp)); 142 assertNotNull(result); 143 assertEquals(i, Bytes.toInt(result.getValue(family, qualifier))); 144 } 145 } 146 } 147 148 private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException { 149 Path peerRemoteWALDir = ReplicationUtils 150 .getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), PEER_ID); 151 if (!fs.exists(peerRemoteWALDir)) { 152 fs.mkdirs(peerRemoteWALDir); 153 } 154 for (int i = 0; i < WAL_NUMBER; i++) { 155 try (ProtobufLogWriter writer = new ProtobufLogWriter()) { 156 Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); 157 writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir), 158 StreamSlowMonitor.create(conf, "defaultMonitor")); 159 List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); 160 for (Entry entry : entries) { 161 writer.append(entry); 162 } 163 writer.sync(false); 164 LOG.info("Created wal {} to replay for peer id={}", wal, PEER_ID); 165 } 166 } 167 } 168 169 private List<Entry> setupWALEntries(int startRow, int endRow) { 170 return IntStream.range(startRow, endRow) 171 .mapToObj(i -> createWALEntry(Bytes.toBytes(i), Bytes.toBytes(i))) 172 .collect(Collectors.toList()); 173 } 174 175 private Entry createWALEntry(byte[] row, byte[] value) { 176 WALKeyImpl key = new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 1); 177 WALEdit edit = new WALEdit(); 178 WALEditInternalHelper.addExtendedCell(edit, 179 new KeyValue(row, family, qualifier, timestamp, value)); 180 return new Entry(key, edit); 181 } 182}