001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
022import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.stream.Collectors;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.Waiter.Predicate;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.ReplicationTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.JVMClusterUtil;
045import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
046import org.junit.jupiter.api.AfterAll;
047import org.junit.jupiter.api.AfterEach;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.BeforeEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Tag(ReplicationTests.TAG)
056@Tag(LargeTests.TAG)
057public class TestReplicationEditsDroppedWithDeletedTableCFs {
058
059  private static final Logger LOG =
060    LoggerFactory.getLogger(TestReplicationEditsDroppedWithDeletedTableCFs.class);
061
062  private static Configuration conf1 = HBaseConfiguration.create();
063  private static Configuration conf2 = HBaseConfiguration.create();
064
065  protected static HBaseTestingUtil utility1;
066  protected static HBaseTestingUtil utility2;
067
068  private static Admin admin1;
069  private static Admin admin2;
070
071  private static final TableName TABLE = TableName.valueOf("table");
072  private static final byte[] NORMAL_CF = Bytes.toBytes("normal_cf");
073  private static final byte[] DROPPED_CF = Bytes.toBytes("dropped_cf");
074
075  private static final byte[] ROW = Bytes.toBytes("row");
076  private static final byte[] QUALIFIER = Bytes.toBytes("q");
077  private static final byte[] VALUE = Bytes.toBytes("value");
078
079  private static final String PEER_ID = "1";
080  private static final long SLEEP_TIME = 1000;
081  private static final int NB_RETRIES = 10;
082
083  @BeforeAll
084  public static void setUpBeforeClass() throws Exception {
085    // Set true to filter replication edits for dropped table
086    conf1.setBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, true);
087    conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
088    conf1.setInt("replication.source.nb.capacity", 1);
089    utility1 = new HBaseTestingUtil(conf1);
090    utility1.startMiniZKCluster();
091    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
092    conf1 = utility1.getConfiguration();
093
094    conf2 = HBaseConfiguration.create(conf1);
095    conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
096    utility2 = new HBaseTestingUtil(conf2);
097    utility2.setZkCluster(miniZK);
098
099    utility1.startMiniCluster(1);
100    utility2.startMiniCluster(1);
101
102    admin1 = utility1.getAdmin();
103    admin2 = utility2.getAdmin();
104  }
105
106  @AfterAll
107  public static void tearDownAfterClass() throws Exception {
108    utility2.shutdownMiniCluster();
109    utility1.shutdownMiniCluster();
110  }
111
112  @BeforeEach
113  public void setup() throws Exception {
114    // Roll log
115    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
116      .getRegionServerThreads()) {
117      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
118    }
119    // add peer
120    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
121      .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build();
122    admin1.addReplicationPeer(PEER_ID, rpc);
123    // create table
124    createTable();
125  }
126
127  @AfterEach
128  public void tearDown() throws Exception {
129    // Remove peer
130    admin1.removeReplicationPeer(PEER_ID);
131    // Drop table
132    admin1.disableTable(TABLE);
133    admin1.deleteTable(TABLE);
134    admin2.disableTable(TABLE);
135    admin2.deleteTable(TABLE);
136  }
137
138  private void createTable() throws Exception {
139    TableDescriptor desc = createTableDescriptor(NORMAL_CF, DROPPED_CF);
140    admin1.createTable(desc);
141    admin2.createTable(desc);
142    utility1.waitUntilAllRegionsAssigned(desc.getTableName());
143    utility2.waitUntilAllRegionsAssigned(desc.getTableName());
144  }
145
146  @Test
147  public void testEditsDroppedWithDeleteCF() throws Exception {
148    admin1.disableReplicationPeer(PEER_ID);
149
150    try (Table table = utility1.getConnection().getTable(TABLE)) {
151      Put put = new Put(ROW);
152      put.addColumn(DROPPED_CF, QUALIFIER, VALUE);
153      table.put(put);
154    }
155
156    deleteCf(admin1);
157    deleteCf(admin2);
158
159    admin1.enableReplicationPeer(PEER_ID);
160
161    verifyReplicationProceeded();
162  }
163
164  @Test
165  public void testEditsBehindDeleteCFTiming() throws Exception {
166    admin1.disableReplicationPeer(PEER_ID);
167
168    try (Table table = utility1.getConnection().getTable(TABLE)) {
169      Put put = new Put(ROW);
170      put.addColumn(DROPPED_CF, QUALIFIER, VALUE);
171      table.put(put);
172    }
173
174    // Only delete cf from peer cluster
175    deleteCf(admin2);
176
177    admin1.enableReplicationPeer(PEER_ID);
178
179    // the source table's cf still exists, replication should be stalled
180    verifyReplicationStuck();
181    deleteCf(admin1);
182    // now the source table's cf is gone, replication should proceed, the
183    // offending edits be dropped
184    verifyReplicationProceeded();
185  }
186
187  private void verifyReplicationProceeded() throws Exception {
188    try (Table table = utility1.getConnection().getTable(TABLE)) {
189      Put put = new Put(ROW);
190      put.addColumn(NORMAL_CF, QUALIFIER, VALUE);
191      table.put(put);
192    }
193    utility2.waitFor(NB_RETRIES * SLEEP_TIME, (Predicate<Exception>) () -> {
194      try (Table peerTable = utility2.getConnection().getTable(TABLE)) {
195        Result result = peerTable.get(new Get(ROW).addColumn(NORMAL_CF, QUALIFIER));
196        return result != null && !result.isEmpty()
197          && Bytes.equals(VALUE, result.getValue(NORMAL_CF, QUALIFIER));
198      }
199    });
200  }
201
202  private void verifyReplicationStuck() throws Exception {
203    try (Table table = utility1.getConnection().getTable(TABLE)) {
204      Put put = new Put(ROW);
205      put.addColumn(NORMAL_CF, QUALIFIER, VALUE);
206      table.put(put);
207    }
208    try (Table peerTable = utility2.getConnection().getTable(TABLE)) {
209      for (int i = 0; i < NB_RETRIES; i++) {
210        Result result = peerTable.get(new Get(ROW).addColumn(NORMAL_CF, QUALIFIER));
211        if (result != null && !result.isEmpty()) {
212          fail("Edit should have been stuck behind dropped tables, but value is "
213            + Bytes.toString(result.getValue(NORMAL_CF, QUALIFIER)));
214        } else {
215          LOG.info("Row not replicated, let's wait a bit more...");
216          Thread.sleep(SLEEP_TIME);
217        }
218      }
219    }
220  }
221
222  private TableDescriptor createTableDescriptor(byte[]... cfs) {
223    return TableDescriptorBuilder.newBuilder(TABLE).setColumnFamilies(Arrays.stream(cfs).map(
224      cf -> ColumnFamilyDescriptorBuilder.newBuilder(cf).setScope(REPLICATION_SCOPE_GLOBAL).build())
225      .collect(Collectors.toList())).build();
226  }
227
228  private void deleteCf(Admin admin) throws IOException {
229    TableDescriptor desc = createTableDescriptor(NORMAL_CF);
230    admin.modifyTable(desc);
231  }
232}