View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.fs.PathFilter;
38  import org.apache.hadoop.fs.permission.FsPermission;
39  import org.apache.hadoop.hbase.ClusterId;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.InvalidFamilyOperationException;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.TableDescriptor;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.backup.HFileArchiver;
50  import org.apache.hadoop.hbase.exceptions.DeserializationException;
51  import org.apache.hadoop.hbase.fs.HFileSystem;
52  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
53  import org.apache.hadoop.hbase.regionserver.HRegion;
54  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
55  import org.apache.hadoop.hbase.wal.WALSplitter;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58  import org.apache.hadoop.hbase.util.FSTableDescriptors;
59  import org.apache.hadoop.hbase.util.FSUtils;
60  import org.apache.hadoop.ipc.RemoteException;
61  
62  import com.google.common.annotations.VisibleForTesting;
63  
64  /**
65   * This class abstracts a bunch of operations the HMaster needs to interact with
66   * the underlying file system, including splitting log files, checking file
67   * system status, etc.
68   */
69  @InterfaceAudience.Private
70  public class MasterFileSystem {
71    private static final Log LOG = LogFactory.getLog(MasterFileSystem.class.getName());
72    // HBase configuration
73    Configuration conf;
74    // master status
75    Server master;
76    // metrics for master
77    private final MetricsMasterFileSystem metricsMasterFilesystem = new MetricsMasterFileSystem();
78    // Persisted unique cluster ID
79    private ClusterId clusterId;
80    // Keep around for convenience.
81    private final FileSystem fs;
82    // Is the fileystem ok?
83    private volatile boolean fsOk = true;
84    // The Path to the old logs dir
85    private final Path oldLogDir;
86    // root hbase directory on the FS
87    private final Path rootdir;
88    // hbase temp directory used for table construction and deletion
89    private final Path tempdir;
90    // create the split log lock
91    final Lock splitLogLock = new ReentrantLock();
92    final boolean distributedLogReplay;
93    final SplitLogManager splitLogManager;
94    private final MasterServices services;
95  
96    final static PathFilter META_FILTER = new PathFilter() {
97      @Override
98      public boolean accept(Path p) {
99        return DefaultWALProvider.isMetaFile(p);
100     }
101   };
102 
103   final static PathFilter NON_META_FILTER = new PathFilter() {
104     @Override
105     public boolean accept(Path p) {
106       return !DefaultWALProvider.isMetaFile(p);
107     }
108   };
109 
110   public MasterFileSystem(Server master, MasterServices services)
111   throws IOException {
112     this.conf = master.getConfiguration();
113     this.master = master;
114     this.services = services;
115     // Set filesystem to be that of this.rootdir else we get complaints about
116     // mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
117     // default localfs.  Presumption is that rootdir is fully-qualified before
118     // we get to here with appropriate fs scheme.
119     this.rootdir = FSUtils.getRootDir(conf);
120     this.tempdir = new Path(this.rootdir, HConstants.HBASE_TEMP_DIRECTORY);
121     // Cover both bases, the old way of setting default fs and the new.
122     // We're supposed to run on 0.20 and 0.21 anyways.
123     this.fs = this.rootdir.getFileSystem(conf);
124     FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
125     // make sure the fs has the same conf
126     fs.setConf(conf);
127     // setup the filesystem variable
128     // set up the archived logs path
129     this.oldLogDir = createInitialFileSystemLayout();
130     HFileSystem.addLocationsOrderInterceptor(conf);
131     this.splitLogManager =
132         new SplitLogManager(master, master.getConfiguration(), master, services,
133             master.getServerName());
134     this.distributedLogReplay = this.splitLogManager.isLogReplaying();
135   }
136 
137   @VisibleForTesting
138   SplitLogManager getSplitLogManager() {
139     return this.splitLogManager;
140   }
141 
142   /**
143    * Create initial layout in filesystem.
144    * <ol>
145    * <li>Check if the meta region exists and is readable, if not create it.
146    * Create hbase.version and the hbase:meta directory if not one.
147    * </li>
148    * <li>Create a log archive directory for RS to put archived logs</li>
149    * </ol>
150    * Idempotent.
151    */
152   private Path createInitialFileSystemLayout() throws IOException {
153     // check if the root directory exists
154     checkRootDir(this.rootdir, conf, this.fs);
155 
156     // check if temp directory exists and clean it
157     checkTempDir(this.tempdir, conf, this.fs);
158 
159     Path oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
160 
161     // Make sure the region servers can archive their old logs
162     if(!this.fs.exists(oldLogDir)) {
163       this.fs.mkdirs(oldLogDir);
164     }
165 
166     return oldLogDir;
167   }
168 
169   public FileSystem getFileSystem() {
170     return this.fs;
171   }
172 
173   /**
174    * Get the directory where old logs go
175    * @return the dir
176    */
177   public Path getOldLogDir() {
178     return this.oldLogDir;
179   }
180 
181   /**
182    * Checks to see if the file system is still accessible.
183    * If not, sets closed
184    * @return false if file system is not available
185    */
186   public boolean checkFileSystem() {
187     if (this.fsOk) {
188       try {
189         FSUtils.checkFileSystemAvailable(this.fs);
190         FSUtils.checkDfsSafeMode(this.conf);
191       } catch (IOException e) {
192         master.abort("Shutting down HBase cluster: file system not available", e);
193         this.fsOk = false;
194       }
195     }
196     return this.fsOk;
197   }
198 
199   /**
200    * @return HBase root dir.
201    */
202   public Path getRootDir() {
203     return this.rootdir;
204   }
205 
206   /**
207    * @return HBase temp dir.
208    */
209   public Path getTempDir() {
210     return this.tempdir;
211   }
212 
213   /**
214    * @return The unique identifier generated for this cluster
215    */
216   public ClusterId getClusterId() {
217     return clusterId;
218   }
219 
220   /**
221    * Inspect the log directory to find dead servers which need recovery work
222    * @return A set of ServerNames which aren't running but still have WAL files left in file system
223    */
224   Set<ServerName> getFailedServersFromLogFolders() {
225     boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
226         WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
227 
228     Set<ServerName> serverNames = new HashSet<ServerName>();
229     Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
230 
231     do {
232       if (master.isStopped()) {
233         LOG.warn("Master stopped while trying to get failed servers.");
234         break;
235       }
236       try {
237         if (!this.fs.exists(logsDirPath)) return serverNames;
238         FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null);
239         // Get online servers after getting log folders to avoid log folder deletion of newly
240         // checked in region servers . see HBASE-5916
241         Set<ServerName> onlineServers = ((HMaster) master).getServerManager().getOnlineServers()
242             .keySet();
243 
244         if (logFolders == null || logFolders.length == 0) {
245           LOG.debug("No log files to split, proceeding...");
246           return serverNames;
247         }
248         for (FileStatus status : logFolders) {
249           FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null);
250           if (curLogFiles == null || curLogFiles.length == 0) {
251             // Empty log folder. No recovery needed
252             continue;
253           }
254           final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(
255               status.getPath());
256           if (null == serverName) {
257             LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " +
258                 "region server name; leaving in place. If you see later errors about missing " +
259                 "write ahead logs they may be saved in this location.");
260           } else if (!onlineServers.contains(serverName)) {
261             LOG.info("Log folder " + status.getPath() + " doesn't belong "
262                 + "to a known region server, splitting");
263             serverNames.add(serverName);
264           } else {
265             LOG.info("Log folder " + status.getPath() + " belongs to an existing region server");
266           }
267         }
268         retrySplitting = false;
269       } catch (IOException ioe) {
270         LOG.warn("Failed getting failed servers to be recovered.", ioe);
271         if (!checkFileSystem()) {
272           LOG.warn("Bad Filesystem, exiting");
273           Runtime.getRuntime().halt(1);
274         }
275         try {
276           if (retrySplitting) {
277             Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
278           }
279         } catch (InterruptedException e) {
280           LOG.warn("Interrupted, aborting since cannot return w/o splitting");
281           Thread.currentThread().interrupt();
282           retrySplitting = false;
283           Runtime.getRuntime().halt(1);
284         }
285       }
286     } while (retrySplitting);
287 
288     return serverNames;
289   }
290 
291   public void splitLog(final ServerName serverName) throws IOException {
292     Set<ServerName> serverNames = new HashSet<ServerName>();
293     serverNames.add(serverName);
294     splitLog(serverNames);
295   }
296 
297   /**
298    * Specialized method to handle the splitting for meta WAL
299    * @param serverName
300    * @throws IOException
301    */
302   public void splitMetaLog(final ServerName serverName) throws IOException {
303     Set<ServerName> serverNames = new HashSet<ServerName>();
304     serverNames.add(serverName);
305     splitMetaLog(serverNames);
306   }
307 
308   /**
309    * Specialized method to handle the splitting for meta WAL
310    * @param serverNames
311    * @throws IOException
312    */
313   public void splitMetaLog(final Set<ServerName> serverNames) throws IOException {
314     splitLog(serverNames, META_FILTER);
315   }
316 
317   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
318       "We only release this lock when we set it. Updates to code that uses it should verify use " +
319       "of the guard boolean.")
320   private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
321     List<Path> logDirs = new ArrayList<Path>();
322     boolean needReleaseLock = false;
323     if (!this.services.isInitialized()) {
324       // during master initialization, we could have multiple places splitting a same wal
325       this.splitLogLock.lock();
326       needReleaseLock = true;
327     }
328     try {
329       for (ServerName serverName : serverNames) {
330         Path logDir = new Path(this.rootdir,
331             DefaultWALProvider.getWALDirectoryName(serverName.toString()));
332         Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
333         // Rename the directory so a rogue RS doesn't create more WALs
334         if (fs.exists(logDir)) {
335           if (!this.fs.rename(logDir, splitDir)) {
336             throw new IOException("Failed fs.rename for log split: " + logDir);
337           }
338           logDir = splitDir;
339           LOG.debug("Renamed region directory: " + splitDir);
340         } else if (!fs.exists(splitDir)) {
341           LOG.info("Log dir for server " + serverName + " does not exist");
342           continue;
343         }
344         logDirs.add(splitDir);
345       }
346     } finally {
347       if (needReleaseLock) {
348         this.splitLogLock.unlock();
349       }
350     }
351     return logDirs;
352   }
353 
354   /**
355    * Mark regions in recovering state when distributedLogReplay are set true
356    * @param serverName Failed region server whose wals to be replayed
357    * @param regions Set of regions to be recovered
358    * @throws IOException
359    */
360   public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> regions) throws IOException {
361     if (!this.distributedLogReplay) {
362       return;
363     }
364     // mark regions in recovering state
365     if (regions == null || regions.isEmpty()) {
366       return;
367     }
368     this.splitLogManager.markRegionsRecovering(serverName, regions);
369   }
370 
371   public void splitLog(final Set<ServerName> serverNames) throws IOException {
372     splitLog(serverNames, NON_META_FILTER);
373   }
374 
375   /**
376    * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)}
377    * @param failedServers
378    * @throws IOException
379    */
380   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
381       throws IOException, InterruptedIOException {
382     this.splitLogManager.removeStaleRecoveringRegions(failedServers);
383   }
384 
385   /**
386    * This method is the base split method that splits WAL files matching a filter. Callers should
387    * pass the appropriate filter for meta and non-meta WALs.
388    * @param serverNames logs belonging to these servers will be split; this will rename the log
389    *                    directory out from under a soft-failed server
390    * @param filter
391    * @throws IOException
392    */
393   public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
394     long splitTime = 0, splitLogSize = 0;
395     List<Path> logDirs = getLogDirs(serverNames);
396 
397     splitLogManager.handleDeadWorkers(serverNames);
398     splitTime = EnvironmentEdgeManager.currentTime();
399     splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
400     splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
401 
402     if (this.metricsMasterFilesystem != null) {
403       if (filter == META_FILTER) {
404         this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
405       } else {
406         this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
407       }
408     }
409   }
410 
411   /**
412    * Get the rootdir.  Make sure its wholesome and exists before returning.
413    * @param rd
414    * @param c
415    * @param fs
416    * @return hbase.rootdir (after checks for existence and bootstrapping if
417    * needed populating the directory with necessary bootup files).
418    * @throws IOException
419    */
420   @SuppressWarnings("deprecation")
421   private Path checkRootDir(final Path rd, final Configuration c,
422     final FileSystem fs)
423   throws IOException {
424     // If FS is in safe mode wait till out of it.
425     FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
426 
427     boolean isSecurityEnabled = "kerberos".equalsIgnoreCase(c.get("hbase.security.authentication"));
428     FsPermission rootDirPerms = new FsPermission(c.get("hbase.rootdir.perms", "700"));
429 
430     // Filesystem is good. Go ahead and check for hbase.rootdir.
431     try {
432       if (!fs.exists(rd)) {
433         if (isSecurityEnabled) {
434           fs.mkdirs(rd, rootDirPerms);
435         } else {
436           fs.mkdirs(rd);
437         }
438         // DFS leaves safe mode with 0 DNs when there are 0 blocks.
439         // We used to handle this by checking the current DN count and waiting until
440         // it is nonzero. With security, the check for datanode count doesn't work --
441         // it is a privileged op. So instead we adopt the strategy of the jobtracker
442         // and simply retry file creation during bootstrap indefinitely. As soon as
443         // there is one datanode it will succeed. Permission problems should have
444         // already been caught by mkdirs above.
445         FSUtils.setVersion(fs, rd, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
446           10 * 1000), c.getInt(HConstants.VERSION_FILE_WRITE_ATTEMPTS,
447             HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
448       } else {
449         if (!fs.isDirectory(rd)) {
450           throw new IllegalArgumentException(rd.toString() + " is not a directory");
451         }
452         if (isSecurityEnabled && !rootDirPerms.equals(fs.getFileStatus(rd).getPermission())) {
453           // check whether the permission match
454           LOG.warn("Found rootdir permissions NOT matching expected \"hbase.rootdir.perms\" for "
455               + "rootdir=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission()
456               + " and  \"hbase.rootdir.perms\" configured as "
457               + c.get("hbase.rootdir.perms", "700") + ". Automatically setting the permissions. You"
458               + " can change the permissions by setting \"hbase.rootdir.perms\" in hbase-site.xml "
459               + "and restarting the master");
460           fs.setPermission(rd, rootDirPerms);
461         }
462         // as above
463         FSUtils.checkVersion(fs, rd, true, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
464           10 * 1000), c.getInt(HConstants.VERSION_FILE_WRITE_ATTEMPTS,
465             HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
466       }
467     } catch (DeserializationException de) {
468       LOG.fatal("Please fix invalid configuration for " + HConstants.HBASE_DIR, de);
469       IOException ioe = new IOException();
470       ioe.initCause(de);
471       throw ioe;
472     } catch (IllegalArgumentException iae) {
473       LOG.fatal("Please fix invalid configuration for "
474         + HConstants.HBASE_DIR + " " + rd.toString(), iae);
475       throw iae;
476     }
477     // Make sure cluster ID exists
478     if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
479         HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
480       FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
481     }
482     clusterId = FSUtils.getClusterId(fs, rd);
483 
484     // Make sure the meta region directory exists!
485     if (!FSUtils.metaRegionExists(fs, rd)) {
486       bootstrap(rd, c);
487     } else {
488       // Migrate table descriptor files if necessary
489       org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
490         .migrateFSTableDescriptorsIfNecessary(fs, rd);
491     }
492 
493     // Create tableinfo-s for hbase:meta if not already there.
494     // assume, created table descriptor is for enabling table
495     // meta table is a system table, so descriptors are predefined,
496     // we should get them from registry.
497     FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd);
498     fsd.createTableDescriptor(
499         new TableDescriptor(fsd.get(TableName.META_TABLE_NAME)));
500 
501     return rd;
502   }
503 
504   /**
505    * Make sure the hbase temp directory exists and is empty.
506    * NOTE that this method is only executed once just after the master becomes the active one.
507    */
508   private void checkTempDir(final Path tmpdir, final Configuration c, final FileSystem fs)
509       throws IOException {
510     // If the temp directory exists, clear the content (left over, from the previous run)
511     if (fs.exists(tmpdir)) {
512       // Archive table in temp, maybe left over from failed deletion,
513       // if not the cleaner will take care of them.
514       for (Path tabledir: FSUtils.getTableDirs(fs, tmpdir)) {
515         for (Path regiondir: FSUtils.getRegionDirs(fs, tabledir)) {
516           HFileArchiver.archiveRegion(fs, this.rootdir, tabledir, regiondir);
517         }
518       }
519       if (!fs.delete(tmpdir, true)) {
520         throw new IOException("Unable to clean the temp directory: " + tmpdir);
521       }
522     }
523 
524     // Create the temp directory
525     if (!fs.mkdirs(tmpdir)) {
526       throw new IOException("HBase temp directory '" + tmpdir + "' creation failure.");
527     }
528   }
529 
530   private static void bootstrap(final Path rd, final Configuration c)
531   throws IOException {
532     LOG.info("BOOTSTRAP: creating hbase:meta region");
533     try {
534       // Bootstrapping, make sure blockcache is off.  Else, one will be
535       // created here in bootstrap and it'll need to be cleaned up.  Better to
536       // not make it in first place.  Turn off block caching for bootstrap.
537       // Enable after.
538       HRegionInfo metaHRI = new HRegionInfo(HRegionInfo.FIRST_META_REGIONINFO);
539       HTableDescriptor metaDescriptor = new FSTableDescriptors(c).get(TableName.META_TABLE_NAME);
540       setInfoFamilyCachingForMeta(metaDescriptor, false);
541       HRegion meta = HRegion.createHRegion(metaHRI, rd, c, metaDescriptor, null);
542       setInfoFamilyCachingForMeta(metaDescriptor, true);
543       meta.close();
544     } catch (IOException e) {
545         e = e instanceof RemoteException ?
546                 ((RemoteException)e).unwrapRemoteException() : e;
547       LOG.error("bootstrap", e);
548       throw e;
549     }
550   }
551 
552   /**
553    * Enable in memory caching for hbase:meta
554    */
555   public static void setInfoFamilyCachingForMeta(HTableDescriptor metaDescriptor, final boolean b) {
556     for (HColumnDescriptor hcd: metaDescriptor.getColumnFamilies()) {
557       if (Bytes.equals(hcd.getName(), HConstants.CATALOG_FAMILY)) {
558         hcd.setBlockCacheEnabled(b);
559         hcd.setInMemory(b);
560       }
561     }
562   }
563 
564 
565   public void deleteRegion(HRegionInfo region) throws IOException {
566     HFileArchiver.archiveRegion(conf, fs, region);
567   }
568 
569   public void deleteTable(TableName tableName) throws IOException {
570     fs.delete(FSUtils.getTableDir(rootdir, tableName), true);
571   }
572 
573   /**
574    * Move the specified table to the hbase temp directory
575    * @param tableName Table name to move
576    * @return The temp location of the table moved
577    * @throws IOException in case of file-system failure
578    */
579   public Path moveTableToTemp(TableName tableName) throws IOException {
580     Path srcPath = FSUtils.getTableDir(rootdir, tableName);
581     Path tempPath = FSUtils.getTableDir(this.tempdir, tableName);
582 
583     // Ensure temp exists
584     if (!fs.exists(tempPath.getParent()) && !fs.mkdirs(tempPath.getParent())) {
585       throw new IOException("HBase temp directory '" + tempPath.getParent() + "' creation failure.");
586     }
587 
588     if (!fs.rename(srcPath, tempPath)) {
589       throw new IOException("Unable to move '" + srcPath + "' to temp '" + tempPath + "'");
590     }
591 
592     return tempPath;
593   }
594 
595   public void updateRegionInfo(HRegionInfo region) {
596     // TODO implement this.  i think this is currently broken in trunk i don't
597     //      see this getting updated.
598     //      @see HRegion.checkRegioninfoOnFilesystem()
599   }
600 
601   public void deleteFamilyFromFS(HRegionInfo region, byte[] familyName)
602       throws IOException {
603     // archive family store files
604     Path tableDir = FSUtils.getTableDir(rootdir, region.getTable());
605     HFileArchiver.archiveFamily(fs, conf, region, tableDir, familyName);
606 
607     // delete the family folder
608     Path familyDir = new Path(tableDir,
609       new Path(region.getEncodedName(), Bytes.toString(familyName)));
610     if (fs.delete(familyDir, true) == false) {
611       if (fs.exists(familyDir)) {
612         throw new IOException("Could not delete family "
613             + Bytes.toString(familyName) + " from FileSystem for region "
614             + region.getRegionNameAsString() + "(" + region.getEncodedName()
615             + ")");
616       }
617     }
618   }
619 
620   public void stop() {
621     if (splitLogManager != null) {
622       this.splitLogManager.stop();
623     }
624   }
625 
626   /**
627    * Delete column of a table
628    * @param tableName
629    * @param familyName
630    * @return Modified HTableDescriptor with requested column deleted.
631    * @throws IOException
632    */
633   public HTableDescriptor deleteColumn(TableName tableName, byte[] familyName)
634       throws IOException {
635     LOG.info("DeleteColumn. Table = " + tableName
636         + " family = " + Bytes.toString(familyName));
637     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
638     htd.removeFamily(familyName);
639     this.services.getTableDescriptors().add(htd);
640     return htd;
641   }
642 
643   /**
644    * Modify Column of a table
645    * @param tableName
646    * @param hcd HColumnDesciptor
647    * @return Modified HTableDescriptor with the column modified.
648    * @throws IOException
649    */
650   public HTableDescriptor modifyColumn(TableName tableName, HColumnDescriptor hcd)
651       throws IOException {
652     LOG.info("AddModifyColumn. Table = " + tableName
653         + " HCD = " + hcd.toString());
654 
655     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
656     byte [] familyName = hcd.getName();
657     if(!htd.hasFamily(familyName)) {
658       throw new InvalidFamilyOperationException("Family '" +
659         Bytes.toString(familyName) + "' doesn't exists so cannot be modified");
660     }
661     htd.modifyFamily(hcd);
662     this.services.getTableDescriptors().add(htd);
663     return htd;
664   }
665 
666   /**
667    * Add column to a table
668    * @param tableName
669    * @param hcd
670    * @return Modified HTableDescriptor with new column added.
671    * @throws IOException
672    */
673   public HTableDescriptor addColumn(TableName tableName, HColumnDescriptor hcd)
674       throws IOException {
675     LOG.info("AddColumn. Table = " + tableName + " HCD = " +
676       hcd.toString());
677     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
678     if (htd == null) {
679       throw new InvalidFamilyOperationException("Family '" +
680         hcd.getNameAsString() + "' cannot be modified as HTD is null");
681     }
682     htd.addFamily(hcd);
683     this.services.getTableDescriptors().add(htd);
684     return htd;
685   }
686 
687   /**
688    * The function is used in SSH to set recovery mode based on configuration after all outstanding
689    * log split tasks drained.
690    * @throws IOException
691    */
692   public void setLogRecoveryMode() throws IOException {
693       this.splitLogManager.setRecoveryMode(false);
694   }
695 
696   public RecoveryMode getLogRecoveryMode() {
697     return this.splitLogManager.getRecoveryMode();
698   }
699 }