001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.Collection;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.concurrent.CompletionException;
031import java.util.concurrent.ForkJoinPool;
032import java.util.regex.Pattern;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotFoundException;
040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.After;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.runner.RunWith;
051import org.junit.runners.Parameterized;
052
053/**
054 * Class to test asynchronous replication admin operations when more than 1 cluster
055 */
056@RunWith(Parameterized.class)
057@Category({LargeTests.class, ClientTests.class})
058public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestAsyncReplicationAdminApiWithClusters.class);
063
064  private final static String ID_SECOND = "2";
065
066  private static HBaseTestingUtility TEST_UTIL2;
067  private static Configuration conf2;
068  private static AsyncAdmin admin2;
069  private static AsyncConnection connection;
070
071  @BeforeClass
072  public static void setUpBeforeClass() throws Exception {
073    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
074    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
075    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
076    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
077    TEST_UTIL.startMiniCluster();
078    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
079
080    conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
081    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
082    TEST_UTIL2 = new HBaseTestingUtility(conf2);
083    TEST_UTIL2.startMiniCluster();
084
085    connection =
086      ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get();
087    admin2 = connection.getAdmin();
088
089    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
090    rpc.setClusterKey(TEST_UTIL2.getClusterKey());
091    ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
092  }
093
094  @AfterClass
095  public static void clearUp() throws IOException {
096    connection.close();
097  }
098
099  @Override
100  @After
101  public void tearDown() throws Exception {
102    Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*");
103    cleanupTables(admin, pattern);
104    cleanupTables(admin2, pattern);
105  }
106
107  private void cleanupTables(AsyncAdmin admin, Pattern pattern) {
108    admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> {
109      if (tables != null) {
110        tables.forEach(table -> {
111          try {
112            admin.disableTable(table).join();
113          } catch (Exception e) {
114            LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
115          }
116          admin.deleteTable(table).join();
117        });
118      }
119    }, ForkJoinPool.commonPool()).join();
120  }
121
122  private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) {
123    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
124    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
125    admin.createTable(builder.build()).join();
126  }
127
128  @Test
129  public void testEnableAndDisableTableReplication() throws Exception {
130    // default replication scope is local
131    createTableWithDefaultConf(tableName);
132    admin.enableTableReplication(tableName).join();
133    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
134    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
135      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
136    }
137
138    admin.disableTableReplication(tableName).join();
139    tableDesc = admin.getDescriptor(tableName).get();
140    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
141      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
142    }
143  }
144
145  @Test
146  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
147    // Only create table in source cluster
148    createTableWithDefaultConf(tableName);
149    assertFalse(admin2.tableExists(tableName).get());
150    admin.enableTableReplication(tableName).join();
151    assertTrue(admin2.tableExists(tableName).get());
152  }
153
154  @Test
155  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
156    createTableWithDefaultConf(admin, tableName);
157    createTableWithDefaultConf(admin2, tableName);
158    TableDescriptorBuilder builder =
159        TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName).get());
160    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily"))
161        .build());
162    admin2.disableTable(tableName).join();
163    admin2.modifyTable(builder.build()).join();
164    admin2.enableTable(tableName).join();
165
166    try {
167      admin.enableTableReplication(tableName).join();
168      fail("Exception should be thrown if table descriptors in the clusters are not same.");
169    } catch (Exception ignored) {
170      // ok
171    }
172
173    admin.disableTable(tableName).join();
174    admin.modifyTable(builder.build()).join();
175    admin.enableTable(tableName).join();
176    admin.enableTableReplication(tableName).join();
177    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
178    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
179      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
180    }
181  }
182
183  @Test
184  public void testDisableReplicationForNonExistingTable() throws Exception {
185    try {
186      admin.disableTableReplication(tableName).join();
187    } catch (CompletionException e) {
188      assertTrue(e.getCause() instanceof TableNotFoundException);
189    }
190  }
191
192  @Test
193  public void testEnableReplicationForNonExistingTable() throws Exception {
194    try {
195      admin.enableTableReplication(tableName).join();
196    } catch (CompletionException e) {
197      assertTrue(e.getCause() instanceof TableNotFoundException);
198    }
199  }
200
201  @Test
202  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
203    try {
204      admin.disableTableReplication(null).join();
205    } catch (CompletionException e) {
206      assertTrue(e.getCause() instanceof IllegalArgumentException);
207    }
208  }
209
210  @Test
211  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
212    try {
213      admin.enableTableReplication(null).join();
214    } catch (CompletionException e) {
215      assertTrue(e.getCause() instanceof IllegalArgumentException);
216    }
217  }
218
219  /*
220   * Test enable table replication should create table only in user explicit specified table-cfs.
221   * HBASE-14717
222   */
223  @Test
224  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
225    TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
226    // Only create table in source cluster
227    createTableWithDefaultConf(tableName);
228    createTableWithDefaultConf(tableName2);
229    assertFalse("Table should not exists in the peer cluster",
230      admin2.tableExists(tableName).get());
231    assertFalse("Table should not exists in the peer cluster",
232      admin2.tableExists(tableName2).get());
233
234    Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
235    tableCfs.put(tableName, null);
236    ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get();
237    rpc.setReplicateAllUserTables(false);
238    rpc.setTableCFsMap(tableCfs);
239    try {
240      // Only add tableName to replication peer config
241      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
242      admin.enableTableReplication(tableName2).join();
243      assertFalse("Table should not be created if user has set table cfs explicitly for the "
244          + "peer and this is not part of that collection", admin2.tableExists(tableName2).get());
245
246      // Add tableName2 to replication peer config, too
247      tableCfs.put(tableName2, null);
248      rpc.setTableCFsMap(tableCfs);
249      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
250      admin.enableTableReplication(tableName2).join();
251      assertTrue(
252        "Table should be created if user has explicitly added table into table cfs collection",
253        admin2.tableExists(tableName2).get());
254    } finally {
255      rpc.setTableCFsMap(null);
256      rpc.setReplicateAllUserTables(true);
257      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
258    }
259  }
260}