001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure.flush;
019
020import java.util.List;
021import java.util.concurrent.Callable;
022
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026import org.apache.hadoop.hbase.errorhandling.ForeignException;
027import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
028import org.apache.hadoop.hbase.procedure.ProcedureMember;
029import org.apache.hadoop.hbase.procedure.Subprocedure;
030import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032
033/**
034 * This flush region implementation uses the distributed procedure framework to flush
035 * table regions.
036 * Its acquireBarrier stage does nothing.  Its insideBarrier stage flushes the regions.
037 */
038@InterfaceAudience.Private
039public class FlushTableSubprocedure extends Subprocedure {
040  private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class);
041
042  private final String table;
043  private final List<HRegion> regions;
044  private final FlushTableSubprocedurePool taskManager;
045
046  public FlushTableSubprocedure(ProcedureMember member,
047      ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
048      List<HRegion> regions, String table,
049      FlushTableSubprocedurePool taskManager) {
050    super(member, table, errorListener, wakeFrequency, timeout);
051    this.table = table;
052    this.regions = regions;
053    this.taskManager = taskManager;
054  }
055
056  private static class RegionFlushTask implements Callable<Void> {
057    HRegion region;
058    RegionFlushTask(HRegion region) {
059      this.region = region;
060    }
061
062    @Override
063    public Void call() throws Exception {
064      LOG.debug("Starting region operation on " + region);
065      region.startRegionOperation();
066      try {
067        LOG.debug("Flush region " + region.toString() + " started...");
068        region.flush(true);
069        // TODO: flush result is not checked?
070      } finally {
071        LOG.debug("Closing region operation on " + region);
072        region.closeRegionOperation();
073      }
074      return null;
075    }
076  }
077
078  private void flushRegions() throws ForeignException {
079    if (regions.isEmpty()) {
080      // No regions on this RS, we are basically done.
081      return;
082    }
083
084    monitor.rethrowException();
085
086    // assert that the taskManager is empty.
087    if (taskManager.hasTasks()) {
088      throw new IllegalStateException("Attempting to flush "
089          + table + " but we currently have outstanding tasks");
090    }
091
092    // Add all hfiles already existing in region.
093    for (HRegion region : regions) {
094      // submit one task per region for parallelize by region.
095      taskManager.submitTask(new RegionFlushTask(region));
096      monitor.rethrowException();
097    }
098
099    // wait for everything to complete.
100    LOG.debug("Flush region tasks submitted for " + regions.size() + " regions");
101    try {
102      taskManager.waitForOutstandingTasks();
103    } catch (InterruptedException e) {
104      throw new ForeignException(getMemberName(), e);
105    }
106  }
107
108  /**
109   * Flush the online regions on this rs for the target table.
110   */
111  @Override
112  public void acquireBarrier() throws ForeignException {
113    flushRegions();
114  }
115
116  @Override
117  public byte[] insideBarrier() throws ForeignException {
118    // No-Op
119    return new byte[0];
120  }
121
122  /**
123   * Cancel threads if they haven't finished.
124   */
125  @Override
126  public void cleanup(Exception e) {
127    LOG.info("Aborting all flush region subprocedure task threads for '"
128        + table + "' due to error", e);
129    try {
130      taskManager.cancelTasks();
131    } catch (InterruptedException e1) {
132      Thread.currentThread().interrupt();
133    }
134  }
135
136  public void releaseBarrier() {
137    // NO OP
138  }
139
140}