001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.util.List; 025import java.util.Map; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 033import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; 034import org.apache.hadoop.hbase.testclassification.ReplicationTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.zookeeper.ZKUtil; 038import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 039import org.junit.AfterClass; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Rule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.junit.rules.TestName; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ ReplicationTests.class, SmallTests.class }) 050public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestTableCFsUpdater.class); 055 056 private static final Logger LOG = LoggerFactory.getLogger(TestTableCFsUpdater.class); 057 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 058 059 private static ZKWatcher zkw = null; 060 private static Abortable abortable = null; 061 private static ZKStorageUtil zkStorageUtil = null; 062 063 private static class ZKStorageUtil extends ZKReplicationPeerStorage { 064 public ZKStorageUtil(ZKWatcher zookeeper, Configuration conf) { 065 super(zookeeper, conf); 066 } 067 } 068 069 @Rule 070 public TestName name = new TestName(); 071 072 public TestTableCFsUpdater() { 073 super(zkw, TEST_UTIL.getConfiguration()); 074 } 075 076 @BeforeClass 077 public static void setUpBeforeClass() throws Exception { 078 TEST_UTIL.startMiniZKCluster(); 079 Configuration conf = TEST_UTIL.getConfiguration(); 080 abortable = new Abortable() { 081 @Override 082 public void abort(String why, Throwable e) { 083 LOG.info(why, e); 084 } 085 086 @Override 087 public boolean isAborted() { 088 return false; 089 } 090 }; 091 zkw = new ZKWatcher(conf, "TableCFs", abortable, true); 092 zkStorageUtil = new ZKStorageUtil(zkw, conf); 093 } 094 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 TEST_UTIL.shutdownMiniZKCluster(); 098 } 099 100 @Test 101 public void testUpgrade() throws Exception { 102 String peerId = "1"; 103 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 104 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 105 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 106 107 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 108 rpc.setClusterKey(zkw.getQuorum()); 109 String peerNode = zkStorageUtil.getPeerNode(peerId); 110 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 111 112 String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3; 113 String tableCFsNode = getTableCFsNode(peerId); 114 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 115 ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); 116 117 ReplicationPeerConfig actualRpc = 118 ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 119 String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 120 121 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 122 assertNull(actualRpc.getTableCFsMap()); 123 assertEquals(tableCFs, actualTableCfs); 124 125 peerId = "2"; 126 rpc = new ReplicationPeerConfig(); 127 rpc.setClusterKey(zkw.getQuorum()); 128 peerNode = zkStorageUtil.getPeerNode(peerId); 129 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 130 131 tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2"; 132 tableCFsNode = getTableCFsNode(peerId); 133 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 134 ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); 135 136 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 137 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 138 139 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 140 assertNull(actualRpc.getTableCFsMap()); 141 assertEquals(tableCFs, actualTableCfs); 142 143 peerId = "3"; 144 rpc = new ReplicationPeerConfig(); 145 rpc.setClusterKey(zkw.getQuorum()); 146 peerNode = zkStorageUtil.getPeerNode(peerId); 147 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 148 149 tableCFs = ""; 150 tableCFsNode = getTableCFsNode(peerId); 151 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 152 ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); 153 154 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 155 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 156 157 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 158 assertNull(actualRpc.getTableCFsMap()); 159 assertEquals(tableCFs, actualTableCfs); 160 161 peerId = "4"; 162 rpc = new ReplicationPeerConfig(); 163 rpc.setClusterKey(zkw.getQuorum()); 164 peerNode = zkStorageUtil.getPeerNode(peerId); 165 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 166 167 tableCFsNode = getTableCFsNode(peerId); 168 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 169 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 170 171 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 172 assertNull(actualRpc.getTableCFsMap()); 173 assertNull(actualTableCfs); 174 175 copyTableCFs(); 176 177 peerId = "1"; 178 peerNode = zkStorageUtil.getPeerNode(peerId); 179 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 180 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 181 Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap(); 182 assertEquals(3, tableNameListMap.size()); 183 assertTrue(tableNameListMap.containsKey(tableName1)); 184 assertTrue(tableNameListMap.containsKey(tableName2)); 185 assertTrue(tableNameListMap.containsKey(tableName3)); 186 assertEquals(2, tableNameListMap.get(tableName1).size()); 187 assertEquals("cf1", tableNameListMap.get(tableName1).get(0)); 188 assertEquals("cf2", tableNameListMap.get(tableName1).get(1)); 189 assertEquals(1, tableNameListMap.get(tableName2).size()); 190 assertEquals("cf3", tableNameListMap.get(tableName2).get(0)); 191 assertNull(tableNameListMap.get(tableName3)); 192 193 peerId = "2"; 194 peerNode = zkStorageUtil.getPeerNode(peerId); 195 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 196 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 197 tableNameListMap = actualRpc.getTableCFsMap(); 198 assertEquals(2, tableNameListMap.size()); 199 assertTrue(tableNameListMap.containsKey(tableName1)); 200 assertTrue(tableNameListMap.containsKey(tableName2)); 201 assertEquals(2, tableNameListMap.get(tableName1).size()); 202 assertEquals("cf1", tableNameListMap.get(tableName1).get(0)); 203 assertEquals("cf3", tableNameListMap.get(tableName1).get(1)); 204 assertEquals(1, tableNameListMap.get(tableName2).size()); 205 assertEquals("cf2", tableNameListMap.get(tableName2).get(0)); 206 207 peerId = "3"; 208 peerNode = zkStorageUtil.getPeerNode(peerId); 209 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 210 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 211 tableNameListMap = actualRpc.getTableCFsMap(); 212 assertNull(tableNameListMap); 213 214 peerId = "4"; 215 peerNode = zkStorageUtil.getPeerNode(peerId); 216 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 217 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 218 tableNameListMap = actualRpc.getTableCFsMap(); 219 assertNull(tableNameListMap); 220 } 221}