001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.stream.Collectors; 026import java.util.stream.IntStream; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 041import org.apache.hadoop.hbase.master.HMaster; 042import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 043import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure; 044import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; 045import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 046import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 047import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 048import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 049import org.apache.hadoop.hbase.replication.ReplicationUtils; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.MasterTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.wal.WAL.Entry; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALKeyImpl; 058import org.junit.After; 059import org.junit.AfterClass; 060import org.junit.Before; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068@Category({ MasterTests.class, LargeTests.class }) 069public class TestRecoverStandbyProcedure { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestRecoverStandbyProcedure.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestRecoverStandbyProcedure.class); 076 077 private static final TableName tableName = TableName.valueOf("TestRecoverStandbyProcedure"); 078 079 private static final RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 080 081 private static final byte[] family = Bytes.toBytes("CF"); 082 083 private static final byte[] qualifier = Bytes.toBytes("q"); 084 085 private static final long timestamp = EnvironmentEdgeManager.currentTime(); 086 087 private static final int ROW_COUNT = 1000; 088 089 private static final int WAL_NUMBER = 10; 090 091 private static final int RS_NUMBER = 3; 092 093 private static final String PEER_ID = "1"; 094 095 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 096 097 private static SyncReplicationReplayWALManager syncReplicationReplayWALManager; 098 099 private static ProcedureExecutor<MasterProcedureEnv> procExec; 100 101 private static FileSystem fs; 102 103 private static Configuration conf; 104 105 @BeforeClass 106 public static void setupCluster() throws Exception { 107 UTIL.startMiniCluster(RS_NUMBER); 108 UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 109 conf = UTIL.getConfiguration(); 110 HMaster master = UTIL.getHBaseCluster().getMaster(); 111 fs = master.getMasterFileSystem().getWALFileSystem(); 112 syncReplicationReplayWALManager = master.getSyncReplicationReplayWALManager(); 113 procExec = master.getMasterProcedureExecutor(); 114 } 115 116 @AfterClass 117 public static void cleanupTest() throws Exception { 118 try { 119 UTIL.shutdownMiniCluster(); 120 } catch (Exception e) { 121 LOG.warn("failure shutting down cluster", e); 122 } 123 } 124 125 @Before 126 public void setupBeforeTest() throws IOException { 127 UTIL.createTable(tableName, family); 128 } 129 130 @After 131 public void tearDownAfterTest() throws IOException { 132 try (Admin admin = UTIL.getAdmin()) { 133 if (admin.isTableEnabled(tableName)) { 134 admin.disableTable(tableName); 135 } 136 admin.deleteTable(tableName); 137 } 138 } 139 140 @Test 141 public void testRecoverStandby() throws IOException, StreamLacksCapabilityException { 142 setupSyncReplicationWALs(); 143 long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID, false)); 144 ProcedureTestingUtility.waitProcedure(procExec, procId); 145 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 146 147 try (Table table = UTIL.getConnection().getTable(tableName)) { 148 for (int i = 0; i < WAL_NUMBER * ROW_COUNT; i++) { 149 Result result = table.get(new Get(Bytes.toBytes(i)).setTimestamp(timestamp)); 150 assertNotNull(result); 151 assertEquals(i, Bytes.toInt(result.getValue(family, qualifier))); 152 } 153 } 154 } 155 156 private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException { 157 Path peerRemoteWALDir = ReplicationUtils 158 .getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), PEER_ID); 159 if (!fs.exists(peerRemoteWALDir)) { 160 fs.mkdirs(peerRemoteWALDir); 161 } 162 for (int i = 0; i < WAL_NUMBER; i++) { 163 try (ProtobufLogWriter writer = new ProtobufLogWriter()) { 164 Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); 165 writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir), 166 StreamSlowMonitor.create(conf, "defaultMonitor")); 167 List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); 168 for (Entry entry : entries) { 169 writer.append(entry); 170 } 171 writer.sync(false); 172 LOG.info("Created wal {} to replay for peer id={}", wal, PEER_ID); 173 } 174 } 175 } 176 177 private List<Entry> setupWALEntries(int startRow, int endRow) { 178 return IntStream.range(startRow, endRow) 179 .mapToObj(i -> createWALEntry(Bytes.toBytes(i), Bytes.toBytes(i))) 180 .collect(Collectors.toList()); 181 } 182 183 private Entry createWALEntry(byte[] row, byte[] value) { 184 WALKeyImpl key = new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 1); 185 WALEdit edit = new WALEdit(); 186 edit.add(new KeyValue(row, family, qualifier, timestamp, value)); 187 return new Entry(key, edit); 188 } 189}