001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.regionserver.StoreContext;
033import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
034import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
041
042/**
043 * A file based store file tracker.
044 * <p/>
045 * For this tracking way, the store file list will be persistent into a file, so we can write the
046 * new store files directly to the final data directory, as we will not load the broken files. This
047 * will greatly reduce the time for flush and compaction on some object storages as a rename is
048 * actual a copy on them. And it also avoid listing when loading store file list, which could also
049 * speed up the loading of store files as listing is also not a fast operation on most object
050 * storages.
051 */
052@InterfaceAudience.Private
053class FileBasedStoreFileTracker extends StoreFileTrackerBase {
054
055  private final StoreFileListFile backedFile;
056
057  private final Map<String, StoreFileInfo> storefiles = new HashMap<>();
058  private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreFileTracker.class);
059
060  public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
061    super(conf, isPrimaryReplica, ctx);
062    // CreateTableProcedure needs to instantiate the configured SFT impl, in order to update table
063    // descriptors with the SFT impl specific configs. By the time this happens, the table has no
064    // regions nor stores yet, so it can't create a proper StoreContext.
065    if (ctx != null) {
066      backedFile = new StoreFileListFile(ctx);
067    } else {
068      backedFile = null;
069    }
070  }
071
072  @Override
073  protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
074    StoreFileList list = backedFile.load(readOnly);
075    if (LOG.isTraceEnabled()) {
076      LOG.trace("Loaded file list backed file, containing " + list.getStoreFileList().size()
077        + " store file entries");
078    }
079    if (list == null) {
080      return Collections.emptyList();
081    }
082    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
083    List<StoreFileInfo> infos = new ArrayList<>();
084    for (StoreFileEntry entry : list.getStoreFileList()) {
085      infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(),
086        ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(),
087        new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName()), this));
088    }
089    // In general, for primary replica, the load method should only be called once when
090    // initialization, so we do not need synchronized here. And for secondary replicas, though the
091    // load method could be called multiple times, we will never call other methods so no
092    // synchronized is also fine.
093    // But we have a refreshStoreFiles method in the Region interface, which can be called by CPs,
094    // and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so
095    // for safety, let's still keep the synchronized here.
096    synchronized (storefiles) {
097      for (StoreFileInfo info : infos) {
098        storefiles.put(info.getPath().getName(), info);
099      }
100    }
101    return infos;
102  }
103
104  @Override
105  public boolean requireWritingToTmpDirFirst() {
106    return false;
107  }
108
109  private StoreFileEntry toStoreFileEntry(StoreFileInfo info) {
110    return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize())
111      .build();
112  }
113
114  @Override
115  protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
116    synchronized (storefiles) {
117      StoreFileList.Builder builder = StoreFileList.newBuilder();
118      for (StoreFileInfo info : storefiles.values()) {
119        builder.addStoreFile(toStoreFileEntry(info));
120      }
121      for (StoreFileInfo info : newFiles) {
122        builder.addStoreFile(toStoreFileEntry(info));
123      }
124      backedFile.update(builder);
125      if (LOG.isTraceEnabled()) {
126        LOG.trace(newFiles.size() + " store files added to store file list file: " + newFiles);
127      }
128      for (StoreFileInfo info : newFiles) {
129        storefiles.put(info.getPath().getName(), info);
130      }
131    }
132  }
133
134  @Override
135  protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
136    Collection<StoreFileInfo> newFiles) throws IOException {
137    Set<String> compactedFileNames =
138      compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet());
139    synchronized (storefiles) {
140      StoreFileList.Builder builder = StoreFileList.newBuilder();
141      storefiles.forEach((name, info) -> {
142        if (compactedFileNames.contains(name)) {
143          return;
144        }
145        builder.addStoreFile(toStoreFileEntry(info));
146      });
147      for (StoreFileInfo info : newFiles) {
148        builder.addStoreFile(toStoreFileEntry(info));
149      }
150      backedFile.update(builder);
151      if (LOG.isTraceEnabled()) {
152        LOG.trace(
153          "replace compacted files: " + compactedFileNames + " with new store files: " + newFiles);
154      }
155      for (String name : compactedFileNames) {
156        storefiles.remove(name);
157      }
158      for (StoreFileInfo info : newFiles) {
159        storefiles.put(info.getPath().getName(), info);
160      }
161    }
162  }
163
164  @Override
165  protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
166    synchronized (storefiles) {
167      storefiles.clear();
168      StoreFileList.Builder builder = StoreFileList.newBuilder();
169      for (StoreFileInfo info : files) {
170        storefiles.put(info.getPath().getName(), info);
171        builder.addStoreFile(toStoreFileEntry(info));
172      }
173      backedFile.update(builder);
174      if (LOG.isTraceEnabled()) {
175        LOG.trace("Set store files in store file list file: " + files);
176      }
177    }
178  }
179}