001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HBaseTestingUtil;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.hadoop.hbase.client.Admin;
026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.client.TableDescriptor;
031import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
032import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.util.ToolRunner;
035import org.junit.After;
036import org.junit.Before;
037
038import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
039
040public abstract class TestReplicationSyncUpToolBase {
041
042  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
043  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
044
045  protected static final TableName TN1 = TableName.valueOf("t1_syncup");
046  protected static final TableName TN2 = TableName.valueOf("t2_syncup");
047
048  protected static final byte[] FAMILY = Bytes.toBytes("cf1");
049  protected static final byte[] QUALIFIER = Bytes.toBytes("q1");
050
051  protected static final byte[] NO_REP_FAMILY = Bytes.toBytes("norep");
052
053  protected TableDescriptor t1SyncupSource;
054  protected TableDescriptor t1SyncupTarget;
055  protected TableDescriptor t2SyncupSource;
056  protected TableDescriptor t2SyncupTarget;
057
058  protected Connection conn1;
059  protected Connection conn2;
060
061  protected Table ht1Source;
062  protected Table ht2Source;
063  protected Table ht1TargetAtPeer1;
064  protected Table ht2TargetAtPeer1;
065
066  protected void customizeClusterConf(Configuration conf) {
067  }
068
069  @Before
070  public void setUp() throws Exception {
071    customizeClusterConf(UTIL1.getConfiguration());
072    customizeClusterConf(UTIL2.getConfiguration());
073    TestReplicationBase.configureClusters(UTIL1, UTIL2);
074    UTIL1.startMiniZKCluster();
075    UTIL2.setZkCluster(UTIL1.getZkCluster());
076
077    UTIL1.startMiniCluster(2);
078    UTIL2.startMiniCluster(4);
079
080    t1SyncupSource = TableDescriptorBuilder.newBuilder(TN1)
081      .setColumnFamily(
082        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
083      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
084
085    t1SyncupTarget = TableDescriptorBuilder.newBuilder(TN1)
086      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
087      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
088
089    t2SyncupSource = TableDescriptorBuilder.newBuilder(TN2)
090      .setColumnFamily(
091        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
092      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
093
094    t2SyncupTarget = TableDescriptorBuilder.newBuilder(TN2)
095      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
096      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
097  }
098
099  @After
100  public void tearDown() throws Exception {
101    Closeables.close(ht1Source, true);
102    Closeables.close(ht2Source, true);
103    Closeables.close(ht1TargetAtPeer1, true);
104    Closeables.close(ht2TargetAtPeer1, true);
105    Closeables.close(conn1, true);
106    Closeables.close(conn2, true);
107    UTIL2.shutdownMiniCluster();
108    UTIL1.shutdownMiniCluster();
109  }
110
111  final void setupReplication() throws Exception {
112    Admin admin1 = UTIL1.getAdmin();
113    admin1.createTable(t1SyncupSource);
114    admin1.createTable(t2SyncupSource);
115
116    Admin admin2 = UTIL2.getAdmin();
117    admin2.createTable(t1SyncupTarget);
118    admin2.createTable(t2SyncupTarget);
119
120    // Get HTable from Master
121    Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
122    ht1Source = conn1.getTable(TN1);
123    ht2Source = conn1.getTable(TN2);
124
125    // Get HTable from Peer1
126    Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
127    ht1TargetAtPeer1 = conn2.getTable(TN1);
128    ht2TargetAtPeer1 = conn2.getTable(TN2);
129
130    /**
131     * set M-S : Master: utility1 Slave1: utility2
132     */
133    ReplicationPeerConfig rpc =
134      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
135    admin1.addReplicationPeer("1", rpc);
136  }
137
138  final void syncUp(HBaseTestingUtil util) throws Exception {
139    syncUp(util, new String[0]);
140  }
141
142  final void syncUp(HBaseTestingUtil util, String[] args) throws Exception {
143    ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), args);
144  }
145
146  // Utilities that manager shutdown / restart of source / sink clusters. They take care of
147  // invalidating stale connections after shutdown / restarts.
148  final void shutDownSourceHBaseCluster() throws Exception {
149    Closeables.close(ht1Source, true);
150    Closeables.close(ht2Source, true);
151    UTIL1.shutdownMiniHBaseCluster();
152  }
153
154  final void shutDownTargetHBaseCluster() throws Exception {
155    Closeables.close(ht1TargetAtPeer1, true);
156    Closeables.close(ht2TargetAtPeer1, true);
157    UTIL2.shutdownMiniHBaseCluster();
158  }
159
160  final void restartSourceHBaseCluster(int numServers) throws Exception {
161    Closeables.close(ht1Source, true);
162    Closeables.close(ht2Source, true);
163    UTIL1.restartHBaseCluster(numServers);
164    ht1Source = UTIL1.getConnection().getTable(TN1);
165    ht2Source = UTIL1.getConnection().getTable(TN2);
166  }
167
168  final void restartTargetHBaseCluster(int numServers) throws Exception {
169    Closeables.close(ht1TargetAtPeer1, true);
170    Closeables.close(ht2TargetAtPeer1, true);
171    UTIL2.restartHBaseCluster(numServers);
172    ht1TargetAtPeer1 = UTIL2.getConnection().getTable(TN1);
173    ht2TargetAtPeer1 = UTIL2.getConnection().getTable(TN2);
174  }
175}