001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication;
020
021import java.util.List;
022import java.util.Map;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.CellUtil;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.hbase.wal.WAL.Entry;
028import org.apache.hadoop.hbase.wal.WALEdit;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
033 * exclude namespaces config, and exclude table-cfs config.
034 *
035 * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But
036 * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster.
037 * Note: set a exclude namespace means that all tables in this namespace can't be replicated.
038 *
039 * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
040 * But you can set namespaces or table-cfs which will be replicated to peer cluster.
041 * Note: set a namespace means that all tables in this namespace will be replicated.
042 */
043@InterfaceAudience.Private
044public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
045
046  private final ReplicationPeer peer;
047  private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
048
049  public NamespaceTableCfWALEntryFilter(ReplicationPeer peer) {
050    this.peer = peer;
051  }
052
053  @Override
054  public Entry filter(Entry entry) {
055    if (this.peer.getPeerConfig().needToReplicate(entry.getKey().getTableName())) {
056      return entry;
057    } else {
058      return null;
059    }
060  }
061
062  @Override
063  public Cell filterCell(final Entry entry, Cell cell) {
064    ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
065    if (peerConfig.replicateAllUserTables()) {
066      // replicate all user tables, but filter by exclude table-cfs config
067      final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap();
068      if (excludeTableCfs == null) {
069        return cell;
070      }
071
072      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
073        cell = bulkLoadFilter.filterCell(cell,
074          fam -> filterByExcludeTableCfs(entry.getKey().getTableName(), Bytes.toString(fam),
075            excludeTableCfs));
076      } else {
077        if (filterByExcludeTableCfs(entry.getKey().getTableName(),
078          Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
079          excludeTableCfs)) {
080          return null;
081        }
082      }
083
084      return cell;
085    } else {
086      // not replicate all user tables, so filter by table-cfs config
087      final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
088      if (tableCfs == null) {
089        return cell;
090      }
091
092      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
093        cell = bulkLoadFilter.filterCell(cell,
094          fam -> filterByTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), tableCfs));
095      } else {
096        if (filterByTableCfs(entry.getKey().getTableName(),
097          Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
098          tableCfs)) {
099          return null;
100        }
101      }
102
103      return cell;
104    }
105  }
106
107  private boolean filterByExcludeTableCfs(TableName tableName, String family,
108      Map<TableName, List<String>> excludeTableCfs) {
109    List<String> excludeCfs = excludeTableCfs.get(tableName);
110    if (excludeCfs != null) {
111      // empty cfs means all cfs of this table are excluded
112      if (excludeCfs.isEmpty()) {
113        return true;
114      }
115      // ignore(remove) kv if its cf is in the exclude cfs list
116      if (excludeCfs.contains(family)) {
117        return true;
118      }
119    }
120    return false;
121  }
122
123  private boolean filterByTableCfs(TableName tableName, String family,
124      Map<TableName, List<String>> tableCfs) {
125    List<String> cfs = tableCfs.get(tableName);
126    // ignore(remove) kv if its cf isn't in the replicable cf list
127    // (empty cfs means all cfs of this table are replicable)
128    if (cfs != null && !cfs.contains(family)) {
129      return true;
130    }
131    return false;
132  }
133}