View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.snapshot;
19  
20  import java.util.List;
21  import java.util.concurrent.Callable;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.errorhandling.ForeignException;
28  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
29  import org.apache.hadoop.hbase.procedure.ProcedureMember;
30  import org.apache.hadoop.hbase.procedure.Subprocedure;
31  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
32  import org.apache.hadoop.hbase.regionserver.HRegion;
33  import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
34  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
35  
36  /**
37   * This online snapshot implementation uses the distributed procedure framework to force a
38   * store flush and then records the hfiles.  Its enter stage does nothing.  Its leave stage then
39   * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
40   * copies .regioninfos into the snapshot working directory.  At the master side, there is an atomic
41   * rename of the working dir into the proper snapshot directory.
42   */
43  @InterfaceAudience.Private
44  @InterfaceStability.Unstable
45  public class FlushSnapshotSubprocedure extends Subprocedure {
46    private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
47  
48    private final List<HRegion> regions;
49    private final SnapshotDescription snapshot;
50    private final SnapshotSubprocedurePool taskManager;
51  
52    public FlushSnapshotSubprocedure(ProcedureMember member,
53        ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
54        List<HRegion> regions, SnapshotDescription snapshot,
55        SnapshotSubprocedurePool taskManager) {
56      super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
57      this.snapshot = snapshot;
58      this.regions = regions;
59      this.taskManager = taskManager;
60    }
61  
62    /**
63     * Callable for adding files to snapshot manifest working dir.  Ready for multithreading.
64     */
65    private class RegionSnapshotTask implements Callable<Void> {
66      HRegion region;
67      RegionSnapshotTask(HRegion region) {
68        this.region = region;
69      }
70  
71      @Override
72      public Void call() throws Exception {
73        // Taking the region read lock prevents the individual region from being closed while a
74        // snapshot is in progress.  This is helpful but not sufficient for preventing races with
75        // snapshots that involve multiple regions and regionservers.  It is still possible to have
76        // an interleaving such that globally regions are missing, so we still need the verification
77        // step.
78        LOG.debug("Starting region operation on " + region);
79        region.startRegionOperation();
80        try {
81          LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
82          region.flushcache();
83          region.addRegionToSnapshot(snapshot, monitor);
84          LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
85        } finally {
86          LOG.debug("Closing region operation on " + region);
87          region.closeRegionOperation();
88        }
89        return null;
90      }
91    }
92  
93    private void flushSnapshot() throws ForeignException {
94      if (regions.isEmpty()) {
95        // No regions on this RS, we are basically done.
96        return;
97      }
98  
99      monitor.rethrowException();
100 
101     // assert that the taskManager is empty.
102     if (taskManager.hasTasks()) {
103       throw new IllegalStateException("Attempting to take snapshot "
104           + ClientSnapshotDescriptionUtils.toString(snapshot)
105           + " but we currently have outstanding tasks");
106     }
107 
108     // Add all hfiles already existing in region.
109     for (HRegion region : regions) {
110       // submit one task per region for parallelize by region.
111       taskManager.submitTask(new RegionSnapshotTask(region));
112       monitor.rethrowException();
113     }
114 
115     // wait for everything to complete.
116     LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
117     try {
118       taskManager.waitForOutstandingTasks();
119     } catch (InterruptedException e) {
120       throw new ForeignException(getMemberName(), e);
121     }
122   }
123 
124   /**
125    * do nothing, core of snapshot is executed in {@link #insideBarrier} step.
126    */
127   @Override
128   public void acquireBarrier() throws ForeignException {
129     // NO OP
130   }
131 
132   /**
133    * do a flush snapshot of every region on this rs from the target table.
134    */
135   @Override
136   public void insideBarrier() throws ForeignException {
137     flushSnapshot();
138   }
139 
140   /**
141    * Cancel threads if they haven't finished.
142    */
143   @Override
144   public void cleanup(Exception e) {
145     LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
146         + snapshot.getName() + "' due to error", e);
147     try {
148       taskManager.cancelTasks();
149     } catch (InterruptedException e1) {
150       Thread.currentThread().interrupt();
151     }
152   }
153 
154   /**
155    * Hooray!
156    */
157   public void releaseBarrier() {
158     // NO OP
159   }
160 
161 }