001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 003 * agreements. See the NOTICE file distributed with this work for additional information regarding 004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 005 * "License"); you may not use this file except in compliance with the License. You may obtain a 006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable 007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" 008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License 009 * for the specific language governing permissions and limitations under the License. 010 */ 011package org.apache.hadoop.hbase.util; 012 013import static org.junit.Assert.assertEquals; 014import static org.junit.Assert.assertFalse; 015import static org.junit.Assert.assertTrue; 016 017import java.io.IOException; 018import java.util.ArrayList; 019import java.util.List; 020 021import org.apache.hadoop.hbase.HBaseClassTestRule; 022import org.apache.hadoop.hbase.HBaseTestingUtility; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.hadoop.hbase.MetaTableAccessor; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 027import org.apache.hadoop.hbase.client.Put; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.client.RegionInfoBuilder; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.client.ResultScanner; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.client.TableDescriptor; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.master.RegionState; 037import org.apache.hadoop.hbase.replication.ReplicationException; 038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 039import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 040import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 051 052@Category({ ReplicationTests.class, MediumTests.class }) 053public class TestHBaseFsckCleanReplicationBarriers { 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestHBaseFsckCleanReplicationBarriers.class); 057 058 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 059 060 private static String PEER_1 = "1", PEER_2 = "2"; 061 062 private static ReplicationQueueStorage QUEUE_STORAGE; 063 064 private static String WAL_FILE_NAME = "test.wal"; 065 066 private static String TABLE_NAME = "test"; 067 068 private static String COLUMN_FAMILY = "info"; 069 070 @BeforeClass 071 public static void setUp() throws Exception { 072 UTIL.startMiniCluster(1); 073 QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), 074 UTIL.getConfiguration()); 075 createPeer(); 076 QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_1, 077 WAL_FILE_NAME); 078 QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_2, 079 WAL_FILE_NAME); 080 } 081 082 @AfterClass 083 public static void tearDown() throws Exception { 084 UTIL.shutdownMiniCluster(); 085 } 086 087 @Test 088 public void testCleanReplicationBarrierWithNonExistTable() 089 throws ClassNotFoundException, IOException { 090 TableName tableName = TableName.valueOf(TABLE_NAME + "_non"); 091 boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); 092 assertFalse(cleaned); 093 } 094 095 @Test 096 public void testCleanReplicationBarrierWithDeletedTable() throws Exception { 097 TableName tableName = TableName.valueOf(TABLE_NAME + "_deleted"); 098 List<RegionInfo> regionInfos = new ArrayList<>(); 099 // only write some barriers into meta table 100 101 for (int i = 0; i < 110; i++) { 102 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(i)) 103 .setEndKey(Bytes.toBytes(i + 1)).build(); 104 regionInfos.add(regionInfo); 105 addStateAndBarrier(regionInfo, RegionState.State.OPEN, 10, 100); 106 updatePushedSeqId(regionInfo, 10); 107 assertEquals("check if there is lastPushedId", 10, 108 QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1)); 109 assertEquals("check if there is lastPushedId", 10, 110 QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2)); 111 } 112 Scan barrierScan = new Scan(); 113 barrierScan.setCaching(100); 114 barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); 115 barrierScan 116 .withStartRow( 117 MetaTableAccessor.getTableStartRowForMeta(tableName, MetaTableAccessor.QueryType.REGION)) 118 .withStopRow( 119 MetaTableAccessor.getTableStopRowForMeta(tableName, MetaTableAccessor.QueryType.REGION)); 120 Result result; 121 try (ResultScanner scanner = 122 MetaTableAccessor.getMetaHTable(UTIL.getConnection()).getScanner(barrierScan)) { 123 while ((result = scanner.next()) != null) { 124 assertTrue(MetaTableAccessor.getReplicationBarriers(result).length > 0); 125 } 126 } 127 boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); 128 assertTrue(cleaned); 129 for (RegionInfo regionInfo : regionInfos) { 130 assertEquals("check if there is lastPushedId", -1, 131 QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1)); 132 assertEquals("check if there is lastPushedId", -1, 133 QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2)); 134 } 135 cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); 136 assertFalse(cleaned); 137 for (RegionInfo region : regionInfos) { 138 assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 139 region.getRegionName()).length); 140 } 141 } 142 143 @Test 144 public void testCleanReplicationBarrierWithExistTable() throws Exception { 145 TableName tableName = TableName.valueOf(TABLE_NAME); 146 String cf = COLUMN_FAMILY; 147 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 148 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build()) 149 .setReplicationScope(HConstants.REPLICATION_SCOPE_LOCAL).build(); 150 UTIL.createTable(tableDescriptor, Bytes.split(Bytes.toBytes(1), Bytes.toBytes(256), 123)); 151 assertTrue(UTIL.getAdmin().getRegions(tableName).size() > 0); 152 for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { 153 addStateAndBarrier(region, RegionState.State.OFFLINE, 10, 100); 154 updatePushedSeqId(region, 10); 155 assertEquals("check if there is lastPushedId", 10, 156 QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1)); 157 assertEquals("check if there is lastPushedId", 10, 158 QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2)); 159 } 160 boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); 161 assertTrue(cleaned); 162 for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { 163 assertEquals("check if there is lastPushedId", -1, 164 QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1)); 165 assertEquals("check if there is lastPushedId", -1, 166 QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2)); 167 } 168 cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); 169 assertFalse(cleaned); 170 for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { 171 assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 172 region.getRegionName()).length); 173 } 174 } 175 176 public static void createPeer() throws IOException { 177 ReplicationPeerConfig rpc = 178 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") 179 .setSerial(true).build(); 180 UTIL.getAdmin().addReplicationPeer(PEER_1, rpc); 181 UTIL.getAdmin().addReplicationPeer(PEER_2, rpc); 182 } 183 184 private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) 185 throws IOException { 186 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 187 if (state != null) { 188 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, 189 Bytes.toBytes(state.name())); 190 } 191 for (int i = 0; i < barriers.length; i++) { 192 put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, 193 put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); 194 } 195 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 196 table.put(put); 197 } 198 } 199 200 private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { 201 QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), 202 PEER_1, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); 203 QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), 204 PEER_2, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); 205 } 206}