001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.regionserver.StoreContext; 033import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 034import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 035import org.apache.yetus.audience.InterfaceAudience; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; 039 040/** 041 * A file based store file tracker. 042 * <p/> 043 * For this tracking way, the store file list will be persistent into a file, so we can write the 044 * new store files directly to the final data directory, as we will not load the broken files. This 045 * will greatly reduce the time for flush and compaction on some object storages as a rename is 046 * actual a copy on them. And it also avoid listing when loading store file list, which could also 047 * speed up the loading of store files as listing is also not a fast operation on most object 048 * storages. 049 */ 050@InterfaceAudience.Private 051class FileBasedStoreFileTracker extends StoreFileTrackerBase { 052 053 private final StoreFileListFile backedFile; 054 055 private final Map<String, StoreFileInfo> storefiles = new HashMap<>(); 056 057 public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 058 super(conf, isPrimaryReplica, ctx); 059 // CreateTableProcedure needs to instantiate the configured SFT impl, in order to update table 060 // descriptors with the SFT impl specific configs. By the time this happens, the table has no 061 // regions nor stores yet, so it can't create a proper StoreContext. 062 if (ctx != null) { 063 backedFile = new StoreFileListFile(ctx); 064 } else { 065 backedFile = null; 066 } 067 } 068 069 @Override 070 public List<StoreFileInfo> load() throws IOException { 071 StoreFileList list = backedFile.load(); 072 if (list == null) { 073 return Collections.emptyList(); 074 } 075 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 076 List<StoreFileInfo> infos = new ArrayList<>(); 077 for (StoreFileEntry entry : list.getStoreFileList()) { 078 infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(), 079 ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(), 080 new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName()))); 081 } 082 // In general, for primary replica, the load method should only be called once when 083 // initialization, so we do not need synchronized here. And for secondary replicas, though the 084 // load method could be called multiple times, we will never call other methods so no 085 // synchronized is also fine. 086 // But we have a refreshStoreFiles method in the Region interface, which can be called by CPs, 087 // and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so 088 // for safety, let's still keep the synchronized here. 089 synchronized (storefiles) { 090 for (StoreFileInfo info : infos) { 091 storefiles.put(info.getPath().getName(), info); 092 } 093 } 094 return infos; 095 } 096 097 @Override 098 public boolean requireWritingToTmpDirFirst() { 099 return false; 100 } 101 102 private StoreFileEntry toStoreFileEntry(StoreFileInfo info) { 103 return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize()) 104 .build(); 105 } 106 107 @Override 108 protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException { 109 synchronized (storefiles) { 110 StoreFileList.Builder builder = StoreFileList.newBuilder(); 111 for (StoreFileInfo info : storefiles.values()) { 112 builder.addStoreFile(toStoreFileEntry(info)); 113 } 114 for (StoreFileInfo info : newFiles) { 115 builder.addStoreFile(toStoreFileEntry(info)); 116 } 117 backedFile.update(builder); 118 for (StoreFileInfo info : newFiles) { 119 storefiles.put(info.getPath().getName(), info); 120 } 121 } 122 } 123 124 @Override 125 protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 126 Collection<StoreFileInfo> newFiles) throws IOException { 127 Set<String> compactedFileNames = 128 compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet()); 129 synchronized (storefiles) { 130 StoreFileList.Builder builder = StoreFileList.newBuilder(); 131 storefiles.forEach((name, info) -> { 132 if (compactedFileNames.contains(name)) { 133 return; 134 } 135 builder.addStoreFile(toStoreFileEntry(info)); 136 }); 137 for (StoreFileInfo info : newFiles) { 138 builder.addStoreFile(toStoreFileEntry(info)); 139 } 140 backedFile.update(builder); 141 for (String name : compactedFileNames) { 142 storefiles.remove(name); 143 } 144 for (StoreFileInfo info : newFiles) { 145 storefiles.put(info.getPath().getName(), info); 146 } 147 } 148 } 149 150 @Override 151 public void set(List<StoreFileInfo> files) throws IOException { 152 synchronized (storefiles) { 153 storefiles.clear(); 154 StoreFileList.Builder builder = StoreFileList.newBuilder(); 155 for (StoreFileInfo info : files) { 156 storefiles.put(info.getPath().getName(), info); 157 builder.addStoreFile(toStoreFileEntry(info)); 158 } 159 backedFile.update(builder); 160 } 161 } 162}