001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.util.Collection;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
029
030/**
031 * This class is used to track the active procedures when loading procedures from proc wal file.
032 * <p/>
033 * We will read proc wal files from new to old, but when reading a proc wal file, we will still read
034 * from top to bottom, so there are two groups of methods for this class.
035 * <p/>
036 * The first group is {@link #add(ProcedureProtos.Procedure)} and {@link #remove(long)}. It is used
037 * when reading a proc wal file. In these methods, for the same procedure, typically the one comes
038 * later should win, please see the comment for
039 * {@link #isIncreasing(ProcedureProtos.Procedure, ProcedureProtos.Procedure)} to see the
040 * exceptions.
041 * <p/>
042 * The second group is {@link #merge(WALProcedureMap)}. We will have a global
043 * {@link WALProcedureMap} to hold global the active procedures, and a local {@link WALProcedureMap}
044 * to hold the active procedures for the current proc wal file. And when we finish reading a proc
045 * wal file, we will merge the local one into the global one, by calling the
046 * {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this
047 * method, for the same procedure, the one comes earlier will win, as we read the proc wal files
048 * from new to old(the reverse order).
049 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
050 *             use the new region based procedure store.
051 */
052@Deprecated
053@InterfaceAudience.Private
054class WALProcedureMap {
055
056  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class);
057
058  private final Map<Long, ProcedureProtos.Procedure> procMap = new HashMap<>();
059
060  private long minModifiedProcId = Long.MAX_VALUE;
061
062  private long maxModifiedProcId = Long.MIN_VALUE;
063
064  private void trackProcId(long procId) {
065    minModifiedProcId = Math.min(minModifiedProcId, procId);
066    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
067  }
068
069  /**
070   * @return True if this new procedure is 'richer' than the current one else false and we log this
071   *         incidence where it appears that the WAL has older entries appended after newer ones.
072   *         See HBASE-18152.
073   */
074  private static boolean isIncreasing(ProcedureProtos.Procedure current,
075    ProcedureProtos.Procedure candidate) {
076    // Check that the procedures we see are 'increasing'. We used to compare
077    // procedure id first and then update time but it can legitimately go backwards if the
078    // procedure is failed or rolled back so that was unreliable. Was going to compare
079    // state but lets see if comparing update time enough (unfortunately this issue only
080    // seen under load...)
081    boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate();
082    if (!increasing) {
083      LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate);
084    }
085    return increasing;
086  }
087
088  public void add(ProcedureProtos.Procedure proc) {
089    procMap.compute(proc.getProcId(), (procId, existingProc) -> {
090      if (existingProc == null || isIncreasing(existingProc, proc)) {
091        return proc;
092      } else {
093        return existingProc;
094      }
095    });
096    trackProcId(proc.getProcId());
097  }
098
099  public void remove(long procId) {
100    procMap.remove(procId);
101  }
102
103  public boolean isEmpty() {
104    return procMap.isEmpty();
105  }
106
107  public boolean contains(long procId) {
108    return procMap.containsKey(procId);
109  }
110
111  /**
112   * Merge the given {@link WALProcedureMap} into this one. The {@link WALProcedureMap} passed in
113   * will be cleared after merging.
114   */
115  public void merge(WALProcedureMap other) {
116    other.procMap.forEach(procMap::putIfAbsent);
117    maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId);
118    minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId);
119    other.procMap.clear();
120    other.maxModifiedProcId = Long.MIN_VALUE;
121    other.minModifiedProcId = Long.MAX_VALUE;
122  }
123
124  public Collection<ProcedureProtos.Procedure> getProcedures() {
125    return Collections.unmodifiableCollection(procMap.values());
126  }
127
128  public long getMinModifiedProcId() {
129    return minModifiedProcId;
130  }
131
132  public long getMaxModifiedProcId() {
133    return maxModifiedProcId;
134  }
135}