001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellBuilderType;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.ExtendedCellBuilder;
028import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
029import org.apache.hadoop.hbase.wal.WALEdit;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
038
039@InterfaceAudience.Private
040public class BulkLoadCellFilter {
041  private static final Logger LOG = LoggerFactory.getLogger(BulkLoadCellFilter.class);
042
043  private final ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
044  /**
045   * Filters the bulk load cell using the supplied predicate.
046   * @param cell The WAL cell to filter.
047   * @param famPredicate Returns true of given family should be removed.
048   * @return The filtered cell.
049   */
050  public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) {
051    byte[] fam;
052    BulkLoadDescriptor bld = null;
053    try {
054      bld = WALEdit.getBulkLoadDescriptor(cell);
055    } catch (IOException e) {
056      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
057      return cell;
058    }
059    List<StoreDescriptor> storesList = bld.getStoresList();
060    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
061    List<StoreDescriptor> copiedStoresList = new ArrayList<>(storesList);
062    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
063    boolean anyStoreRemoved = false;
064    while (copiedStoresListIterator.hasNext()) {
065      StoreDescriptor sd = copiedStoresListIterator.next();
066      fam = sd.getFamilyName().toByteArray();
067      if (famPredicate.apply(fam)) {
068        copiedStoresListIterator.remove();
069        anyStoreRemoved = true;
070      }
071    }
072
073    if (!anyStoreRemoved) {
074      return cell;
075    } else if (copiedStoresList.isEmpty()) {
076      return null;
077    }
078    BulkLoadDescriptor.Builder newDesc =
079        BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
080            .setEncodedRegionName(bld.getEncodedRegionName())
081            .setBulkloadSeqNum(bld.getBulkloadSeqNum());
082    newDesc.addAllStores(copiedStoresList);
083    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
084    return cellBuilder.clear()
085            .setRow(CellUtil.cloneRow(cell))
086            .setFamily(WALEdit.METAFAMILY)
087            .setQualifier(WALEdit.BULK_LOAD)
088            .setTimestamp(cell.getTimestamp())
089            .setType(cell.getTypeByte())
090            .setValue(newBulkLoadDescriptor.toByteArray())
091            .build();
092  }
093}