001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.regionserver.StoreContext; 033import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 034import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; 041 042/** 043 * A file based store file tracker. 044 * <p/> 045 * For this tracking way, the store file list will be persistent into a file, so we can write the 046 * new store files directly to the final data directory, as we will not load the broken files. This 047 * will greatly reduce the time for flush and compaction on some object storages as a rename is 048 * actual a copy on them. And it also avoid listing when loading store file list, which could also 049 * speed up the loading of store files as listing is also not a fast operation on most object 050 * storages. 051 */ 052@InterfaceAudience.Private 053class FileBasedStoreFileTracker extends StoreFileTrackerBase { 054 055 private final StoreFileListFile backedFile; 056 057 private final Map<String, StoreFileInfo> storefiles = new HashMap<>(); 058 private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreFileTracker.class); 059 060 public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 061 super(conf, isPrimaryReplica, ctx); 062 // CreateTableProcedure needs to instantiate the configured SFT impl, in order to update table 063 // descriptors with the SFT impl specific configs. By the time this happens, the table has no 064 // regions nor stores yet, so it can't create a proper StoreContext. 065 if (ctx != null) { 066 backedFile = new StoreFileListFile(ctx); 067 } else { 068 backedFile = null; 069 } 070 } 071 072 @Override 073 protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException { 074 StoreFileList list = backedFile.load(readOnly); 075 if (LOG.isTraceEnabled()) { 076 LOG.trace("Loaded file list backed file, containing " + list.getStoreFileList().size() 077 + " store file entries"); 078 } 079 if (list == null) { 080 return Collections.emptyList(); 081 } 082 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 083 List<StoreFileInfo> infos = new ArrayList<>(); 084 for (StoreFileEntry entry : list.getStoreFileList()) { 085 infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(), 086 ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(), 087 new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName()), this)); 088 } 089 // In general, for primary replica, the load method should only be called once when 090 // initialization, so we do not need synchronized here. And for secondary replicas, though the 091 // load method could be called multiple times, we will never call other methods so no 092 // synchronized is also fine. 093 // But we have a refreshStoreFiles method in the Region interface, which can be called by CPs, 094 // and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so 095 // for safety, let's still keep the synchronized here. 096 synchronized (storefiles) { 097 for (StoreFileInfo info : infos) { 098 storefiles.put(info.getPath().getName(), info); 099 } 100 } 101 return infos; 102 } 103 104 @Override 105 public boolean requireWritingToTmpDirFirst() { 106 return false; 107 } 108 109 private StoreFileEntry toStoreFileEntry(StoreFileInfo info) { 110 return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize()) 111 .build(); 112 } 113 114 @Override 115 protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException { 116 synchronized (storefiles) { 117 StoreFileList.Builder builder = StoreFileList.newBuilder(); 118 for (StoreFileInfo info : storefiles.values()) { 119 builder.addStoreFile(toStoreFileEntry(info)); 120 } 121 for (StoreFileInfo info : newFiles) { 122 builder.addStoreFile(toStoreFileEntry(info)); 123 } 124 backedFile.update(builder); 125 if (LOG.isTraceEnabled()) { 126 LOG.trace(newFiles.size() + " store files added to store file list file: " + newFiles); 127 } 128 for (StoreFileInfo info : newFiles) { 129 storefiles.put(info.getPath().getName(), info); 130 } 131 } 132 } 133 134 @Override 135 protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 136 Collection<StoreFileInfo> newFiles) throws IOException { 137 Set<String> compactedFileNames = 138 compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet()); 139 synchronized (storefiles) { 140 StoreFileList.Builder builder = StoreFileList.newBuilder(); 141 storefiles.forEach((name, info) -> { 142 if (compactedFileNames.contains(name)) { 143 return; 144 } 145 builder.addStoreFile(toStoreFileEntry(info)); 146 }); 147 for (StoreFileInfo info : newFiles) { 148 builder.addStoreFile(toStoreFileEntry(info)); 149 } 150 backedFile.update(builder); 151 if (LOG.isTraceEnabled()) { 152 LOG.trace( 153 "replace compacted files: " + compactedFileNames + " with new store files: " + newFiles); 154 } 155 for (String name : compactedFileNames) { 156 storefiles.remove(name); 157 } 158 for (StoreFileInfo info : newFiles) { 159 storefiles.put(info.getPath().getName(), info); 160 } 161 } 162 } 163 164 @Override 165 protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException { 166 synchronized (storefiles) { 167 storefiles.clear(); 168 StoreFileList.Builder builder = StoreFileList.newBuilder(); 169 for (StoreFileInfo info : files) { 170 storefiles.put(info.getPath().getName(), info); 171 builder.addStoreFile(toStoreFileEntry(info)); 172 } 173 backedFile.update(builder); 174 if (LOG.isTraceEnabled()) { 175 LOG.trace("Set store files in store file list file: " + files); 176 } 177 } 178 } 179}