001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication;
020
021import java.util.List;
022import java.util.Map;
023import java.util.Set;
024
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hadoop.hbase.wal.WALEdit;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034
035/**
036 * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
037 * exclude namespaces config, and exclude table-cfs config.
038 *
039 * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But
040 * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster.
041 * Note: set a exclude namespace means that all tables in this namespace can't be replicated.
042 *
043 * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
044 * But you can set namespaces or table-cfs which will be replicated to peer cluster.
045 * Note: set a namespace means that all tables in this namespace will be replicated.
046 */
047@InterfaceAudience.Private
048public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
049
050  private static final Logger LOG = LoggerFactory.getLogger(NamespaceTableCfWALEntryFilter.class);
051  private final ReplicationPeer peer;
052  private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
053
054  public NamespaceTableCfWALEntryFilter(ReplicationPeer peer) {
055    this.peer = peer;
056  }
057
058  @Override
059  public Entry filter(Entry entry) {
060    TableName tabName = entry.getKey().getTableName();
061    String namespace = tabName.getNamespaceAsString();
062    ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
063
064    if (peerConfig.replicateAllUserTables()) {
065      // replicate all user tables, but filter by exclude namespaces config
066      Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
067
068      // return null(prevent replicating) if logKey's table is in this peer's
069      // exclude namespaces list
070      if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
071        return null;
072      }
073
074      return entry;
075    } else {
076      // Not replicate all user tables, so filter by namespaces and table-cfs config
077      Set<String> namespaces = peerConfig.getNamespaces();
078      Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
079
080      if (namespaces == null && tableCFs == null) {
081        return null;
082      }
083
084      // First filter by namespaces config
085      // If table's namespace in peer config, all the tables data are applicable for replication
086      if (namespaces != null && namespaces.contains(namespace)) {
087        return entry;
088      }
089
090      // Then filter by table-cfs config
091      // return null(prevent replicating) if logKey's table isn't in this peer's
092      // replicable tables list
093      if (tableCFs == null || !tableCFs.containsKey(tabName)) {
094        return null;
095      }
096
097      return entry;
098    }
099  }
100
101  @Override
102  public Cell filterCell(final Entry entry, Cell cell) {
103    ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
104    if (peerConfig.replicateAllUserTables()) {
105      // replicate all user tables, but filter by exclude table-cfs config
106      final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap();
107      if (excludeTableCfs == null) {
108        return cell;
109      }
110
111      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
112        cell = bulkLoadFilter.filterCell(cell,
113          fam -> filterByExcludeTableCfs(entry.getKey().getTableName(), Bytes.toString(fam),
114            excludeTableCfs));
115      } else {
116        if (filterByExcludeTableCfs(entry.getKey().getTableName(),
117          Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
118          excludeTableCfs)) {
119          return null;
120        }
121      }
122
123      return cell;
124    } else {
125      // not replicate all user tables, so filter by table-cfs config
126      final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
127      if (tableCfs == null) {
128        return cell;
129      }
130
131      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
132        cell = bulkLoadFilter.filterCell(cell,
133          fam -> filterByTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), tableCfs));
134      } else {
135        if (filterByTableCfs(entry.getKey().getTableName(),
136          Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
137          tableCfs)) {
138          return null;
139        }
140      }
141
142      return cell;
143    }
144  }
145
146  private boolean filterByExcludeTableCfs(TableName tableName, String family,
147      Map<TableName, List<String>> excludeTableCfs) {
148    List<String> excludeCfs = excludeTableCfs.get(tableName);
149    if (excludeCfs != null) {
150      // empty cfs means all cfs of this table are excluded
151      if (excludeCfs.isEmpty()) {
152        return true;
153      }
154      // ignore(remove) kv if its cf is in the exclude cfs list
155      if (excludeCfs.contains(family)) {
156        return true;
157      }
158    }
159    return false;
160  }
161
162  private boolean filterByTableCfs(TableName tableName, String family,
163      Map<TableName, List<String>> tableCfs) {
164    List<String> cfs = tableCfs.get(tableName);
165    // ignore(remove) kv if its cf isn't in the replicable cf list
166    // (empty cfs means all cfs of this table are replicable)
167    if (cfs != null && !cfs.contains(family)) {
168      return true;
169    }
170    return false;
171  }
172}