001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.io.HFileLink.LINK_NAME_PATTERN;
021
022import edu.umd.cs.findbugs.annotations.Nullable;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Objects;
032import java.util.Optional;
033import java.util.UUID;
034import java.util.regex.Matcher;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataInputStream;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.FileUtil;
041import org.apache.hadoop.fs.LocatedFileStatus;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.ExtendedCell;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.backup.HFileArchiver;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.fs.HFileSystem;
054import org.apache.hadoop.hbase.io.HFileLink;
055import org.apache.hadoop.hbase.io.Reference;
056import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
057import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
058import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.hbase.util.FSUtils;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
069
070/**
071 * View to an on-disk Region. Provides the set of methods necessary to interact with the on-disk
072 * region data.
073 */
074@InterfaceAudience.Private
075public class HRegionFileSystem {
076  private static final Logger LOG = LoggerFactory.getLogger(HRegionFileSystem.class);
077
078  /** Name of the region info file that resides just under the region directory. */
079  public final static String REGION_INFO_FILE = ".regioninfo";
080
081  /** Temporary subdirectory of the region directory used for compaction output. */
082  static final String REGION_TEMP_DIR = ".tmp";
083
084  private final RegionInfo regionInfo;
085  // regionInfo for interacting with FS (getting encodedName, etc)
086  final RegionInfo regionInfoForFs;
087  final Configuration conf;
088  private final Path tableDir;
089  final FileSystem fs;
090  private final Path regionDir;
091
092  /**
093   * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the
094   * client level.
095   */
096  private final int hdfsClientRetriesNumber;
097  private final int baseSleepBeforeRetries;
098  private static final int DEFAULT_HDFS_CLIENT_RETRIES_NUMBER = 10;
099  private static final int DEFAULT_BASE_SLEEP_BEFORE_RETRIES = 1000;
100
101  /**
102   * Create a view to the on-disk region
103   * @param conf       the {@link Configuration} to use
104   * @param fs         {@link FileSystem} that contains the region
105   * @param tableDir   {@link Path} to where the table is being stored
106   * @param regionInfo {@link RegionInfo} for region
107   */
108  HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir,
109    final RegionInfo regionInfo) {
110    this.fs = fs;
111    this.conf = conf;
112    this.tableDir = Objects.requireNonNull(tableDir, "tableDir is null");
113    this.regionInfo = Objects.requireNonNull(regionInfo, "regionInfo is null");
114    this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo);
115    this.regionDir = FSUtils.getRegionDirFromTableDir(tableDir, regionInfo);
116    this.hdfsClientRetriesNumber =
117      conf.getInt("hdfs.client.retries.number", DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
118    this.baseSleepBeforeRetries =
119      conf.getInt("hdfs.client.sleep.before.retries", DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
120  }
121
122  /** Returns the underlying {@link FileSystem} */
123  public FileSystem getFileSystem() {
124    return this.fs;
125  }
126
127  /** Returns the {@link RegionInfo} that describe this on-disk region view */
128  public RegionInfo getRegionInfo() {
129    return this.regionInfo;
130  }
131
132  public RegionInfo getRegionInfoForFS() {
133    return this.regionInfoForFs;
134  }
135
136  /** Returns {@link Path} to the region's root directory. */
137  public Path getTableDir() {
138    return this.tableDir;
139  }
140
141  /** Returns {@link Path} to the region directory. */
142  public Path getRegionDir() {
143    return regionDir;
144  }
145
146  // ===========================================================================
147  // Temp Helpers
148  // ===========================================================================
149  /** Returns {@link Path} to the region's temp directory, used for file creations */
150  public Path getTempDir() {
151    return new Path(getRegionDir(), REGION_TEMP_DIR);
152  }
153
154  /**
155   * Clean up any temp detritus that may have been left around from previous operation attempts.
156   */
157  void cleanupTempDir() throws IOException {
158    deleteDir(getTempDir());
159  }
160
161  // ===========================================================================
162  // Store/StoreFile Helpers
163  // ===========================================================================
164  /**
165   * Returns the directory path of the specified family
166   * @param familyName Column Family Name
167   * @return {@link Path} to the directory of the specified family
168   */
169  public Path getStoreDir(final String familyName) {
170    return new Path(this.getRegionDir(), familyName);
171  }
172
173  /**
174   * @param tabledir {@link Path} to where the table is being stored
175   * @param hri      {@link RegionInfo} for the region.
176   * @param family   {@link ColumnFamilyDescriptor} describing the column family
177   * @return Path to family/Store home directory.
178   */
179  public static Path getStoreHomedir(final Path tabledir, final RegionInfo hri,
180    final byte[] family) {
181    return getStoreHomedir(tabledir, hri.getEncodedName(), family);
182  }
183
184  /**
185   * @param tabledir    {@link Path} to where the table is being stored
186   * @param encodedName Encoded region name.
187   * @param family      {@link ColumnFamilyDescriptor} describing the column family
188   * @return Path to family/Store home directory.
189   */
190  public static Path getStoreHomedir(final Path tabledir, final String encodedName,
191    final byte[] family) {
192    return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
193  }
194
195  /**
196   * Create the store directory for the specified family name
197   * @param familyName Column Family Name
198   * @return {@link Path} to the directory of the specified family
199   * @throws IOException if the directory creation fails.
200   */
201  Path createStoreDir(final String familyName) throws IOException {
202    Path storeDir = getStoreDir(familyName);
203    if (!fs.exists(storeDir) && !createDir(storeDir))
204      throw new IOException("Failed creating " + storeDir);
205    return storeDir;
206  }
207
208  /**
209   * Set the directory of CF to the specified storage policy. <br>
210   * <i>"LAZY_PERSIST"</i>, <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>,
211   * <i>"COLD"</i> <br>
212   * <br>
213   * See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
214   * @param familyName The name of column family.
215   * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. See hadoop 2.6+
216   *                   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 'COLD',
217   *                   'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
218   */
219  public void setStoragePolicy(String familyName, String policyName) {
220    CommonFSUtils.setStoragePolicy(this.fs, getStoreDir(familyName), policyName);
221  }
222
223  /**
224   * Set storage policy for a whole region. <br>
225   * <i>"LAZY_PERSIST"</i>, <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>,
226   * <i>"COLD"</i> <br>
227   * <br>
228   * See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
229   * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. See hadoop 2.6+
230   *                   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 'COLD',
231   *                   'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
232   */
233  public void setStoragePolicy(String policyName) {
234    CommonFSUtils.setStoragePolicy(this.fs, getRegionDir(), policyName);
235  }
236
237  /**
238   * Get the storage policy of the directory of CF.
239   * @param familyName The name of column family.
240   * @return Storage policy name, or {@code null} if not using {@link HFileSystem} or exception
241   *         thrown when trying to get policy
242   */
243  @Nullable
244  public String getStoragePolicyName(String familyName) {
245    if (this.fs instanceof HFileSystem) {
246      Path storeDir = getStoreDir(familyName);
247      return ((HFileSystem) this.fs).getStoragePolicyName(storeDir);
248    }
249
250    return null;
251  }
252
253  /**
254   * Returns the store files' LocatedFileStatus which available for the family. This methods
255   * performs the filtering based on the valid store files.
256   * @param familyName Column Family Name
257   * @return a list of store files' LocatedFileStatus for the specified family.
258   */
259  public static List<LocatedFileStatus> getStoreFilesLocatedStatus(final HRegionFileSystem regionfs,
260    final String familyName, final boolean validate) throws IOException {
261    Path familyDir = regionfs.getStoreDir(familyName);
262    List<LocatedFileStatus> locatedFileStatuses =
263      CommonFSUtils.listLocatedStatus(regionfs.getFileSystem(), familyDir);
264    if (locatedFileStatuses == null) {
265      if (LOG.isTraceEnabled()) {
266        LOG.trace("No StoreFiles for: " + familyDir);
267      }
268      return null;
269    }
270
271    List<LocatedFileStatus> validStoreFiles = Lists.newArrayList();
272    for (LocatedFileStatus status : locatedFileStatuses) {
273      if (validate && !StoreFileInfo.isValid(status)) {
274        // recovered.hfiles directory is expected inside CF path when hbase.wal.split.to.hfile to
275        // true, refer HBASE-23740
276        if (!HConstants.RECOVERED_HFILES_DIR.equals(status.getPath().getName())) {
277          LOG.warn("Invalid StoreFile: {}", status.getPath());
278        }
279      } else {
280        validStoreFiles.add(status);
281      }
282    }
283    return validStoreFiles;
284  }
285
286  /**
287   * Return Qualified Path of the specified family/file
288   * @param familyName Column Family Name
289   * @param fileName   File Name
290   * @return The qualified Path for the specified family/file
291   */
292  Path getStoreFilePath(final String familyName, final String fileName) {
293    Path familyDir = getStoreDir(familyName);
294    return new Path(familyDir, fileName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
295  }
296
297  /**
298   * Return the store file information of the specified family/file.
299   * @param familyName Column Family Name
300   * @param fileName   File Name
301   * @return The {@link StoreFileInfo} for the specified family/file
302   */
303  StoreFileInfo getStoreFileInfo(final String familyName, final String fileName,
304    final StoreFileTracker tracker) throws IOException {
305    Path familyDir = getStoreDir(familyName);
306    return ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, regionInfoForFs,
307      familyName, new Path(familyDir, fileName), tracker);
308  }
309
310  /** Returns the set of families present on disk n */
311  public Collection<String> getFamilies() throws IOException {
312    FileStatus[] fds =
313      CommonFSUtils.listStatus(fs, getRegionDir(), new FSUtils.FamilyDirFilter(fs));
314    if (fds == null) return null;
315
316    ArrayList<String> families = new ArrayList<>(fds.length);
317    for (FileStatus status : fds) {
318      families.add(status.getPath().getName());
319    }
320
321    return families;
322  }
323
324  /**
325   * Remove the region family from disk, archiving the store files.
326   * @param familyName Column Family Name
327   * @throws IOException if an error occours during the archiving
328   */
329  public void deleteFamily(final String familyName) throws IOException {
330    // archive family store files
331    HFileArchiver.archiveFamily(fs, conf, regionInfoForFs, tableDir, Bytes.toBytes(familyName));
332
333    // delete the family folder
334    Path familyDir = getStoreDir(familyName);
335    if (fs.exists(familyDir) && !deleteDir(familyDir))
336      throw new IOException("Could not delete family " + familyName + " from FileSystem for region "
337        + regionInfoForFs.getRegionNameAsString() + "(" + regionInfoForFs.getEncodedName() + ")");
338  }
339
340  /**
341   * Generate a unique file name, used by createTempName() and commitStoreFile()
342   * @param suffix extra information to append to the generated name
343   * @return Unique file name
344   */
345  private static String generateUniqueName(final String suffix) {
346    String name = UUID.randomUUID().toString().replaceAll("-", "");
347    if (suffix != null) name += suffix;
348    return name;
349  }
350
351  /**
352   * Generate a unique temporary Path. Used in conjuction with commitStoreFile() to get a safer file
353   * creation. <code>
354   * Path file = fs.createTempName();
355   * ...StoreFile.Writer(file)...
356   * fs.commitStoreFile("family", file);
357   * </code>
358   * @return Unique {@link Path} of the temporary file
359   */
360  public Path createTempName() {
361    return createTempName(null);
362  }
363
364  /**
365   * Generate a unique temporary Path. Used in conjuction with commitStoreFile() to get a safer file
366   * creation. <code>
367   * Path file = fs.createTempName();
368   * ...StoreFile.Writer(file)...
369   * fs.commitStoreFile("family", file);
370   * </code>
371   * @param suffix extra information to append to the generated name
372   * @return Unique {@link Path} of the temporary file
373   */
374  public Path createTempName(final String suffix) {
375    return new Path(getTempDir(), generateUniqueName(suffix));
376  }
377
378  /**
379   * Move the file from a build/temp location to the main family store directory.
380   * @param familyName Family that will gain the file
381   * @param buildPath  {@link Path} to the file to commit.
382   * @return The new {@link Path} of the committed file
383   */
384  public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
385    Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false);
386    return commitStoreFile(buildPath, dstPath);
387  }
388
389  /**
390   * Generate the filename in the main family store directory for moving the file from a build/temp
391   * location.
392   * @param familyName      Family that will gain the file
393   * @param buildPath       {@link Path} to the file to commit.
394   * @param seqNum          Sequence Number to append to the file name (less then 0 if no sequence
395   *                        number)
396   * @param generateNewName False if you want to keep the buildPath name
397   * @return The new {@link Path} of the to be committed file
398   */
399  private Path preCommitStoreFile(final String familyName, final Path buildPath, final long seqNum,
400    final boolean generateNewName) throws IOException {
401    Path storeDir = getStoreDir(familyName);
402    if (!fs.exists(storeDir) && !createDir(storeDir))
403      throw new IOException("Failed creating " + storeDir);
404
405    String name = buildPath.getName();
406    if (generateNewName) {
407      name = generateUniqueName((seqNum < 0) ? null : StoreFileInfo.formatBulkloadSeqId(seqNum));
408    }
409    Path dstPath = new Path(storeDir, name);
410    if (!fs.exists(buildPath)) {
411      throw new FileNotFoundException(buildPath.toString());
412    }
413    if (LOG.isDebugEnabled()) {
414      LOG.debug("Committing " + buildPath + " as " + dstPath);
415    }
416    return dstPath;
417  }
418
419  /*
420   * Moves file from staging dir to region dir
421   * @param buildPath {@link Path} to the file to commit.
422   * @param dstPath {@link Path} to the file under region dir
423   * @return The {@link Path} of the committed file
424   */
425  Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
426    // rename is not necessary in case of direct-insert stores
427    if (buildPath.equals(dstPath)) {
428      return dstPath;
429    }
430    // buildPath exists, therefore not doing an exists() check.
431    if (!rename(buildPath, dstPath)) {
432      throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
433    }
434    return dstPath;
435  }
436
437  /**
438   * Bulk load: Add a specified store file to the specified family. If the source file is on the
439   * same different file-system is moved from the source location to the destination location,
440   * otherwise is copied over.
441   * @param familyName Family that will gain the file
442   * @param srcPath    {@link Path} to the file to import
443   * @param seqNum     Bulk Load sequence number
444   * @return The destination {@link Path} of the bulk loaded file
445   */
446  Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
447    throws IOException {
448    // Copy the file if it's on another filesystem
449    FileSystem srcFs = srcPath.getFileSystem(conf);
450    srcPath = srcFs.resolvePath(srcPath);
451    FileSystem realSrcFs = srcPath.getFileSystem(conf);
452    FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem) fs).getBackingFs() : fs;
453
454    // We can't compare FileSystem instances as equals() includes UGI instance
455    // as part of the comparison and won't work when doing SecureBulkLoad
456    // TODO deal with viewFS
457    if (!FSUtils.isSameHdfs(conf, realSrcFs, desFs)) {
458      LOG.info("Bulk-load file " + srcPath + " is on different filesystem than "
459        + "the destination store. Copying file over to destination filesystem.");
460      Path tmpPath = createTempName();
461      FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
462      LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
463      srcPath = tmpPath;
464    }
465
466    return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true));
467  }
468
469  // ===========================================================================
470  // Splits Helpers
471  // ===========================================================================
472
473  public Path getSplitsDir(final RegionInfo hri) {
474    return new Path(getTableDir(), hri.getEncodedName());
475  }
476
477  /**
478   * Remove daughter region
479   * @param regionInfo daughter {@link RegionInfo}
480   */
481  void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
482    Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
483    if (this.fs.exists(regionDir) && !deleteDir(regionDir)) {
484      throw new IOException("Failed delete of " + regionDir);
485    }
486  }
487
488  /**
489   * Commit a daughter region, moving it from the split temporary directory to the proper location
490   * in the filesystem.
491   * @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
492   */
493  public Path commitDaughterRegion(final RegionInfo regionInfo, List<StoreFileInfo> allRegionFiles,
494    MasterProcedureEnv env) throws IOException {
495    Path regionDir = this.getSplitsDir(regionInfo);
496    if (fs.exists(regionDir)) {
497      // Write HRI to a file in case we need to recover hbase:meta
498      Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
499      byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
500      writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
501      HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
502        env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
503      insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
504    }
505    return regionDir;
506  }
507
508  private void insertRegionFilesIntoStoreTracker(List<StoreFileInfo> allFiles,
509    MasterProcedureEnv env, HRegionFileSystem regionFs) throws IOException {
510    TableDescriptor tblDesc =
511      env.getMasterServices().getTableDescriptors().get(regionInfo.getTable());
512    // we need to map trackers per store
513    Map<String, StoreFileTracker> trackerMap = new HashMap<>();
514    // we need to map store files per store
515    Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
516    for (StoreFileInfo sfi : allFiles) {
517      Path file = sfi.getPath();
518      String familyName = file.getParent().getName();
519      trackerMap.computeIfAbsent(familyName, t -> StoreFileTrackerFactory.create(conf, tblDesc,
520        tblDesc.getColumnFamily(Bytes.toBytes(familyName)), regionFs));
521      fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
522      List<StoreFileInfo> infos = fileInfoMap.get(familyName);
523      infos.add(sfi);
524    }
525    for (Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
526      entry.getValue().add(fileInfoMap.get(entry.getKey()));
527    }
528  }
529
530  private void insertRegionfilePathsIntoStoreTracker(List<StoreFileInfo> allFiles,
531    MasterProcedureEnv env, HRegionFileSystem regionFs) throws IOException {
532    TableDescriptor tblDesc =
533      env.getMasterServices().getTableDescriptors().get(regionInfo.getTable());
534    // we need to map trackers per store
535    Map<String, StoreFileTracker> trackerMap = new HashMap<>();
536    // we need to map store files per store
537    Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
538    for (StoreFileInfo file : allFiles) {
539      String familyName = file.getPath().getParent().getName();
540      trackerMap.computeIfAbsent(familyName, t -> StoreFileTrackerFactory.create(conf, tblDesc,
541        tblDesc.getColumnFamily(familyName.getBytes()), regionFs));
542      fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
543      List<StoreFileInfo> infos = fileInfoMap.get(familyName);
544      infos.add(file);
545    }
546    for (Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
547      entry.getValue().add(fileInfoMap.get(entry.getKey()));
548    }
549  }
550
551  /**
552   * Creates region split daughter directories under the table dir. If the daughter regions already
553   * exist, for example, in the case of a recovery from a previous failed split procedure, this
554   * method deletes the given region dir recursively, then recreates it again.
555   */
556  public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws IOException {
557    Path daughterADir = getSplitsDir(daughterA);
558    if (fs.exists(daughterADir) && !deleteDir(daughterADir)) {
559      throw new IOException("Failed deletion of " + daughterADir + " before creating them again.");
560
561    }
562    if (!createDir(daughterADir)) {
563      throw new IOException("Failed create of " + daughterADir);
564    }
565    Path daughterBDir = getSplitsDir(daughterB);
566    if (fs.exists(daughterBDir) && !deleteDir(daughterBDir)) {
567      throw new IOException("Failed deletion of " + daughterBDir + " before creating them again.");
568
569    }
570    if (!createDir(daughterBDir)) {
571      throw new IOException("Failed create of " + daughterBDir);
572    }
573  }
574
575  /**
576   * Write out a split reference. Package local so it doesnt leak out of regionserver.
577   * @param hri         {@link RegionInfo} of the destination
578   * @param familyName  Column Family Name
579   * @param f           File to split.
580   * @param splitRow    Split Row
581   * @param top         True if we are referring to the top half of the hfile.
582   * @param splitPolicy A split policy instance; be careful! May not be full populated; e.g. if this
583   *                    method is invoked on the Master side, then the RegionSplitPolicy will NOT
584   *                    have a reference to a Region.
585   * @return Path to created reference.
586   */
587  public StoreFileInfo splitStoreFile(RegionInfo hri, String familyName, HStoreFile f,
588    byte[] splitRow, boolean top, RegionSplitPolicy splitPolicy, StoreFileTracker tracker)
589    throws IOException {
590    Path splitDir = new Path(getSplitsDir(hri), familyName);
591    // Add the referred-to regions name as a dot separated suffix.
592    // See REF_NAME_REGEX regex above. The referred-to regions name is
593    // up in the path of the passed in <code>f</code> -- parentdir is family,
594    // then the directory above is the region name.
595    String parentRegionName = regionInfoForFs.getEncodedName();
596    // Write reference with same file id only with the other region name as
597    // suffix and into the new region location (under same family).
598    Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
599    if (fs.exists(p)) {
600      LOG.warn("Found an already existing split file for {}. Assuming this is a recovery.", p);
601      return tracker.getStoreFileInfo(fs.getFileStatus(p), p, true);
602    }
603    boolean createLinkFile = false;
604    if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
605      // Check whether the split row lies in the range of the store file
606      // If it is outside the range, return directly.
607      f.initReader();
608      try {
609        Cell splitKey = PrivateCellUtil.createFirstOnRow(splitRow);
610        Optional<ExtendedCell> lastKey = f.getLastKey();
611        Optional<ExtendedCell> firstKey = f.getFirstKey();
612        if (top) {
613          // check if larger than last key.
614          // If lastKey is null means storefile is empty.
615          if (!lastKey.isPresent()) {
616            return null;
617          }
618          if (f.getComparator().compare(splitKey, lastKey.get()) > 0) {
619            return null;
620          }
621          if (firstKey.isPresent() && f.getComparator().compare(splitKey, firstKey.get()) <= 0) {
622            LOG.debug("Will create HFileLink file for {}, top=true", f.getPath());
623            createLinkFile = true;
624          }
625        } else {
626          // check if smaller than first key
627          // If firstKey is null means storefile is empty.
628          if (!firstKey.isPresent()) {
629            return null;
630          }
631          if (f.getComparator().compare(splitKey, firstKey.get()) < 0) {
632            return null;
633          }
634          if (lastKey.isPresent() && f.getComparator().compare(splitKey, lastKey.get()) >= 0) {
635            LOG.debug("Will create HFileLink file for {}, top=false", f.getPath());
636            createLinkFile = true;
637          }
638        }
639      } finally {
640        f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
641      }
642    }
643    if (createLinkFile) {
644      // create HFileLink file instead of Reference file for child
645      String hfileName = f.getPath().getName();
646      TableName linkedTable = regionInfoForFs.getTable();
647      String linkedRegion = regionInfoForFs.getEncodedName();
648      try {
649        if (HFileLink.isHFileLink(hfileName)) {
650          Matcher m = LINK_NAME_PATTERN.matcher(hfileName);
651          if (!m.matches()) {
652            throw new IllegalArgumentException(hfileName + " is not a valid HFileLink name!");
653          }
654          linkedTable = TableName.valueOf(m.group(1), m.group(2));
655          linkedRegion = m.group(3);
656          hfileName = m.group(4);
657        }
658        // must create back reference here
659        HFileLink hFileLink = tracker.createHFileLink(linkedTable, linkedRegion, hfileName, true);
660        Path path =
661          new Path(splitDir, HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName));
662        LOG.info("Created linkFile:" + path.toString() + " for child: " + hri.getEncodedName()
663          + ", parent: " + regionInfoForFs.getEncodedName());
664        return new StoreFileInfo(conf, fs, path, hFileLink);
665      } catch (IOException e) {
666        // if create HFileLink file failed, then just skip the error and create Reference file
667        LOG.error("Create link file for " + hfileName + " for child " + hri.getEncodedName()
668          + "failed, will create Reference file", e);
669      }
670    }
671    // A reference to the bottom half of the hsf store file.
672    Reference r =
673      top ? Reference.createTopReference(splitRow) : Reference.createBottomReference(splitRow);
674    tracker.createReference(r, p);
675    return new StoreFileInfo(conf, fs, p, r);
676  }
677
678  // ===========================================================================
679  // Merge Helpers
680  // ===========================================================================
681
682  Path getMergesDir(final RegionInfo hri) {
683    return new Path(getTableDir(), hri.getEncodedName());
684  }
685
686  /**
687   * Remove merged region
688   * @param mergedRegion {@link RegionInfo}
689   */
690  public void cleanupMergedRegion(final RegionInfo mergedRegion) throws IOException {
691    Path regionDir = new Path(this.tableDir, mergedRegion.getEncodedName());
692    if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) {
693      throw new IOException("Failed delete of " + regionDir);
694    }
695  }
696
697  static boolean mkdirs(FileSystem fs, Configuration conf, Path dir) throws IOException {
698    if (
699      FSUtils.isDistributedFileSystem(fs)
700        || !conf.getBoolean(HConstants.ENABLE_DATA_FILE_UMASK, false)
701    ) {
702      return fs.mkdirs(dir);
703    }
704    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
705    return fs.mkdirs(dir, perms);
706  }
707
708  /**
709   * Write out a merge reference under the given merges directory.
710   * @param mergingRegion {@link RegionInfo} for one of the regions being merged.
711   * @param familyName    Column Family Name
712   * @param f             File to create reference.
713   * @return Path to created reference.
714   * @throws IOException if the merge write fails.
715   */
716  public StoreFileInfo mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFile f,
717    StoreFileTracker tracker) throws IOException {
718    Path referenceDir = new Path(getMergesDir(regionInfoForFs), familyName);
719    // A whole reference to the store file.
720    Reference r = Reference.createTopReference(mergingRegion.getStartKey());
721    // Add the referred-to regions name as a dot separated suffix.
722    // See REF_NAME_REGEX regex above. The referred-to regions name is
723    // up in the path of the passed in <code>f</code> -- parentdir is family,
724    // then the directory above is the region name.
725    String mergingRegionName = mergingRegion.getEncodedName();
726    // Write reference with same file id only with the other region name as
727    // suffix and into the new region location (under same family).
728    Path p = new Path(referenceDir, f.getPath().getName() + "." + mergingRegionName);
729    tracker.createReference(r, p);
730    StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, p, r);
731    return storeFileInfo;
732  }
733
734  /**
735   * Commit a merged region, making it ready for use.
736   */
737  public void commitMergedRegion(List<StoreFileInfo> allMergedFiles, MasterProcedureEnv env)
738    throws IOException {
739    Path regionDir = getMergesDir(regionInfoForFs);
740    if (regionDir != null && fs.exists(regionDir)) {
741      // Write HRI to a file in case we need to recover hbase:meta
742      Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
743      byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
744      writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
745      insertRegionfilePathsIntoStoreTracker(allMergedFiles, env, this);
746    }
747  }
748
749  // ===========================================================================
750  // Create/Open/Delete Helpers
751  // ===========================================================================
752
753  /** Returns Content of the file we write out to the filesystem under a region */
754  private static byte[] getRegionInfoFileContent(final RegionInfo hri) throws IOException {
755    return RegionInfo.toDelimitedByteArray(hri);
756  }
757
758  /**
759   * Create a {@link RegionInfo} from the serialized version on-disk.
760   * @param fs        {@link FileSystem} that contains the Region Info file
761   * @param regionDir {@link Path} to the Region Directory that contains the Info file
762   * @return An {@link RegionInfo} instance gotten from the Region Info file.
763   * @throws IOException if an error occurred during file open/read operation.
764   */
765  public static RegionInfo loadRegionInfoFileContent(final FileSystem fs, final Path regionDir)
766    throws IOException {
767    FSDataInputStream in = fs.open(new Path(regionDir, REGION_INFO_FILE));
768    try {
769      return RegionInfo.parseFrom(in);
770    } finally {
771      in.close();
772    }
773  }
774
775  /**
776   * Write the .regioninfo file on-disk.
777   * <p/>
778   * Overwrites if exists already.
779   */
780  private static void writeRegionInfoFileContent(final Configuration conf, final FileSystem fs,
781    final Path regionInfoFile, final byte[] content) throws IOException {
782    // First check to get the permissions
783    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
784    // Write the RegionInfo file content
785    // HBASE-29662: Fail .regioninfo file creation, if the region directory doesn't exist,
786    // avoiding silent masking of missing region directories during region initialization.
787    // The region directory should already exist when this method is called.
788    try (FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null, false)) {
789      out.write(content);
790    }
791  }
792
793  /**
794   * Write out an info file under the stored region directory. Useful recovering mangled regions. If
795   * the regionInfo already exists on-disk, then we fast exit.
796   */
797  void checkRegionInfoOnFilesystem() throws IOException {
798    // Compose the content of the file so we can compare to length in filesystem. If not same,
799    // rewrite it (it may have been written in the old format using Writables instead of pb). The
800    // pb version is much shorter -- we write now w/o the toString version -- so checking length
801    // only should be sufficient. I don't want to read the file every time to check if it pb
802    // serialized.
803    byte[] content = getRegionInfoFileContent(regionInfoForFs);
804
805    // Verify if the region directory exists before opening a region. We need to do this since if
806    // the region directory doesn't exist we will re-create the region directory and a new HRI
807    // when HRegion.openHRegion() is called.
808    try {
809      FileStatus status = fs.getFileStatus(getRegionDir());
810    } catch (FileNotFoundException e) {
811      LOG.warn(getRegionDir() + " doesn't exist for region: " + regionInfoForFs.getEncodedName()
812        + " on table " + regionInfo.getTable());
813    }
814
815    try {
816      Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE);
817      FileStatus status = fs.getFileStatus(regionInfoFile);
818      if (status != null && status.getLen() == content.length) {
819        // Then assume the content good and move on.
820        // NOTE: that the length is not sufficient to define the the content matches.
821        return;
822      }
823
824      LOG.info("Rewriting .regioninfo file at: " + regionInfoFile);
825      if (!fs.delete(regionInfoFile, false)) {
826        throw new IOException("Unable to remove existing " + regionInfoFile);
827      }
828    } catch (FileNotFoundException e) {
829      LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfoForFs.getEncodedName()
830        + " on table " + regionInfo.getTable());
831    }
832
833    // Write HRI to a file in case we need to recover hbase:meta
834    writeRegionInfoOnFilesystem(content, true);
835  }
836
837  /**
838   * Write out an info file under the region directory. Useful recovering mangled regions.
839   * @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation.
840   */
841  private void writeRegionInfoOnFilesystem(boolean useTempDir) throws IOException {
842    byte[] content = getRegionInfoFileContent(regionInfoForFs);
843    writeRegionInfoOnFilesystem(content, useTempDir);
844  }
845
846  /**
847   * Write out an info file under the region directory. Useful recovering mangled regions.
848   * @param regionInfoContent serialized version of the {@link RegionInfo}
849   * @param useTempDir        indicate whether or not using the region .tmp dir for a safer file
850   *                          creation.
851   */
852  private void writeRegionInfoOnFilesystem(final byte[] regionInfoContent, final boolean useTempDir)
853    throws IOException {
854    Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE);
855    if (useTempDir) {
856      // Create in tmpDir and then move into place in case we crash after
857      // create but before close. If we don't successfully close the file,
858      // subsequent region reopens will fail the below because create is
859      // registered in NN.
860
861      // And then create the file
862      Path tmpPath = new Path(getTempDir(), REGION_INFO_FILE);
863
864      // If datanode crashes or if the RS goes down just before the close is called while trying to
865      // close the created regioninfo file in the .tmp directory then on next
866      // creation we will be getting AlreadyCreatedException.
867      // Hence delete and create the file if exists.
868      if (CommonFSUtils.isExists(fs, tmpPath)) {
869        CommonFSUtils.delete(fs, tmpPath, true);
870      }
871
872      // Check parent (region) directory exists first to maintain HBASE-29662 protection
873      if (!fs.exists(getRegionDir())) {
874        throw new IOException("Region directory does not exist: " + getRegionDir());
875      }
876      if (!fs.exists(getTempDir())) {
877        fs.mkdirs(getTempDir());
878      }
879
880      // Write HRI to a file in case we need to recover hbase:meta
881      writeRegionInfoFileContent(conf, fs, tmpPath, regionInfoContent);
882
883      // Move the created file to the original path
884      if (fs.exists(tmpPath) && !rename(tmpPath, regionInfoFile)) {
885        throw new IOException("Unable to rename " + tmpPath + " to " + regionInfoFile);
886      }
887    } else {
888      // Write HRI to a file in case we need to recover hbase:meta
889      writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
890    }
891  }
892
893  /**
894   * Create a new Region on file-system.
895   * @param conf       the {@link Configuration} to use
896   * @param fs         {@link FileSystem} from which to add the region
897   * @param tableDir   {@link Path} to where the table is being stored
898   * @param regionInfo {@link RegionInfo} for region to be added
899   * @throws IOException if the region creation fails due to a FileSystem exception.
900   */
901  public static HRegionFileSystem createRegionOnFileSystem(final Configuration conf,
902    final FileSystem fs, final Path tableDir, final RegionInfo regionInfo) throws IOException {
903    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
904
905    // We only create a .regioninfo and the region directory if this is the default region replica
906    if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
907      Path regionDir = regionFs.getRegionDir();
908      if (fs.exists(regionDir)) {
909        LOG.warn("Trying to create a region that already exists on disk: " + regionDir);
910      } else {
911        // Create the region directory
912        if (!createDirOnFileSystem(fs, conf, regionDir)) {
913          LOG.warn("Unable to create the region directory: " + regionDir);
914          throw new IOException("Unable to create region directory: " + regionDir);
915        }
916      }
917
918      // Write HRI to a file in case we need to recover hbase:meta
919      regionFs.writeRegionInfoOnFilesystem(false);
920    } else {
921      if (LOG.isDebugEnabled())
922        LOG.debug("Skipping creation of .regioninfo file for " + regionInfo);
923    }
924    return regionFs;
925  }
926
927  /**
928   * Open Region from file-system.
929   * @param conf       the {@link Configuration} to use
930   * @param fs         {@link FileSystem} from which to add the region
931   * @param tableDir   {@link Path} to where the table is being stored
932   * @param regionInfo {@link RegionInfo} for region to be added
933   * @param readOnly   True if you don't want to edit the region data
934   * @throws IOException if the region creation fails due to a FileSystem exception.
935   */
936  public static HRegionFileSystem openRegionFromFileSystem(final Configuration conf,
937    final FileSystem fs, final Path tableDir, final RegionInfo regionInfo, boolean readOnly)
938    throws IOException {
939    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
940    Path regionDir = regionFs.getRegionDir();
941
942    if (!fs.exists(regionDir)) {
943      LOG.warn("Trying to open a region that do not exists on disk: " + regionDir);
944      throw new IOException("The specified region do not exists on disk: " + regionDir);
945    }
946
947    if (!readOnly) {
948      // Cleanup temporary directories
949      regionFs.cleanupTempDir();
950
951      // If it doesn't exists, Write HRI to a file, in case we need to recover hbase:meta
952      // Only create HRI if we are the default replica
953      if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
954        regionFs.checkRegionInfoOnFilesystem();
955      } else {
956        if (LOG.isDebugEnabled()) {
957          LOG.debug("Skipping creation of .regioninfo file for " + regionInfo);
958        }
959      }
960    }
961
962    return regionFs;
963  }
964
965  /**
966   * Remove the region from the table directory, archiving the region's hfiles.
967   * @param conf       the {@link Configuration} to use
968   * @param fs         {@link FileSystem} from which to remove the region
969   * @param tableDir   {@link Path} to where the table is being stored
970   * @param regionInfo {@link RegionInfo} for region to be deleted
971   * @throws IOException if the request cannot be completed
972   */
973  public static void deleteRegionFromFileSystem(final Configuration conf, final FileSystem fs,
974    final Path tableDir, final RegionInfo regionInfo) throws IOException {
975    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
976    Path regionDir = regionFs.getRegionDir();
977
978    if (!fs.exists(regionDir)) {
979      LOG.warn("Trying to delete a region that do not exists on disk: " + regionDir);
980      return;
981    }
982
983    if (LOG.isDebugEnabled()) {
984      LOG.debug("DELETING region " + regionDir);
985    }
986
987    // Archive region
988    Path rootDir = CommonFSUtils.getRootDir(conf);
989    HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir);
990
991    // Delete empty region dir
992    if (!fs.delete(regionDir, true)) {
993      LOG.warn("Failed delete of " + regionDir);
994    }
995  }
996
997  /**
998   * Creates a directory. Assumes the user has already checked for this directory existence.
999   * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
1000   *         whether the directory exists or not, and returns true if it exists.
1001   */
1002  boolean createDir(Path dir) throws IOException {
1003    int i = 0;
1004    IOException lastIOE = null;
1005    do {
1006      try {
1007        return mkdirs(fs, conf, dir);
1008      } catch (IOException ioe) {
1009        lastIOE = ioe;
1010        if (fs.exists(dir)) return true; // directory is present
1011        try {
1012          sleepBeforeRetry("Create Directory", i + 1);
1013        } catch (InterruptedException e) {
1014          throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1015        }
1016      }
1017    } while (++i <= hdfsClientRetriesNumber);
1018    throw new IOException("Exception in createDir", lastIOE);
1019  }
1020
1021  /**
1022   * Renames a directory. Assumes the user has already checked for this directory existence.
1023   * @return true if rename is successful.
1024   */
1025  boolean rename(Path srcpath, Path dstPath) throws IOException {
1026    IOException lastIOE = null;
1027    int i = 0;
1028    do {
1029      try {
1030        return fs.rename(srcpath, dstPath);
1031      } catch (IOException ioe) {
1032        lastIOE = ioe;
1033        if (!fs.exists(srcpath) && fs.exists(dstPath)) return true; // successful move
1034        // dir is not there, retry after some time.
1035        try {
1036          sleepBeforeRetry("Rename Directory", i + 1);
1037        } catch (InterruptedException e) {
1038          throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1039        }
1040      }
1041    } while (++i <= hdfsClientRetriesNumber);
1042
1043    throw new IOException("Exception in rename", lastIOE);
1044  }
1045
1046  /**
1047   * Deletes a directory. Assumes the user has already checked for this directory existence.
1048   * @return true if the directory is deleted.
1049   */
1050  boolean deleteDir(Path dir) throws IOException {
1051    IOException lastIOE = null;
1052    int i = 0;
1053    do {
1054      try {
1055        return fs.delete(dir, true);
1056      } catch (IOException ioe) {
1057        lastIOE = ioe;
1058        if (!fs.exists(dir)) return true;
1059        // dir is there, retry deleting after some time.
1060        try {
1061          sleepBeforeRetry("Delete Directory", i + 1);
1062        } catch (InterruptedException e) {
1063          throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1064        }
1065      }
1066    } while (++i <= hdfsClientRetriesNumber);
1067
1068    throw new IOException("Exception in DeleteDir", lastIOE);
1069  }
1070
1071  /**
1072   * sleeping logic; handles the interrupt exception.
1073   */
1074  private void sleepBeforeRetry(String msg, int sleepMultiplier) throws InterruptedException {
1075    sleepBeforeRetry(msg, sleepMultiplier, baseSleepBeforeRetries, hdfsClientRetriesNumber);
1076  }
1077
1078  /**
1079   * Creates a directory for a filesystem and configuration object. Assumes the user has already
1080   * checked for this directory existence.
1081   * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
1082   *         whether the directory exists or not, and returns true if it exists.
1083   */
1084  private static boolean createDirOnFileSystem(FileSystem fs, Configuration conf, Path dir)
1085    throws IOException {
1086    int i = 0;
1087    IOException lastIOE = null;
1088    int hdfsClientRetriesNumber =
1089      conf.getInt("hdfs.client.retries.number", DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
1090    int baseSleepBeforeRetries =
1091      conf.getInt("hdfs.client.sleep.before.retries", DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
1092    do {
1093      try {
1094        return fs.mkdirs(dir);
1095      } catch (IOException ioe) {
1096        lastIOE = ioe;
1097        if (fs.exists(dir)) return true; // directory is present
1098        try {
1099          sleepBeforeRetry("Create Directory", i + 1, baseSleepBeforeRetries,
1100            hdfsClientRetriesNumber);
1101        } catch (InterruptedException e) {
1102          throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1103        }
1104      }
1105    } while (++i <= hdfsClientRetriesNumber);
1106
1107    throw new IOException("Exception in createDir", lastIOE);
1108  }
1109
1110  /**
1111   * sleeping logic for static methods; handles the interrupt exception. Keeping a static version
1112   * for this to avoid re-looking for the integer values.
1113   */
1114  private static void sleepBeforeRetry(String msg, int sleepMultiplier, int baseSleepBeforeRetries,
1115    int hdfsClientRetriesNumber) throws InterruptedException {
1116    if (sleepMultiplier > hdfsClientRetriesNumber) {
1117      if (LOG.isDebugEnabled()) {
1118        LOG.debug(msg + ", retries exhausted");
1119      }
1120      return;
1121    }
1122    if (LOG.isDebugEnabled()) {
1123      LOG.debug(msg + ", sleeping " + baseSleepBeforeRetries + " times " + sleepMultiplier);
1124    }
1125    Thread.sleep((long) baseSleepBeforeRetries * sleepMultiplier);
1126  }
1127
1128  public static HRegionFileSystem create(final Configuration conf, final FileSystem fs,
1129    final Path tableDir, final RegionInfo regionInfo) throws IOException {
1130    return new HRegionFileSystem(conf, fs, tableDir, regionInfo);
1131  }
1132}