001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.wal;
020
021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
022import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.locks.Lock;
030import java.util.stream.Collectors;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.client.RegionInfo;
036// imports for classes still in regionserver.wal
037import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.KeyLocker;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * A WAL Provider that returns a WAL per group of regions.
046 *
047 * This provider follows the decorator pattern and mainly holds the logic for WAL grouping.
048 * WAL creation/roll/close is delegated to {@link #DELEGATE_PROVIDER}
049 *
050 * Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via the
051 * property "hbase.wal.regiongrouping.strategy". Current strategy choices are
052 * <ul>
053 *   <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
054 *                                  "bounded".</li>
055 *   <li><em>identity</em> : each region belongs to its own group.</li>
056 *   <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
057 * </ul>
058 * Optionally, a FQCN to a custom implementation may be given.
059 */
060@InterfaceAudience.Private
061public class RegionGroupingProvider implements WALProvider {
062  private static final Logger LOG = LoggerFactory.getLogger(RegionGroupingProvider.class);
063
064  /**
065   * Map identifiers to a group number.
066   */
067  public static interface RegionGroupingStrategy {
068    String GROUP_NAME_DELIMITER = ".";
069
070    /**
071     * Given an identifier and a namespace, pick a group.
072     */
073    String group(final byte[] identifier, byte[] namespace);
074    void init(Configuration config, String providerId);
075  }
076
077  /**
078   * Maps between configuration names for strategies and implementation classes.
079   */
080  static enum Strategies {
081    defaultStrategy(BoundedGroupingStrategy.class),
082    identity(IdentityGroupingStrategy.class),
083    bounded(BoundedGroupingStrategy.class),
084    namespace(NamespaceGroupingStrategy.class);
085
086    final Class<? extends RegionGroupingStrategy> clazz;
087    Strategies(Class<? extends RegionGroupingStrategy> clazz) {
088      this.clazz = clazz;
089    }
090  }
091
092  /**
093   * instantiate a strategy from a config property.
094   * requires conf to have already been set (as well as anything the provider might need to read).
095   */
096  RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
097      final String defaultValue) throws IOException {
098    Class<? extends RegionGroupingStrategy> clazz;
099    try {
100      clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
101    } catch (IllegalArgumentException exception) {
102      // Fall back to them specifying a class name
103      // Note that the passed default class shouldn't actually be used, since the above only fails
104      // when there is a config value present.
105      clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
106    }
107    LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
108    try {
109      final RegionGroupingStrategy result = clazz.getDeclaredConstructor().newInstance();
110      result.init(conf, providerId);
111      return result;
112    } catch (Exception e) {
113      LOG.error("couldn't set up region grouping strategy, check config key " +
114          REGION_GROUPING_STRATEGY);
115      LOG.debug("Exception details for failure to load region grouping strategy.", e);
116      throw new IOException("couldn't set up region grouping strategy", e);
117    }
118  }
119
120  public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
121  public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
122
123  /** delegate provider for WAL creation/roll/close, but not support multiwal */
124  public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
125  public static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider
126      .name();
127
128  private static final String META_WAL_GROUP_NAME = "meta";
129
130  /** A group-provider mapping, make sure one-one rather than many-one mapping */
131  private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
132
133  private final KeyLocker<String> createLock = new KeyLocker<>();
134
135  private RegionGroupingStrategy strategy;
136  private WALFactory factory;
137  private List<WALActionsListener> listeners = new ArrayList<>();
138  private String providerId;
139  private Class<? extends WALProvider> providerClass;
140
141  @Override
142  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
143      throws IOException {
144    if (null != strategy) {
145      throw new IllegalStateException("WALProvider.init should only be called once.");
146    }
147    this.factory = factory;
148
149    if (META_WAL_PROVIDER_ID.equals(providerId)) {
150      // do not change the provider id if it is for meta
151      this.providerId = providerId;
152    } else {
153      StringBuilder sb = new StringBuilder().append(factory.factoryId);
154      if (providerId != null) {
155        if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
156          sb.append(providerId);
157        } else {
158          sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
159        }
160      }
161      this.providerId = sb.toString();
162    }
163    this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
164    this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
165    if (providerClass.equals(this.getClass())) {
166      LOG.warn("delegate provider not support multiwal, falling back to defaultProvider.");
167      providerClass = factory.getDefaultProvider().clazz;
168    }
169  }
170
171  private WALProvider createProvider(String group) throws IOException {
172    if (META_WAL_PROVIDER_ID.equals(providerId)) {
173      return factory.createProvider(providerClass, META_WAL_PROVIDER_ID);
174    } else {
175      return factory.createProvider(providerClass, group);
176    }
177  }
178
179  @Override
180  public List<WAL> getWALs() {
181    return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList());
182  }
183
184  private WAL getWAL(String group) throws IOException {
185    WALProvider provider = cached.get(group);
186    if (provider == null) {
187      Lock lock = createLock.acquireLock(group);
188      try {
189        provider = cached.get(group);
190        if (provider == null) {
191          provider = createProvider(group);
192          listeners.forEach(provider::addWALActionsListener);
193          cached.put(group, provider);
194        }
195      } finally {
196        lock.unlock();
197      }
198    }
199    return provider.getWAL(null);
200  }
201
202  @Override
203  public WAL getWAL(RegionInfo region) throws IOException {
204    String group;
205    if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
206      group = META_WAL_GROUP_NAME;
207    } else {
208      byte[] id;
209      byte[] namespace;
210      if (region != null) {
211        id = region.getEncodedNameAsBytes();
212        namespace = region.getTable().getNamespace();
213      } else {
214        id = HConstants.EMPTY_BYTE_ARRAY;
215        namespace = null;
216      }
217      group = strategy.group(id, namespace);
218    }
219    return getWAL(group);
220  }
221
222  @Override
223  public void shutdown() throws IOException {
224    // save the last exception and rethrow
225    IOException failure = null;
226    for (WALProvider provider: cached.values()) {
227      try {
228        provider.shutdown();
229      } catch (IOException e) {
230        LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
231        if (LOG.isDebugEnabled()) {
232          LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
233        }
234        failure = e;
235      }
236    }
237    if (failure != null) {
238      throw failure;
239    }
240  }
241
242  @Override
243  public void close() throws IOException {
244    // save the last exception and rethrow
245    IOException failure = null;
246    for (WALProvider provider : cached.values()) {
247      try {
248        provider.close();
249      } catch (IOException e) {
250        LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
251        if (LOG.isDebugEnabled()) {
252          LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
253        }
254        failure = e;
255      }
256    }
257    if (failure != null) {
258      throw failure;
259    }
260  }
261
262  static class IdentityGroupingStrategy implements RegionGroupingStrategy {
263    @Override
264    public void init(Configuration config, String providerId) {}
265    @Override
266    public String group(final byte[] identifier, final byte[] namespace) {
267      return Bytes.toString(identifier);
268    }
269  }
270
271  @Override
272  public long getNumLogFiles() {
273    long numLogFiles = 0;
274    for (WALProvider provider : cached.values()) {
275      numLogFiles += provider.getNumLogFiles();
276    }
277    return numLogFiles;
278  }
279
280  @Override
281  public long getLogFileSize() {
282    long logFileSize = 0;
283    for (WALProvider provider : cached.values()) {
284      logFileSize += provider.getLogFileSize();
285    }
286    return logFileSize;
287  }
288
289  @Override
290  public void addWALActionsListener(WALActionsListener listener) {
291    // Notice that there is an assumption that this method must be called before the getWAL above,
292    // so we can make sure there is no sub WALProvider yet, so we only add the listener to our
293    // listeners list without calling addWALActionListener for each WALProvider. Although it is no
294    // hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the
295    // extra code actually works, then we will have other big problems. So leave it as is.
296    listeners.add(listener);
297  }
298}