001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellBuilderType;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.ExtendedCellBuilder;
028import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
029import org.apache.hadoop.hbase.wal.WALEdit;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
038
039@InterfaceAudience.Private
040public class BulkLoadCellFilter {
041  private static final Logger LOG = LoggerFactory.getLogger(BulkLoadCellFilter.class);
042
043  private final ExtendedCellBuilder cellBuilder =
044    ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
045
046  /**
047   * Filters the bulk load cell using the supplied predicate.
048   * @param cell         The WAL cell to filter.
049   * @param famPredicate Returns true of given family should be removed.
050   * @return The filtered cell.
051   */
052  public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) {
053    byte[] fam;
054    BulkLoadDescriptor bld = null;
055    try {
056      bld = WALEdit.getBulkLoadDescriptor(cell);
057    } catch (IOException e) {
058      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
059      return cell;
060    }
061    List<StoreDescriptor> storesList = bld.getStoresList();
062    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
063    List<StoreDescriptor> copiedStoresList = new ArrayList<>(storesList);
064    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
065    boolean anyStoreRemoved = false;
066    while (copiedStoresListIterator.hasNext()) {
067      StoreDescriptor sd = copiedStoresListIterator.next();
068      fam = sd.getFamilyName().toByteArray();
069      if (famPredicate.apply(fam)) {
070        copiedStoresListIterator.remove();
071        anyStoreRemoved = true;
072      }
073    }
074
075    if (!anyStoreRemoved) {
076      return cell;
077    } else if (copiedStoresList.isEmpty()) {
078      return null;
079    }
080    BulkLoadDescriptor.Builder newDesc = BulkLoadDescriptor.newBuilder()
081      .setTableName(bld.getTableName()).setEncodedRegionName(bld.getEncodedRegionName())
082      .setBulkloadSeqNum(bld.getBulkloadSeqNum());
083    newDesc.addAllStores(copiedStoresList);
084    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
085    return cellBuilder.clear().setRow(CellUtil.cloneRow(cell)).setFamily(WALEdit.METAFAMILY)
086      .setQualifier(WALEdit.BULK_LOAD).setTimestamp(cell.getTimestamp()).setType(cell.getTypeByte())
087      .setValue(newBulkLoadDescriptor.toByteArray()).build();
088  }
089}