001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication; 020 021import java.util.List; 022import java.util.Map; 023import org.apache.hadoop.hbase.Cell; 024import org.apache.hadoop.hbase.CellUtil; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.util.Bytes; 027import org.apache.hadoop.hbase.wal.WAL.Entry; 028import org.apache.hadoop.hbase.wal.WALEdit; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config, 033 * exclude namespaces config, and exclude table-cfs config. 034 * 035 * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But 036 * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster. 037 * Note: set a exclude namespace means that all tables in this namespace can't be replicated. 038 * 039 * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. 040 * But you can set namespaces or table-cfs which will be replicated to peer cluster. 041 * Note: set a namespace means that all tables in this namespace will be replicated. 042 */ 043@InterfaceAudience.Private 044public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { 045 046 private final ReplicationPeer peer; 047 private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); 048 049 public NamespaceTableCfWALEntryFilter(ReplicationPeer peer) { 050 this.peer = peer; 051 } 052 053 @Override 054 public Entry filter(Entry entry) { 055 if (this.peer.getPeerConfig().needToReplicate(entry.getKey().getTableName())) { 056 return entry; 057 } else { 058 return null; 059 } 060 } 061 062 @Override 063 public Cell filterCell(final Entry entry, Cell cell) { 064 ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); 065 if (peerConfig.replicateAllUserTables()) { 066 // replicate all user tables, but filter by exclude table-cfs config 067 final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap(); 068 if (excludeTableCfs == null) { 069 return cell; 070 } 071 072 if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { 073 cell = bulkLoadFilter.filterCell(cell, 074 fam -> filterByExcludeTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), 075 excludeTableCfs)); 076 } else { 077 if (filterByExcludeTableCfs(entry.getKey().getTableName(), 078 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), 079 excludeTableCfs)) { 080 return null; 081 } 082 } 083 084 return cell; 085 } else { 086 // not replicate all user tables, so filter by table-cfs config 087 final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap(); 088 if (tableCfs == null) { 089 return cell; 090 } 091 092 if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { 093 cell = bulkLoadFilter.filterCell(cell, 094 fam -> filterByTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), tableCfs)); 095 } else { 096 if (filterByTableCfs(entry.getKey().getTableName(), 097 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), 098 tableCfs)) { 099 return null; 100 } 101 } 102 103 return cell; 104 } 105 } 106 107 private boolean filterByExcludeTableCfs(TableName tableName, String family, 108 Map<TableName, List<String>> excludeTableCfs) { 109 List<String> excludeCfs = excludeTableCfs.get(tableName); 110 if (excludeCfs != null) { 111 // empty cfs means all cfs of this table are excluded 112 if (excludeCfs.isEmpty()) { 113 return true; 114 } 115 // ignore(remove) kv if its cf is in the exclude cfs list 116 if (excludeCfs.contains(family)) { 117 return true; 118 } 119 } 120 return false; 121 } 122 123 private boolean filterByTableCfs(TableName tableName, String family, 124 Map<TableName, List<String>> tableCfs) { 125 List<String> cfs = tableCfs.get(tableName); 126 // ignore(remove) kv if its cf isn't in the replicable cf list 127 // (empty cfs means all cfs of this table are replicable) 128 if (cfs != null && !cfs.contains(family)) { 129 return true; 130 } 131 return false; 132 } 133}