001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.List; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.CellBuilderType; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.ExtendedCellBuilder; 028import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 029import org.apache.hadoop.hbase.wal.WALEdit; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 035 036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 038 039@InterfaceAudience.Private 040public class BulkLoadCellFilter { 041 private static final Logger LOG = LoggerFactory.getLogger(BulkLoadCellFilter.class); 042 043 private final ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 044 /** 045 * Filters the bulk load cell using the supplied predicate. 046 * @param cell The WAL cell to filter. 047 * @param famPredicate Returns true of given family should be removed. 048 * @return The filtered cell. 049 */ 050 public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) { 051 byte[] fam; 052 BulkLoadDescriptor bld = null; 053 try { 054 bld = WALEdit.getBulkLoadDescriptor(cell); 055 } catch (IOException e) { 056 LOG.warn("Failed to get bulk load events information from the WAL file.", e); 057 return cell; 058 } 059 List<StoreDescriptor> storesList = bld.getStoresList(); 060 // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList 061 List<StoreDescriptor> copiedStoresList = new ArrayList<>(storesList); 062 Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator(); 063 boolean anyStoreRemoved = false; 064 while (copiedStoresListIterator.hasNext()) { 065 StoreDescriptor sd = copiedStoresListIterator.next(); 066 fam = sd.getFamilyName().toByteArray(); 067 if (famPredicate.apply(fam)) { 068 copiedStoresListIterator.remove(); 069 anyStoreRemoved = true; 070 } 071 } 072 073 if (!anyStoreRemoved) { 074 return cell; 075 } else if (copiedStoresList.isEmpty()) { 076 return null; 077 } 078 BulkLoadDescriptor.Builder newDesc = 079 BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) 080 .setEncodedRegionName(bld.getEncodedRegionName()) 081 .setBulkloadSeqNum(bld.getBulkloadSeqNum()); 082 newDesc.addAllStores(copiedStoresList); 083 BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); 084 return cellBuilder.clear() 085 .setRow(CellUtil.cloneRow(cell)) 086 .setFamily(WALEdit.METAFAMILY) 087 .setQualifier(WALEdit.BULK_LOAD) 088 .setTimestamp(cell.getTimestamp()) 089 .setType(cell.getTypeByte()) 090 .setValue(newBulkLoadDescriptor.toByteArray()) 091 .build(); 092 } 093}