001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.region;
019
020import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
021
022import com.google.errorprone.annotations.RestrictedApi;
023import java.io.IOException;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseIOException;
030import org.apache.hadoop.hbase.Server;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionInfoBuilder;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.log.HBaseMarkers;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
044import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
045import org.apache.hadoop.hbase.regionserver.RegionScanner;
046import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
047import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
050import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.FSTableDescriptors;
054import org.apache.hadoop.hbase.util.FSUtils;
055import org.apache.hadoop.hbase.util.HFileArchiveUtil;
056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
058import org.apache.hadoop.hbase.wal.WAL;
059import org.apache.hadoop.hbase.wal.WALFactory;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
065
066/**
067 * A region that stores data in a separated directory, which can be used to store master local data.
068 * <p/>
069 * FileSystem layout:
070 *
071 * <pre>
072 * hbase
073 *   |
074 *   --&lt;region dir&gt;
075 *       |
076 *       --data
077 *       |  |
078 *       |  --/&lt;ns&gt/&lt;table&gt/&lt;encoded-region-name&gt; <---- The region data
079 *       |      |
080 *       |      --replay <---- The edits to replay
081 *       |
082 *       --WALs
083 *          |
084 *          --&lt;master-server-name&gt; <---- The WAL dir for active master
085 *          |
086 *          --&lt;master-server-name&gt;-dead <---- The WAL dir for dead master
087 * </pre>
088 *
089 * Notice that, you can use different root file system and WAL file system. Then the above directory
090 * will be on two file systems, the root file system will have the data directory while the WAL
091 * filesystem will have the WALs directory. The archived HFile will be moved to the global HFile
092 * archived directory with the {@link MasterRegionParams#archivedHFileSuffix()} suffix. The archived
093 * WAL will be moved to the global WAL archived directory with the
094 * {@link MasterRegionParams#archivedWalSuffix()} suffix.
095 */
096@InterfaceAudience.Private
097public final class MasterRegion {
098
099  private static final Logger LOG = LoggerFactory.getLogger(MasterRegion.class);
100
101  private static final String REPLAY_EDITS_DIR = "recovered.wals";
102
103  private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
104
105  static final String INITIALIZING_FLAG = ".initializing";
106
107  static final String INITIALIZED_FLAG = ".initialized";
108
109  private static final int REGION_ID = 1;
110
111  private final Server server;
112
113  private final WALFactory walFactory;
114
115  final HRegion region;
116
117  final MasterRegionFlusherAndCompactor flusherAndCompactor;
118
119  private MasterRegionWALRoller walRoller;
120
121  private MasterRegion(Server server, HRegion region, WALFactory walFactory,
122    MasterRegionFlusherAndCompactor flusherAndCompactor, MasterRegionWALRoller walRoller) {
123    this.server = server;
124    this.region = region;
125    this.walFactory = walFactory;
126    this.flusherAndCompactor = flusherAndCompactor;
127    this.walRoller = walRoller;
128  }
129
130  private void closeRegion(boolean abort) {
131    try {
132      region.close(abort);
133    } catch (IOException e) {
134      LOG.warn("Failed to close region", e);
135    }
136  }
137
138  private void shutdownWAL() {
139    try {
140      walFactory.shutdown();
141    } catch (IOException e) {
142      LOG.warn("Failed to shutdown WAL", e);
143    }
144  }
145
146  public void update(UpdateMasterRegion action) throws IOException {
147    try {
148      action.update(region);
149      flusherAndCompactor.onUpdate();
150    } catch (WALSyncTimeoutIOException e) {
151      LOG.error(HBaseMarkers.FATAL, "WAL sync timeout. Aborting server.");
152      server.abort("WAL sync timeout", e);
153      throw e;
154    }
155  }
156
157  public Result get(Get get) throws IOException {
158    return region.get(get);
159  }
160
161  public ResultScanner getScanner(Scan scan) throws IOException {
162    return new RegionScannerAsResultScanner(region.getScanner(scan));
163  }
164
165  public RegionScanner getRegionScanner(Scan scan) throws IOException {
166    return region.getScanner(scan);
167  }
168
169  public FlushResult flush(boolean force) throws IOException {
170    try {
171      flusherAndCompactor.resetChangesAfterLastFlush();
172      FlushResult flushResult = region.flush(force);
173      flusherAndCompactor.recordLastFlushTime();
174      return flushResult;
175    } catch (WALSyncTimeoutIOException e) {
176      LOG.error(HBaseMarkers.FATAL, "WAL sync timeout. Aborting server.");
177      server.abort("WAL sync timeout", e);
178      throw e;
179    }
180  }
181
182  @RestrictedApi(explanation = "Should only be called in tests", link = "",
183      allowedOnPath = ".*/src/test/.*")
184  public void requestRollAll() {
185    walRoller.requestRollAll();
186  }
187
188  @RestrictedApi(explanation = "Should only be called in tests", link = "",
189      allowedOnPath = ".*/src/test/.*")
190  public void waitUntilWalRollFinished() throws InterruptedException {
191    walRoller.waitUntilWalRollFinished();
192  }
193
194  public void close(boolean abort) {
195    LOG.info("Closing local region {}, isAbort={}", region.getRegionInfo(), abort);
196    if (flusherAndCompactor != null) {
197      flusherAndCompactor.close();
198    }
199    // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
200    // region, otherwise there will be dead lock.
201    if (abort) {
202      shutdownWAL();
203      closeRegion(true);
204    } else {
205      closeRegion(false);
206      shutdownWAL();
207    }
208
209    if (walRoller != null) {
210      walRoller.close();
211    }
212  }
213
214  private static WAL createWAL(WALFactory walFactory, MasterRegionWALRoller walRoller,
215    String serverName, FileSystem walFs, Path walRootDir, RegionInfo regionInfo)
216    throws IOException {
217    String logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
218    Path walDir = new Path(walRootDir, logName);
219    LOG.debug("WALDir={}", walDir);
220    if (walFs.exists(walDir)) {
221      throw new HBaseIOException(
222        "Already created wal directory at " + walDir + " for local region " + regionInfo);
223    }
224    if (!walFs.mkdirs(walDir)) {
225      throw new IOException(
226        "Can not create wal directory " + walDir + " for local region " + regionInfo);
227    }
228    WAL wal = walFactory.getWAL(regionInfo);
229    walRoller.addWAL(wal);
230    return wal;
231  }
232
233  private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
234    Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
235    MasterRegionWALRoller walRoller, String serverName, boolean touchInitializingFlag)
236    throws IOException {
237    TableName tn = td.getTableName();
238    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
239    Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
240    // persist table descriptor
241    FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, true);
242    HRegion.createHRegion(conf, regionInfo, fs, tableDir, td).close();
243    Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
244    if (!fs.mkdirs(initializedFlag)) {
245      throw new IOException("Can not touch initialized flag: " + initializedFlag);
246    }
247    Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
248    if (!fs.delete(initializingFlag, true)) {
249      LOG.warn("failed to clean up initializing flag: " + initializingFlag);
250    }
251    WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
252    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
253  }
254
255  private static RegionInfo loadRegionInfo(FileSystem fs, Path tableDir) throws IOException {
256    Path regionDir =
257      fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
258        .getPath();
259    return HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
260  }
261
262  private static HRegion open(Configuration conf, TableDescriptor td, RegionInfo regionInfo,
263    FileSystem fs, Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
264    MasterRegionWALRoller walRoller, String serverName) throws IOException {
265    Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
266    Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo);
267    Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR);
268    if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) {
269      throw new IOException("Failed to create replay directory: " + replayEditsDir);
270    }
271
272    // Replay any WALs for the Master Region before opening it.
273    Path walsDir = new Path(walRootDir, HREGION_LOGDIR_NAME);
274    // In open(...), we expect that the WAL directory for the MasterRegion to already exist.
275    // This is in contrast to bootstrap() where we create the MasterRegion data and WAL dir.
276    // However, it's possible that users directly remove the WAL directory. We expect walsDir
277    // to always exist in normal situations, but we should guard against users changing the
278    // filesystem outside of HBase's line of sight.
279    if (walFs.exists(walsDir)) {
280      replayWALs(conf, walFs, walRootDir, walsDir, regionInfo, serverName, replayEditsDir);
281    } else {
282      LOG.error(
283        "UNEXPECTED: WAL directory for MasterRegion is missing." + " {} is unexpectedly missing.",
284        walsDir);
285    }
286
287    // Create a new WAL
288    WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
289    conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
290      replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
291    // we do not do WAL splitting here so it is possible to have uncleanly closed WAL files, so we
292    // need to ignore EOFException.
293    conf.setBoolean(HRegion.RECOVERED_EDITS_IGNORE_EOF, true);
294    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
295  }
296
297  private static void replayWALs(Configuration conf, FileSystem walFs, Path walRootDir,
298    Path walsDir, RegionInfo regionInfo, String serverName, Path replayEditsDir)
299    throws IOException {
300    for (FileStatus walDir : walFs.listStatus(walsDir)) {
301      if (!walDir.isDirectory()) {
302        continue;
303      }
304      if (walDir.getPath().getName().startsWith(serverName)) {
305        LOG.warn("This should not happen in real production as we have not created our WAL "
306          + "directory yet, ignore if you are running a local region related UT");
307      }
308      Path deadWALDir;
309      if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
310        deadWALDir =
311          new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
312        if (!walFs.rename(walDir.getPath(), deadWALDir)) {
313          throw new IOException("Can not rename " + walDir + " to " + deadWALDir
314            + " when recovering lease of proc store");
315        }
316        LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
317      } else {
318        deadWALDir = walDir.getPath();
319        LOG.info("{} is already marked as dead", deadWALDir);
320      }
321      for (FileStatus walFile : walFs.listStatus(deadWALDir)) {
322        Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
323        RecoverLeaseFSUtils.recoverFileLease(walFs, walFile.getPath(), conf);
324        if (!walFs.rename(walFile.getPath(), replayEditsFile)) {
325          throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile
326            + " when recovering lease for local region");
327        }
328        LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
329      }
330      LOG.info("Delete empty local region wal dir {}", deadWALDir);
331      walFs.delete(deadWALDir, true);
332    }
333  }
334
335  private static void tryMigrate(Configuration conf, FileSystem fs, Path tableDir,
336    RegionInfo regionInfo, TableDescriptor oldTd, TableDescriptor newTd) throws IOException {
337    Class<? extends StoreFileTracker> oldSft =
338      StoreFileTrackerFactory.getTrackerClass(oldTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
339    Class<? extends StoreFileTracker> newSft =
340      StoreFileTrackerFactory.getTrackerClass(newTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
341    if (oldSft.equals(newSft)) {
342      LOG.debug("old store file tracker {} is the same with new store file tracker, skip migration",
343        StoreFileTrackerFactory.getStoreFileTrackerName(oldSft));
344      if (!oldTd.equals(newTd)) {
345        // we may change other things such as adding a new family, so here we still need to persist
346        // the new table descriptor
347        LOG.info("Update table descriptor from {} to {}", oldTd, newTd);
348        FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true);
349      }
350      return;
351    }
352    LOG.info("Migrate store file tracker from {} to {}", oldSft.getSimpleName(),
353      newSft.getSimpleName());
354    HRegionFileSystem hfs =
355      HRegionFileSystem.openRegionFromFileSystem(conf, fs, tableDir, regionInfo, false);
356    for (ColumnFamilyDescriptor oldCfd : oldTd.getColumnFamilies()) {
357      StoreFileTracker oldTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs);
358      StoreFileTracker newTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs);
359      List<StoreFileInfo> files = oldTracker.load();
360      LOG.debug("Store file list for {}: {}", oldCfd.getNameAsString(), files);
361      newTracker.set(oldTracker.load());
362    }
363    // persist the new table descriptor after migration
364    LOG.info("Update table descriptor from {} to {}", oldTd, newTd);
365    FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true);
366  }
367
368  public static MasterRegion create(MasterRegionParams params) throws IOException {
369    TableDescriptor td = params.tableDescriptor();
370    LOG.info("Create or load local region for table " + td);
371    Server server = params.server();
372    Configuration baseConf = server.getConfiguration();
373    FileSystem fs = CommonFSUtils.getRootDirFileSystem(baseConf);
374    FileSystem walFs = CommonFSUtils.getWALFileSystem(baseConf);
375    Path globalRootDir = CommonFSUtils.getRootDir(baseConf);
376    Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
377    Path rootDir = new Path(globalRootDir, params.regionDirName());
378    Path walRootDir = new Path(globalWALRootDir, params.regionDirName());
379    // we will override some configurations so create a new one.
380    Configuration conf = new Configuration(baseConf);
381    CommonFSUtils.setRootDir(conf, rootDir);
382    CommonFSUtils.setWALRootDir(conf, walRootDir);
383    MasterRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(),
384      params.flushIntervalMs());
385    conf.setInt(AbstractFSWAL.MAX_LOGS, params.maxWals());
386    if (params.useHsync() != null) {
387      conf.setBoolean(HRegion.WAL_HSYNC_CONF_KEY, params.useHsync());
388    }
389    if (params.useMetaCellComparator() != null) {
390      conf.setBoolean(HRegion.USE_META_CELL_COMPARATOR, params.useMetaCellComparator());
391    }
392    conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT,
393      IntMath.ceilingPowerOfTwo(params.ringBufferSlotCount()));
394
395    MasterRegionWALRoller walRoller = MasterRegionWALRoller.create(
396      td.getTableName() + "-WAL-Roller", conf, server, walFs, walRootDir, globalWALRootDir,
397      params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
398    walRoller.start();
399
400    WALFactory walFactory = new WALFactory(conf, server.getServerName(), server);
401    Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
402    Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
403    Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
404    HRegion region;
405    if (!fs.exists(tableDir)) {
406      // bootstrap, no doubt
407      if (!fs.mkdirs(initializedFlag)) {
408        throw new IOException("Can not touch initialized flag");
409      }
410      region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
411        server.getServerName().toString(), true);
412    } else {
413      if (!fs.exists(initializedFlag)) {
414        if (!fs.exists(initializingFlag)) {
415          // should be old style, where we do not have the initializing or initialized file, persist
416          // the table descriptor, touch the initialized flag and then open the region.
417          // the store file tracker must be DEFAULT
418          LOG.info("No {} or {} file, try upgrading", INITIALIZING_FLAG, INITIALIZED_FLAG);
419          TableDescriptor oldTd =
420            TableDescriptorBuilder.newBuilder(td).setValue(StoreFileTrackerFactory.TRACKER_IMPL,
421              StoreFileTrackerFactory.Trackers.DEFAULT.name()).build();
422          FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, oldTd, true);
423          if (!fs.mkdirs(initializedFlag)) {
424            throw new IOException("Can not touch initialized flag: " + initializedFlag);
425          }
426          RegionInfo regionInfo = loadRegionInfo(fs, tableDir);
427          tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td);
428          region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
429            server.getServerName().toString());
430        } else {
431          // delete all contents besides the initializing flag, here we can make sure tableDir
432          // exists(unless someone delete it manually...), so we do not do null check here.
433          for (FileStatus status : fs.listStatus(tableDir)) {
434            if (!status.getPath().getName().equals(INITIALIZING_FLAG)) {
435              fs.delete(status.getPath(), true);
436            }
437          }
438          region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
439            server.getServerName().toString(), false);
440        }
441      } else {
442        if (fs.exists(initializingFlag) && !fs.delete(initializingFlag, true)) {
443          LOG.warn("failed to clean up initializing flag: " + initializingFlag);
444        }
445        // open it, make sure to load the table descriptor from fs
446        TableDescriptor oldTd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
447        RegionInfo regionInfo = loadRegionInfo(fs, tableDir);
448        tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td);
449        region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
450          server.getServerName().toString());
451      }
452    }
453
454    Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
455    MasterRegionFlusherAndCompactor flusherAndCompactor = new MasterRegionFlusherAndCompactor(conf,
456      server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
457      params.compactMin(), globalArchiveDir, params.archivedHFileSuffix());
458    walRoller.setFlusherAndCompactor(flusherAndCompactor);
459    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
460    if (!fs.mkdirs(archiveDir)) {
461      LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will"
462        + " be created again when we actually archive the hfiles later, so continue", archiveDir);
463    }
464    return new MasterRegion(server, region, walFactory, flusherAndCompactor, walRoller);
465  }
466}