001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.CompletionException;
031import java.util.concurrent.ForkJoinPool;
032import java.util.regex.Pattern;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotFoundException;
040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
041import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.After;
046import org.junit.AfterClass;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053
054/**
055 * Class to test asynchronous replication admin operations when more than 1 cluster
056 */
057@RunWith(Parameterized.class)
058@Category({LargeTests.class, ClientTests.class})
059public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestAsyncReplicationAdminApiWithClusters.class);
064
065  private final static String ID_SECOND = "2";
066
067  private static HBaseTestingUtility TEST_UTIL2;
068  private static Configuration conf2;
069  private static AsyncAdmin admin2;
070  private static AsyncConnection connection;
071
072  @BeforeClass
073  public static void setUpBeforeClass() throws Exception {
074    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
075    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
076    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
077    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
078    TEST_UTIL.startMiniCluster();
079    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
080
081    conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
082    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
083    TEST_UTIL2 = new HBaseTestingUtility(conf2);
084    TEST_UTIL2.startMiniCluster();
085
086    connection =
087      ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get();
088    admin2 = connection.getAdmin();
089
090    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
091      .setClusterKey(TEST_UTIL2.getClusterKey()).build();
092    ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
093  }
094
095  @AfterClass
096  public static void clearUp() throws IOException {
097    connection.close();
098  }
099
100  @Override
101  @After
102  public void tearDown() throws Exception {
103    Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*");
104    cleanupTables(admin, pattern);
105    cleanupTables(admin2, pattern);
106  }
107
108  private void cleanupTables(AsyncAdmin admin, Pattern pattern) {
109    admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> {
110      if (tables != null) {
111        tables.forEach(table -> {
112          try {
113            admin.disableTable(table).join();
114          } catch (Exception e) {
115            LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
116          }
117          admin.deleteTable(table).join();
118        });
119      }
120    }, ForkJoinPool.commonPool()).join();
121  }
122
123  private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) {
124    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
125    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
126    admin.createTable(builder.build()).join();
127  }
128
129  @Test
130  public void testEnableAndDisableTableReplication() throws Exception {
131    // default replication scope is local
132    createTableWithDefaultConf(tableName);
133    admin.enableTableReplication(tableName).join();
134    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
135    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
136      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
137    }
138
139    admin.disableTableReplication(tableName).join();
140    tableDesc = admin.getDescriptor(tableName).get();
141    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
142      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
143    }
144  }
145
146  @Test
147  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
148    // Only create table in source cluster
149    createTableWithDefaultConf(tableName);
150    assertFalse(admin2.tableExists(tableName).get());
151    admin.enableTableReplication(tableName).join();
152    assertTrue(admin2.tableExists(tableName).get());
153  }
154
155  @Test
156  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
157    createTableWithDefaultConf(admin, tableName);
158    createTableWithDefaultConf(admin2, tableName);
159    TableDescriptorBuilder builder =
160        TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName).get());
161    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily"))
162        .build());
163    admin2.disableTable(tableName).join();
164    admin2.modifyTable(builder.build()).join();
165    admin2.enableTable(tableName).join();
166
167    try {
168      admin.enableTableReplication(tableName).join();
169      fail("Exception should be thrown if table descriptors in the clusters are not same.");
170    } catch (Exception ignored) {
171      // ok
172    }
173
174    admin.disableTable(tableName).join();
175    admin.modifyTable(builder.build()).join();
176    admin.enableTable(tableName).join();
177    admin.enableTableReplication(tableName).join();
178    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
179    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
180      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
181    }
182  }
183
184  @Test
185  public void testDisableReplicationForNonExistingTable() throws Exception {
186    try {
187      admin.disableTableReplication(tableName).join();
188    } catch (CompletionException e) {
189      assertTrue(e.getCause() instanceof TableNotFoundException);
190    }
191  }
192
193  @Test
194  public void testEnableReplicationForNonExistingTable() throws Exception {
195    try {
196      admin.enableTableReplication(tableName).join();
197    } catch (CompletionException e) {
198      assertTrue(e.getCause() instanceof TableNotFoundException);
199    }
200  }
201
202  @Test
203  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
204    try {
205      admin.disableTableReplication(null).join();
206    } catch (CompletionException e) {
207      assertTrue(e.getCause() instanceof IllegalArgumentException);
208    }
209  }
210
211  @Test
212  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
213    try {
214      admin.enableTableReplication(null).join();
215    } catch (CompletionException e) {
216      assertTrue(e.getCause() instanceof IllegalArgumentException);
217    }
218  }
219
220  /*
221   * Test enable table replication should create table only in user explicit specified table-cfs.
222   * HBASE-14717
223   */
224  @Test
225  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
226    TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
227    // Only create table in source cluster
228    createTableWithDefaultConf(tableName);
229    createTableWithDefaultConf(tableName2);
230    assertFalse("Table should not exists in the peer cluster",
231      admin2.tableExists(tableName).get());
232    assertFalse("Table should not exists in the peer cluster",
233      admin2.tableExists(tableName2).get());
234
235    Map<TableName, List<String>> tableCfs = new HashMap<>();
236    tableCfs.put(tableName, null);
237    ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig
238      .newBuilder(admin.getReplicationPeerConfig(ID_SECOND).get())
239      .setReplicateAllUserTables(false)
240      .setTableCFsMap(tableCfs);
241    try {
242      // Only add tableName to replication peer config
243      admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join();
244      admin.enableTableReplication(tableName2).join();
245      assertFalse("Table should not be created if user has set table cfs explicitly for the "
246          + "peer and this is not part of that collection", admin2.tableExists(tableName2).get());
247
248      // Add tableName2 to replication peer config, too
249      tableCfs.put(tableName2, null);
250      rpcBuilder.setTableCFsMap(tableCfs);
251      admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join();
252      admin.enableTableReplication(tableName2).join();
253      assertTrue(
254        "Table should be created if user has explicitly added table into table cfs collection",
255        admin2.tableExists(tableName2).get());
256    } finally {
257      rpcBuilder.setTableCFsMap(null).setReplicateAllUserTables(true).build();
258      admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join();
259    }
260  }
261}