001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.UUID;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HColumnDescriptor;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HTableDescriptor;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.TableNotFoundException;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
044import org.apache.hadoop.hbase.replication.TestReplicationBase;
045import org.apache.hadoop.hbase.testclassification.ClientTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054
055/**
056 * Unit testing of ReplicationAdmin with clusters
057 */
058@Category({ MediumTests.class, ClientTests.class })
059public class TestReplicationAdminWithClusters extends TestReplicationBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestReplicationAdminWithClusters.class);
064
065  static Connection connection1;
066  static Connection connection2;
067  static Admin admin1;
068  static Admin admin2;
069  static ReplicationAdmin adminExt;
070
071  @Rule
072  public TestName name = new TestName();
073
074  @BeforeClass
075  public static void setUpBeforeClass() throws Exception {
076    TestReplicationBase.setUpBeforeClass();
077    connection1 = ConnectionFactory.createConnection(conf1);
078    connection2 = ConnectionFactory.createConnection(conf2);
079    admin1 = connection1.getAdmin();
080    admin2 = connection2.getAdmin();
081    adminExt = new ReplicationAdmin(conf1);
082  }
083
084  @AfterClass
085  public static void tearDownAfterClass() throws Exception {
086    admin1.close();
087    admin2.close();
088    adminExt.close();
089    connection1.close();
090    connection2.close();
091    TestReplicationBase.tearDownAfterClass();
092  }
093
094  @Test
095  public void disableNotFullReplication() throws Exception {
096    HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName));
097    HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily");
098    table.addFamily(f);
099    admin1.disableTable(tableName);
100    admin1.modifyTable(tableName, table);
101    admin1.enableTable(tableName);
102
103
104    admin1.disableTableReplication(tableName);
105    table = admin1.getTableDescriptor(tableName);
106    for (HColumnDescriptor fam : table.getColumnFamilies()) {
107      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
108    }
109  }
110
111  @Test
112  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
113    admin1.disableTableReplication(tableName);
114    admin2.disableTable(tableName);
115    admin2.deleteTable(tableName);
116    assertFalse(admin2.tableExists(tableName));
117    admin1.enableTableReplication(tableName);
118    assertTrue(admin2.tableExists(tableName));
119  }
120
121  @Test
122  public void testEnableReplicationWhenReplicationNotEnabled() throws Exception {
123    HTableDescriptor table = new HTableDescriptor(admin1.getTableDescriptor(tableName));
124    for (HColumnDescriptor fam : table.getColumnFamilies()) {
125      fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
126    }
127    admin1.disableTable(tableName);
128    admin1.modifyTable(tableName, table);
129    admin1.enableTable(tableName);
130
131    admin2.disableTable(tableName);
132    admin2.modifyTable(tableName, table);
133    admin2.enableTable(tableName);
134
135    admin1.enableTableReplication(tableName);
136    table = admin1.getTableDescriptor(tableName);
137    for (HColumnDescriptor fam : table.getColumnFamilies()) {
138      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
139    }
140  }
141
142  @Test
143  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
144    HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName));
145    HColumnDescriptor f = new HColumnDescriptor("newFamily");
146    table.addFamily(f);
147    admin2.disableTable(tableName);
148    admin2.modifyTable(tableName, table);
149    admin2.enableTable(tableName);
150
151    try {
152      admin1.enableTableReplication(tableName);
153      fail("Exception should be thrown if table descriptors in the clusters are not same.");
154    } catch (RuntimeException ignored) {
155
156    }
157    admin1.disableTable(tableName);
158    admin1.modifyTable(tableName, table);
159    admin1.enableTable(tableName);
160    admin1.enableTableReplication(tableName);
161    table = admin1.getTableDescriptor(tableName);
162    for (HColumnDescriptor fam : table.getColumnFamilies()) {
163      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
164    }
165  }
166
167  @Test
168  public void testDisableAndEnableReplication() throws Exception {
169    admin1.disableTableReplication(tableName);
170    HTableDescriptor table = admin1.getTableDescriptor(tableName);
171    for (HColumnDescriptor fam : table.getColumnFamilies()) {
172      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
173    }
174    admin1.enableTableReplication(tableName);
175    table = admin1.getTableDescriptor(tableName);
176    for (HColumnDescriptor fam : table.getColumnFamilies()) {
177      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
178    }
179  }
180
181  @Test
182  public void testEnableReplicationForTableWithRegionReplica() throws Exception {
183    TableName tn = TableName.valueOf(name.getMethodName());
184    TableDescriptor td = TableDescriptorBuilder.newBuilder(tn)
185        .setRegionReplication(5)
186        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).build())
187        .build();
188
189    admin1.createTable(td);
190
191    try {
192      admin1.enableTableReplication(tn);
193      td = admin1.getDescriptor(tn);
194      for (ColumnFamilyDescriptor fam : td.getColumnFamilies()) {
195        assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
196      }
197    } finally {
198      utility1.deleteTable(tn);
199      utility2.deleteTable(tn);
200    }
201  }
202
203  @Test(expected = TableNotFoundException.class)
204  public void testDisableReplicationForNonExistingTable() throws Exception {
205    admin1.disableTableReplication(TableName.valueOf(name.getMethodName()));
206  }
207
208  @Test(expected = TableNotFoundException.class)
209  public void testEnableReplicationForNonExistingTable() throws Exception {
210    admin1.enableTableReplication(TableName.valueOf(name.getMethodName()));
211  }
212
213  @Test(expected = IllegalArgumentException.class)
214  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
215    admin1.disableTableReplication(null);
216  }
217
218  @Test(expected = IllegalArgumentException.class)
219  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
220    admin1.enableTableReplication(null);
221  }
222
223  /*
224   * Test enable table replication should create table only in user explicit specified table-cfs.
225   * HBASE-14717
226   */
227  @Test
228  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
229    final TableName tableName = TableName.valueOf(name.getMethodName());
230    String peerId = "2";
231    if (admin2.isTableAvailable(TestReplicationBase.tableName)) {
232      admin2.disableTable(TestReplicationBase.tableName);
233      admin2.deleteTable(TestReplicationBase.tableName);
234    }
235    assertFalse("Table should not exists in the peer cluster",
236      admin2.isTableAvailable(TestReplicationBase.tableName));
237
238    // update peer config
239    ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
240    rpc.setReplicateAllUserTables(false);
241    admin1.updateReplicationPeerConfig(peerId, rpc);
242
243    Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
244    tableCfs.put(tableName, null);
245    try {
246      adminExt.setPeerTableCFs(peerId, tableCfs);
247      admin1.enableTableReplication(TestReplicationBase.tableName);
248      assertFalse("Table should not be created if user has set table cfs explicitly for the "
249          + "peer and this is not part of that collection",
250        admin2.isTableAvailable(TestReplicationBase.tableName));
251
252      tableCfs.put(TestReplicationBase.tableName, null);
253      adminExt.setPeerTableCFs(peerId, tableCfs);
254      admin1.enableTableReplication(TestReplicationBase.tableName);
255      assertTrue(
256        "Table should be created if user has explicitly added table into table cfs collection",
257        admin2.isTableAvailable(TestReplicationBase.tableName));
258    } finally {
259      adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
260      admin1.disableTableReplication(TestReplicationBase.tableName);
261
262      rpc = admin1.getReplicationPeerConfig(peerId);
263      rpc.setReplicateAllUserTables(true);
264      admin1.updateReplicationPeerConfig(peerId, rpc);
265    }
266  }
267
268  @Test
269  public void testReplicationPeerConfigUpdateCallback() throws Exception {
270    String peerId = "1";
271    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
272    rpc.setClusterKey(utility2.getClusterKey());
273    rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
274    rpc.getConfiguration().put("key1", "value1");
275
276    admin1.addReplicationPeer(peerId, rpc);
277
278    rpc.getConfiguration().put("key1", "value2");
279    admin.updatePeerConfig(peerId, rpc);
280    if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
281      synchronized(TestUpdatableReplicationEndpoint.class) {
282        TestUpdatableReplicationEndpoint.class.wait(2000L);
283      }
284    }
285
286    assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
287  }
288
289  public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint {
290    private static boolean calledBack = false;
291    public static boolean hasCalledBack(){
292      return calledBack;
293    }
294    @Override
295    public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){
296      calledBack = true;
297      notifyAll();
298    }
299
300    @Override
301    public void start() {
302      startAsync();
303    }
304
305    @Override
306    public void stop() {
307      stopAsync();
308    }
309
310    @Override
311    protected void doStart() {
312      notifyStarted();
313    }
314
315    @Override
316    protected void doStop() {
317      notifyStopped();
318    }
319
320
321    @Override
322    public UUID getPeerUUID() {
323      return UUID.randomUUID();
324    }
325
326    @Override
327    public boolean replicate(ReplicateContext replicateContext) {
328      return false;
329    }
330  }
331}