View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.snapshot;
19  
20  import java.util.List;
21  import java.util.concurrent.Callable;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.errorhandling.ForeignException;
28  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
29  import org.apache.hadoop.hbase.procedure.ProcedureMember;
30  import org.apache.hadoop.hbase.procedure.Subprocedure;
31  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
32  import org.apache.hadoop.hbase.regionserver.HRegion;
33  import org.apache.hadoop.hbase.regionserver.Region;
34  import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
35  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
36  
37  /**
38   * This online snapshot implementation uses the distributed procedure framework to force a
39   * store flush and then records the hfiles.  Its enter stage does nothing.  Its leave stage then
40   * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
41   * copies .regioninfos into the snapshot working directory.  At the master side, there is an atomic
42   * rename of the working dir into the proper snapshot directory.
43   */
44  @InterfaceAudience.Private
45  @InterfaceStability.Unstable
46  public class FlushSnapshotSubprocedure extends Subprocedure {
47    private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
48  
49    private final List<Region> regions;
50    private final SnapshotDescription snapshot;
51    private final SnapshotSubprocedurePool taskManager;
52    private boolean snapshotSkipFlush = false;
53  
54    public FlushSnapshotSubprocedure(ProcedureMember member,
55        ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
56        List<Region> regions, SnapshotDescription snapshot,
57        SnapshotSubprocedurePool taskManager) {
58      super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
59      this.snapshot = snapshot;
60  
61      if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
62        snapshotSkipFlush = true;
63      }
64      this.regions = regions;
65      this.taskManager = taskManager;
66    }
67  
68    /**
69     * Callable for adding files to snapshot manifest working dir.  Ready for multithreading.
70     */
71    private class RegionSnapshotTask implements Callable<Void> {
72      Region region;
73      RegionSnapshotTask(Region region) {
74        this.region = region;
75      }
76  
77      @Override
78      public Void call() throws Exception {
79        // Taking the region read lock prevents the individual region from being closed while a
80        // snapshot is in progress.  This is helpful but not sufficient for preventing races with
81        // snapshots that involve multiple regions and regionservers.  It is still possible to have
82        // an interleaving such that globally regions are missing, so we still need the verification
83        // step.
84        LOG.debug("Starting region operation on " + region);
85        region.startRegionOperation();
86        try {
87          if (snapshotSkipFlush) {
88          /*
89           * This is to take an online-snapshot without force a coordinated flush to prevent pause
90           * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
91           * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
92           * turned on/off based on the flush type.
93           * To minimized the code change, class name is not changed.
94           */
95            LOG.debug("take snapshot without flush memstore first");
96          } else {
97            LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
98            region.flush(true);
99          }
100         ((HRegion)region).addRegionToSnapshot(snapshot, monitor);
101         if (snapshotSkipFlush) {
102           LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
103         } else {
104           LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
105         }
106       } finally {
107         LOG.debug("Closing region operation on " + region);
108         region.closeRegionOperation();
109       }
110       return null;
111     }
112   }
113 
114   private void flushSnapshot() throws ForeignException {
115     if (regions.isEmpty()) {
116       // No regions on this RS, we are basically done.
117       return;
118     }
119 
120     monitor.rethrowException();
121 
122     // assert that the taskManager is empty.
123     if (taskManager.hasTasks()) {
124       throw new IllegalStateException("Attempting to take snapshot "
125           + ClientSnapshotDescriptionUtils.toString(snapshot)
126           + " but we currently have outstanding tasks");
127     }
128 
129     // Add all hfiles already existing in region.
130     for (Region region : regions) {
131       // submit one task per region for parallelize by region.
132       taskManager.submitTask(new RegionSnapshotTask(region));
133       monitor.rethrowException();
134     }
135 
136     // wait for everything to complete.
137     LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
138     try {
139       taskManager.waitForOutstandingTasks();
140     } catch (InterruptedException e) {
141       LOG.error("got interrupted exception for " + getMemberName());
142       throw new ForeignException(getMemberName(), e);
143     }
144   }
145 
146   /**
147    * do nothing, core of snapshot is executed in {@link #insideBarrier} step.
148    */
149   @Override
150   public void acquireBarrier() throws ForeignException {
151     // NO OP
152   }
153 
154   /**
155    * do a flush snapshot of every region on this rs from the target table.
156    */
157   @Override
158   public byte[] insideBarrier() throws ForeignException {
159     flushSnapshot();
160     return new byte[0];
161   }
162 
163   /**
164    * Cancel threads if they haven't finished.
165    */
166   @Override
167   public void cleanup(Exception e) {
168     LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
169         + snapshot.getName() + "' due to error", e);
170     try {
171       taskManager.cancelTasks();
172     } catch (InterruptedException e1) {
173       Thread.currentThread().interrupt();
174     }
175   }
176 
177   /**
178    * Hooray!
179    */
180   public void releaseBarrier() {
181     // NO OP
182   }
183 
184 }