View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.Map;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.hbase.wal.WAL.Entry;
30  import org.apache.hadoop.hbase.util.Bytes;
31  
32  public class TableCfWALEntryFilter implements WALEntryFilter {
33  
34    private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
35    private final ReplicationPeer peer;
36  
37    public TableCfWALEntryFilter(ReplicationPeer peer) {
38      this.peer = peer;
39    }
40  
41    @Override
42    public Entry filter(Entry entry) {
43      TableName tabName = entry.getKey().getTablename();
44      ArrayList<Cell> cells = entry.getEdit().getCells();
45      Map<TableName, List<String>> tableCFs = null;
46  
47      try {
48        tableCFs = this.peer.getTableCFs();
49      } catch (IllegalArgumentException e) {
50        LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
51            ", degenerate as if it's not configured by keeping tableCFs==null");
52      }
53      int size = cells.size();
54  
55      // return null(prevent replicating) if logKey's table isn't in this peer's
56      // replicable table list (empty tableCFs means all table are replicable)
57      if (tableCFs != null && !tableCFs.containsKey(tabName)) {
58        return null;
59      } else {
60        List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
61        for (int i = size - 1; i >= 0; i--) {
62          Cell cell = cells.get(i);
63          // ignore(remove) kv if its cf isn't in the replicable cf list
64          // (empty cfs means all cfs of this table are replicable)
65          if ((cfs != null && !cfs.contains(Bytes.toString(cell.getFamily())))) {
66            cells.remove(i);
67          }
68        }
69      }
70      if (cells.size() < size/2) {
71        cells.trimToSize();
72      }
73      return entry;
74    }
75  
76  }