001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.io.HFileLink;
035import org.apache.hadoop.hbase.io.Reference;
036import org.apache.hadoop.hbase.regionserver.StoreContext;
037import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
038import org.apache.hadoop.hbase.util.HFileArchiveUtil;
039import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
048
049/**
050 * A file based store file tracker.
051 * <p/>
052 * For this tracking way, the store file list will be persistent into a file, so we can write the
053 * new store files directly to the final data directory, as we will not load the broken files. This
054 * will greatly reduce the time for flush and compaction on some object storages as a rename is
055 * actual a copy on them. And it also avoid listing when loading store file list, which could also
056 * speed up the loading of store files as listing is also not a fast operation on most object
057 * storages.
058 */
059@InterfaceAudience.Private
060class FileBasedStoreFileTracker extends StoreFileTrackerBase {
061
062  private final StoreFileListFile backedFile;
063
064  private final Map<String, StoreFileInfo> storefiles = new HashMap<>();
065  private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreFileTracker.class);
066
067  public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
068    super(conf, isPrimaryReplica, ctx);
069    // CreateTableProcedure needs to instantiate the configured SFT impl, in order to update table
070    // descriptors with the SFT impl specific configs. By the time this happens, the table has no
071    // regions nor stores yet, so it can't create a proper StoreContext.
072    if (ctx != null) {
073      backedFile = new StoreFileListFile(ctx);
074    } else {
075      backedFile = null;
076    }
077  }
078
079  @Override
080  protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
081    StoreFileList list = backedFile.load(readOnly);
082    if (LOG.isTraceEnabled()) {
083      LOG.trace("Loaded file list backed file, containing " + list.getStoreFileList().size()
084        + " store file entries");
085    }
086    if (list == null) {
087      return Collections.emptyList();
088    }
089    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
090    List<StoreFileInfo> infos = new ArrayList<>();
091    for (StoreFileEntry entry : list.getStoreFileList()) {
092      infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(),
093        ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(),
094        new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName()), this));
095    }
096    // In general, for primary replica, the load method should only be called once when
097    // initialization, so we do not need synchronized here. And for secondary replicas, though the
098    // load method could be called multiple times, we will never call other methods so no
099    // synchronized is also fine.
100    // But we have a refreshStoreFiles method in the Region interface, which can be called by CPs,
101    // and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so
102    // for safety, let's still keep the synchronized here.
103    synchronized (storefiles) {
104      for (StoreFileInfo info : infos) {
105        storefiles.put(info.getPath().getName(), info);
106      }
107    }
108    return infos;
109  }
110
111  @Override
112  public boolean requireWritingToTmpDirFirst() {
113    return false;
114  }
115
116  private StoreFileEntry toStoreFileEntry(StoreFileInfo info) {
117    org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry.Builder entryBuilder =
118      StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize());
119    if (info.isReference()) {
120      org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference reference =
121        org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.newBuilder()
122          .setSplitkey(ByteString.copyFrom(info.getReference().getSplitKey()))
123          .setRange(info.getReference().convert().getRange()).build();
124      entryBuilder.setReference(reference);
125    }
126    return entryBuilder.build();
127  }
128
129  @Override
130  protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
131    synchronized (storefiles) {
132      StoreFileList.Builder builder = StoreFileList.newBuilder();
133      for (StoreFileInfo info : storefiles.values()) {
134        builder.addStoreFile(toStoreFileEntry(info));
135      }
136      for (StoreFileInfo info : newFiles) {
137        if (!storefiles.containsKey(info.getPath().getName()))
138          builder.addStoreFile(toStoreFileEntry(info));
139      }
140      backedFile.update(builder);
141      if (LOG.isTraceEnabled()) {
142        LOG.trace(newFiles.size() + " store files added to store file list file: " + newFiles);
143      }
144      for (StoreFileInfo info : newFiles) {
145        storefiles.put(info.getPath().getName(), info);
146      }
147    }
148  }
149
150  @Override
151  protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
152    Collection<StoreFileInfo> newFiles) throws IOException {
153    Set<String> compactedFileNames =
154      compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet());
155    synchronized (storefiles) {
156      StoreFileList.Builder builder = StoreFileList.newBuilder();
157      storefiles.forEach((name, info) -> {
158        if (compactedFileNames.contains(name)) {
159          return;
160        }
161        builder.addStoreFile(toStoreFileEntry(info));
162      });
163      for (StoreFileInfo info : newFiles) {
164        builder.addStoreFile(toStoreFileEntry(info));
165      }
166      backedFile.update(builder);
167      if (LOG.isTraceEnabled()) {
168        LOG.trace(
169          "replace compacted files: " + compactedFileNames + " with new store files: " + newFiles);
170      }
171      for (String name : compactedFileNames) {
172        storefiles.remove(name);
173      }
174      for (StoreFileInfo info : newFiles) {
175        storefiles.put(info.getPath().getName(), info);
176      }
177    }
178  }
179
180  @Override
181  protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
182    synchronized (storefiles) {
183      storefiles.clear();
184      StoreFileList.Builder builder = StoreFileList.newBuilder();
185      for (StoreFileInfo info : files) {
186        storefiles.put(info.getPath().getName(), info);
187        builder.addStoreFile(toStoreFileEntry(info));
188      }
189      backedFile.update(builder);
190      if (LOG.isTraceEnabled()) {
191        LOG.trace("Set store files in store file list file: " + files);
192      }
193    }
194  }
195
196  @Override
197  public Reference readReference(Path p) throws IOException {
198    String fileName = p.getName();
199    StoreFileList list = backedFile.load(true);
200    for (StoreFileEntry entry : list.getStoreFileList()) {
201      if (entry.getName().equals(fileName)) {
202        if (entry.hasReference()) {
203          return Reference.convert(entry.getReference());
204        } else {
205          LOG.debug(
206            "Fallback to reading reference from FS as it is not part of StoreFileEntry. This is when FSFT is reading older version of StoreFileListFile");
207          return super.readReference(p);
208        }
209      }
210    }
211    throw new FileNotFoundException("Reference does not exist for path : " + p);
212  }
213
214  @Override
215  public boolean hasReferences() throws IOException {
216    StoreFileList list = backedFile.load(true);
217    for (StoreFileEntry entry : list.getStoreFileList()) {
218      if (entry.hasReference() || HFileLink.isHFileLink(entry.getName())) {
219        return true;
220      }
221    }
222    return false;
223  }
224
225  @Override
226  public HFileLink createHFileLink(TableName linkedTable, String linkedRegion, String hfileName,
227    boolean createBackRef) throws IOException {
228    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
229    HFileLink hfileLink = HFileLink.build(conf, linkedTable, linkedRegion,
230      ctx.getFamily().getNameAsString(), hfileName);
231    Path backRefPath = null;
232    if (createBackRef) {
233      Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion,
234        ctx.getFamily().getNameAsString());
235      Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName);
236      fs.mkdirs(backRefssDir);
237      // Create the reference for the link
238      String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(),
239        ctx.getRegionInfo().getEncodedName());
240      backRefPath = new Path(backRefssDir, refName);
241      fs.createNewFile(backRefPath);
242    }
243    return hfileLink;
244  }
245
246  @Override
247  public Reference createReference(Reference reference, Path path) throws IOException {
248    return reference;
249  }
250
251  @Override
252  public Reference createAndCommitReference(Reference reference, Path path) throws IOException {
253    StoreFileInfo storeFileInfo =
254      new StoreFileInfo(ctx.getRegionFileSystem().getFileSystem().getConf(),
255        ctx.getRegionFileSystem().getFileSystem(), path, reference);
256    add(Collections.singleton(storeFileInfo));
257    return reference;
258  }
259}