001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
022import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.stream.Collectors;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.Waiter.Predicate;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.JVMClusterUtil;
046import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Category({ LargeTests.class })
058public class TestReplicationEditsDroppedWithDeletedTableCFs {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestReplicationEditsDroppedWithDeletedTableCFs.class);
063
064  private static final Logger LOG =
065      LoggerFactory.getLogger(TestReplicationEditsDroppedWithDeletedTableCFs.class);
066
067  private static Configuration conf1 = HBaseConfiguration.create();
068  private static Configuration conf2 = HBaseConfiguration.create();
069
070  protected static HBaseTestingUtility utility1;
071  protected static HBaseTestingUtility utility2;
072
073  private static Admin admin1;
074  private static Admin admin2;
075
076  private static final TableName TABLE = TableName.valueOf("table");
077  private static final byte[] NORMAL_CF = Bytes.toBytes("normal_cf");
078  private static final byte[] DROPPED_CF = Bytes.toBytes("dropped_cf");
079
080  private static final byte[] ROW = Bytes.toBytes("row");
081  private static final byte[] QUALIFIER = Bytes.toBytes("q");
082  private static final byte[] VALUE = Bytes.toBytes("value");
083
084  private static final String PEER_ID = "1";
085  private static final long SLEEP_TIME = 1000;
086  private static final int NB_RETRIES = 10;
087
088  @BeforeClass
089  public static void setUpBeforeClass() throws Exception {
090    // Set true to filter replication edits for dropped table
091    conf1.setBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, true);
092    conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
093    conf1.setInt("replication.source.nb.capacity", 1);
094    utility1 = new HBaseTestingUtility(conf1);
095    utility1.startMiniZKCluster();
096    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
097    conf1 = utility1.getConfiguration();
098
099    conf2 = HBaseConfiguration.create(conf1);
100    conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
101    utility2 = new HBaseTestingUtility(conf2);
102    utility2.setZkCluster(miniZK);
103
104    utility1.startMiniCluster(1);
105    utility2.startMiniCluster(1);
106
107    admin1 = utility1.getAdmin();
108    admin2 = utility2.getAdmin();
109  }
110
111  @AfterClass
112  public static void tearDownAfterClass() throws Exception {
113    utility2.shutdownMiniCluster();
114    utility1.shutdownMiniCluster();
115  }
116
117  @Before
118  public void setup() throws Exception {
119    // Roll log
120    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
121        .getRegionServerThreads()) {
122      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
123    }
124    // add peer
125    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
126        .setClusterKey(utility2.getClusterKey())
127        .setReplicateAllUserTables(true).build();
128    admin1.addReplicationPeer(PEER_ID, rpc);
129    // create table
130    createTable();
131  }
132
133  @After
134  public void tearDown() throws Exception {
135    // Remove peer
136    admin1.removeReplicationPeer(PEER_ID);
137    // Drop table
138    admin1.disableTable(TABLE);
139    admin1.deleteTable(TABLE);
140    admin2.disableTable(TABLE);
141    admin2.deleteTable(TABLE);
142  }
143
144  private void createTable() throws Exception {
145    TableDescriptor desc = createTableDescriptor(NORMAL_CF, DROPPED_CF);
146    admin1.createTable(desc);
147    admin2.createTable(desc);
148    utility1.waitUntilAllRegionsAssigned(desc.getTableName());
149    utility2.waitUntilAllRegionsAssigned(desc.getTableName());
150  }
151
152  @Test
153  public void testEditsDroppedWithDeleteCF() throws Exception {
154    admin1.disableReplicationPeer(PEER_ID);
155
156    try (Table table = utility1.getConnection().getTable(TABLE)) {
157      Put put = new Put(ROW);
158      put.addColumn(DROPPED_CF, QUALIFIER, VALUE);
159      table.put(put);
160    }
161
162    deleteCf(admin1);
163    deleteCf(admin2);
164
165    admin1.enableReplicationPeer(PEER_ID);
166
167    verifyReplicationProceeded();
168  }
169
170  @Test
171  public void testEditsBehindDeleteCFTiming() throws Exception {
172    admin1.disableReplicationPeer(PEER_ID);
173
174    try (Table table = utility1.getConnection().getTable(TABLE)) {
175      Put put = new Put(ROW);
176      put.addColumn(DROPPED_CF, QUALIFIER, VALUE);
177      table.put(put);
178    }
179
180    // Only delete cf from peer cluster
181    deleteCf(admin2);
182
183    admin1.enableReplicationPeer(PEER_ID);
184
185    // the source table's cf still exists, replication should be stalled
186    verifyReplicationStuck();
187    deleteCf(admin1);
188    // now the source table's cf is gone, replication should proceed, the
189    // offending edits be dropped
190    verifyReplicationProceeded();
191  }
192
193  private void verifyReplicationProceeded() throws Exception {
194    try (Table table = utility1.getConnection().getTable(TABLE)) {
195      Put put = new Put(ROW);
196      put.addColumn(NORMAL_CF, QUALIFIER, VALUE);
197      table.put(put);
198    }
199    utility2.waitFor(NB_RETRIES * SLEEP_TIME, (Predicate<Exception>) () -> {
200      try (Table peerTable = utility2.getConnection().getTable(TABLE)) {
201        Result result = peerTable.get(new Get(ROW).addColumn(NORMAL_CF, QUALIFIER));
202        return result != null && !result.isEmpty()
203            && Bytes.equals(VALUE, result.getValue(NORMAL_CF, QUALIFIER));
204      }
205    });
206  }
207
208  private void verifyReplicationStuck() throws Exception {
209    try (Table table = utility1.getConnection().getTable(TABLE)) {
210      Put put = new Put(ROW);
211      put.addColumn(NORMAL_CF, QUALIFIER, VALUE);
212      table.put(put);
213    }
214    try (Table peerTable = utility2.getConnection().getTable(TABLE)) {
215      for (int i = 0; i < NB_RETRIES; i++) {
216        Result result = peerTable.get(new Get(ROW).addColumn(NORMAL_CF, QUALIFIER));
217        if (result != null && !result.isEmpty()) {
218          fail("Edit should have been stuck behind dropped tables, but value is " + Bytes
219              .toString(result.getValue(NORMAL_CF, QUALIFIER)));
220        } else {
221          LOG.info("Row not replicated, let's wait a bit more...");
222          Thread.sleep(SLEEP_TIME);
223        }
224      }
225    }
226  }
227
228  private TableDescriptor createTableDescriptor(byte[]... cfs) {
229    return TableDescriptorBuilder.newBuilder(TABLE)
230        .setColumnFamilies(Arrays.stream(cfs).map(cf ->
231            ColumnFamilyDescriptorBuilder.newBuilder(cf).setScope(REPLICATION_SCOPE_GLOBAL).build())
232            .collect(Collectors.toList())
233        ).build();
234  }
235
236  private void deleteCf(Admin admin) throws IOException {
237    TableDescriptor desc = createTableDescriptor(NORMAL_CF);
238    admin.modifyTable(desc);
239  }
240}