001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.regionserver.StoreContext;
033import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
034import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
035import org.apache.yetus.audience.InterfaceAudience;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
039
040/**
041 * A file based store file tracker.
042 * <p/>
043 * For this tracking way, the store file list will be persistent into a file, so we can write the
044 * new store files directly to the final data directory, as we will not load the broken files. This
045 * will greatly reduce the time for flush and compaction on some object storages as a rename is
046 * actual a copy on them. And it also avoid listing when loading store file list, which could also
047 * speed up the loading of store files as listing is also not a fast operation on most object
048 * storages.
049 */
050@InterfaceAudience.Private
051class FileBasedStoreFileTracker extends StoreFileTrackerBase {
052
053  private final StoreFileListFile backedFile;
054
055  private final Map<String, StoreFileInfo> storefiles = new HashMap<>();
056
057  public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
058    super(conf, isPrimaryReplica, ctx);
059    // CreateTableProcedure needs to instantiate the configured SFT impl, in order to update table
060    // descriptors with the SFT impl specific configs. By the time this happens, the table has no
061    // regions nor stores yet, so it can't create a proper StoreContext.
062    if (ctx != null) {
063      backedFile = new StoreFileListFile(ctx);
064    } else {
065      backedFile = null;
066    }
067  }
068
069  @Override
070  public List<StoreFileInfo> load() throws IOException {
071    StoreFileList list = backedFile.load();
072    if (list == null) {
073      return Collections.emptyList();
074    }
075    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
076    List<StoreFileInfo> infos = new ArrayList<>();
077    for (StoreFileEntry entry : list.getStoreFileList()) {
078      infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(),
079        ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(),
080        new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName())));
081    }
082    // In general, for primary replica, the load method should only be called once when
083    // initialization, so we do not need synchronized here. And for secondary replicas, though the
084    // load method could be called multiple times, we will never call other methods so no
085    // synchronized is also fine.
086    // But we have a refreshStoreFiles method in the Region interface, which can be called by CPs,
087    // and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so
088    // for safety, let's still keep the synchronized here.
089    synchronized (storefiles) {
090      for (StoreFileInfo info : infos) {
091        storefiles.put(info.getPath().getName(), info);
092      }
093    }
094    return infos;
095  }
096
097  @Override
098  public boolean requireWritingToTmpDirFirst() {
099    return false;
100  }
101
102  private StoreFileEntry toStoreFileEntry(StoreFileInfo info) {
103    return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize())
104      .build();
105  }
106
107  @Override
108  protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
109    synchronized (storefiles) {
110      StoreFileList.Builder builder = StoreFileList.newBuilder();
111      for (StoreFileInfo info : storefiles.values()) {
112        builder.addStoreFile(toStoreFileEntry(info));
113      }
114      for (StoreFileInfo info : newFiles) {
115        builder.addStoreFile(toStoreFileEntry(info));
116      }
117      backedFile.update(builder);
118      for (StoreFileInfo info : newFiles) {
119        storefiles.put(info.getPath().getName(), info);
120      }
121    }
122  }
123
124  @Override
125  protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
126    Collection<StoreFileInfo> newFiles) throws IOException {
127    Set<String> compactedFileNames =
128      compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet());
129    synchronized (storefiles) {
130      StoreFileList.Builder builder = StoreFileList.newBuilder();
131      storefiles.forEach((name, info) -> {
132        if (compactedFileNames.contains(name)) {
133          return;
134        }
135        builder.addStoreFile(toStoreFileEntry(info));
136      });
137      for (StoreFileInfo info : newFiles) {
138        builder.addStoreFile(toStoreFileEntry(info));
139      }
140      backedFile.update(builder);
141      for (String name : compactedFileNames) {
142        storefiles.remove(name);
143      }
144      for (StoreFileInfo info : newFiles) {
145        storefiles.put(info.getPath().getName(), info);
146      }
147    }
148  }
149
150  @Override
151  public void set(List<StoreFileInfo> files) throws IOException {
152    synchronized (storefiles) {
153      storefiles.clear();
154      StoreFileList.Builder builder = StoreFileList.newBuilder();
155      for (StoreFileInfo info : files) {
156        storefiles.put(info.getPath().getName(), info);
157        builder.addStoreFile(toStoreFileEntry(info));
158      }
159      backedFile.update(builder);
160    }
161  }
162}