001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.hbase.ClientMetaTableAccessor;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.MetaTableAccessor;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.ResultScanner;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.master.RegionState;
045import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
046import org.apache.hadoop.hbase.replication.ReplicationException;
047import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
049import org.apache.hadoop.hbase.replication.ReplicationQueueId;
050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.testclassification.ReplicationTests;
054import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
055import org.junit.AfterClass;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
062
063@Category({ ReplicationTests.class, MediumTests.class })
064public class TestHBaseFsckCleanReplicationBarriers {
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestHBaseFsckCleanReplicationBarriers.class);
068
069  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
070
071  private static String PEER_1 = "1", PEER_2 = "2";
072
073  private static ReplicationQueueStorage QUEUE_STORAGE;
074
075  private static String WAL_FILE_NAME = "test.wal";
076
077  private static String TABLE_NAME = "test";
078
079  private static String COLUMN_FAMILY = "info";
080
081  @BeforeClass
082  public static void setUp() throws Exception {
083    UTIL.startMiniCluster(1);
084    QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(),
085      UTIL.getConfiguration());
086    createPeer();
087  }
088
089  @AfterClass
090  public static void tearDown() throws Exception {
091    UTIL.shutdownMiniCluster();
092  }
093
094  @Test
095  public void testCleanReplicationBarrierWithNonExistTable()
096    throws ClassNotFoundException, IOException {
097    TableName tableName = TableName.valueOf(TABLE_NAME + "_non");
098    boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
099    assertFalse(cleaned);
100  }
101
102  @Test
103  public void testCleanReplicationBarrierWithDeletedTable() throws Exception {
104    TableName tableName = TableName.valueOf(TABLE_NAME + "_deleted");
105    List<RegionInfo> regionInfos = new ArrayList<>();
106    // only write some barriers into meta table
107
108    for (int i = 0; i < 110; i++) {
109      RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(i))
110        .setEndKey(Bytes.toBytes(i + 1)).build();
111      regionInfos.add(regionInfo);
112      addStateAndBarrier(regionInfo, RegionState.State.OPEN, 10, 100);
113      updatePushedSeqId(regionInfo, 10);
114      assertEquals("check if there is lastPushedId", 10,
115        QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1));
116      assertEquals("check if there is lastPushedId", 10,
117        QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2));
118    }
119    Scan barrierScan = new Scan();
120    barrierScan.setCaching(100);
121    barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
122    barrierScan
123      .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
124        ClientMetaTableAccessor.QueryType.REGION))
125      .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
126        ClientMetaTableAccessor.QueryType.REGION));
127    Result result;
128    try (ResultScanner scanner =
129      MetaTableAccessor.getMetaHTable(UTIL.getConnection()).getScanner(barrierScan)) {
130      while ((result = scanner.next()) != null) {
131        assertTrue(ReplicationBarrierFamilyFormat.getReplicationBarriers(result).length > 0);
132      }
133    }
134    boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
135    assertTrue(cleaned);
136    for (RegionInfo regionInfo : regionInfos) {
137      assertEquals("check if there is lastPushedId", -1,
138        QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1));
139      assertEquals("check if there is lastPushedId", -1,
140        QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2));
141    }
142    cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
143    assertFalse(cleaned);
144    for (RegionInfo region : regionInfos) {
145      assertEquals(0, ReplicationBarrierFamilyFormat.getReplicationBarriers(UTIL.getConnection(),
146        region.getRegionName()).length);
147    }
148  }
149
150  @Test
151  public void testCleanReplicationBarrierWithExistTable() throws Exception {
152    TableName tableName = TableName.valueOf(TABLE_NAME);
153    String cf = COLUMN_FAMILY;
154    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
155      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build())
156      .setReplicationScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
157    UTIL.createTable(tableDescriptor, Bytes.split(Bytes.toBytes(1), Bytes.toBytes(256), 123));
158    assertTrue(UTIL.getAdmin().getRegions(tableName).size() > 0);
159    for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
160      addStateAndBarrier(region, RegionState.State.OFFLINE, 10, 100);
161      updatePushedSeqId(region, 10);
162      assertEquals("check if there is lastPushedId", 10,
163        QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1));
164      assertEquals("check if there is lastPushedId", 10,
165        QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2));
166    }
167    boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
168    assertTrue(cleaned);
169    for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
170      assertEquals("check if there is lastPushedId", -1,
171        QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1));
172      assertEquals("check if there is lastPushedId", -1,
173        QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2));
174    }
175    cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
176    assertFalse(cleaned);
177    for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
178      assertEquals(0, ReplicationBarrierFamilyFormat.getReplicationBarriers(UTIL.getConnection(),
179        region.getRegionName()).length);
180    }
181  }
182
183  public static void createPeer() throws IOException {
184    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
185      .setClusterKey(UTIL.getClusterKey() + "-test").setSerial(true).build();
186    UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
187    UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
188  }
189
190  private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
191    throws IOException {
192    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
193    if (state != null) {
194      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
195        Bytes.toBytes(state.name()));
196    }
197    for (int i = 0; i < barriers.length; i++) {
198      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
199        put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
200    }
201    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
202      table.put(put);
203    }
204  }
205
206  private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
207    ServerName sn = UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName();
208    QUEUE_STORAGE.setOffset(new ReplicationQueueId(sn, PEER_1), "",
209      new ReplicationGroupOffset(WAL_FILE_NAME, 10),
210      ImmutableMap.of(region.getEncodedName(), seqId));
211    QUEUE_STORAGE.setOffset(new ReplicationQueueId(sn, PEER_2), "",
212      new ReplicationGroupOffset(WAL_FILE_NAME, 10),
213      ImmutableMap.of(region.getEncodedName(), seqId));
214  }
215}