001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.util.Collection;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
029
030/**
031 * This class is used to track the active procedures when loading procedures from proc wal file.
032 * <p/>
033 * We will read proc wal files from new to old, but when reading a proc wal file, we will still read
034 * from top to bottom, so there are two groups of methods for this class.
035 * <p/>
036 * The first group is {@link #add(ProcedureProtos.Procedure)} and {@link #remove(long)}. It is used
037 * when reading a proc wal file. In these methods, for the same procedure, typically the one comes
038 * later should win, please see the comment for
039 * {@link #isIncreasing(ProcedureProtos.Procedure, ProcedureProtos.Procedure)} to see the
040 * exceptions.
041 * <p/>
042 * The second group is {@link #merge(WALProcedureMap)}. We will have a global
043 * {@link WALProcedureMap} to hold global the active procedures, and a local {@link WALProcedureMap}
044 * to hold the active procedures for the current proc wal file. And when we finish reading a proc
045 * wal file, we will merge the local one into the global one, by calling the
046 * {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this
047 * method, for the same procedure, the one comes earlier will win, as we read the proc wal files
048 * from new to old(the reverse order).
049 */
050@InterfaceAudience.Private
051class WALProcedureMap {
052
053  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class);
054
055  private final Map<Long, ProcedureProtos.Procedure> procMap = new HashMap<>();
056
057  private long minModifiedProcId = Long.MAX_VALUE;
058
059  private long maxModifiedProcId = Long.MIN_VALUE;
060
061  private void trackProcId(long procId) {
062    minModifiedProcId = Math.min(minModifiedProcId, procId);
063    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
064  }
065
066  /**
067   * @return True if this new procedure is 'richer' than the current one else false and we log this
068   *         incidence where it appears that the WAL has older entries appended after newer ones.
069   *         See HBASE-18152.
070   */
071  private static boolean isIncreasing(ProcedureProtos.Procedure current,
072      ProcedureProtos.Procedure candidate) {
073    // Check that the procedures we see are 'increasing'. We used to compare
074    // procedure id first and then update time but it can legitimately go backwards if the
075    // procedure is failed or rolled back so that was unreliable. Was going to compare
076    // state but lets see if comparing update time enough (unfortunately this issue only
077    // seen under load...)
078    boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate();
079    if (!increasing) {
080      LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate);
081    }
082    return increasing;
083  }
084
085  public void add(ProcedureProtos.Procedure proc) {
086    procMap.compute(proc.getProcId(), (procId, existingProc) -> {
087      if (existingProc == null || isIncreasing(existingProc, proc)) {
088        return proc;
089      } else {
090        return existingProc;
091      }
092    });
093    trackProcId(proc.getProcId());
094  }
095
096  public void remove(long procId) {
097    procMap.remove(procId);
098  }
099
100  public boolean isEmpty() {
101    return procMap.isEmpty();
102  }
103
104  public boolean contains(long procId) {
105    return procMap.containsKey(procId);
106  }
107
108  /**
109   * Merge the given {@link WALProcedureMap} into this one. The {@link WALProcedureMap} passed in
110   * will be cleared after merging.
111   */
112  public void merge(WALProcedureMap other) {
113    other.procMap.forEach(procMap::putIfAbsent);
114    maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId);
115    minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId);
116    other.procMap.clear();
117    other.maxModifiedProcId = Long.MIN_VALUE;
118    other.minModifiedProcId = Long.MAX_VALUE;
119  }
120
121  public Collection<ProcedureProtos.Procedure> getProcedures() {
122    return Collections.unmodifiableCollection(procMap.values());
123  }
124
125  public long getMinModifiedProcId() {
126    return minModifiedProcId;
127  }
128
129  public long getMaxModifiedProcId() {
130    return maxModifiedProcId;
131  }
132}