001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.master; 020 021import java.io.IOException; 022import java.util.List; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Abortable; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.client.Admin; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 031import org.apache.hadoop.hbase.exceptions.DeserializationException; 032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 033import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; 034import org.apache.hadoop.hbase.zookeeper.ZKUtil; 035import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.yetus.audience.InterfaceStability; 038import org.apache.zookeeper.KeeperException; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 042 043/** 044 * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. 045 * It will be removed in HBase 3.x. See HBASE-11393 046 */ 047@InterfaceAudience.Private 048@InterfaceStability.Unstable 049public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { 050 051 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class); 052 053 public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, 054 Configuration conf, Abortable abortable) { 055 super(zookeeper, conf, abortable); 056 } 057 058 public void upgrade() throws Exception { 059 try (Connection conn = ConnectionFactory.createConnection(conf)) { 060 Admin admin = conn.getAdmin(); 061 admin.listReplicationPeers().forEach( 062 (peerDesc) -> { 063 String peerId = peerDesc.getPeerId(); 064 ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); 065 if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 066 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { 067 peerConfig.setReplicateAllUserTables(false); 068 try { 069 admin.updateReplicationPeerConfig(peerId, peerConfig); 070 } catch (Exception e) { 071 LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); 072 } 073 } 074 }); 075 } 076 } 077 078 public void copyTableCFs() { 079 List<String> znodes = null; 080 try { 081 znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); 082 } catch (KeeperException e) { 083 LOG.error("Failed to get peers znode", e); 084 } 085 if (znodes != null) { 086 for (String peerId : znodes) { 087 if (!copyTableCFs(peerId)) { 088 LOG.error("upgrade tableCFs failed for peerId=" + peerId); 089 } 090 } 091 } 092 } 093 094 public boolean copyTableCFs(String peerId) { 095 String tableCFsNode = getTableCFsNode(peerId); 096 try { 097 if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) { 098 String peerNode = getPeerNode(peerId); 099 ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode); 100 // We only need to copy data from tableCFs node to rpc Node the first time hmaster start. 101 if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) { 102 // we copy TableCFs node into PeerNode 103 LOG.info("Copy table ColumnFamilies into peer=" + peerId); 104 ReplicationProtos.TableCF[] tableCFs = 105 ReplicationPeerConfigUtil.parseTableCFs( 106 ZKUtil.getData(this.zookeeper, tableCFsNode)); 107 if (tableCFs != null && tableCFs.length > 0) { 108 rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs)); 109 ZKUtil.setData(this.zookeeper, peerNode, 110 ReplicationPeerConfigUtil.toByteArray(rpc)); 111 } 112 } else { 113 LOG.info("No tableCFs in peerNode:" + peerId); 114 } 115 } 116 } catch (KeeperException e) { 117 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 118 return false; 119 } catch (InterruptedException e) { 120 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 121 return false; 122 } catch (IOException e) { 123 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 124 return false; 125 } 126 return true; 127 } 128 129 private ReplicationPeerConfig getReplicationPeerConig(String peerNode) 130 throws KeeperException, InterruptedException { 131 byte[] data = null; 132 data = ZKUtil.getData(this.zookeeper, peerNode); 133 if (data == null) { 134 LOG.error("Could not get configuration for " + 135 "peer because it doesn't exist. peer=" + peerNode); 136 return null; 137 } 138 try { 139 return ReplicationPeerConfigUtil.parsePeerFrom(data); 140 } catch (DeserializationException e) { 141 LOG.warn("Failed to parse cluster key from peer=" + peerNode); 142 return null; 143 } 144 } 145 146 private static void printUsageAndExit() { 147 System.err.printf( 148 "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader" 149 + " [options]"); 150 System.err.println(" where [options] are:"); 151 System.err.println(" -h|-help Show this help and exit."); 152 System.err.println(" copyTableCFs Copy table-cfs to replication peer config"); 153 System.err.println(" upgrade Upgrade replication peer config to new format"); 154 System.err.println(); 155 System.exit(1); 156 } 157 158 public static void main(String[] args) throws Exception { 159 if (args.length != 1) { 160 printUsageAndExit(); 161 } 162 if (args[0].equals("-help") || args[0].equals("-h")) { 163 printUsageAndExit(); 164 } else if (args[0].equals("copyTableCFs")) { 165 Configuration conf = HBaseConfiguration.create(); 166 ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); 167 try { 168 ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw, 169 conf, null); 170 tableCFsUpdater.copyTableCFs(); 171 } finally { 172 zkw.close(); 173 } 174 } else if (args[0].equals("upgrade")) { 175 Configuration conf = HBaseConfiguration.create(); 176 ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); 177 ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null); 178 upgrader.upgrade(); 179 } else { 180 printUsageAndExit(); 181 } 182 } 183}