001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication; 020 021import java.util.List; 022import java.util.Map; 023import java.util.Set; 024 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.apache.hadoop.hbase.wal.WALEdit; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034 035/** 036 * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config, 037 * exclude namespaces config, and exclude table-cfs config. 038 * 039 * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But 040 * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster. 041 * Note: set a exclude namespace means that all tables in this namespace can't be replicated. 042 * 043 * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. 044 * But you can set namespaces or table-cfs which will be replicated to peer cluster. 045 * Note: set a namespace means that all tables in this namespace will be replicated. 046 */ 047@InterfaceAudience.Private 048public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { 049 050 private static final Logger LOG = LoggerFactory.getLogger(NamespaceTableCfWALEntryFilter.class); 051 private final ReplicationPeer peer; 052 private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); 053 054 public NamespaceTableCfWALEntryFilter(ReplicationPeer peer) { 055 this.peer = peer; 056 } 057 058 @Override 059 public Entry filter(Entry entry) { 060 TableName tabName = entry.getKey().getTableName(); 061 String namespace = tabName.getNamespaceAsString(); 062 ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); 063 064 if (peerConfig.replicateAllUserTables()) { 065 // replicate all user tables, but filter by exclude namespaces config 066 Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); 067 068 // return null(prevent replicating) if logKey's table is in this peer's 069 // exclude namespaces list 070 if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) { 071 return null; 072 } 073 074 return entry; 075 } else { 076 // Not replicate all user tables, so filter by namespaces and table-cfs config 077 Set<String> namespaces = peerConfig.getNamespaces(); 078 Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap(); 079 080 if (namespaces == null && tableCFs == null) { 081 return null; 082 } 083 084 // First filter by namespaces config 085 // If table's namespace in peer config, all the tables data are applicable for replication 086 if (namespaces != null && namespaces.contains(namespace)) { 087 return entry; 088 } 089 090 // Then filter by table-cfs config 091 // return null(prevent replicating) if logKey's table isn't in this peer's 092 // replicable tables list 093 if (tableCFs == null || !tableCFs.containsKey(tabName)) { 094 return null; 095 } 096 097 return entry; 098 } 099 } 100 101 @Override 102 public Cell filterCell(final Entry entry, Cell cell) { 103 ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); 104 if (peerConfig.replicateAllUserTables()) { 105 // replicate all user tables, but filter by exclude table-cfs config 106 final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap(); 107 if (excludeTableCfs == null) { 108 return cell; 109 } 110 111 if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { 112 cell = bulkLoadFilter.filterCell(cell, 113 fam -> filterByExcludeTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), 114 excludeTableCfs)); 115 } else { 116 if (filterByExcludeTableCfs(entry.getKey().getTableName(), 117 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), 118 excludeTableCfs)) { 119 return null; 120 } 121 } 122 123 return cell; 124 } else { 125 // not replicate all user tables, so filter by table-cfs config 126 final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap(); 127 if (tableCfs == null) { 128 return cell; 129 } 130 131 if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { 132 cell = bulkLoadFilter.filterCell(cell, 133 fam -> filterByTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), tableCfs)); 134 } else { 135 if (filterByTableCfs(entry.getKey().getTableName(), 136 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), 137 tableCfs)) { 138 return null; 139 } 140 } 141 142 return cell; 143 } 144 } 145 146 private boolean filterByExcludeTableCfs(TableName tableName, String family, 147 Map<TableName, List<String>> excludeTableCfs) { 148 List<String> excludeCfs = excludeTableCfs.get(tableName); 149 if (excludeCfs != null) { 150 // empty cfs means all cfs of this table are excluded 151 if (excludeCfs.isEmpty()) { 152 return true; 153 } 154 // ignore(remove) kv if its cf is in the exclude cfs list 155 if (excludeCfs.contains(family)) { 156 return true; 157 } 158 } 159 return false; 160 } 161 162 private boolean filterByTableCfs(TableName tableName, String family, 163 Map<TableName, List<String>> tableCfs) { 164 List<String> cfs = tableCfs.get(tableName); 165 // ignore(remove) kv if its cf isn't in the replicable cf list 166 // (empty cfs means all cfs of this table are replicable) 167 if (cfs != null && !cfs.contains(family)) { 168 return true; 169 } 170 return false; 171 } 172}