1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.UUID;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FSDataInputStream;
35  import org.apache.hadoop.fs.FSDataOutputStream;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.FileUtil;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.fs.PathFilter;
41  import org.apache.hadoop.fs.permission.FsPermission;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.backup.HFileArchiver;
48  import org.apache.hadoop.hbase.fs.HFileSystem;
49  import org.apache.hadoop.hbase.io.Reference;
50  import org.apache.hadoop.hbase.util.FSUtils;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.Threads;
53  
54  /**
55   * View to an on-disk Region.
56   * Provides the set of methods necessary to interact with the on-disk region data.
57   */
58  @InterfaceAudience.Private
59  public class HRegionFileSystem {
60    public static final Log LOG = LogFactory.getLog(HRegionFileSystem.class);
61  
62    /** Name of the region info file that resides just under the region directory. */
63    public final static String REGION_INFO_FILE = ".regioninfo";
64  
65    /** Temporary subdirectory of the region directory used for merges. */
66    public static final String REGION_MERGES_DIR = ".merges";
67  
68    /** Temporary subdirectory of the region directory used for splits. */
69    public static final String REGION_SPLITS_DIR = ".splits";
70  
71    /** Temporary subdirectory of the region directory used for compaction output. */
72    private static final String REGION_TEMP_DIR = ".tmp";
73  
74    private final HRegionInfo regionInfo;
75    private final Configuration conf;
76    private final Path tableDir;
77    private final FileSystem fs;
78    
79    /**
80     * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the
81     * client level.
82     */
83    private final int hdfsClientRetriesNumber;
84    private final int baseSleepBeforeRetries;
85    private static final int DEFAULT_HDFS_CLIENT_RETRIES_NUMBER = 10;
86    private static final int DEFAULT_BASE_SLEEP_BEFORE_RETRIES = 1000;
87  
88    /**
89     * Create a view to the on-disk region
90     * @param conf the {@link Configuration} to use
91     * @param fs {@link FileSystem} that contains the region
92     * @param tableDir {@link Path} to where the table is being stored
93     * @param regionInfo {@link HRegionInfo} for region
94     */
95    HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir,
96        final HRegionInfo regionInfo) {
97      this.fs = fs;
98      this.conf = conf;
99      this.tableDir = tableDir;
100     this.regionInfo = regionInfo;
101     this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
102       DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
103     this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
104       DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
105  }
106 
107   /** @return the underlying {@link FileSystem} */
108   public FileSystem getFileSystem() {
109     return this.fs;
110   }
111 
112   /** @return the {@link HRegionInfo} that describe this on-disk region view */
113   public HRegionInfo getRegionInfo() {
114     return this.regionInfo;
115   }
116 
117   /** @return {@link Path} to the region's root directory. */
118   public Path getTableDir() {
119     return this.tableDir;
120   }
121 
122   /** @return {@link Path} to the region directory. */
123   public Path getRegionDir() {
124     return new Path(this.tableDir, this.regionInfo.getEncodedName());
125   }
126 
127   // ===========================================================================
128   //  Temp Helpers
129   // ===========================================================================
130   /** @return {@link Path} to the region's temp directory, used for file creations */
131   Path getTempDir() {
132     return new Path(getRegionDir(), REGION_TEMP_DIR);
133   }
134 
135   /**
136    * Clean up any temp detritus that may have been left around from previous operation attempts.
137    */
138   void cleanupTempDir() throws IOException {
139     deleteDir(getTempDir());
140   }
141 
142   // ===========================================================================
143   //  Store/StoreFile Helpers
144   // ===========================================================================
145   /**
146    * Returns the directory path of the specified family
147    * @param familyName Column Family Name
148    * @return {@link Path} to the directory of the specified family
149    */
150   Path getStoreDir(final String familyName) {
151     return new Path(this.getRegionDir(), familyName);
152   }
153 
154   /**
155    * Create the store directory for the specified family name
156    * @param familyName Column Family Name
157    * @return {@link Path} to the directory of the specified family
158    * @throws IOException if the directory creation fails.
159    */
160   Path createStoreDir(final String familyName) throws IOException {
161     Path storeDir = getStoreDir(familyName);
162     if(!fs.exists(storeDir) && !createDir(storeDir))
163       throw new IOException("Failed creating "+storeDir);
164     return storeDir;
165   }
166 
167   /**
168    * Returns the store files available for the family.
169    * This methods performs the filtering based on the valid store files.
170    * @param familyName Column Family Name
171    * @return a set of {@link StoreFileInfo} for the specified family.
172    */
173   public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) throws IOException {
174     return getStoreFiles(Bytes.toString(familyName));
175   }
176 
177   /**
178    * Returns the store files available for the family.
179    * This methods performs the filtering based on the valid store files.
180    * @param familyName Column Family Name
181    * @return a set of {@link StoreFileInfo} for the specified family.
182    */
183   public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
184     Path familyDir = getStoreDir(familyName);
185     FileStatus[] files = FSUtils.listStatus(this.fs, familyDir);
186     if (files == null) return null;
187 
188     ArrayList<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(files.length);
189     for (FileStatus status: files) {
190       if (!StoreFileInfo.isValid(status)) continue;
191 
192       storeFiles.add(new StoreFileInfo(this.conf, this.fs, status));
193     }
194     return storeFiles;
195   }
196 
197   /**
198    * Returns true if the specified family has reference files
199    * @param familyName Column Family Name
200    * @return true if family contains reference files
201    * @throws IOException
202    */
203   public boolean hasReferences(final String familyName) throws IOException {
204     FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName),
205       new PathFilter () {
206         public boolean accept(Path path) {
207           return StoreFileInfo.isReference(path);
208         }
209       }
210     );
211     return files != null && files.length > 0;
212   }
213 
214   /**
215    * Check whether region has Reference file
216    * @param htd table desciptor of the region
217    * @return true if region has reference file
218    * @throws IOException
219    */
220   public boolean hasReferences(final HTableDescriptor htd) throws IOException {
221     for (HColumnDescriptor family : htd.getFamilies()) {
222       if (hasReferences(family.getNameAsString())) {
223         return true;
224       }
225     }
226     return false;
227   }
228 
229   /**
230    * @return the set of families present on disk
231    * @throws IOException
232    */
233   public Collection<String> getFamilies() throws IOException {
234     FileStatus[] fds = FSUtils.listStatus(fs, getRegionDir(), new FSUtils.FamilyDirFilter(fs));
235     if (fds == null) return null;
236 
237     ArrayList<String> families = new ArrayList<String>(fds.length);
238     for (FileStatus status: fds) {
239       families.add(status.getPath().getName());
240     }
241 
242     return families;
243   }
244 
245   /**
246    * Remove the region family from disk, archiving the store files.
247    * @param familyName Column Family Name
248    * @throws IOException if an error occours during the archiving
249    */
250   public void deleteFamily(final String familyName) throws IOException {
251     // archive family store files
252     HFileArchiver.archiveFamily(fs, conf, regionInfo, tableDir, Bytes.toBytes(familyName));
253 
254     // delete the family folder
255     Path familyDir = getStoreDir(familyName);
256     if(fs.exists(familyDir) && !deleteDir(familyDir))
257       throw new IOException("Could not delete family " + familyName
258           + " from FileSystem for region " + regionInfo.getRegionNameAsString() + "("
259           + regionInfo.getEncodedName() + ")");
260   }
261 
262   /**
263    * Generate a unique file name, used by createTempName() and commitStoreFile()
264    * @param suffix extra information to append to the generated name
265    * @return Unique file name
266    */
267   private static String generateUniqueName(final String suffix) {
268     String name = UUID.randomUUID().toString().replaceAll("-", "");
269     if (suffix != null) name += suffix;
270     return name;
271   }
272 
273   /**
274    * Generate a unique temporary Path. Used in conjuction with commitStoreFile()
275    * to get a safer file creation.
276    * <code>
277    * Path file = fs.createTempName();
278    * ...StoreFile.Writer(file)...
279    * fs.commitStoreFile("family", file);
280    * </code>
281    *
282    * @return Unique {@link Path} of the temporary file
283    */
284   public Path createTempName() {
285     return createTempName(null);
286   }
287 
288   /**
289    * Generate a unique temporary Path. Used in conjuction with commitStoreFile()
290    * to get a safer file creation.
291    * <code>
292    * Path file = fs.createTempName();
293    * ...StoreFile.Writer(file)...
294    * fs.commitStoreFile("family", file);
295    * </code>
296    *
297    * @param suffix extra information to append to the generated name
298    * @return Unique {@link Path} of the temporary file
299    */
300   public Path createTempName(final String suffix) {
301     return new Path(getTempDir(), generateUniqueName(suffix));
302   }
303 
304   /**
305    * Move the file from a build/temp location to the main family store directory.
306    * @param familyName Family that will gain the file
307    * @param buildPath {@link Path} to the file to commit.
308    * @return The new {@link Path} of the committed file
309    * @throws IOException
310    */
311   public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
312     return commitStoreFile(familyName, buildPath, -1, false);
313   }
314 
315   /**
316    * Move the file from a build/temp location to the main family store directory.
317    * @param familyName Family that will gain the file
318    * @param buildPath {@link Path} to the file to commit.
319    * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
320    * @param generateNewName False if you want to keep the buildPath name
321    * @return The new {@link Path} of the committed file
322    * @throws IOException
323    */
324   private Path commitStoreFile(final String familyName, final Path buildPath,
325       final long seqNum, final boolean generateNewName) throws IOException {
326     Path storeDir = getStoreDir(familyName);
327     if(!fs.exists(storeDir) && !createDir(storeDir))
328       throw new IOException("Failed creating " + storeDir);
329     
330     String name = buildPath.getName();
331     if (generateNewName) {
332       name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
333     }
334     Path dstPath = new Path(storeDir, name);
335     if (!fs.exists(buildPath)) {
336       throw new FileNotFoundException(buildPath.toString());
337     }
338     LOG.debug("Committing store file " + buildPath + " as " + dstPath);
339     // buildPath exists, therefore not doing an exists() check.
340     if (!rename(buildPath, dstPath)) {
341       throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
342     }
343     return dstPath;
344   }
345 
346 
347   /**
348    * Moves multiple store files to the relative region's family store directory.
349    * @param storeFiles list of store files divided by family
350    * @throws IOException
351    */
352   void commitStoreFiles(final Map<byte[], List<StoreFile>> storeFiles) throws IOException {
353     for (Map.Entry<byte[], List<StoreFile>> es: storeFiles.entrySet()) {
354       String familyName = Bytes.toString(es.getKey());
355       for (StoreFile sf: es.getValue()) {
356         commitStoreFile(familyName, sf.getPath());
357       }
358     }
359   }
360 
361   /**
362    * Archives the specified store file from the specified family.
363    * @param familyName Family that contains the store files
364    * @param filePath {@link Path} to the store file to remove
365    * @throws IOException if the archiving fails
366    */
367   public void removeStoreFile(final String familyName, final Path filePath)
368       throws IOException {
369     HFileArchiver.archiveStoreFile(this.conf, this.fs, this.regionInfo,
370         this.tableDir, Bytes.toBytes(familyName), filePath);
371   }
372 
373   /**
374    * Closes and archives the specified store files from the specified family.
375    * @param familyName Family that contains the store files
376    * @param storeFiles set of store files to remove
377    * @throws IOException if the archiving fails
378    */
379   public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles)
380       throws IOException {
381     HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfo,
382         this.tableDir, Bytes.toBytes(familyName), storeFiles);
383   }
384 
385   /**
386    * Bulk load: Add a specified store file to the specified family.
387    * If the source file is on the same different file-system is moved from the
388    * source location to the destination location, otherwise is copied over.
389    *
390    * @param familyName Family that will gain the file
391    * @param srcPath {@link Path} to the file to import
392    * @param seqNum Bulk Load sequence number
393    * @return The destination {@link Path} of the bulk loaded file
394    * @throws IOException
395    */
396   Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
397       throws IOException {
398     // Copy the file if it's on another filesystem
399     FileSystem srcFs = srcPath.getFileSystem(conf);
400     FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
401 
402     // We can't compare FileSystem instances as equals() includes UGI instance
403     // as part of the comparison and won't work when doing SecureBulkLoad
404     // TODO deal with viewFS
405     if (!srcFs.getUri().equals(desFs.getUri())) {
406       LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
407           "the destination store. Copying file over to destination filesystem.");
408       Path tmpPath = createTempName();
409       FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
410       LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
411       srcPath = tmpPath;
412     }
413 
414     return commitStoreFile(familyName, srcPath, seqNum, true);
415   }
416 
417   // ===========================================================================
418   //  Splits Helpers
419   // ===========================================================================
420   /** @return {@link Path} to the temp directory used during split operations */
421   Path getSplitsDir() {
422     return new Path(getRegionDir(), REGION_SPLITS_DIR);
423   }
424 
425   Path getSplitsDir(final HRegionInfo hri) {
426     return new Path(getSplitsDir(), hri.getEncodedName());
427   }
428 
429   /**
430    * Clean up any split detritus that may have been left around from previous split attempts.
431    */
432   void cleanupSplitsDir() throws IOException {
433     deleteDir(getSplitsDir());
434   }
435 
436   /**
437    * Clean up any split detritus that may have been left around from previous
438    * split attempts.
439    * Call this method on initial region deploy.
440    * @throws IOException
441    */
442   void cleanupAnySplitDetritus() throws IOException {
443     Path splitdir = this.getSplitsDir();
444     if (!fs.exists(splitdir)) return;
445     // Look at the splitdir.  It could have the encoded names of the daughter
446     // regions we tried to make.  See if the daughter regions actually got made
447     // out under the tabledir.  If here under splitdir still, then the split did
448     // not complete.  Try and do cleanup.  This code WILL NOT catch the case
449     // where we successfully created daughter a but regionserver crashed during
450     // the creation of region b.  In this case, there'll be an orphan daughter
451     // dir in the filesystem.  TOOD: Fix.
452     FileStatus[] daughters = FSUtils.listStatus(fs, splitdir, new FSUtils.DirFilter(fs));
453     if (daughters != null) {
454       for (FileStatus daughter: daughters) {
455         Path daughterDir = new Path(getTableDir(), daughter.getPath().getName());
456         if (fs.exists(daughterDir) && !deleteDir(daughterDir)) {
457           throw new IOException("Failed delete of " + daughterDir);
458         }
459       }
460     }
461     cleanupSplitsDir();
462     LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
463   }
464 
465   /**
466    * Remove daughter region
467    * @param regionInfo daughter {@link HRegionInfo}
468    * @throws IOException
469    */
470   void cleanupDaughterRegion(final HRegionInfo regionInfo) throws IOException {
471     Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
472     if (this.fs.exists(regionDir) && !deleteDir(regionDir)) {
473       throw new IOException("Failed delete of " + regionDir);
474     }
475   }
476 
477   /**
478    * Commit a daughter region, moving it from the split temporary directory
479    * to the proper location in the filesystem.
480    * @param regionInfo daughter {@link HRegionInfo}
481    * @throws IOException
482    */
483   Path commitDaughterRegion(final HRegionInfo regionInfo) throws IOException {
484     Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
485     Path daughterTmpDir = this.getSplitsDir(regionInfo);
486     if (fs.exists(daughterTmpDir) && !rename(daughterTmpDir, regionDir)) {
487       throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
488     }
489     return regionDir;
490   }
491 
492   /**
493    * Create the region splits directory.
494    */
495   void createSplitsDir() throws IOException {
496     Path splitdir = getSplitsDir();
497     if (fs.exists(splitdir)) {
498       LOG.info("The " + splitdir + " directory exists.  Hence deleting it to recreate it");
499       if (!deleteDir(splitdir)) {
500         throw new IOException("Failed deletion of " + splitdir
501             + " before creating them again.");
502       }
503     }
504     // splitDir doesn't exists now. No need to do an exists() call for it.
505     if (!createDir(splitdir)) {
506       throw new IOException("Failed create of " + splitdir);
507     }
508   }
509 
510   /**
511    * Write out a split reference. Package local so it doesnt leak out of
512    * regionserver.
513    * @param hri {@link HRegionInfo} of the destination
514    * @param familyName Column Family Name
515    * @param f File to split.
516    * @param splitRow Split Row
517    * @param top True if we are referring to the top half of the hfile.
518    * @return Path to created reference.
519    * @throws IOException
520    */
521   Path splitStoreFile(final HRegionInfo hri, final String familyName,
522       final StoreFile f, final byte[] splitRow, final boolean top) throws IOException {
523     
524     // Check whether the split row lies in the range of the store file
525     // If it is outside the range, return directly.
526     if (top) {
527       //check if larger than last key.
528       KeyValue splitKey = KeyValue.createFirstOnRow(splitRow);
529       byte[] lastKey = f.createReader().getLastKey();      
530       if (f.getReader().getComparator().compare(splitKey.getBuffer(), 
531           splitKey.getKeyOffset(), splitKey.getKeyLength(), lastKey, 0, lastKey.length) > 0) {
532         return null;
533       }
534     } else {
535       //check if smaller than first key
536       KeyValue splitKey = KeyValue.createLastOnRow(splitRow);
537       byte[] firstKey = f.createReader().getFirstKey();
538       if (f.getReader().getComparator().compare(splitKey.getBuffer(), 
539           splitKey.getKeyOffset(), splitKey.getKeyLength(), firstKey, 0, firstKey.length) < 0) {
540         return null;
541       }      
542     }
543  
544     f.getReader().close(true);
545     
546     Path splitDir = new Path(getSplitsDir(hri), familyName);
547     // A reference to the bottom half of the hsf store file.
548     Reference r =
549       top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
550     // Add the referred-to regions name as a dot separated suffix.
551     // See REF_NAME_REGEX regex above.  The referred-to regions name is
552     // up in the path of the passed in <code>f</code> -- parentdir is family,
553     // then the directory above is the region name.
554     String parentRegionName = regionInfo.getEncodedName();
555     // Write reference with same file id only with the other region name as
556     // suffix and into the new region location (under same family).
557     Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
558     return r.write(fs, p);
559   }
560 
561   // ===========================================================================
562   //  Merge Helpers
563   // ===========================================================================
564   /** @return {@link Path} to the temp directory used during merge operations */
565   Path getMergesDir() {
566     return new Path(getRegionDir(), REGION_MERGES_DIR);
567   }
568 
569   Path getMergesDir(final HRegionInfo hri) {
570     return new Path(getMergesDir(), hri.getEncodedName());
571   }
572 
573   /**
574    * Clean up any merge detritus that may have been left around from previous merge attempts.
575    */
576   void cleanupMergesDir() throws IOException {
577     deleteDir(getMergesDir());
578   }
579 
580   /**
581    * Remove merged region
582    * @param mergedRegion {@link HRegionInfo}
583    * @throws IOException
584    */
585   void cleanupMergedRegion(final HRegionInfo mergedRegion) throws IOException {
586     Path regionDir = new Path(this.tableDir, mergedRegion.getEncodedName());
587     if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) {
588       throw new IOException("Failed delete of " + regionDir);
589     }
590   }
591 
592   /**
593    * Create the region merges directory.
594    * @throws IOException If merges dir already exists or we fail to create it.
595    * @see HRegionFileSystem#cleanupMergesDir()
596    */
597   void createMergesDir() throws IOException {
598     Path mergesdir = getMergesDir();
599     if (fs.exists(mergesdir)) {
600       LOG.info("The " + mergesdir
601           + " directory exists.  Hence deleting it to recreate it");
602       if (!fs.delete(mergesdir, true)) {
603         throw new IOException("Failed deletion of " + mergesdir
604             + " before creating them again.");
605       }
606     }
607     if (!fs.mkdirs(mergesdir))
608       throw new IOException("Failed create of " + mergesdir);
609   }
610 
611   /**
612    * Write out a merge reference under the given merges directory. Package local
613    * so it doesnt leak out of regionserver.
614    * @param mergedRegion {@link HRegionInfo} of the merged region
615    * @param familyName Column Family Name
616    * @param f File to create reference.
617    * @param mergedDir
618    * @return Path to created reference.
619    * @throws IOException
620    */
621   Path mergeStoreFile(final HRegionInfo mergedRegion, final String familyName,
622       final StoreFile f, final Path mergedDir)
623       throws IOException {
624     Path referenceDir = new Path(new Path(mergedDir,
625         mergedRegion.getEncodedName()), familyName);
626     // A whole reference to the store file.
627     Reference r = Reference.createTopReference(regionInfo.getStartKey());
628     // Add the referred-to regions name as a dot separated suffix.
629     // See REF_NAME_REGEX regex above. The referred-to regions name is
630     // up in the path of the passed in <code>f</code> -- parentdir is family,
631     // then the directory above is the region name.
632     String mergingRegionName = regionInfo.getEncodedName();
633     // Write reference with same file id only with the other region name as
634     // suffix and into the new region location (under same family).
635     Path p = new Path(referenceDir, f.getPath().getName() + "."
636         + mergingRegionName);
637     return r.write(fs, p);
638   }
639 
640   /**
641    * Commit a merged region, moving it from the merges temporary directory to
642    * the proper location in the filesystem.
643    * @param mergedRegionInfo merged region {@link HRegionInfo}
644    * @throws IOException 
645    */
646   void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws IOException {
647     Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
648     Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
649     // Move the tmp dir in the expected location
650     if (mergedRegionTmpDir != null && fs.exists(mergedRegionTmpDir)) {
651       if (!fs.rename(mergedRegionTmpDir, regionDir)) {
652         throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
653             + regionDir);
654       }
655     }
656   }
657 
658   // ===========================================================================
659   //  Create/Open/Delete Helpers
660   // ===========================================================================
661   /**
662    * Log the current state of the region
663    * @param LOG log to output information
664    * @throws IOException if an unexpected exception occurs
665    */
666   void logFileSystemState(final Log LOG) throws IOException {
667     FSUtils.logFileSystemState(fs, this.getRegionDir(), LOG);
668   }
669 
670   /**
671    * @param hri
672    * @return Content of the file we write out to the filesystem under a region
673    * @throws IOException
674    */
675   private static byte[] getRegionInfoFileContent(final HRegionInfo hri) throws IOException {
676     return hri.toDelimitedByteArray();
677   }
678 
679   /**
680    * Create a {@link HRegionInfo} from the serialized version on-disk.
681    * @param fs {@link FileSystem} that contains the Region Info file
682    * @param regionDir {@link Path} to the Region Directory that contains the Info file
683    * @return An {@link HRegionInfo} instance gotten from the Region Info file.
684    * @throws IOException if an error occurred during file open/read operation.
685    */
686   public static HRegionInfo loadRegionInfoFileContent(final FileSystem fs, final Path regionDir)
687       throws IOException {
688     FSDataInputStream in = fs.open(new Path(regionDir, REGION_INFO_FILE));
689     try {
690       return HRegionInfo.parseFrom(in);
691     } finally {
692       in.close();
693     }
694   }
695 
696   /**
697    * Write the .regioninfo file on-disk.
698    */
699   private static void writeRegionInfoFileContent(final Configuration conf, final FileSystem fs,
700       final Path regionInfoFile, final byte[] content) throws IOException {
701     // First check to get the permissions
702     FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
703     // Write the RegionInfo file content
704     FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null);
705     try {
706       out.write(content);
707     } finally {
708       out.close();
709     }
710   }
711 
712   /**
713    * Write out an info file under the stored region directory. Useful recovering mangled regions.
714    * If the regionInfo already exists on-disk, then we fast exit.
715    */
716   void checkRegionInfoOnFilesystem() throws IOException {
717     // Compose the content of the file so we can compare to length in filesystem. If not same,
718     // rewrite it (it may have been written in the old format using Writables instead of pb). The
719     // pb version is much shorter -- we write now w/o the toString version -- so checking length
720     // only should be sufficient. I don't want to read the file every time to check if it pb
721     // serialized.
722     byte[] content = getRegionInfoFileContent(regionInfo);
723     try {
724       Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE);
725 
726       FileStatus status = fs.getFileStatus(regionInfoFile);
727       if (status != null && status.getLen() == content.length) {
728         // Then assume the content good and move on.
729         // NOTE: that the length is not sufficient to define the the content matches.
730         return;
731       }
732 
733       LOG.info("Rewriting .regioninfo file at: " + regionInfoFile);
734       if (!fs.delete(regionInfoFile, false)) {
735         throw new IOException("Unable to remove existing " + regionInfoFile);
736       }
737     } catch (FileNotFoundException e) {
738       LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfo.getEncodedName());
739     }
740 
741     // Write HRI to a file in case we need to recover .META.
742     writeRegionInfoOnFilesystem(content, true);
743   }
744 
745   /**
746    * Write out an info file under the region directory. Useful recovering mangled regions.
747    * @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation.
748    */
749   private void writeRegionInfoOnFilesystem(boolean useTempDir) throws IOException {
750     byte[] content = getRegionInfoFileContent(regionInfo);
751     writeRegionInfoOnFilesystem(content, useTempDir);
752   }
753 
754   /**
755    * Write out an info file under the region directory. Useful recovering mangled regions.
756    * @param regionInfoContent serialized version of the {@link HRegionInfo}
757    * @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation.
758    */
759   private void writeRegionInfoOnFilesystem(final byte[] regionInfoContent,
760       final boolean useTempDir) throws IOException {
761     Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE);
762     if (useTempDir) {
763       // Create in tmpDir and then move into place in case we crash after
764       // create but before close. If we don't successfully close the file,
765       // subsequent region reopens will fail the below because create is
766       // registered in NN.
767 
768       // And then create the file
769       Path tmpPath = new Path(getTempDir(), REGION_INFO_FILE);
770 
771       // If datanode crashes or if the RS goes down just before the close is called while trying to
772       // close the created regioninfo file in the .tmp directory then on next
773       // creation we will be getting AlreadyCreatedException.
774       // Hence delete and create the file if exists.
775       if (FSUtils.isExists(fs, tmpPath)) {
776         FSUtils.delete(fs, tmpPath, true);
777       }
778 
779       // Write HRI to a file in case we need to recover .META.
780       writeRegionInfoFileContent(conf, fs, tmpPath, regionInfoContent);
781 
782       // Move the created file to the original path
783       if (fs.exists(tmpPath) &&  !rename(tmpPath, regionInfoFile)) {
784         throw new IOException("Unable to rename " + tmpPath + " to " + regionInfoFile);
785       }
786     } else {
787       // Write HRI to a file in case we need to recover .META.
788       writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
789     }
790   }
791 
792   /**
793    * Create a new Region on file-system.
794    * @param conf the {@link Configuration} to use
795    * @param fs {@link FileSystem} from which to add the region
796    * @param tableDir {@link Path} to where the table is being stored
797    * @param regionInfo {@link HRegionInfo} for region to be added
798    * @throws IOException if the region creation fails due to a FileSystem exception.
799    */
800   public static HRegionFileSystem createRegionOnFileSystem(final Configuration conf,
801       final FileSystem fs, final Path tableDir, final HRegionInfo regionInfo) throws IOException {
802     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
803     Path regionDir = regionFs.getRegionDir();
804 
805     if (fs.exists(regionDir)) {
806       LOG.warn("Trying to create a region that already exists on disk: " + regionDir);
807       throw new IOException("The specified region already exists on disk: " + regionDir);
808     }
809 
810     // Create the region directory
811     if (!createDirOnFileSystem(fs, conf, regionDir)) {
812       LOG.warn("Unable to create the region directory: " + regionDir);
813       throw new IOException("Unable to create region directory: " + regionDir);
814     }
815 
816     // Write HRI to a file in case we need to recover .META.
817     regionFs.writeRegionInfoOnFilesystem(false);
818     return regionFs;
819   }
820 
821   /**
822    * Open Region from file-system.
823    * @param conf the {@link Configuration} to use
824    * @param fs {@link FileSystem} from which to add the region
825    * @param tableDir {@link Path} to where the table is being stored
826    * @param regionInfo {@link HRegionInfo} for region to be added
827    * @param readOnly True if you don't want to edit the region data
828    * @throws IOException if the region creation fails due to a FileSystem exception.
829    */
830   public static HRegionFileSystem openRegionFromFileSystem(final Configuration conf,
831       final FileSystem fs, final Path tableDir, final HRegionInfo regionInfo, boolean readOnly)
832       throws IOException {
833     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
834     Path regionDir = regionFs.getRegionDir();
835 
836     if (!fs.exists(regionDir)) {
837       LOG.warn("Trying to open a region that do not exists on disk: " + regionDir);
838       throw new IOException("The specified region do not exists on disk: " + regionDir);
839     }
840 
841     if (!readOnly) {
842       // Cleanup temporary directories
843       regionFs.cleanupTempDir();
844       regionFs.cleanupSplitsDir();
845       regionFs.cleanupMergesDir();
846 
847       // if it doesn't exists, Write HRI to a file, in case we need to recover .META.
848       regionFs.checkRegionInfoOnFilesystem();
849     }
850 
851     return regionFs;
852   }
853 
854   /**
855    * Remove the region from the table directory, archiving the region's hfiles.
856    * @param conf the {@link Configuration} to use
857    * @param fs {@link FileSystem} from which to remove the region
858    * @param tableDir {@link Path} to where the table is being stored
859    * @param regionInfo {@link HRegionInfo} for region to be deleted
860    * @throws IOException if the request cannot be completed
861    */
862   public static void deleteRegionFromFileSystem(final Configuration conf,
863       final FileSystem fs, final Path tableDir, final HRegionInfo regionInfo) throws IOException {
864     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo);
865     Path regionDir = regionFs.getRegionDir();
866 
867     if (!fs.exists(regionDir)) {
868       LOG.warn("Trying to delete a region that do not exists on disk: " + regionDir);
869       return;
870     }
871 
872     if (LOG.isDebugEnabled()) {
873       LOG.debug("DELETING region " + regionDir);
874     }
875 
876     // Archive region
877     Path rootDir = FSUtils.getRootDir(conf);
878     HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir);
879 
880     // Delete empty region dir
881     if (!fs.delete(regionDir, true)) {
882       LOG.warn("Failed delete of " + regionDir);
883     }
884   }
885 
886   /**
887    * Creates a directory. Assumes the user has already checked for this directory existence.
888    * @param dir
889    * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
890    *         whether the directory exists or not, and returns true if it exists.
891    * @throws IOException
892    */
893   boolean createDir(Path dir) throws IOException {
894     int i = 0;
895     IOException lastIOE = null;
896     do {
897       try {
898         return fs.mkdirs(dir);
899       } catch (IOException ioe) {
900         lastIOE = ioe;
901         if (fs.exists(dir)) return true; // directory is present
902         sleepBeforeRetry("Create Directory", i+1);
903       }
904     } while (++i <= hdfsClientRetriesNumber);
905     throw new IOException("Exception in createDir", lastIOE);
906   }
907 
908   /**
909    * Renames a directory. Assumes the user has already checked for this directory existence.
910    * @param srcpath
911    * @param dstPath
912    * @return true if rename is successful.
913    * @throws IOException
914    */
915   boolean rename(Path srcpath, Path dstPath) throws IOException {
916     IOException lastIOE = null;
917     int i = 0;
918     do {
919       try {
920         return fs.rename(srcpath, dstPath);
921       } catch (IOException ioe) {
922         lastIOE = ioe;
923         if (!fs.exists(srcpath) && fs.exists(dstPath)) return true; // successful move
924         // dir is not there, retry after some time.
925         sleepBeforeRetry("Rename Directory", i+1);
926       }
927     } while (++i <= hdfsClientRetriesNumber);
928     throw new IOException("Exception in rename", lastIOE);
929   }
930 
931   /**
932    * Deletes a directory. Assumes the user has already checked for this directory existence.
933    * @param dir
934    * @return true if the directory is deleted.
935    * @throws IOException
936    */
937   boolean deleteDir(Path dir) throws IOException {
938     IOException lastIOE = null;
939     int i = 0;
940     do {
941       try {
942         return fs.delete(dir, true);
943       } catch (IOException ioe) {
944         lastIOE = ioe;
945         if (!fs.exists(dir)) return true;
946         // dir is there, retry deleting after some time.
947         sleepBeforeRetry("Delete Directory", i+1);
948       }
949     } while (++i <= hdfsClientRetriesNumber);
950     throw new IOException("Exception in DeleteDir", lastIOE);
951   }
952 
953   /**
954    * sleeping logic; handles the interrupt exception.
955    */
956   private void sleepBeforeRetry(String msg, int sleepMultiplier) {
957     sleepBeforeRetry(msg, sleepMultiplier, baseSleepBeforeRetries, hdfsClientRetriesNumber);
958   }
959 
960   /**
961    * Creates a directory for a filesystem and configuration object. Assumes the user has already
962    * checked for this directory existence.
963    * @param fs
964    * @param conf
965    * @param dir
966    * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
967    *         whether the directory exists or not, and returns true if it exists.
968    * @throws IOException
969    */
970   private static boolean createDirOnFileSystem(FileSystem fs, Configuration conf, Path dir)
971       throws IOException {
972     int i = 0;
973     IOException lastIOE = null;
974     int hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
975       DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
976     int baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
977       DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
978     do {
979       try {
980         return fs.mkdirs(dir);
981       } catch (IOException ioe) {
982         lastIOE = ioe;
983         if (fs.exists(dir)) return true; // directory is present
984         sleepBeforeRetry("Create Directory", i+1, baseSleepBeforeRetries, hdfsClientRetriesNumber);
985       }
986     } while (++i <= hdfsClientRetriesNumber);
987     throw new IOException("Exception in createDir", lastIOE);
988   }
989 
990   /**
991    * sleeping logic for static methods; handles the interrupt exception. Keeping a static version
992    * for this to avoid re-looking for the integer values.
993    */
994   private static void sleepBeforeRetry(String msg, int sleepMultiplier, int baseSleepBeforeRetries,
995       int hdfsClientRetriesNumber) {
996     if (sleepMultiplier > hdfsClientRetriesNumber) {
997       LOG.debug(msg + ", retries exhausted");
998       return;
999     }
1000     LOG.debug(msg + ", sleeping " + baseSleepBeforeRetries + " times " + sleepMultiplier);
1001     Threads.sleep(baseSleepBeforeRetries * sleepMultiplier);
1002   }
1003 }