001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.wal;
020
021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
022import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.locks.Lock;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.client.RegionInfo;
034// imports for classes still in regionserver.wal
035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.KeyLocker;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A WAL Provider that returns a WAL per group of regions.
044 *
045 * This provider follows the decorator pattern and mainly holds the logic for WAL grouping.
046 * WAL creation/roll/close is delegated to {@link #DELEGATE_PROVIDER}
047 *
048 * Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via the
049 * property "hbase.wal.regiongrouping.strategy". Current strategy choices are
050 * <ul>
051 *   <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
052 *                                  "bounded".</li>
053 *   <li><em>identity</em> : each region belongs to its own group.</li>
054 *   <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
055 * </ul>
056 * Optionally, a FQCN to a custom implementation may be given.
057 */
058@InterfaceAudience.Private
059public class RegionGroupingProvider implements WALProvider {
060  private static final Logger LOG = LoggerFactory.getLogger(RegionGroupingProvider.class);
061
062  /**
063   * Map identifiers to a group number.
064   */
065  public static interface RegionGroupingStrategy {
066    String GROUP_NAME_DELIMITER = ".";
067
068    /**
069     * Given an identifier and a namespace, pick a group.
070     */
071    String group(final byte[] identifier, byte[] namespace);
072    void init(Configuration config, String providerId);
073  }
074
075  /**
076   * Maps between configuration names for strategies and implementation classes.
077   */
078  static enum Strategies {
079    defaultStrategy(BoundedGroupingStrategy.class),
080    identity(IdentityGroupingStrategy.class),
081    bounded(BoundedGroupingStrategy.class),
082    namespace(NamespaceGroupingStrategy.class);
083
084    final Class<? extends RegionGroupingStrategy> clazz;
085    Strategies(Class<? extends RegionGroupingStrategy> clazz) {
086      this.clazz = clazz;
087    }
088  }
089
090  /**
091   * instantiate a strategy from a config property.
092   * requires conf to have already been set (as well as anything the provider might need to read).
093   */
094  RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
095      final String defaultValue) throws IOException {
096    Class<? extends RegionGroupingStrategy> clazz;
097    try {
098      clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
099    } catch (IllegalArgumentException exception) {
100      // Fall back to them specifying a class name
101      // Note that the passed default class shouldn't actually be used, since the above only fails
102      // when there is a config value present.
103      clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
104    }
105    LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
106    try {
107      final RegionGroupingStrategy result = clazz.getDeclaredConstructor().newInstance();
108      result.init(conf, providerId);
109      return result;
110    } catch (Exception e) {
111      LOG.error("couldn't set up region grouping strategy, check config key " +
112          REGION_GROUPING_STRATEGY);
113      LOG.debug("Exception details for failure to load region grouping strategy.", e);
114      throw new IOException("couldn't set up region grouping strategy", e);
115    }
116  }
117
118  public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
119  public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
120
121  /** delegate provider for WAL creation/roll/close, but not support multiwal */
122  public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
123  public static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider
124      .name();
125
126  private static final String META_WAL_GROUP_NAME = "meta";
127
128  /** A group-provider mapping, make sure one-one rather than many-one mapping */
129  private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
130
131  private final KeyLocker<String> createLock = new KeyLocker<>();
132
133  private RegionGroupingStrategy strategy;
134  private WALFactory factory;
135  private List<WALActionsListener> listeners = new ArrayList<>();
136  private String providerId;
137  private Class<? extends WALProvider> providerClass;
138
139  @Override
140  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
141    if (null != strategy) {
142      throw new IllegalStateException("WALProvider.init should only be called once.");
143    }
144    this.factory = factory;
145
146    if (META_WAL_PROVIDER_ID.equals(providerId)) {
147      // do not change the provider id if it is for meta
148      this.providerId = providerId;
149    } else {
150      StringBuilder sb = new StringBuilder().append(factory.factoryId);
151      if (providerId != null) {
152        if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
153          sb.append(providerId);
154        } else {
155          sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
156        }
157      }
158      this.providerId = sb.toString();
159    }
160    this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
161    this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
162    if (providerClass.equals(this.getClass())) {
163      LOG.warn("delegate provider not support multiwal, falling back to defaultProvider.");
164      providerClass = factory.getDefaultProvider().clazz;
165    }
166  }
167
168  private WALProvider createProvider(String group) throws IOException {
169    if (META_WAL_PROVIDER_ID.equals(providerId)) {
170      return factory.createProvider(providerClass, META_WAL_PROVIDER_ID);
171    } else {
172      return factory.createProvider(providerClass, group);
173    }
174  }
175
176  @Override
177  public List<WAL> getWALs() {
178    return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList());
179  }
180
181  private WAL getWAL(String group) throws IOException {
182    WALProvider provider = cached.get(group);
183    if (provider == null) {
184      Lock lock = createLock.acquireLock(group);
185      try {
186        provider = cached.get(group);
187        if (provider == null) {
188          provider = createProvider(group);
189          listeners.forEach(provider::addWALActionsListener);
190          cached.put(group, provider);
191        }
192      } finally {
193        lock.unlock();
194      }
195    }
196    return provider.getWAL(null);
197  }
198
199  @Override
200  public WAL getWAL(RegionInfo region) throws IOException {
201    String group;
202    if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
203      group = META_WAL_GROUP_NAME;
204    } else {
205      byte[] id;
206      byte[] namespace;
207      if (region != null) {
208        id = region.getEncodedNameAsBytes();
209        namespace = region.getTable().getNamespace();
210      } else {
211        id = HConstants.EMPTY_BYTE_ARRAY;
212        namespace = null;
213      }
214      group = strategy.group(id, namespace);
215    }
216    return getWAL(group);
217  }
218
219  @Override
220  public void shutdown() throws IOException {
221    // save the last exception and rethrow
222    IOException failure = null;
223    for (WALProvider provider: cached.values()) {
224      try {
225        provider.shutdown();
226      } catch (IOException e) {
227        LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
228        if (LOG.isDebugEnabled()) {
229          LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
230        }
231        failure = e;
232      }
233    }
234    if (failure != null) {
235      throw failure;
236    }
237  }
238
239  @Override
240  public void close() throws IOException {
241    // save the last exception and rethrow
242    IOException failure = null;
243    for (WALProvider provider : cached.values()) {
244      try {
245        provider.close();
246      } catch (IOException e) {
247        LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
248        if (LOG.isDebugEnabled()) {
249          LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
250        }
251        failure = e;
252      }
253    }
254    if (failure != null) {
255      throw failure;
256    }
257  }
258
259  static class IdentityGroupingStrategy implements RegionGroupingStrategy {
260    @Override
261    public void init(Configuration config, String providerId) {}
262    @Override
263    public String group(final byte[] identifier, final byte[] namespace) {
264      return Bytes.toString(identifier);
265    }
266  }
267
268  @Override
269  public long getNumLogFiles() {
270    long numLogFiles = 0;
271    for (WALProvider provider : cached.values()) {
272      numLogFiles += provider.getNumLogFiles();
273    }
274    return numLogFiles;
275  }
276
277  @Override
278  public long getLogFileSize() {
279    long logFileSize = 0;
280    for (WALProvider provider : cached.values()) {
281      logFileSize += provider.getLogFileSize();
282    }
283    return logFileSize;
284  }
285
286  @Override
287  public void addWALActionsListener(WALActionsListener listener) {
288    // Notice that there is an assumption that this method must be called before the getWAL above,
289    // so we can make sure there is no sub WALProvider yet, so we only add the listener to our
290    // listeners list without calling addWALActionListener for each WALProvider. Although it is no
291    // hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the
292    // extra code actually works, then we will have other big problems. So leave it as is.
293    listeners.add(listener);
294  }
295}