001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.Collection;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.concurrent.CompletionException;
030import java.util.concurrent.ForkJoinPool;
031import java.util.regex.Pattern;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.TableNotFoundException;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.After;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.runner.RunWith;
049import org.junit.runners.Parameterized;
050
051/**
052 * Class to test asynchronous replication admin operations when more than 1 cluster
053 */
054@RunWith(Parameterized.class)
055@Category({LargeTests.class, ClientTests.class})
056public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestAsyncReplicationAdminApiWithClusters.class);
061
062  private final static String ID_SECOND = "2";
063
064  private static HBaseTestingUtility TEST_UTIL2;
065  private static Configuration conf2;
066  private static AsyncAdmin admin2;
067
068  @BeforeClass
069  public static void setUpBeforeClass() throws Exception {
070    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
071    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
072    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
073    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
074    TEST_UTIL.startMiniCluster();
075    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
076
077    conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
078    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
079    TEST_UTIL2 = new HBaseTestingUtility(conf2);
080    TEST_UTIL2.startMiniCluster();
081    admin2 =
082        ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();
083
084    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
085    rpc.setClusterKey(TEST_UTIL2.getClusterKey());
086    ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
087  }
088
089  @Override
090  @After
091  public void tearDown() throws Exception {
092    Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*");
093    cleanupTables(admin, pattern);
094    cleanupTables(admin2, pattern);
095  }
096
097  private void cleanupTables(AsyncAdmin admin, Pattern pattern) {
098    admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> {
099      if (tables != null) {
100        tables.forEach(table -> {
101          try {
102            admin.disableTable(table).join();
103          } catch (Exception e) {
104            LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
105          }
106          admin.deleteTable(table).join();
107        });
108      }
109    }, ForkJoinPool.commonPool()).join();
110  }
111
112  private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) {
113    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
114    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
115    admin.createTable(builder.build()).join();
116  }
117
118  @Test
119  public void testEnableAndDisableTableReplication() throws Exception {
120    // default replication scope is local
121    createTableWithDefaultConf(tableName);
122    admin.enableTableReplication(tableName).join();
123    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
124    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
125      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
126    }
127
128    admin.disableTableReplication(tableName).join();
129    tableDesc = admin.getDescriptor(tableName).get();
130    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
131      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
132    }
133  }
134
135  @Test
136  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
137    // Only create table in source cluster
138    createTableWithDefaultConf(tableName);
139    assertFalse(admin2.tableExists(tableName).get());
140    admin.enableTableReplication(tableName).join();
141    assertTrue(admin2.tableExists(tableName).get());
142  }
143
144  @Test
145  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
146    createTableWithDefaultConf(admin, tableName);
147    createTableWithDefaultConf(admin2, tableName);
148    TableDescriptorBuilder builder =
149        TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName).get());
150    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily"))
151        .build());
152    admin2.disableTable(tableName).join();
153    admin2.modifyTable(builder.build()).join();
154    admin2.enableTable(tableName).join();
155
156    try {
157      admin.enableTableReplication(tableName).join();
158      fail("Exception should be thrown if table descriptors in the clusters are not same.");
159    } catch (Exception ignored) {
160      // ok
161    }
162
163    admin.disableTable(tableName).join();
164    admin.modifyTable(builder.build()).join();
165    admin.enableTable(tableName).join();
166    admin.enableTableReplication(tableName).join();
167    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
168    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
169      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
170    }
171  }
172
173  @Test
174  public void testDisableReplicationForNonExistingTable() throws Exception {
175    try {
176      admin.disableTableReplication(tableName).join();
177    } catch (CompletionException e) {
178      assertTrue(e.getCause() instanceof TableNotFoundException);
179    }
180  }
181
182  @Test
183  public void testEnableReplicationForNonExistingTable() throws Exception {
184    try {
185      admin.enableTableReplication(tableName).join();
186    } catch (CompletionException e) {
187      assertTrue(e.getCause() instanceof TableNotFoundException);
188    }
189  }
190
191  @Test
192  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
193    try {
194      admin.disableTableReplication(null).join();
195    } catch (CompletionException e) {
196      assertTrue(e.getCause() instanceof IllegalArgumentException);
197    }
198  }
199
200  @Test
201  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
202    try {
203      admin.enableTableReplication(null).join();
204    } catch (CompletionException e) {
205      assertTrue(e.getCause() instanceof IllegalArgumentException);
206    }
207  }
208
209  /*
210   * Test enable table replication should create table only in user explicit specified table-cfs.
211   * HBASE-14717
212   */
213  @Test
214  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
215    TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
216    // Only create table in source cluster
217    createTableWithDefaultConf(tableName);
218    createTableWithDefaultConf(tableName2);
219    assertFalse("Table should not exists in the peer cluster",
220      admin2.tableExists(tableName).get());
221    assertFalse("Table should not exists in the peer cluster",
222      admin2.tableExists(tableName2).get());
223
224    Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
225    tableCfs.put(tableName, null);
226    ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get();
227    rpc.setReplicateAllUserTables(false);
228    rpc.setTableCFsMap(tableCfs);
229    try {
230      // Only add tableName to replication peer config
231      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
232      admin.enableTableReplication(tableName2).join();
233      assertFalse("Table should not be created if user has set table cfs explicitly for the "
234          + "peer and this is not part of that collection", admin2.tableExists(tableName2).get());
235
236      // Add tableName2 to replication peer config, too
237      tableCfs.put(tableName2, null);
238      rpc.setTableCFsMap(tableCfs);
239      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
240      admin.enableTableReplication(tableName2).join();
241      assertTrue(
242        "Table should be created if user has explicitly added table into table cfs collection",
243        admin2.tableExists(tableName2).get());
244    } finally {
245      rpc.setTableCFsMap(null);
246      rpc.setReplicateAllUserTables(true);
247      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
248    }
249  }
250}