001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.List;
027import java.util.Objects;
028import java.util.Optional;
029import java.util.UUID;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.FileUtil;
036import org.apache.hadoop.fs.LocatedFileStatus;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.permission.FsPermission;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.PrivateCellUtil;
042import org.apache.hadoop.hbase.backup.HFileArchiver;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.fs.HFileSystem;
047import org.apache.hadoop.hbase.io.Reference;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.FSUtils;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
058import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
059
060/**
061 * View to an on-disk Region.
062 * Provides the set of methods necessary to interact with the on-disk region data.
063 */
064@InterfaceAudience.Private
065public class HRegionFileSystem {
066  private static final Logger LOG = LoggerFactory.getLogger(HRegionFileSystem.class);
067
068  /** Name of the region info file that resides just under the region directory. */
069  public final static String REGION_INFO_FILE = ".regioninfo";
070
071  /** Temporary subdirectory of the region directory used for merges. */
072  public static final String REGION_MERGES_DIR = ".merges";
073
074  /** Temporary subdirectory of the region directory used for splits. */
075  public static final String REGION_SPLITS_DIR = ".splits";
076
077  /** Temporary subdirectory of the region directory used for compaction output. */
078  @VisibleForTesting static final String REGION_TEMP_DIR = ".tmp";
079
080  private final RegionInfo regionInfo;
081  //regionInfo for interacting with FS (getting encodedName, etc)
082  final RegionInfo regionInfoForFs;
083  final Configuration conf;
084  private final Path tableDir;
085  final FileSystem fs;
086  private final Path regionDir;
087
088  /**
089   * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the
090   * client level.
091   */
092  private final int hdfsClientRetriesNumber;
093  private final int baseSleepBeforeRetries;
094  private static final int DEFAULT_HDFS_CLIENT_RETRIES_NUMBER = 10;
095  private static final int DEFAULT_BASE_SLEEP_BEFORE_RETRIES = 1000;
096
097  /**
098   * Create a view to the on-disk region
099   * @param conf the {@link Configuration} to use
100   * @param fs {@link FileSystem} that contains the region
101   * @param tableDir {@link Path} to where the table is being stored
102   * @param regionInfo {@link RegionInfo} for region
103   */
104  HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir,
105      final RegionInfo regionInfo) {
106    this.fs = fs;
107    this.conf = conf;
108    this.tableDir = Objects.requireNonNull(tableDir, "tableDir is null");
109    this.regionInfo = Objects.requireNonNull(regionInfo, "regionInfo is null");
110    this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo);
111    this.regionDir = FSUtils.getRegionDirFromTableDir(tableDir, regionInfo);
112    this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
113      DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
114    this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
115      DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
116 }
117
118  /** @return the underlying {@link FileSystem} */
119  public FileSystem getFileSystem() {
120    return this.fs;
121  }
122
123  /** @return the {@link RegionInfo} that describe this on-disk region view */
124  public RegionInfo getRegionInfo() {
125    return this.regionInfo;
126  }
127
128  public RegionInfo getRegionInfoForFS() {
129    return this.regionInfoForFs;
130  }
131
132  /** @return {@link Path} to the region's root directory. */
133  public Path getTableDir() {
134    return this.tableDir;
135  }
136
137  /** @return {@link Path} to the region directory. */
138  public Path getRegionDir() {
139    return regionDir;
140  }
141
142  // ===========================================================================
143  //  Temp Helpers
144  // ===========================================================================
145  /** @return {@link Path} to the region's temp directory, used for file creations */
146  Path getTempDir() {
147    return new Path(getRegionDir(), REGION_TEMP_DIR);
148  }
149
150  /**
151   * Clean up any temp detritus that may have been left around from previous operation attempts.
152   */
153  void cleanupTempDir() throws IOException {
154    deleteDir(getTempDir());
155  }
156
157  // ===========================================================================
158  //  Store/StoreFile Helpers
159  // ===========================================================================
160  /**
161   * Returns the directory path of the specified family
162   * @param familyName Column Family Name
163   * @return {@link Path} to the directory of the specified family
164   */
165  public Path getStoreDir(final String familyName) {
166    return new Path(this.getRegionDir(), familyName);
167  }
168
169  /**
170   * Create the store directory for the specified family name
171   * @param familyName Column Family Name
172   * @return {@link Path} to the directory of the specified family
173   * @throws IOException if the directory creation fails.
174   */
175  Path createStoreDir(final String familyName) throws IOException {
176    Path storeDir = getStoreDir(familyName);
177    if(!fs.exists(storeDir) && !createDir(storeDir))
178      throw new IOException("Failed creating "+storeDir);
179    return storeDir;
180  }
181
182  /**
183   * Set the directory of CF to the specified storage policy. <br>
184   * <i>"LAZY_PERSIST"</i>, <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>,
185   * <i>"COLD"</i> <br>
186   * <br>
187   * See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
188   * @param familyName The name of column family.
189   * @param policyName The name of the storage policy: 'HOT', 'COLD', etc.
190   * See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
191   * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
192   */
193  public void setStoragePolicy(String familyName, String policyName) {
194    CommonFSUtils.setStoragePolicy(this.fs, getStoreDir(familyName), policyName);
195  }
196
197  /**
198   * Get the storage policy of the directory of CF.
199   * @param familyName The name of column family.
200   * @return Storage policy name, or {@code null} if not using {@link HFileSystem} or exception
201   *         thrown when trying to get policy
202   */
203  @Nullable
204  public String getStoragePolicyName(String familyName) {
205    if (this.fs instanceof HFileSystem) {
206      Path storeDir = getStoreDir(familyName);
207      return ((HFileSystem) this.fs).getStoragePolicyName(storeDir);
208    }
209
210    return null;
211  }
212
213  /**
214   * Returns the store files available for the family.
215   * This methods performs the filtering based on the valid store files.
216   * @param familyName Column Family Name
217   * @return a set of {@link StoreFileInfo} for the specified family.
218   */
219  public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) throws IOException {
220    return getStoreFiles(Bytes.toString(familyName));
221  }
222
223  public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
224    return getStoreFiles(familyName, true);
225  }
226
227  /**
228   * Returns the store files available for the family.
229   * This methods performs the filtering based on the valid store files.
230   * @param familyName Column Family Name
231   * @return a set of {@link StoreFileInfo} for the specified family.
232   */
233  public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
234      throws IOException {
235    Path familyDir = getStoreDir(familyName);
236    FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir);
237    if (files == null) {
238      if (LOG.isTraceEnabled()) {
239        LOG.trace("No StoreFiles for: " + familyDir);
240      }
241      return null;
242    }
243
244    ArrayList<StoreFileInfo> storeFiles = new ArrayList<>(files.length);
245    for (FileStatus status: files) {
246      if (validate && !StoreFileInfo.isValid(status)) {
247        // recovered.hfiles directory is expected inside CF path when hbase.wal.split.to.hfile to
248        // true, refer HBASE-23740
249        if (!HConstants.RECOVERED_HFILES_DIR.equals(status.getPath().getName())) {
250          LOG.warn("Invalid StoreFile: {}", status.getPath());
251        }
252        continue;
253      }
254      StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
255        regionInfoForFs, familyName, status.getPath());
256      storeFiles.add(info);
257
258    }
259    return storeFiles;
260  }
261
262  /**
263   * Returns the store files' LocatedFileStatus which available for the family.
264   * This methods performs the filtering based on the valid store files.
265   * @param familyName Column Family Name
266   * @return a list of store files' LocatedFileStatus for the specified family.
267   */
268  public static List<LocatedFileStatus> getStoreFilesLocatedStatus(
269      final HRegionFileSystem regionfs, final String familyName,
270      final boolean validate) throws IOException {
271    Path familyDir = regionfs.getStoreDir(familyName);
272    List<LocatedFileStatus> locatedFileStatuses = CommonFSUtils.listLocatedStatus(
273        regionfs.getFileSystem(), familyDir);
274    if (locatedFileStatuses == null) {
275      if (LOG.isTraceEnabled()) {
276        LOG.trace("No StoreFiles for: " + familyDir);
277      }
278      return null;
279    }
280
281    List<LocatedFileStatus> validStoreFiles = Lists.newArrayList();
282    for (LocatedFileStatus status : locatedFileStatuses) {
283      if (validate && !StoreFileInfo.isValid(status)) {
284        // recovered.hfiles directory is expected inside CF path when hbase.wal.split.to.hfile to
285        // true, refer HBASE-23740
286        if (!HConstants.RECOVERED_HFILES_DIR.equals(status.getPath().getName())) {
287          LOG.warn("Invalid StoreFile: {}", status.getPath());
288        }
289      } else {
290        validStoreFiles.add(status);
291      }
292    }
293    return validStoreFiles;
294  }
295
296  /**
297   * Return Qualified Path of the specified family/file
298   *
299   * @param familyName Column Family Name
300   * @param fileName File Name
301   * @return The qualified Path for the specified family/file
302   */
303  Path getStoreFilePath(final String familyName, final String fileName) {
304    Path familyDir = getStoreDir(familyName);
305    return new Path(familyDir, fileName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
306  }
307
308  /**
309   * Return the store file information of the specified family/file.
310   *
311   * @param familyName Column Family Name
312   * @param fileName File Name
313   * @return The {@link StoreFileInfo} for the specified family/file
314   */
315  StoreFileInfo getStoreFileInfo(final String familyName, final String fileName)
316      throws IOException {
317    Path familyDir = getStoreDir(familyName);
318    return ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
319      regionInfoForFs, familyName, new Path(familyDir, fileName));
320  }
321
322  /**
323   * Returns true if the specified family has reference files
324   * @param familyName Column Family Name
325   * @return true if family contains reference files
326   * @throws IOException
327   */
328  public boolean hasReferences(final String familyName) throws IOException {
329    Path storeDir = getStoreDir(familyName);
330    FileStatus[] files = CommonFSUtils.listStatus(fs, storeDir);
331    if (files != null) {
332      for(FileStatus stat: files) {
333        if(stat.isDirectory()) {
334          continue;
335        }
336        if (StoreFileInfo.isReference(stat.getPath())) {
337          LOG.trace("Reference {}", stat.getPath());
338          return true;
339        }
340      }
341    }
342    return false;
343  }
344
345  /**
346   * Check whether region has Reference file
347   * @param htd table desciptor of the region
348   * @return true if region has reference file
349   * @throws IOException
350   */
351  public boolean hasReferences(final TableDescriptor htd) throws IOException {
352    for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
353      if (hasReferences(family.getNameAsString())) {
354        return true;
355      }
356    }
357    return false;
358  }
359
360  /**
361   * @return the set of families present on disk
362   * @throws IOException
363   */
364  public Collection<String> getFamilies() throws IOException {
365    FileStatus[] fds =
366      CommonFSUtils.listStatus(fs, getRegionDir(), new FSUtils.FamilyDirFilter(fs));
367    if (fds == null) return null;
368
369    ArrayList<String> families = new ArrayList<>(fds.length);
370    for (FileStatus status : fds) {
371      families.add(status.getPath().getName());
372    }
373
374    return families;
375  }
376
377  /**
378   * Remove the region family from disk, archiving the store files.
379   * @param familyName Column Family Name
380   * @throws IOException if an error occours during the archiving
381   */
382  public void deleteFamily(final String familyName) throws IOException {
383    // archive family store files
384    HFileArchiver.archiveFamily(fs, conf, regionInfoForFs, tableDir, Bytes.toBytes(familyName));
385
386    // delete the family folder
387    Path familyDir = getStoreDir(familyName);
388    if(fs.exists(familyDir) && !deleteDir(familyDir))
389      throw new IOException("Could not delete family " + familyName
390          + " from FileSystem for region " + regionInfoForFs.getRegionNameAsString() + "("
391          + regionInfoForFs.getEncodedName() + ")");
392  }
393
394  /**
395   * Generate a unique file name, used by createTempName() and commitStoreFile()
396   * @param suffix extra information to append to the generated name
397   * @return Unique file name
398   */
399  private static String generateUniqueName(final String suffix) {
400    String name = UUID.randomUUID().toString().replaceAll("-", "");
401    if (suffix != null) name += suffix;
402    return name;
403  }
404
405  /**
406   * Generate a unique temporary Path. Used in conjuction with commitStoreFile()
407   * to get a safer file creation.
408   * <code>
409   * Path file = fs.createTempName();
410   * ...StoreFile.Writer(file)...
411   * fs.commitStoreFile("family", file);
412   * </code>
413   *
414   * @return Unique {@link Path} of the temporary file
415   */
416  public Path createTempName() {
417    return createTempName(null);
418  }
419
420  /**
421   * Generate a unique temporary Path. Used in conjuction with commitStoreFile()
422   * to get a safer file creation.
423   * <code>
424   * Path file = fs.createTempName();
425   * ...StoreFile.Writer(file)...
426   * fs.commitStoreFile("family", file);
427   * </code>
428   *
429   * @param suffix extra information to append to the generated name
430   * @return Unique {@link Path} of the temporary file
431   */
432  public Path createTempName(final String suffix) {
433    return new Path(getTempDir(), generateUniqueName(suffix));
434  }
435
436  /**
437   * Move the file from a build/temp location to the main family store directory.
438   * @param familyName Family that will gain the file
439   * @param buildPath {@link Path} to the file to commit.
440   * @return The new {@link Path} of the committed file
441   * @throws IOException
442   */
443  public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
444    Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false);
445    return commitStoreFile(buildPath, dstPath);
446  }
447
448  /**
449   * Generate the filename in the main family store directory for moving the file from a build/temp
450   *  location.
451   * @param familyName Family that will gain the file
452   * @param buildPath {@link Path} to the file to commit.
453   * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
454   * @param generateNewName False if you want to keep the buildPath name
455   * @return The new {@link Path} of the to be committed file
456   * @throws IOException
457   */
458  private Path preCommitStoreFile(final String familyName, final Path buildPath,
459      final long seqNum, final boolean generateNewName) throws IOException {
460    Path storeDir = getStoreDir(familyName);
461    if(!fs.exists(storeDir) && !createDir(storeDir))
462      throw new IOException("Failed creating " + storeDir);
463
464    String name = buildPath.getName();
465    if (generateNewName) {
466      name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
467    }
468    Path dstPath = new Path(storeDir, name);
469    if (!fs.exists(buildPath)) {
470      throw new FileNotFoundException(buildPath.toString());
471    }
472    if (LOG.isDebugEnabled()) {
473      LOG.debug("Committing " + buildPath + " as " + dstPath);
474    }
475    return dstPath;
476  }
477
478  /*
479   * Moves file from staging dir to region dir
480   * @param buildPath {@link Path} to the file to commit.
481   * @param dstPath {@link Path} to the file under region dir
482   * @return The {@link Path} of the committed file
483   * @throws IOException
484   */
485  Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
486    // buildPath exists, therefore not doing an exists() check.
487    if (!rename(buildPath, dstPath)) {
488      throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
489    }
490    return dstPath;
491  }
492
493  /**
494   * Archives the specified store file from the specified family.
495   * @param familyName Family that contains the store files
496   * @param filePath {@link Path} to the store file to remove
497   * @throws IOException if the archiving fails
498   */
499  public void removeStoreFile(final String familyName, final Path filePath)
500      throws IOException {
501    HFileArchiver.archiveStoreFile(this.conf, this.fs, this.regionInfoForFs,
502        this.tableDir, Bytes.toBytes(familyName), filePath);
503  }
504
505  /**
506   * Closes and archives the specified store files from the specified family.
507   * @param familyName Family that contains the store files
508   * @param storeFiles set of store files to remove
509   * @throws IOException if the archiving fails
510   */
511  public void removeStoreFiles(String familyName, Collection<HStoreFile> storeFiles)
512      throws IOException {
513    HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs,
514        this.tableDir, Bytes.toBytes(familyName), storeFiles);
515  }
516
517  /**
518   * Bulk load: Add a specified store file to the specified family.
519   * If the source file is on the same different file-system is moved from the
520   * source location to the destination location, otherwise is copied over.
521   *
522   * @param familyName Family that will gain the file
523   * @param srcPath {@link Path} to the file to import
524   * @param seqNum Bulk Load sequence number
525   * @return The destination {@link Path} of the bulk loaded file
526   * @throws IOException
527   */
528  Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
529      throws IOException {
530    // Copy the file if it's on another filesystem
531    FileSystem srcFs = srcPath.getFileSystem(conf);
532    srcPath = srcFs.resolvePath(srcPath);
533    FileSystem realSrcFs = srcPath.getFileSystem(conf);
534    FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
535
536    // We can't compare FileSystem instances as equals() includes UGI instance
537    // as part of the comparison and won't work when doing SecureBulkLoad
538    // TODO deal with viewFS
539    if (!FSUtils.isSameHdfs(conf, realSrcFs, desFs)) {
540      LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
541          "the destination store. Copying file over to destination filesystem.");
542      Path tmpPath = createTempName();
543      FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
544      LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
545      srcPath = tmpPath;
546    }
547
548    return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true));
549  }
550
551  // ===========================================================================
552  //  Splits Helpers
553  // ===========================================================================
554  /** @return {@link Path} to the temp directory used during split operations */
555  Path getSplitsDir() {
556    return new Path(getRegionDir(), REGION_SPLITS_DIR);
557  }
558
559  public Path getSplitsDir(final RegionInfo hri) {
560    return new Path(getSplitsDir(), hri.getEncodedName());
561  }
562
563  /**
564   * Clean up any split detritus that may have been left around from previous split attempts.
565   */
566  void cleanupSplitsDir() throws IOException {
567    deleteDir(getSplitsDir());
568  }
569
570  /**
571   * Clean up any split detritus that may have been left around from previous
572   * split attempts.
573   * Call this method on initial region deploy.
574   * @throws IOException
575   */
576  void cleanupAnySplitDetritus() throws IOException {
577    Path splitdir = this.getSplitsDir();
578    if (!fs.exists(splitdir)) return;
579    // Look at the splitdir.  It could have the encoded names of the daughter
580    // regions we tried to make.  See if the daughter regions actually got made
581    // out under the tabledir.  If here under splitdir still, then the split did
582    // not complete.  Try and do cleanup.  This code WILL NOT catch the case
583    // where we successfully created daughter a but regionserver crashed during
584    // the creation of region b.  In this case, there'll be an orphan daughter
585    // dir in the filesystem.  TOOD: Fix.
586    FileStatus[] daughters = CommonFSUtils.listStatus(fs, splitdir, new FSUtils.DirFilter(fs));
587    if (daughters != null) {
588      for (FileStatus daughter: daughters) {
589        Path daughterDir = new Path(getTableDir(), daughter.getPath().getName());
590        if (fs.exists(daughterDir) && !deleteDir(daughterDir)) {
591          throw new IOException("Failed delete of " + daughterDir);
592        }
593      }
594    }
595    cleanupSplitsDir();
596    LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
597  }
598
599  /**
600   * Remove daughter region
601   * @param regionInfo daughter {@link RegionInfo}
602   * @throws IOException
603   */
604  void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
605    Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
606    if (this.fs.exists(regionDir) && !deleteDir(regionDir)) {
607      throw new IOException("Failed delete of " + regionDir);
608    }
609  }
610
611  /**
612   * Commit a daughter region, moving it from the split temporary directory
613   * to the proper location in the filesystem.
614   *
615   * @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
616   * @throws IOException
617   */
618  public Path commitDaughterRegion(final RegionInfo regionInfo)
619      throws IOException {
620    Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
621    Path daughterTmpDir = this.getSplitsDir(regionInfo);
622
623    if (fs.exists(daughterTmpDir)) {
624
625      // Write HRI to a file in case we need to recover hbase:meta
626      Path regionInfoFile = new Path(daughterTmpDir, REGION_INFO_FILE);
627      byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
628      writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
629
630      // Move the daughter temp dir to the table dir
631      if (!rename(daughterTmpDir, regionDir)) {
632        throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
633      }
634    }
635
636    return regionDir;
637  }
638
639  /**
640   * Create the region splits directory.
641   */
642  public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws IOException {
643    Path splitdir = getSplitsDir();
644    if (fs.exists(splitdir)) {
645      LOG.info("The " + splitdir + " directory exists.  Hence deleting it to recreate it");
646      if (!deleteDir(splitdir)) {
647        throw new IOException("Failed deletion of " + splitdir + " before creating them again.");
648      }
649    }
650    // splitDir doesn't exists now. No need to do an exists() call for it.
651    if (!createDir(splitdir)) {
652      throw new IOException("Failed create of " + splitdir);
653    }
654    Path daughterATmpDir = getSplitsDir(daughterA);
655    if (!createDir(daughterATmpDir)) {
656      throw new IOException("Failed create of " + daughterATmpDir);
657    }
658    Path daughterBTmpDir = getSplitsDir(daughterB);
659    if (!createDir(daughterBTmpDir)) {
660      throw new IOException("Failed create of " + daughterBTmpDir);
661    }
662  }
663
664  /**
665   * Write out a split reference. Package local so it doesnt leak out of
666   * regionserver.
667   * @param hri {@link RegionInfo} of the destination
668   * @param familyName Column Family Name
669   * @param f File to split.
670   * @param splitRow Split Row
671   * @param top True if we are referring to the top half of the hfile.
672   * @param splitPolicy A split policy instance; be careful! May not be full populated; e.g. if
673   *                    this method is invoked on the Master side, then the RegionSplitPolicy will
674   *                    NOT have a reference to a Region.
675   * @return Path to created reference.
676   * @throws IOException
677   */
678  public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte[] splitRow,
679      boolean top, RegionSplitPolicy splitPolicy) throws IOException {
680    if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
681      // Check whether the split row lies in the range of the store file
682      // If it is outside the range, return directly.
683      f.initReader();
684      try {
685        if (top) {
686          //check if larger than last key.
687          Cell splitKey = PrivateCellUtil.createFirstOnRow(splitRow);
688          Optional<Cell> lastKey = f.getLastKey();
689          // If lastKey is null means storefile is empty.
690          if (!lastKey.isPresent()) {
691            return null;
692          }
693          if (f.getComparator().compare(splitKey, lastKey.get()) > 0) {
694            return null;
695          }
696        } else {
697          //check if smaller than first key
698          Cell splitKey = PrivateCellUtil.createLastOnRow(splitRow);
699          Optional<Cell> firstKey = f.getFirstKey();
700          // If firstKey is null means storefile is empty.
701          if (!firstKey.isPresent()) {
702            return null;
703          }
704          if (f.getComparator().compare(splitKey, firstKey.get()) < 0) {
705            return null;
706          }
707        }
708      } finally {
709        f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
710      }
711    }
712
713    Path splitDir = new Path(getSplitsDir(hri), familyName);
714    // A reference to the bottom half of the hsf store file.
715    Reference r =
716      top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
717    // Add the referred-to regions name as a dot separated suffix.
718    // See REF_NAME_REGEX regex above.  The referred-to regions name is
719    // up in the path of the passed in <code>f</code> -- parentdir is family,
720    // then the directory above is the region name.
721    String parentRegionName = regionInfoForFs.getEncodedName();
722    // Write reference with same file id only with the other region name as
723    // suffix and into the new region location (under same family).
724    Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
725    return r.write(fs, p);
726  }
727
728  // ===========================================================================
729  //  Merge Helpers
730  // ===========================================================================
731  /** @return {@link Path} to the temp directory used during merge operations */
732  public Path getMergesDir() {
733    return new Path(getRegionDir(), REGION_MERGES_DIR);
734  }
735
736  Path getMergesDir(final RegionInfo hri) {
737    return new Path(getMergesDir(), hri.getEncodedName());
738  }
739
740  /**
741   * Clean up any merge detritus that may have been left around from previous merge attempts.
742   */
743  void cleanupMergesDir() throws IOException {
744    deleteDir(getMergesDir());
745  }
746
747  /**
748   * Remove merged region
749   * @param mergedRegion {@link RegionInfo}
750   * @throws IOException
751   */
752  public void cleanupMergedRegion(final RegionInfo mergedRegion) throws IOException {
753    Path regionDir = new Path(this.tableDir, mergedRegion.getEncodedName());
754    if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) {
755      throw new IOException("Failed delete of " + regionDir);
756    }
757  }
758
759  static boolean mkdirs(FileSystem fs, Configuration conf, Path dir) throws IOException {
760    if (FSUtils.isDistributedFileSystem(fs) ||
761      !conf.getBoolean(HConstants.ENABLE_DATA_FILE_UMASK, false)) {
762      return fs.mkdirs(dir);
763    }
764    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
765    return fs.mkdirs(dir, perms);
766  }
767
768  /**
769   * Create the region merges directory, a temporary directory to accumulate
770   * merges in.
771   * @throws IOException If merges dir already exists or we fail to create it.
772   * @see HRegionFileSystem#cleanupMergesDir()
773   */
774  public void createMergesDir() throws IOException {
775    Path mergesdir = getMergesDir();
776    if (fs.exists(mergesdir)) {
777      LOG.info("{} directory exists. Deleting it to recreate it anew", mergesdir);
778      if (!fs.delete(mergesdir, true)) {
779        throw new IOException("Failed deletion of " + mergesdir + " before recreate.");
780      }
781    }
782    if (!mkdirs(fs, conf, mergesdir)) {
783      throw new IOException("Failed create of " + mergesdir);
784    }
785  }
786
787  /**
788   * Write out a merge reference under the given merges directory. Package local
789   * so it doesnt leak out of regionserver.
790   * @param mergedRegion {@link RegionInfo} of the merged region
791   * @param familyName Column Family Name
792   * @param f File to create reference.
793   * @param mergedDir
794   * @return Path to created reference.
795   * @throws IOException
796   */
797  public Path mergeStoreFile(RegionInfo mergedRegion, String familyName, HStoreFile f,
798      Path mergedDir) throws IOException {
799    Path referenceDir = new Path(new Path(mergedDir,
800        mergedRegion.getEncodedName()), familyName);
801    // A whole reference to the store file.
802    Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
803    // Add the referred-to regions name as a dot separated suffix.
804    // See REF_NAME_REGEX regex above. The referred-to regions name is
805    // up in the path of the passed in <code>f</code> -- parentdir is family,
806    // then the directory above is the region name.
807    String mergingRegionName = regionInfoForFs.getEncodedName();
808    // Write reference with same file id only with the other region name as
809    // suffix and into the new region location (under same family).
810    Path p = new Path(referenceDir, f.getPath().getName() + "."
811        + mergingRegionName);
812    return r.write(fs, p);
813  }
814
815  /**
816   * Commit a merged region, moving it from the merges temporary directory to
817   * the proper location in the filesystem.
818   * @param mergedRegionInfo merged region {@link RegionInfo}
819   * @throws IOException
820   */
821  public void commitMergedRegion(final RegionInfo mergedRegionInfo) throws IOException {
822    Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
823    Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
824    // Move the tmp dir to the expected location
825    if (mergedRegionTmpDir != null && fs.exists(mergedRegionTmpDir)) {
826
827      // Write HRI to a file in case we need to recover hbase:meta
828      Path regionInfoFile = new Path(mergedRegionTmpDir, REGION_INFO_FILE);
829      byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
830      writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
831
832      if (!fs.rename(mergedRegionTmpDir, regionDir)) {
833        throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
834            + regionDir);
835      }
836    }
837  }
838
839  // ===========================================================================
840  //  Create/Open/Delete Helpers
841  // ===========================================================================
842  /**
843   * Log the current state of the region
844   * @param LOG log to output information
845   * @throws IOException if an unexpected exception occurs
846   */
847  void logFileSystemState(final Logger LOG) throws IOException {
848    CommonFSUtils.logFileSystemState(fs, this.getRegionDir(), LOG);
849  }
850
851  /**
852   * @param hri
853   * @return Content of the file we write out to the filesystem under a region
854   * @throws IOException
855   */
856  private static byte[] getRegionInfoFileContent(final RegionInfo hri) throws IOException {
857    return RegionInfo.toDelimitedByteArray(hri);
858  }
859
860  /**
861   * Create a {@link RegionInfo} from the serialized version on-disk.
862   * @param fs {@link FileSystem} that contains the Region Info file
863   * @param regionDir {@link Path} to the Region Directory that contains the Info file
864   * @return An {@link RegionInfo} instance gotten from the Region Info file.
865   * @throws IOException if an error occurred during file open/read operation.
866   */
867  public static RegionInfo loadRegionInfoFileContent(final FileSystem fs, final Path regionDir)
868      throws IOException {
869    FSDataInputStream in = fs.open(new Path(regionDir, REGION_INFO_FILE));
870    try {
871      return RegionInfo.parseFrom(in);
872    } finally {
873      in.close();
874    }
875  }
876
877  /**
878   * Write the .regioninfo file on-disk.
879   * <p/>
880   * Overwrites if exists already.
881   */
882  private static void writeRegionInfoFileContent(final Configuration conf, final FileSystem fs,
883    final Path regionInfoFile, final byte[] content) throws IOException {
884    // First check to get the permissions
885    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
886    // Write the RegionInfo file content
887    try (FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null)) {
888      out.write(content);
889    }
890  }
891
892  /**
893   * Write out an info file under the stored region directory. Useful recovering mangled regions.
894   * If the regionInfo already exists on-disk, then we fast exit.
895   */
896  void checkRegionInfoOnFilesystem() throws IOException {
897    // Compose the content of the file so we can compare to length in filesystem. If not same,
898    // rewrite it (it may have been written in the old format using Writables instead of pb). The
899    // pb version is much shorter -- we write now w/o the toString version -- so checking length
900    // only should be sufficient. I don't want to read the file every time to check if it pb
901    // serialized.
902    byte[] content = getRegionInfoFileContent(regionInfoForFs);
903
904    // Verify if the region directory exists before opening a region. We need to do this since if
905    // the region directory doesn't exist we will re-create the region directory and a new HRI
906    // when HRegion.openHRegion() is called.
907    try {
908      FileStatus status = fs.getFileStatus(getRegionDir());
909    } catch (FileNotFoundException e) {
910      LOG.warn(getRegionDir() + " doesn't exist for region: " + regionInfoForFs.getEncodedName() +
911          " on table " + regionInfo.getTable());
912    }
913
914    try {
915      Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE);
916      FileStatus status = fs.getFileStatus(regionInfoFile);
917      if (status != null && status.getLen() == content.length) {
918        // Then assume the content good and move on.
919        // NOTE: that the length is not sufficient to define the the content matches.
920        return;
921      }
922
923      LOG.info("Rewriting .regioninfo file at: " + regionInfoFile);
924      if (!fs.delete(regionInfoFile, false)) {
925        throw new IOException("Unable to remove existing " + regionInfoFile);
926      }
927    } catch (FileNotFoundException e) {
928      LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfoForFs.getEncodedName() +
929          " on table " + regionInfo.getTable());
930    }
931
932    // Write HRI to a file in case we need to recover hbase:meta
933    writeRegionInfoOnFilesystem(content, true);
934  }
935
936  /**
937   * Write out an info file under the region directory. Useful recovering mangled regions.
938   * @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation.
939   */
940  private void writeRegionInfoOnFilesystem(boolean useTempDir) throws IOException {
941    byte[] content = getRegionInfoFileContent(regionInfoForFs);
942    writeRegionInfoOnFilesystem(content, useTempDir);
943  }
944
945  /**
946   * Write out an info file under the region directory. Useful recovering mangled regions.
947   * @param regionInfoContent serialized version of the {@link RegionInfo}
948   * @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation.
949   */
950  private void writeRegionInfoOnFilesystem(final byte[] regionInfoContent,
951      final boolean useTempDir) throws IOException {
952    Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE);
953    if (useTempDir) {
954      // Create in tmpDir and then move into place in case we crash after
955      // create but before close. If we don't successfully close the file,
956      // subsequent region reopens will fail the below because create is
957      // registered in NN.
958
959      // And then create the file
960      Path tmpPath = new Path(getTempDir(), REGION_INFO_FILE);
961
962      // If datanode crashes or if the RS goes down just before the close is called while trying to
963      // close the created regioninfo file in the .tmp directory then on next
964      // creation we will be getting AlreadyCreatedException.
965      // Hence delete and create the file if exists.
966      if (CommonFSUtils.isExists(fs, tmpPath)) {
967        CommonFSUtils.delete(fs, tmpPath, true);
968      }
969
970      // Write HRI to a file in case we need to recover hbase:meta
971      writeRegionInfoFileContent(conf, fs, tmpPath, regionInfoContent);
972
973      // Move the created file to the original path
974      if (fs.exists(tmpPath) &&  !rename(tmpPath, regionInfoFile)) {
975        throw new IOException("Unable to rename " + tmpPath + " to " + regionInfoFile);
976      }
977    } else {
978      // Write HRI to a file in case we need to recover hbase:meta
979      writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
980    }
981  }
982
983  /**
984   * Create a new Region on file-system.
985   * @param conf the {@link Configuration} to use
986   * @param fs {@link FileSystem} from which to add the region
987   * @param tableDir {@link Path} to where the table is being stored
988   * @param regionInfo {@link RegionInfo} for region to be added
989   * @throws IOException if the region creation fails due to a FileSystem exception.
990   */
991  public static HRegionFileSystem createRegionOnFileSystem(final Configuration conf,
992      final FileSystem fs, final Path tableDir, final RegionInfo regionInfo) throws IOException {
993    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
994
995    // We only create a .regioninfo and the region directory if this is the default region replica
996    if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
997      Path regionDir = regionFs.getRegionDir();
998      if (fs.exists(regionDir)) {
999        LOG.warn("Trying to create a region that already exists on disk: " + regionDir);
1000      } else {
1001        // Create the region directory
1002        if (!createDirOnFileSystem(fs, conf, regionDir)) {
1003          LOG.warn("Unable to create the region directory: " + regionDir);
1004          throw new IOException("Unable to create region directory: " + regionDir);
1005        }
1006      }
1007
1008      // Write HRI to a file in case we need to recover hbase:meta
1009      regionFs.writeRegionInfoOnFilesystem(false);
1010    } else {
1011      if (LOG.isDebugEnabled())
1012        LOG.debug("Skipping creation of .regioninfo file for " + regionInfo);
1013    }
1014    return regionFs;
1015  }
1016
1017  /**
1018   * Open Region from file-system.
1019   * @param conf the {@link Configuration} to use
1020   * @param fs {@link FileSystem} from which to add the region
1021   * @param tableDir {@link Path} to where the table is being stored
1022   * @param regionInfo {@link RegionInfo} for region to be added
1023   * @param readOnly True if you don't want to edit the region data
1024   * @throws IOException if the region creation fails due to a FileSystem exception.
1025   */
1026  public static HRegionFileSystem openRegionFromFileSystem(final Configuration conf,
1027      final FileSystem fs, final Path tableDir, final RegionInfo regionInfo, boolean readOnly)
1028      throws IOException {
1029    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
1030    Path regionDir = regionFs.getRegionDir();
1031
1032    if (!fs.exists(regionDir)) {
1033      LOG.warn("Trying to open a region that do not exists on disk: " + regionDir);
1034      throw new IOException("The specified region do not exists on disk: " + regionDir);
1035    }
1036
1037    if (!readOnly) {
1038      // Cleanup temporary directories
1039      regionFs.cleanupTempDir();
1040      regionFs.cleanupSplitsDir();
1041      regionFs.cleanupMergesDir();
1042
1043      // If it doesn't exists, Write HRI to a file, in case we need to recover hbase:meta
1044      // Only create HRI if we are the default replica
1045      if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
1046        regionFs.checkRegionInfoOnFilesystem();
1047      } else {
1048        if (LOG.isDebugEnabled()) {
1049          LOG.debug("Skipping creation of .regioninfo file for " + regionInfo);
1050        }
1051      }
1052    }
1053
1054    return regionFs;
1055  }
1056
1057  /**
1058   * Remove the region from the table directory, archiving the region's hfiles.
1059   * @param conf the {@link Configuration} to use
1060   * @param fs {@link FileSystem} from which to remove the region
1061   * @param tableDir {@link Path} to where the table is being stored
1062   * @param regionInfo {@link RegionInfo} for region to be deleted
1063   * @throws IOException if the request cannot be completed
1064   */
1065  public static void deleteRegionFromFileSystem(final Configuration conf,
1066      final FileSystem fs, final Path tableDir, final RegionInfo regionInfo) throws IOException {
1067    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
1068    Path regionDir = regionFs.getRegionDir();
1069
1070    if (!fs.exists(regionDir)) {
1071      LOG.warn("Trying to delete a region that do not exists on disk: " + regionDir);
1072      return;
1073    }
1074
1075    if (LOG.isDebugEnabled()) {
1076      LOG.debug("DELETING region " + regionDir);
1077    }
1078
1079    // Archive region
1080    Path rootDir = CommonFSUtils.getRootDir(conf);
1081    HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir);
1082
1083    // Delete empty region dir
1084    if (!fs.delete(regionDir, true)) {
1085      LOG.warn("Failed delete of " + regionDir);
1086    }
1087  }
1088
1089  /**
1090   * Creates a directory. Assumes the user has already checked for this directory existence.
1091   * @param dir
1092   * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
1093   *         whether the directory exists or not, and returns true if it exists.
1094   * @throws IOException
1095   */
1096  boolean createDir(Path dir) throws IOException {
1097    int i = 0;
1098    IOException lastIOE = null;
1099    do {
1100      try {
1101        return mkdirs(fs, conf, dir);
1102      } catch (IOException ioe) {
1103        lastIOE = ioe;
1104        if (fs.exists(dir)) return true; // directory is present
1105        try {
1106          sleepBeforeRetry("Create Directory", i+1);
1107        } catch (InterruptedException e) {
1108          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1109        }
1110      }
1111    } while (++i <= hdfsClientRetriesNumber);
1112    throw new IOException("Exception in createDir", lastIOE);
1113  }
1114
1115  /**
1116   * Renames a directory. Assumes the user has already checked for this directory existence.
1117   * @param srcpath
1118   * @param dstPath
1119   * @return true if rename is successful.
1120   * @throws IOException
1121   */
1122  boolean rename(Path srcpath, Path dstPath) throws IOException {
1123    IOException lastIOE = null;
1124    int i = 0;
1125    do {
1126      try {
1127        return fs.rename(srcpath, dstPath);
1128      } catch (IOException ioe) {
1129        lastIOE = ioe;
1130        if (!fs.exists(srcpath) && fs.exists(dstPath)) return true; // successful move
1131        // dir is not there, retry after some time.
1132        try {
1133          sleepBeforeRetry("Rename Directory", i+1);
1134        } catch (InterruptedException e) {
1135          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1136        }
1137      }
1138    } while (++i <= hdfsClientRetriesNumber);
1139
1140    throw new IOException("Exception in rename", lastIOE);
1141  }
1142
1143  /**
1144   * Deletes a directory. Assumes the user has already checked for this directory existence.
1145   * @param dir
1146   * @return true if the directory is deleted.
1147   * @throws IOException
1148   */
1149  boolean deleteDir(Path dir) throws IOException {
1150    IOException lastIOE = null;
1151    int i = 0;
1152    do {
1153      try {
1154        return fs.delete(dir, true);
1155      } catch (IOException ioe) {
1156        lastIOE = ioe;
1157        if (!fs.exists(dir)) return true;
1158        // dir is there, retry deleting after some time.
1159        try {
1160          sleepBeforeRetry("Delete Directory", i+1);
1161        } catch (InterruptedException e) {
1162          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1163        }
1164      }
1165    } while (++i <= hdfsClientRetriesNumber);
1166
1167    throw new IOException("Exception in DeleteDir", lastIOE);
1168  }
1169
1170  /**
1171   * sleeping logic; handles the interrupt exception.
1172   */
1173  private void sleepBeforeRetry(String msg, int sleepMultiplier) throws InterruptedException {
1174    sleepBeforeRetry(msg, sleepMultiplier, baseSleepBeforeRetries, hdfsClientRetriesNumber);
1175  }
1176
1177  /**
1178   * Creates a directory for a filesystem and configuration object. Assumes the user has already
1179   * checked for this directory existence.
1180   * @param fs
1181   * @param conf
1182   * @param dir
1183   * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
1184   *         whether the directory exists or not, and returns true if it exists.
1185   * @throws IOException
1186   */
1187  private static boolean createDirOnFileSystem(FileSystem fs, Configuration conf, Path dir)
1188      throws IOException {
1189    int i = 0;
1190    IOException lastIOE = null;
1191    int hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
1192      DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
1193    int baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
1194      DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
1195    do {
1196      try {
1197        return fs.mkdirs(dir);
1198      } catch (IOException ioe) {
1199        lastIOE = ioe;
1200        if (fs.exists(dir)) return true; // directory is present
1201        try {
1202          sleepBeforeRetry("Create Directory", i+1, baseSleepBeforeRetries, hdfsClientRetriesNumber);
1203        } catch (InterruptedException e) {
1204          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1205        }
1206      }
1207    } while (++i <= hdfsClientRetriesNumber);
1208
1209    throw new IOException("Exception in createDir", lastIOE);
1210  }
1211
1212  /**
1213   * sleeping logic for static methods; handles the interrupt exception. Keeping a static version
1214   * for this to avoid re-looking for the integer values.
1215   */
1216  private static void sleepBeforeRetry(String msg, int sleepMultiplier, int baseSleepBeforeRetries,
1217      int hdfsClientRetriesNumber) throws InterruptedException {
1218    if (sleepMultiplier > hdfsClientRetriesNumber) {
1219      if (LOG.isDebugEnabled()) {
1220        LOG.debug(msg + ", retries exhausted");
1221      }
1222      return;
1223    }
1224    if (LOG.isDebugEnabled()) {
1225      LOG.debug(msg + ", sleeping " + baseSleepBeforeRetries + " times " + sleepMultiplier);
1226    }
1227    Thread.sleep((long)baseSleepBeforeRetries * sleepMultiplier);
1228  }
1229}