View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.fs.PathFilter;
38  import org.apache.hadoop.fs.permission.FsPermission;
39  import org.apache.hadoop.hbase.ClusterId;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.InvalidFamilyOperationException;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.TableDescriptor;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.backup.HFileArchiver;
50  import org.apache.hadoop.hbase.exceptions.DeserializationException;
51  import org.apache.hadoop.hbase.fs.HFileSystem;
52  import org.apache.hadoop.hbase.mob.MobConstants;
53  import org.apache.hadoop.hbase.mob.MobUtils;
54  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
55  import org.apache.hadoop.hbase.regionserver.HRegion;
56  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
57  import org.apache.hadoop.hbase.wal.WALSplitter;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.hadoop.hbase.util.FSTableDescriptors;
61  import org.apache.hadoop.hbase.util.FSUtils;
62  import org.apache.hadoop.ipc.RemoteException;
63  
64  import com.google.common.annotations.VisibleForTesting;
65  
66  /**
67   * This class abstracts a bunch of operations the HMaster needs to interact with
68   * the underlying file system, including splitting log files, checking file
69   * system status, etc.
70   */
71  @InterfaceAudience.Private
72  public class MasterFileSystem {
73    private static final Log LOG = LogFactory.getLog(MasterFileSystem.class.getName());
74    // HBase configuration
75    Configuration conf;
76    // master status
77    Server master;
78    // metrics for master
79    private final MetricsMasterFileSystem metricsMasterFilesystem = new MetricsMasterFileSystem();
80    // Persisted unique cluster ID
81    private ClusterId clusterId;
82    // Keep around for convenience.
83    private final FileSystem fs;
84    // Is the fileystem ok?
85    private volatile boolean fsOk = true;
86    // The Path to the old logs dir
87    private final Path oldLogDir;
88    // root hbase directory on the FS
89    private final Path rootdir;
90    // hbase temp directory used for table construction and deletion
91    private final Path tempdir;
92    // create the split log lock
93    final Lock splitLogLock = new ReentrantLock();
94    final boolean distributedLogReplay;
95    final SplitLogManager splitLogManager;
96    private final MasterServices services;
97  
98    final static PathFilter META_FILTER = new PathFilter() {
99      @Override
100     public boolean accept(Path p) {
101       return DefaultWALProvider.isMetaFile(p);
102     }
103   };
104 
105   final static PathFilter NON_META_FILTER = new PathFilter() {
106     @Override
107     public boolean accept(Path p) {
108       return !DefaultWALProvider.isMetaFile(p);
109     }
110   };
111 
112   public MasterFileSystem(Server master, MasterServices services)
113   throws IOException {
114     this.conf = master.getConfiguration();
115     this.master = master;
116     this.services = services;
117     // Set filesystem to be that of this.rootdir else we get complaints about
118     // mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
119     // default localfs.  Presumption is that rootdir is fully-qualified before
120     // we get to here with appropriate fs scheme.
121     this.rootdir = FSUtils.getRootDir(conf);
122     this.tempdir = new Path(this.rootdir, HConstants.HBASE_TEMP_DIRECTORY);
123     // Cover both bases, the old way of setting default fs and the new.
124     // We're supposed to run on 0.20 and 0.21 anyways.
125     this.fs = this.rootdir.getFileSystem(conf);
126     FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
127     // make sure the fs has the same conf
128     fs.setConf(conf);
129     // setup the filesystem variable
130     // set up the archived logs path
131     this.oldLogDir = createInitialFileSystemLayout();
132     HFileSystem.addLocationsOrderInterceptor(conf);
133     this.splitLogManager =
134         new SplitLogManager(master, master.getConfiguration(), master, services,
135             master.getServerName());
136     this.distributedLogReplay = this.splitLogManager.isLogReplaying();
137   }
138 
139   @VisibleForTesting
140   SplitLogManager getSplitLogManager() {
141     return this.splitLogManager;
142   }
143 
144   /**
145    * Create initial layout in filesystem.
146    * <ol>
147    * <li>Check if the meta region exists and is readable, if not create it.
148    * Create hbase.version and the hbase:meta directory if not one.
149    * </li>
150    * <li>Create a log archive directory for RS to put archived logs</li>
151    * </ol>
152    * Idempotent.
153    */
154   private Path createInitialFileSystemLayout() throws IOException {
155     // check if the root directory exists
156     checkRootDir(this.rootdir, conf, this.fs);
157 
158     // check if temp directory exists and clean it
159     checkTempDir(this.tempdir, conf, this.fs);
160 
161     Path oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
162 
163     // Make sure the region servers can archive their old logs
164     if(!this.fs.exists(oldLogDir)) {
165       this.fs.mkdirs(oldLogDir);
166     }
167 
168     return oldLogDir;
169   }
170 
171   public FileSystem getFileSystem() {
172     return this.fs;
173   }
174 
175   /**
176    * Get the directory where old logs go
177    * @return the dir
178    */
179   public Path getOldLogDir() {
180     return this.oldLogDir;
181   }
182 
183   /**
184    * Checks to see if the file system is still accessible.
185    * If not, sets closed
186    * @return false if file system is not available
187    */
188   public boolean checkFileSystem() {
189     if (this.fsOk) {
190       try {
191         FSUtils.checkFileSystemAvailable(this.fs);
192         FSUtils.checkDfsSafeMode(this.conf);
193       } catch (IOException e) {
194         master.abort("Shutting down HBase cluster: file system not available", e);
195         this.fsOk = false;
196       }
197     }
198     return this.fsOk;
199   }
200 
201   /**
202    * @return HBase root dir.
203    */
204   public Path getRootDir() {
205     return this.rootdir;
206   }
207 
208   /**
209    * @return HBase temp dir.
210    */
211   public Path getTempDir() {
212     return this.tempdir;
213   }
214 
215   /**
216    * @return The unique identifier generated for this cluster
217    */
218   public ClusterId getClusterId() {
219     return clusterId;
220   }
221 
222   /**
223    * Inspect the log directory to find dead servers which need recovery work
224    * @return A set of ServerNames which aren't running but still have WAL files left in file system
225    */
226   Set<ServerName> getFailedServersFromLogFolders() {
227     boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
228         WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
229 
230     Set<ServerName> serverNames = new HashSet<ServerName>();
231     Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
232 
233     do {
234       if (master.isStopped()) {
235         LOG.warn("Master stopped while trying to get failed servers.");
236         break;
237       }
238       try {
239         if (!this.fs.exists(logsDirPath)) return serverNames;
240         FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null);
241         // Get online servers after getting log folders to avoid log folder deletion of newly
242         // checked in region servers . see HBASE-5916
243         Set<ServerName> onlineServers = ((HMaster) master).getServerManager().getOnlineServers()
244             .keySet();
245 
246         if (logFolders == null || logFolders.length == 0) {
247           LOG.debug("No log files to split, proceeding...");
248           return serverNames;
249         }
250         for (FileStatus status : logFolders) {
251           FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null);
252           if (curLogFiles == null || curLogFiles.length == 0) {
253             // Empty log folder. No recovery needed
254             continue;
255           }
256           final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(
257               status.getPath());
258           if (null == serverName) {
259             LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " +
260                 "region server name; leaving in place. If you see later errors about missing " +
261                 "write ahead logs they may be saved in this location.");
262           } else if (!onlineServers.contains(serverName)) {
263             LOG.info("Log folder " + status.getPath() + " doesn't belong "
264                 + "to a known region server, splitting");
265             serverNames.add(serverName);
266           } else {
267             LOG.info("Log folder " + status.getPath() + " belongs to an existing region server");
268           }
269         }
270         retrySplitting = false;
271       } catch (IOException ioe) {
272         LOG.warn("Failed getting failed servers to be recovered.", ioe);
273         if (!checkFileSystem()) {
274           LOG.warn("Bad Filesystem, exiting");
275           Runtime.getRuntime().halt(1);
276         }
277         try {
278           if (retrySplitting) {
279             Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
280           }
281         } catch (InterruptedException e) {
282           LOG.warn("Interrupted, aborting since cannot return w/o splitting");
283           Thread.currentThread().interrupt();
284           retrySplitting = false;
285           Runtime.getRuntime().halt(1);
286         }
287       }
288     } while (retrySplitting);
289 
290     return serverNames;
291   }
292 
293   public void splitLog(final ServerName serverName) throws IOException {
294     Set<ServerName> serverNames = new HashSet<ServerName>();
295     serverNames.add(serverName);
296     splitLog(serverNames);
297   }
298 
299   /**
300    * Specialized method to handle the splitting for meta WAL
301    * @param serverName
302    * @throws IOException
303    */
304   public void splitMetaLog(final ServerName serverName) throws IOException {
305     Set<ServerName> serverNames = new HashSet<ServerName>();
306     serverNames.add(serverName);
307     splitMetaLog(serverNames);
308   }
309 
310   /**
311    * Specialized method to handle the splitting for meta WAL
312    * @param serverNames
313    * @throws IOException
314    */
315   public void splitMetaLog(final Set<ServerName> serverNames) throws IOException {
316     splitLog(serverNames, META_FILTER);
317   }
318 
319   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
320       "We only release this lock when we set it. Updates to code that uses it should verify use " +
321       "of the guard boolean.")
322   private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
323     List<Path> logDirs = new ArrayList<Path>();
324     boolean needReleaseLock = false;
325     if (!this.services.isInitialized()) {
326       // during master initialization, we could have multiple places splitting a same wal
327       this.splitLogLock.lock();
328       needReleaseLock = true;
329     }
330     try {
331       for (ServerName serverName : serverNames) {
332         Path logDir = new Path(this.rootdir,
333             DefaultWALProvider.getWALDirectoryName(serverName.toString()));
334         Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
335         // Rename the directory so a rogue RS doesn't create more WALs
336         if (fs.exists(logDir)) {
337           if (!this.fs.rename(logDir, splitDir)) {
338             throw new IOException("Failed fs.rename for log split: " + logDir);
339           }
340           logDir = splitDir;
341           LOG.debug("Renamed region directory: " + splitDir);
342         } else if (!fs.exists(splitDir)) {
343           LOG.info("Log dir for server " + serverName + " does not exist");
344           continue;
345         }
346         logDirs.add(splitDir);
347       }
348     } finally {
349       if (needReleaseLock) {
350         this.splitLogLock.unlock();
351       }
352     }
353     return logDirs;
354   }
355 
356   /**
357    * Mark regions in recovering state when distributedLogReplay are set true
358    * @param serverName Failed region server whose wals to be replayed
359    * @param regions Set of regions to be recovered
360    * @throws IOException
361    */
362   public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> regions) throws IOException {
363     if (!this.distributedLogReplay) {
364       return;
365     }
366     // mark regions in recovering state
367     if (regions == null || regions.isEmpty()) {
368       return;
369     }
370     this.splitLogManager.markRegionsRecovering(serverName, regions);
371   }
372 
373   public void splitLog(final Set<ServerName> serverNames) throws IOException {
374     splitLog(serverNames, NON_META_FILTER);
375   }
376 
377   /**
378    * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)}
379    * @param failedServers
380    * @throws IOException
381    */
382   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
383       throws IOException, InterruptedIOException {
384     this.splitLogManager.removeStaleRecoveringRegions(failedServers);
385   }
386 
387   /**
388    * This method is the base split method that splits WAL files matching a filter. Callers should
389    * pass the appropriate filter for meta and non-meta WALs.
390    * @param serverNames logs belonging to these servers will be split; this will rename the log
391    *                    directory out from under a soft-failed server
392    * @param filter
393    * @throws IOException
394    */
395   public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
396     long splitTime = 0, splitLogSize = 0;
397     List<Path> logDirs = getLogDirs(serverNames);
398 
399     splitLogManager.handleDeadWorkers(serverNames);
400     splitTime = EnvironmentEdgeManager.currentTime();
401     splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
402     splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
403 
404     if (this.metricsMasterFilesystem != null) {
405       if (filter == META_FILTER) {
406         this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
407       } else {
408         this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
409       }
410     }
411   }
412 
413   /**
414    * Get the rootdir.  Make sure its wholesome and exists before returning.
415    * @param rd
416    * @param c
417    * @param fs
418    * @return hbase.rootdir (after checks for existence and bootstrapping if
419    * needed populating the directory with necessary bootup files).
420    * @throws IOException
421    */
422   @SuppressWarnings("deprecation")
423   private Path checkRootDir(final Path rd, final Configuration c,
424     final FileSystem fs)
425   throws IOException {
426     // If FS is in safe mode wait till out of it.
427     FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
428 
429     boolean isSecurityEnabled = "kerberos".equalsIgnoreCase(c.get("hbase.security.authentication"));
430     FsPermission rootDirPerms = new FsPermission(c.get("hbase.rootdir.perms", "700"));
431 
432     // Filesystem is good. Go ahead and check for hbase.rootdir.
433     try {
434       if (!fs.exists(rd)) {
435         if (isSecurityEnabled) {
436           fs.mkdirs(rd, rootDirPerms);
437         } else {
438           fs.mkdirs(rd);
439         }
440         // DFS leaves safe mode with 0 DNs when there are 0 blocks.
441         // We used to handle this by checking the current DN count and waiting until
442         // it is nonzero. With security, the check for datanode count doesn't work --
443         // it is a privileged op. So instead we adopt the strategy of the jobtracker
444         // and simply retry file creation during bootstrap indefinitely. As soon as
445         // there is one datanode it will succeed. Permission problems should have
446         // already been caught by mkdirs above.
447         FSUtils.setVersion(fs, rd, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
448           10 * 1000), c.getInt(HConstants.VERSION_FILE_WRITE_ATTEMPTS,
449             HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
450       } else {
451         if (!fs.isDirectory(rd)) {
452           throw new IllegalArgumentException(rd.toString() + " is not a directory");
453         }
454         if (isSecurityEnabled && !rootDirPerms.equals(fs.getFileStatus(rd).getPermission())) {
455           // check whether the permission match
456           LOG.warn("Found rootdir permissions NOT matching expected \"hbase.rootdir.perms\" for "
457               + "rootdir=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission()
458               + " and  \"hbase.rootdir.perms\" configured as "
459               + c.get("hbase.rootdir.perms", "700") + ". Automatically setting the permissions. You"
460               + " can change the permissions by setting \"hbase.rootdir.perms\" in hbase-site.xml "
461               + "and restarting the master");
462           fs.setPermission(rd, rootDirPerms);
463         }
464         // as above
465         FSUtils.checkVersion(fs, rd, true, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
466           10 * 1000), c.getInt(HConstants.VERSION_FILE_WRITE_ATTEMPTS,
467             HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
468       }
469     } catch (DeserializationException de) {
470       LOG.fatal("Please fix invalid configuration for " + HConstants.HBASE_DIR, de);
471       IOException ioe = new IOException();
472       ioe.initCause(de);
473       throw ioe;
474     } catch (IllegalArgumentException iae) {
475       LOG.fatal("Please fix invalid configuration for "
476         + HConstants.HBASE_DIR + " " + rd.toString(), iae);
477       throw iae;
478     }
479     // Make sure cluster ID exists
480     if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
481         HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
482       FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
483     }
484     clusterId = FSUtils.getClusterId(fs, rd);
485 
486     // Make sure the meta region directory exists!
487     if (!FSUtils.metaRegionExists(fs, rd)) {
488       bootstrap(rd, c);
489     } else {
490       // Migrate table descriptor files if necessary
491       org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
492         .migrateFSTableDescriptorsIfNecessary(fs, rd);
493     }
494 
495     // Create tableinfo-s for hbase:meta if not already there.
496     // assume, created table descriptor is for enabling table
497     // meta table is a system table, so descriptors are predefined,
498     // we should get them from registry.
499     FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd);
500     fsd.createTableDescriptor(
501         new TableDescriptor(fsd.get(TableName.META_TABLE_NAME)));
502 
503     return rd;
504   }
505 
506   /**
507    * Make sure the hbase temp directory exists and is empty.
508    * NOTE that this method is only executed once just after the master becomes the active one.
509    */
510   private void checkTempDir(final Path tmpdir, final Configuration c, final FileSystem fs)
511       throws IOException {
512     // If the temp directory exists, clear the content (left over, from the previous run)
513     if (fs.exists(tmpdir)) {
514       // Archive table in temp, maybe left over from failed deletion,
515       // if not the cleaner will take care of them.
516       for (Path tabledir: FSUtils.getTableDirs(fs, tmpdir)) {
517         for (Path regiondir: FSUtils.getRegionDirs(fs, tabledir)) {
518           HFileArchiver.archiveRegion(fs, this.rootdir, tabledir, regiondir);
519         }
520       }
521       if (!fs.delete(tmpdir, true)) {
522         throw new IOException("Unable to clean the temp directory: " + tmpdir);
523       }
524     }
525 
526     // Create the temp directory
527     if (!fs.mkdirs(tmpdir)) {
528       throw new IOException("HBase temp directory '" + tmpdir + "' creation failure.");
529     }
530   }
531 
532   private static void bootstrap(final Path rd, final Configuration c)
533   throws IOException {
534     LOG.info("BOOTSTRAP: creating hbase:meta region");
535     try {
536       // Bootstrapping, make sure blockcache is off.  Else, one will be
537       // created here in bootstrap and it'll need to be cleaned up.  Better to
538       // not make it in first place.  Turn off block caching for bootstrap.
539       // Enable after.
540       HRegionInfo metaHRI = new HRegionInfo(HRegionInfo.FIRST_META_REGIONINFO);
541       HTableDescriptor metaDescriptor = new FSTableDescriptors(c).get(TableName.META_TABLE_NAME);
542       setInfoFamilyCachingForMeta(metaDescriptor, false);
543       HRegion meta = HRegion.createHRegion(metaHRI, rd, c, metaDescriptor, null);
544       setInfoFamilyCachingForMeta(metaDescriptor, true);
545       meta.close();
546     } catch (IOException e) {
547         e = e instanceof RemoteException ?
548                 ((RemoteException)e).unwrapRemoteException() : e;
549       LOG.error("bootstrap", e);
550       throw e;
551     }
552   }
553 
554   /**
555    * Enable in memory caching for hbase:meta
556    */
557   public static void setInfoFamilyCachingForMeta(HTableDescriptor metaDescriptor, final boolean b) {
558     for (HColumnDescriptor hcd: metaDescriptor.getColumnFamilies()) {
559       if (Bytes.equals(hcd.getName(), HConstants.CATALOG_FAMILY)) {
560         hcd.setBlockCacheEnabled(b);
561         hcd.setInMemory(b);
562       }
563     }
564   }
565 
566 
567   public void deleteRegion(HRegionInfo region) throws IOException {
568     HFileArchiver.archiveRegion(conf, fs, region);
569   }
570 
571   public void deleteTable(TableName tableName) throws IOException {
572     fs.delete(FSUtils.getTableDir(rootdir, tableName), true);
573   }
574 
575   /**
576    * Move the specified table to the hbase temp directory
577    * @param tableName Table name to move
578    * @return The temp location of the table moved
579    * @throws IOException in case of file-system failure
580    */
581   public Path moveTableToTemp(TableName tableName) throws IOException {
582     Path srcPath = FSUtils.getTableDir(rootdir, tableName);
583     Path tempPath = FSUtils.getTableDir(this.tempdir, tableName);
584 
585     // Ensure temp exists
586     if (!fs.exists(tempPath.getParent()) && !fs.mkdirs(tempPath.getParent())) {
587       throw new IOException("HBase temp directory '" + tempPath.getParent() + "' creation failure.");
588     }
589 
590     if (!fs.rename(srcPath, tempPath)) {
591       throw new IOException("Unable to move '" + srcPath + "' to temp '" + tempPath + "'");
592     }
593 
594     return tempPath;
595   }
596 
597   public void updateRegionInfo(HRegionInfo region) {
598     // TODO implement this.  i think this is currently broken in trunk i don't
599     //      see this getting updated.
600     //      @see HRegion.checkRegioninfoOnFilesystem()
601   }
602 
603   public void deleteFamilyFromFS(HRegionInfo region, byte[] familyName, boolean hasMob)
604       throws IOException {
605     // archive family store files
606     Path tableDir = FSUtils.getTableDir(rootdir, region.getTable());
607     HFileArchiver.archiveFamily(fs, conf, region, tableDir, familyName);
608 
609     // delete the family folder
610     Path familyDir = new Path(tableDir,
611       new Path(region.getEncodedName(), Bytes.toString(familyName)));
612     if (fs.delete(familyDir, true) == false) {
613       if (fs.exists(familyDir)) {
614         throw new IOException("Could not delete family "
615             + Bytes.toString(familyName) + " from FileSystem for region "
616             + region.getRegionNameAsString() + "(" + region.getEncodedName()
617             + ")");
618       }
619     }
620 
621     // archive and delete mob files
622     if (hasMob) {
623       Path mobTableDir =
624           FSUtils.getTableDir(new Path(getRootDir(), MobConstants.MOB_DIR_NAME), region.getTable());
625       HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(region.getTable());
626       Path mobFamilyDir =
627           new Path(mobTableDir,
628               new Path(mobRegionInfo.getEncodedName(), Bytes.toString(familyName)));
629       // archive mob family store files
630       MobUtils.archiveMobStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, familyName);
631 
632       if (!fs.delete(mobFamilyDir, true)) {
633         throw new IOException("Could not delete mob store files for family "
634             + Bytes.toString(familyName) + " from FileSystem region "
635             + mobRegionInfo.getRegionNameAsString() + "(" + mobRegionInfo.getEncodedName() + ")");
636       }
637     }
638   }
639 
640   public void stop() {
641     if (splitLogManager != null) {
642       this.splitLogManager.stop();
643     }
644   }
645 
646   /**
647    * Delete column of a table
648    * @param tableName
649    * @param familyName
650    * @return Modified HTableDescriptor with requested column deleted.
651    * @throws IOException
652    */
653   public HTableDescriptor deleteColumn(TableName tableName, byte[] familyName)
654       throws IOException {
655     LOG.info("DeleteColumn. Table = " + tableName
656         + " family = " + Bytes.toString(familyName));
657     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
658     htd.removeFamily(familyName);
659     this.services.getTableDescriptors().add(htd);
660     return htd;
661   }
662 
663   /**
664    * Modify Column of a table
665    * @param tableName
666    * @param hcd HColumnDesciptor
667    * @return Modified HTableDescriptor with the column modified.
668    * @throws IOException
669    */
670   public HTableDescriptor modifyColumn(TableName tableName, HColumnDescriptor hcd)
671       throws IOException {
672     LOG.info("AddModifyColumn. Table = " + tableName
673         + " HCD = " + hcd.toString());
674 
675     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
676     byte [] familyName = hcd.getName();
677     if(!htd.hasFamily(familyName)) {
678       throw new InvalidFamilyOperationException("Family '" +
679         Bytes.toString(familyName) + "' doesn't exists so cannot be modified");
680     }
681     htd.modifyFamily(hcd);
682     this.services.getTableDescriptors().add(htd);
683     return htd;
684   }
685 
686   /**
687    * Add column to a table
688    * @param tableName
689    * @param hcd
690    * @return Modified HTableDescriptor with new column added.
691    * @throws IOException
692    */
693   public HTableDescriptor addColumn(TableName tableName, HColumnDescriptor hcd)
694       throws IOException {
695     LOG.info("AddColumn. Table = " + tableName + " HCD = " +
696       hcd.toString());
697     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
698     if (htd == null) {
699       throw new InvalidFamilyOperationException("Family '" +
700         hcd.getNameAsString() + "' cannot be modified as HTD is null");
701     }
702     htd.addFamily(hcd);
703     this.services.getTableDescriptors().add(htd);
704     return htd;
705   }
706 
707   /**
708    * The function is used in SSH to set recovery mode based on configuration after all outstanding
709    * log split tasks drained.
710    * @throws IOException
711    */
712   public void setLogRecoveryMode() throws IOException {
713       this.splitLogManager.setRecoveryMode(false);
714   }
715 
716   public RecoveryMode getLogRecoveryMode() {
717     return this.splitLogManager.getRecoveryMode();
718   }
719 }