001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.Collection;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.concurrent.CompletionException;
031import java.util.concurrent.ForkJoinPool;
032import java.util.regex.Pattern;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotFoundException;
040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.After;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.runner.RunWith;
051import org.junit.runners.Parameterized;
052
053/**
054 * Class to test asynchronous replication admin operations when more than 1 cluster
055 */
056@RunWith(Parameterized.class)
057@Category({ LargeTests.class, ClientTests.class })
058public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApiWithClusters.class);
063
064  private final static String ID_SECOND = "2";
065
066  private static HBaseTestingUtility TEST_UTIL2;
067  private static Configuration conf2;
068  private static AsyncAdmin admin2;
069  private static AsyncConnection connection;
070
071  @BeforeClass
072  public static void setUpBeforeClass() throws Exception {
073    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
074    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
075    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
076    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
077    TEST_UTIL.startMiniCluster();
078    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
079
080    conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
081    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
082    TEST_UTIL2 = new HBaseTestingUtility(conf2);
083    TEST_UTIL2.startMiniCluster();
084
085    connection = ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get();
086    admin2 = connection.getAdmin();
087
088    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
089    rpc.setClusterKey(TEST_UTIL2.getClusterKey());
090    ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
091  }
092
093  @AfterClass
094  public static void clearUp() throws IOException {
095    connection.close();
096  }
097
098  @Override
099  @After
100  public void tearDown() throws Exception {
101    Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*");
102    cleanupTables(admin, pattern);
103    cleanupTables(admin2, pattern);
104  }
105
106  private void cleanupTables(AsyncAdmin admin, Pattern pattern) {
107    admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> {
108      if (tables != null) {
109        tables.forEach(table -> {
110          try {
111            admin.disableTable(table).join();
112          } catch (Exception e) {
113            LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
114          }
115          admin.deleteTable(table).join();
116        });
117      }
118    }, ForkJoinPool.commonPool()).join();
119  }
120
121  private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) {
122    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
123    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
124    admin.createTable(builder.build()).join();
125  }
126
127  @Test
128  public void testEnableAndDisableTableReplication() throws Exception {
129    // default replication scope is local
130    createTableWithDefaultConf(tableName);
131    admin.enableTableReplication(tableName).join();
132    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
133    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
134      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
135    }
136
137    admin.disableTableReplication(tableName).join();
138    tableDesc = admin.getDescriptor(tableName).get();
139    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
140      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
141    }
142  }
143
144  @Test
145  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
146    // Only create table in source cluster
147    createTableWithDefaultConf(tableName);
148    assertFalse(admin2.tableExists(tableName).get());
149    admin.enableTableReplication(tableName).join();
150    assertTrue(admin2.tableExists(tableName).get());
151  }
152
153  @Test
154  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
155    createTableWithDefaultConf(admin, tableName);
156    createTableWithDefaultConf(admin2, tableName);
157    TableDescriptorBuilder builder =
158      TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName).get());
159    builder.setColumnFamily(
160      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily")).build());
161    admin2.disableTable(tableName).join();
162    admin2.modifyTable(builder.build()).join();
163    admin2.enableTable(tableName).join();
164
165    try {
166      admin.enableTableReplication(tableName).join();
167      fail("Exception should be thrown if table descriptors in the clusters are not same.");
168    } catch (Exception ignored) {
169      // ok
170    }
171
172    admin.disableTable(tableName).join();
173    admin.modifyTable(builder.build()).join();
174    admin.enableTable(tableName).join();
175    admin.enableTableReplication(tableName).join();
176    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
177    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
178      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
179    }
180  }
181
182  @Test
183  public void testDisableReplicationForNonExistingTable() throws Exception {
184    try {
185      admin.disableTableReplication(tableName).join();
186    } catch (CompletionException e) {
187      assertTrue(e.getCause() instanceof TableNotFoundException);
188    }
189  }
190
191  @Test
192  public void testEnableReplicationForNonExistingTable() throws Exception {
193    try {
194      admin.enableTableReplication(tableName).join();
195    } catch (CompletionException e) {
196      assertTrue(e.getCause() instanceof TableNotFoundException);
197    }
198  }
199
200  @Test
201  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
202    try {
203      admin.disableTableReplication(null).join();
204    } catch (CompletionException e) {
205      assertTrue(e.getCause() instanceof IllegalArgumentException);
206    }
207  }
208
209  @Test
210  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
211    try {
212      admin.enableTableReplication(null).join();
213    } catch (CompletionException e) {
214      assertTrue(e.getCause() instanceof IllegalArgumentException);
215    }
216  }
217
218  /*
219   * Test enable table replication should create table only in user explicit specified table-cfs.
220   * HBASE-14717
221   */
222  @Test
223  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
224    TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
225    // Only create table in source cluster
226    createTableWithDefaultConf(tableName);
227    createTableWithDefaultConf(tableName2);
228    assertFalse("Table should not exists in the peer cluster", admin2.tableExists(tableName).get());
229    assertFalse("Table should not exists in the peer cluster",
230      admin2.tableExists(tableName2).get());
231
232    Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
233    tableCfs.put(tableName, null);
234    ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get();
235    rpc.setReplicateAllUserTables(false);
236    rpc.setTableCFsMap(tableCfs);
237    try {
238      // Only add tableName to replication peer config
239      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
240      admin.enableTableReplication(tableName2).join();
241      assertFalse("Table should not be created if user has set table cfs explicitly for the "
242        + "peer and this is not part of that collection", admin2.tableExists(tableName2).get());
243
244      // Add tableName2 to replication peer config, too
245      tableCfs.put(tableName2, null);
246      rpc.setTableCFsMap(tableCfs);
247      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
248      admin.enableTableReplication(tableName2).join();
249      assertTrue(
250        "Table should be created if user has explicitly added table into table cfs collection",
251        admin2.tableExists(tableName2).get());
252    } finally {
253      rpc.setTableCFsMap(null);
254      rpc.setReplicateAllUserTables(true);
255      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
256    }
257  }
258}