001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.locks.Lock;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.KeyLocker;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A WAL Provider that returns a WAL per group of regions. This provider follows the decorator
044 * pattern and mainly holds the logic for WAL grouping. WAL creation/roll/close is delegated to
045 * {@link #DELEGATE_PROVIDER} Region grouping is handled via {@link RegionGroupingStrategy} and can
046 * be configured via the property "hbase.wal.regiongrouping.strategy". Current strategy choices are
047 * <ul>
048 * <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
049 * "bounded".</li>
050 * <li><em>identity</em> : each region belongs to its own group.</li>
051 * <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
052 * </ul>
053 * Optionally, a FQCN to a custom implementation may be given.
054 */
055@InterfaceAudience.Private
056public class RegionGroupingProvider implements WALProvider {
057  private static final Logger LOG = LoggerFactory.getLogger(RegionGroupingProvider.class);
058
059  /**
060   * Map identifiers to a group number.
061   */
062  public static interface RegionGroupingStrategy {
063    String GROUP_NAME_DELIMITER = ".";
064
065    /**
066     * Given an identifier and a namespace, pick a group.
067     */
068    String group(final byte[] identifier, byte[] namespace);
069
070    void init(Configuration config, String providerId);
071  }
072
073  /**
074   * Maps between configuration names for strategies and implementation classes.
075   */
076  static enum Strategies {
077    defaultStrategy(BoundedGroupingStrategy.class),
078    identity(IdentityGroupingStrategy.class),
079    bounded(BoundedGroupingStrategy.class),
080    namespace(NamespaceGroupingStrategy.class);
081
082    final Class<? extends RegionGroupingStrategy> clazz;
083
084    Strategies(Class<? extends RegionGroupingStrategy> clazz) {
085      this.clazz = clazz;
086    }
087  }
088
089  /**
090   * instantiate a strategy from a config property. requires conf to have already been set (as well
091   * as anything the provider might need to read).
092   */
093  RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
094    final String defaultValue) throws IOException {
095    Class<? extends RegionGroupingStrategy> clazz;
096    try {
097      clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
098    } catch (IllegalArgumentException exception) {
099      // Fall back to them specifying a class name
100      // Note that the passed default class shouldn't actually be used, since the above only fails
101      // when there is a config value present.
102      clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
103    }
104    LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
105    try {
106      final RegionGroupingStrategy result = clazz.getDeclaredConstructor().newInstance();
107      result.init(conf, providerId);
108      return result;
109    } catch (Exception e) {
110      LOG.error(
111        "couldn't set up region grouping strategy, check config key " + REGION_GROUPING_STRATEGY);
112      LOG.debug("Exception details for failure to load region grouping strategy.", e);
113      throw new IOException("couldn't set up region grouping strategy", e);
114    }
115  }
116
117  public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
118  public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
119
120  /** delegate provider for WAL creation/roll/close, but not support multiwal */
121  public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
122  public static final String DEFAULT_DELEGATE_PROVIDER =
123    WALFactory.Providers.defaultProvider.name();
124
125  private static final String META_WAL_GROUP_NAME = "meta";
126
127  /** A group-provider mapping, make sure one-one rather than many-one mapping */
128  private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
129
130  private final KeyLocker<String> createLock = new KeyLocker<>();
131
132  private RegionGroupingStrategy strategy;
133  private WALFactory factory;
134  private Configuration conf;
135  private List<WALActionsListener> listeners = new ArrayList<>();
136  private String providerId;
137  private Class<? extends WALProvider> providerClass;
138  private Abortable abortable;
139
140  @Override
141  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
142    throws IOException {
143    if (null != strategy) {
144      throw new IllegalStateException("WALProvider.init should only be called once.");
145    }
146    this.conf = conf;
147    this.factory = factory;
148    this.abortable = abortable;
149
150    if (META_WAL_PROVIDER_ID.equals(providerId)) {
151      // do not change the provider id if it is for meta
152      this.providerId = providerId;
153    } else {
154      StringBuilder sb = new StringBuilder().append(factory.factoryId);
155      if (providerId != null) {
156        if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
157          sb.append(providerId);
158        } else {
159          sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
160        }
161      }
162      this.providerId = sb.toString();
163    }
164    this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
165    this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
166    if (providerClass.equals(this.getClass())) {
167      LOG.warn("delegate provider not support multiwal, falling back to defaultProvider.");
168      providerClass = factory.getDefaultProvider().clazz;
169    }
170  }
171
172  private WALProvider createProvider(String group) throws IOException {
173    WALProvider provider = WALFactory.createProvider(providerClass);
174    provider.init(factory, conf,
175      META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group, this.abortable);
176    provider.addWALActionsListener(new MetricsWAL());
177    return provider;
178  }
179
180  @Override
181  public List<WAL> getWALs() {
182    return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList());
183  }
184
185  private WAL getWAL(String group) throws IOException {
186    WALProvider provider = cached.get(group);
187    if (provider == null) {
188      Lock lock = createLock.acquireLock(group);
189      try {
190        provider = cached.get(group);
191        if (provider == null) {
192          provider = createProvider(group);
193          listeners.forEach(provider::addWALActionsListener);
194          cached.put(group, provider);
195        }
196      } finally {
197        lock.unlock();
198      }
199    }
200    return provider.getWAL(null);
201  }
202
203  @Override
204  public WAL getWAL(RegionInfo region) throws IOException {
205    String group;
206    if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
207      group = META_WAL_GROUP_NAME;
208    } else {
209      byte[] id;
210      byte[] namespace;
211      if (region != null) {
212        id = region.getEncodedNameAsBytes();
213        namespace = region.getTable().getNamespace();
214      } else {
215        id = HConstants.EMPTY_BYTE_ARRAY;
216        namespace = null;
217      }
218      group = strategy.group(id, namespace);
219    }
220    return getWAL(group);
221  }
222
223  @Override
224  public void shutdown() throws IOException {
225    // save the last exception and rethrow
226    IOException failure = null;
227    for (WALProvider provider : cached.values()) {
228      try {
229        provider.shutdown();
230      } catch (IOException e) {
231        LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
232        if (LOG.isDebugEnabled()) {
233          LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
234        }
235        failure = e;
236      }
237    }
238    if (failure != null) {
239      throw failure;
240    }
241  }
242
243  @Override
244  public void close() throws IOException {
245    // save the last exception and rethrow
246    IOException failure = null;
247    for (WALProvider provider : cached.values()) {
248      try {
249        provider.close();
250      } catch (IOException e) {
251        LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
252        if (LOG.isDebugEnabled()) {
253          LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
254        }
255        failure = e;
256      }
257    }
258    if (failure != null) {
259      throw failure;
260    }
261  }
262
263  static class IdentityGroupingStrategy implements RegionGroupingStrategy {
264    @Override
265    public void init(Configuration config, String providerId) {
266    }
267
268    @Override
269    public String group(final byte[] identifier, final byte[] namespace) {
270      return Bytes.toString(identifier);
271    }
272  }
273
274  @Override
275  public long getNumLogFiles() {
276    long numLogFiles = 0;
277    for (WALProvider provider : cached.values()) {
278      numLogFiles += provider.getNumLogFiles();
279    }
280    return numLogFiles;
281  }
282
283  @Override
284  public long getLogFileSize() {
285    long logFileSize = 0;
286    for (WALProvider provider : cached.values()) {
287      logFileSize += provider.getLogFileSize();
288    }
289    return logFileSize;
290  }
291
292  @Override
293  public void addWALActionsListener(WALActionsListener listener) {
294    // Notice that there is an assumption that this method must be called before the getWAL above,
295    // so we can make sure there is no sub WALProvider yet, so we only add the listener to our
296    // listeners list without calling addWALActionListener for each WALProvider. Although it is no
297    // hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the
298    // extra code actually works, then we will have other big problems. So leave it as is.
299    listeners.add(listener);
300  }
301}