001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.Optional; 024import java.util.Set; 025import java.util.function.Predicate; 026import java.util.stream.Collectors; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.backup.impl.BackupManager; 031import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 032import org.apache.hadoop.hbase.backup.impl.BulkLoad; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 038import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 039import org.apache.hadoop.hbase.coprocessor.MasterObserver; 040import org.apache.hadoop.hbase.coprocessor.ObserverContext; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 046 047/** 048 * An Observer to facilitate backup operations 049 */ 050@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 051public class BackupMasterObserver implements MasterCoprocessor, MasterObserver { 052 private static final Logger LOG = LoggerFactory.getLogger(BackupMasterObserver.class); 053 054 @Override 055 public Optional<MasterObserver> getMasterObserver() { 056 return Optional.of(this); 057 } 058 059 @Override 060 public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx, 061 TableName tableName) throws IOException { 062 Configuration cfg = ctx.getEnvironment().getConfiguration(); 063 if (!BackupManager.isBackupEnabled(cfg)) { 064 LOG.debug("Skipping postDeleteTable hook since backup is disabled"); 065 return; 066 } 067 deleteBulkLoads(cfg, tableName, (ignored) -> true); 068 } 069 070 @Override 071 public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, 072 TableName tableName) throws IOException { 073 Configuration cfg = ctx.getEnvironment().getConfiguration(); 074 if (!BackupManager.isBackupEnabled(cfg)) { 075 LOG.debug("Skipping postTruncateTable hook since backup is disabled"); 076 return; 077 } 078 deleteBulkLoads(cfg, tableName, (ignored) -> true); 079 } 080 081 @Override 082 public void postModifyTable(final ObserverContext<MasterCoprocessorEnvironment> ctx, 083 final TableName tableName, TableDescriptor oldDescriptor, TableDescriptor currentDescriptor) 084 throws IOException { 085 Configuration cfg = ctx.getEnvironment().getConfiguration(); 086 if (!BackupManager.isBackupEnabled(cfg)) { 087 LOG.debug("Skipping postModifyTable hook since backup is disabled"); 088 return; 089 } 090 091 Set<String> oldFamilies = Arrays.stream(oldDescriptor.getColumnFamilies()) 092 .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet()); 093 Set<String> newFamilies = Arrays.stream(currentDescriptor.getColumnFamilies()) 094 .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet()); 095 096 Set<String> removedFamilies = Sets.difference(oldFamilies, newFamilies); 097 if (!removedFamilies.isEmpty()) { 098 Predicate<BulkLoad> filter = bulkload -> removedFamilies.contains(bulkload.getColumnFamily()); 099 deleteBulkLoads(cfg, tableName, filter); 100 } 101 } 102 103 /** 104 * Deletes all bulk load entries for the given table, matching the provided predicate. 105 */ 106 private void deleteBulkLoads(Configuration config, TableName tableName, 107 Predicate<BulkLoad> filter) throws IOException { 108 try (Connection connection = ConnectionFactory.createConnection(config); 109 BackupSystemTable tbl = new BackupSystemTable(connection)) { 110 List<BulkLoad> bulkLoads = tbl.readBulkloadRows(List.of(tableName)); 111 List<byte[]> rowsToDelete = 112 bulkLoads.stream().filter(filter).map(BulkLoad::getRowKey).toList(); 113 tbl.deleteBulkLoadedRows(rowsToDelete); 114 } 115 } 116}