001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.List; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.CellBuilderType; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.ExtendedCellBuilder; 028import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 029import org.apache.hadoop.hbase.wal.WALEdit; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 035 036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 038 039@InterfaceAudience.Private 040public class BulkLoadCellFilter { 041 private static final Logger LOG = LoggerFactory.getLogger(BulkLoadCellFilter.class); 042 043 private final ExtendedCellBuilder cellBuilder = 044 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 045 046 /** 047 * Filters the bulk load cell using the supplied predicate. 048 * @param cell The WAL cell to filter. 049 * @param famPredicate Returns true of given family should be removed. 050 * @return The filtered cell. 051 */ 052 public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) { 053 byte[] fam; 054 BulkLoadDescriptor bld = null; 055 try { 056 bld = WALEdit.getBulkLoadDescriptor(cell); 057 } catch (IOException e) { 058 LOG.warn("Failed to get bulk load events information from the WAL file.", e); 059 return cell; 060 } 061 List<StoreDescriptor> storesList = bld.getStoresList(); 062 // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList 063 List<StoreDescriptor> copiedStoresList = new ArrayList<>(storesList); 064 Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator(); 065 boolean anyStoreRemoved = false; 066 while (copiedStoresListIterator.hasNext()) { 067 StoreDescriptor sd = copiedStoresListIterator.next(); 068 fam = sd.getFamilyName().toByteArray(); 069 if (famPredicate.apply(fam)) { 070 copiedStoresListIterator.remove(); 071 anyStoreRemoved = true; 072 } 073 } 074 075 if (!anyStoreRemoved) { 076 return cell; 077 } else if (copiedStoresList.isEmpty()) { 078 return null; 079 } 080 BulkLoadDescriptor.Builder newDesc = BulkLoadDescriptor.newBuilder() 081 .setTableName(bld.getTableName()).setEncodedRegionName(bld.getEncodedRegionName()) 082 .setBulkloadSeqNum(bld.getBulkloadSeqNum()); 083 newDesc.addAllStores(copiedStoresList); 084 BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); 085 return cellBuilder.clear().setRow(CellUtil.cloneRow(cell)).setFamily(WALEdit.METAFAMILY) 086 .setQualifier(WALEdit.BULK_LOAD).setTimestamp(cell.getTimestamp()).setType(cell.getTypeByte()) 087 .setValue(newBulkLoadDescriptor.toByteArray()).build(); 088 } 089}