001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.stream.Collectors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.io.HFileLink; 035import org.apache.hadoop.hbase.io.Reference; 036import org.apache.hadoop.hbase.regionserver.StoreContext; 037import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 038import org.apache.hadoop.hbase.util.HFileArchiveUtil; 039import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; 048 049/** 050 * A file based store file tracker. 051 * <p/> 052 * For this tracking way, the store file list will be persistent into a file, so we can write the 053 * new store files directly to the final data directory, as we will not load the broken files. This 054 * will greatly reduce the time for flush and compaction on some object storages as a rename is 055 * actual a copy on them. And it also avoid listing when loading store file list, which could also 056 * speed up the loading of store files as listing is also not a fast operation on most object 057 * storages. 058 */ 059@InterfaceAudience.Private 060class FileBasedStoreFileTracker extends StoreFileTrackerBase { 061 062 private final StoreFileListFile backedFile; 063 064 private final Map<String, StoreFileInfo> storefiles = new HashMap<>(); 065 private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreFileTracker.class); 066 067 public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 068 super(conf, isPrimaryReplica, ctx); 069 // CreateTableProcedure needs to instantiate the configured SFT impl, in order to update table 070 // descriptors with the SFT impl specific configs. By the time this happens, the table has no 071 // regions nor stores yet, so it can't create a proper StoreContext. 072 if (ctx != null) { 073 backedFile = new StoreFileListFile(ctx); 074 } else { 075 backedFile = null; 076 } 077 } 078 079 @Override 080 protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException { 081 StoreFileList list = backedFile.load(readOnly); 082 if (LOG.isTraceEnabled()) { 083 LOG.trace("Loaded file list backed file, containing " + list.getStoreFileList().size() 084 + " store file entries"); 085 } 086 if (list == null) { 087 return Collections.emptyList(); 088 } 089 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 090 List<StoreFileInfo> infos = new ArrayList<>(); 091 for (StoreFileEntry entry : list.getStoreFileList()) { 092 infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(), 093 ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(), 094 new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName()), this)); 095 } 096 // In general, for primary replica, the load method should only be called once when 097 // initialization, so we do not need synchronized here. And for secondary replicas, though the 098 // load method could be called multiple times, we will never call other methods so no 099 // synchronized is also fine. 100 // But we have a refreshStoreFiles method in the Region interface, which can be called by CPs, 101 // and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so 102 // for safety, let's still keep the synchronized here. 103 synchronized (storefiles) { 104 for (StoreFileInfo info : infos) { 105 storefiles.put(info.getPath().getName(), info); 106 } 107 } 108 return infos; 109 } 110 111 @Override 112 public boolean requireWritingToTmpDirFirst() { 113 return false; 114 } 115 116 private StoreFileEntry toStoreFileEntry(StoreFileInfo info) { 117 org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry.Builder entryBuilder = 118 StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize()); 119 if (info.isReference()) { 120 org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference reference = 121 org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.newBuilder() 122 .setSplitkey(ByteString.copyFrom(info.getReference().getSplitKey())) 123 .setRange(info.getReference().convert().getRange()).build(); 124 entryBuilder.setReference(reference); 125 } 126 return entryBuilder.build(); 127 } 128 129 @Override 130 protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException { 131 synchronized (storefiles) { 132 StoreFileList.Builder builder = StoreFileList.newBuilder(); 133 for (StoreFileInfo info : storefiles.values()) { 134 builder.addStoreFile(toStoreFileEntry(info)); 135 } 136 for (StoreFileInfo info : newFiles) { 137 if (!storefiles.containsKey(info.getPath().getName())) 138 builder.addStoreFile(toStoreFileEntry(info)); 139 } 140 backedFile.update(builder); 141 if (LOG.isTraceEnabled()) { 142 LOG.trace(newFiles.size() + " store files added to store file list file: " + newFiles); 143 } 144 for (StoreFileInfo info : newFiles) { 145 storefiles.put(info.getPath().getName(), info); 146 } 147 } 148 } 149 150 @Override 151 protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 152 Collection<StoreFileInfo> newFiles) throws IOException { 153 Set<String> compactedFileNames = 154 compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet()); 155 synchronized (storefiles) { 156 StoreFileList.Builder builder = StoreFileList.newBuilder(); 157 storefiles.forEach((name, info) -> { 158 if (compactedFileNames.contains(name)) { 159 return; 160 } 161 builder.addStoreFile(toStoreFileEntry(info)); 162 }); 163 for (StoreFileInfo info : newFiles) { 164 builder.addStoreFile(toStoreFileEntry(info)); 165 } 166 backedFile.update(builder); 167 if (LOG.isTraceEnabled()) { 168 LOG.trace( 169 "replace compacted files: " + compactedFileNames + " with new store files: " + newFiles); 170 } 171 for (String name : compactedFileNames) { 172 storefiles.remove(name); 173 } 174 for (StoreFileInfo info : newFiles) { 175 storefiles.put(info.getPath().getName(), info); 176 } 177 } 178 } 179 180 @Override 181 protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException { 182 synchronized (storefiles) { 183 storefiles.clear(); 184 StoreFileList.Builder builder = StoreFileList.newBuilder(); 185 for (StoreFileInfo info : files) { 186 storefiles.put(info.getPath().getName(), info); 187 builder.addStoreFile(toStoreFileEntry(info)); 188 } 189 backedFile.update(builder); 190 if (LOG.isTraceEnabled()) { 191 LOG.trace("Set store files in store file list file: " + files); 192 } 193 } 194 } 195 196 @Override 197 public Reference readReference(Path p) throws IOException { 198 String fileName = p.getName(); 199 StoreFileList list = backedFile.load(true); 200 for (StoreFileEntry entry : list.getStoreFileList()) { 201 if (entry.getName().equals(fileName)) { 202 if (entry.hasReference()) { 203 return Reference.convert(entry.getReference()); 204 } else { 205 LOG.debug( 206 "Fallback to reading reference from FS as it is not part of StoreFileEntry. This is when FSFT is reading older version of StoreFileListFile"); 207 return super.readReference(p); 208 } 209 } 210 } 211 throw new FileNotFoundException("Reference does not exist for path : " + p); 212 } 213 214 @Override 215 public boolean hasReferences() throws IOException { 216 StoreFileList list = backedFile.load(true); 217 for (StoreFileEntry entry : list.getStoreFileList()) { 218 if (entry.hasReference() || HFileLink.isHFileLink(entry.getName())) { 219 return true; 220 } 221 } 222 return false; 223 } 224 225 @Override 226 public HFileLink createHFileLink(TableName linkedTable, String linkedRegion, String hfileName, 227 boolean createBackRef) throws IOException { 228 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 229 HFileLink hfileLink = HFileLink.build(conf, linkedTable, linkedRegion, 230 ctx.getFamily().getNameAsString(), hfileName); 231 Path backRefPath = null; 232 if (createBackRef) { 233 Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion, 234 ctx.getFamily().getNameAsString()); 235 Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName); 236 fs.mkdirs(backRefssDir); 237 // Create the reference for the link 238 String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(), 239 ctx.getRegionInfo().getEncodedName()); 240 backRefPath = new Path(backRefssDir, refName); 241 fs.createNewFile(backRefPath); 242 } 243 return hfileLink; 244 } 245 246 @Override 247 public Reference createReference(Reference reference, Path path) throws IOException { 248 return reference; 249 } 250 251 @Override 252 public Reference createAndCommitReference(Reference reference, Path path) throws IOException { 253 StoreFileInfo storeFileInfo = 254 new StoreFileInfo(ctx.getRegionFileSystem().getFileSystem().getConf(), 255 ctx.getRegionFileSystem().getFileSystem(), path, reference); 256 add(Collections.singleton(storeFileInfo)); 257 return reference; 258 } 259}