001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.HBaseInterfaceAudience;
025import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
026import org.apache.hadoop.hbase.wal.WAL.Entry;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * A {@link WALEntryFilter} which contains multiple filters and applies them in chain order
031 */
032@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
033public class ChainWALEntryFilter implements WALEntryFilter {
034
035  private final WALEntryFilter[] filters;
036  private WALCellFilter[] cellFilters;
037
038  public ChainWALEntryFilter(WALEntryFilter... filters) {
039    this.filters = filters;
040    initCellFilters();
041  }
042
043  public ChainWALEntryFilter(List<WALEntryFilter> filters) {
044    ArrayList<WALEntryFilter> rawFilters = new ArrayList<>(filters.size());
045    // flatten the chains
046    for (WALEntryFilter filter : filters) {
047      if (filter instanceof ChainWALEntryFilter) {
048        Collections.addAll(rawFilters, ((ChainWALEntryFilter) filter).filters);
049      } else {
050        rawFilters.add(filter);
051      }
052    }
053    this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]);
054    initCellFilters();
055  }
056
057  @Override
058  public void setSerial(boolean serial) {
059    for (WALEntryFilter filter : filters) {
060      filter.setSerial(serial);
061    }
062  }
063
064  public void initCellFilters() {
065    ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
066    for (WALEntryFilter filter : filters) {
067      if (filter instanceof WALCellFilter) {
068        cellFilters.add((WALCellFilter) filter);
069      }
070    }
071    this.cellFilters = cellFilters.toArray(new WALCellFilter[cellFilters.size()]);
072  }
073
074  @Override
075  public Entry filter(Entry entry) {
076    entry = filterEntry(entry);
077    if (entry == null) {
078      return null;
079    }
080
081    filterCells(entry);
082    return entry;
083  }
084
085  protected Entry filterEntry(Entry entry) {
086    for (WALEntryFilter filter : filters) {
087      if (entry == null) {
088        return null;
089      }
090      entry = filter.filter(entry);
091    }
092    return entry;
093  }
094
095  protected void filterCells(Entry entry) {
096    if (entry == null || cellFilters.length == 0) {
097      return;
098    }
099    WALUtil.filterCells(entry.getEdit(), c -> filterCell(entry, c));
100  }
101
102  private Cell filterCell(Entry entry, Cell cell) {
103    for (WALCellFilter filter : cellFilters) {
104      cell = filter.filterCell(entry, cell);
105      if (cell == null) {
106        break;
107      }
108    }
109    return cell;
110  }
111}