/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.base.Objects;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.streaming.StreamOutSession;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AntiEntropyService {
    private static final Logger logger = LoggerFactory.getLogger(AntiEntropyService.class);
    public static final AntiEntropyService instance = new AntiEntropyService();
    public static final long REQUEST_TIMEOUT = 172800000L;
    private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests = new ExpiringMap(172800000L);
    private final ConcurrentMap<String, RepairSession.Callback> sessions = new ConcurrentHashMap<String, RepairSession.Callback>();

    protected AntiEntropyService() {
    }

    public RepairSession getRepairSession(Range range, String tablename, String ... cfnames) {
        return new RepairSession(range, tablename, cfnames);
    }

    RepairSession getArtificialRepairSession(TreeRequest req, String tablename, String ... cfnames) {
        return new RepairSession(req, tablename, cfnames);
    }

    void completedRequest(TreeRequest request) {
        ((RepairSession.Callback)this.sessions.get(request.sessionid)).completed(request);
    }

    private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid) {
        Map<TreeRequest, TreePair> ctrees = this.requests.get(sessionid);
        if (ctrees == null) {
            ctrees = new HashMap<TreeRequest, TreePair>();
            this.requests.put(sessionid, ctrees);
        }
        return ctrees;
    }

    static Set<InetAddress> getNeighbors(String table, Range range) {
        StorageService ss = StorageService.instance;
        Map<Range, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
        if (!replicaSets.containsKey(range)) {
            return Collections.emptySet();
        }
        HashSet<InetAddress> neighbors = new HashSet<InetAddress>((Collection)replicaSets.get(range));
        neighbors.remove(FBUtilities.getLocalAddress());
        Iterator iter = neighbors.iterator();
        while (iter.hasNext()) {
            InetAddress endpoint = (InetAddress)iter.next();
            if (Gossiper.instance.getVersion(endpoint) > 1) continue;
            logger.info("Excluding " + endpoint + " from repair because it is on version 0.7 or sooner. You should consider updating this node before running repair again.");
            iter.remove();
        }
        return neighbors;
    }

    private void rendezvous(TreeRequest request, MerkleTree tree) {
        InetAddress LOCAL = FBUtilities.getLocalAddress();
        Map<TreeRequest, TreePair> ctrees = this.rendezvousPairs(request.sessionid);
        ArrayList<Differencer> differencers = new ArrayList<Differencer>();
        if (LOCAL.equals(request.endpoint)) {
            for (InetAddress neighbor : AntiEntropyService.getNeighbors((String)request.cf.left, request.range)) {
                TreeRequest remotereq = new TreeRequest(request.sessionid, neighbor, request.range, request.cf);
                TreePair waiting = ctrees.remove(remotereq);
                if (waiting != null && waiting.right != null) {
                    differencers.add(new Differencer(remotereq, tree, (MerkleTree)waiting.right));
                    continue;
                }
                ctrees.put(remotereq, new TreePair(tree, null));
                logger.debug("Stored local tree for " + request + " to wait for " + remotereq);
            }
        } else {
            TreePair waiting = ctrees.remove(request);
            if (waiting != null && waiting.left != null) {
                differencers.add(new Differencer(request, (MerkleTree)waiting.left, tree));
            } else {
                ctrees.put(request, new TreePair(null, tree));
                logger.debug("Stored remote tree for " + request + " to wait for local tree.");
            }
        }
        for (Differencer differencer : differencers) {
            logger.info("Queueing comparison " + differencer);
            StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
        }
    }

    TreeRequest request(String sessionid, InetAddress remote, Range range, String ksname, String cfname) {
        TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname, cfname));
        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request, Gossiper.instance.getVersion(remote)), remote);
        return request;
    }

    void respond(Validator validator, InetAddress local) {
        MessagingService ms = MessagingService.instance();
        try {
            Message message = TreeResponseVerbHandler.makeVerb(local, validator);
            logger.info("Sending AEService tree for " + validator.request);
            ms.sendOneWay(message, validator.request.endpoint);
        }
        catch (Exception e) {
            logger.error("Could not send valid tree for request " + validator.request, (Throwable)e);
        }
    }

    class RepairSession
    extends Thread {
        private final String tablename;
        private final String[] cfnames;
        private final SimpleCondition requestsMade;
        private final ConcurrentHashMap<TreeRequest, Object> requests;
        private final Range range;

        public RepairSession(TreeRequest req, String tablename, String ... cfnames) {
            super(req.sessionid);
            this.range = req.range;
            this.tablename = tablename;
            this.cfnames = cfnames;
            this.requestsMade = new SimpleCondition();
            this.requests = new ConcurrentHashMap();
            this.requests.put(req, this);
            Callback callback = new Callback();
            instance.sessions.put(this.getName(), callback);
        }

        public RepairSession(Range range, String tablename, String ... cfnames) {
            super("manual-repair-" + UUID.randomUUID());
            this.tablename = tablename;
            this.cfnames = cfnames;
            this.range = range;
            this.requestsMade = new SimpleCondition();
            this.requests = new ConcurrentHashMap();
        }

        public void blockUntilRunning() throws InterruptedException {
            this.requestsMade.await();
        }

        @Override
        public void run() {
            Set<InetAddress> endpoints = AntiEntropyService.getNeighbors(this.tablename, this.range);
            if (endpoints.isEmpty()) {
                this.requestsMade.signalAll();
                logger.info("No neighbors to repair with for " + this.tablename + " on " + this.range + ": " + this.getName() + " completed.");
                return;
            }
            for (InetAddress endpoint : endpoints) {
                if (FailureDetector.instance.isAlive(endpoint)) continue;
                logger.info("Could not proceed on repair because a neighbor (" + endpoint + ") is dead: " + this.getName() + " failed.");
                return;
            }
            Callback callback = new Callback();
            instance.sessions.put(this.getName(), callback);
            try {
                for (String cfname : this.cfnames) {
                    for (InetAddress endpoint : endpoints) {
                        this.requests.put(instance.request(this.getName(), endpoint, this.range, this.tablename, cfname), this);
                    }
                    instance.request(this.getName(), FBUtilities.getLocalAddress(), this.range, this.tablename, cfname);
                }
                logger.info("Waiting for repair requests: " + this.requests.keySet());
                this.requestsMade.signalAll();
                callback.completed.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for repair: repair will continue in the background.");
            }
        }

        class Callback {
            public final SimpleCondition completed = new SimpleCondition();

            Callback() {
            }

            public void completed(TreeRequest request) {
                try {
                    RepairSession.this.blockUntilRunning();
                }
                catch (InterruptedException e) {
                    throw new AssertionError((Object)e);
                }
                RepairSession.this.requests.remove(request);
                logger.info("{} completed successfully: {} outstanding.", (Object)request, (Object)RepairSession.this.requests.size());
                if (!RepairSession.this.requests.isEmpty()) {
                    return;
                }
                logger.info("Repair session " + RepairSession.this.getName() + " completed successfully.");
                instance.sessions.remove(RepairSession.this.getName());
                this.completed.signalAll();
            }
        }
    }

    public static class TreeRequest {
        public final String sessionid;
        public final InetAddress endpoint;
        public final Range range;
        public final CFPair cf;

        public TreeRequest(String sessionid, InetAddress endpoint, Range range, CFPair cf) {
            this.sessionid = sessionid;
            this.endpoint = endpoint;
            this.cf = cf;
            this.range = range;
        }

        public final int hashCode() {
            return Objects.hashCode((Object[])new Object[]{this.sessionid, this.endpoint, this.cf, this.range});
        }

        public final boolean equals(Object o) {
            if (!(o instanceof TreeRequest)) {
                return false;
            }
            TreeRequest that = (TreeRequest)o;
            return Objects.equal((Object)this.sessionid, (Object)that.sessionid) && Objects.equal((Object)this.endpoint, (Object)that.endpoint) && Objects.equal((Object)this.cf, (Object)that.cf) && Objects.equal((Object)this.range, (Object)that.range);
        }

        public String toString() {
            return "#<TreeRequest " + this.sessionid + ", " + this.endpoint + ", " + this.cf + ", " + this.range + ">";
        }
    }

    static class TreePair
    extends Pair<MerkleTree, MerkleTree> {
        public TreePair(MerkleTree local, MerkleTree remote) {
            super(local, remote);
        }
    }

    static class CFPair
    extends Pair<String, String> {
        public CFPair(String table, String cf) {
            super(table, cf);
            assert (table != null && cf != null);
        }
    }

    public static class TreeResponseVerbHandler
    implements IVerbHandler,
    ICompactSerializer<Validator> {
        public static final TreeResponseVerbHandler SERIALIZER = new TreeResponseVerbHandler();

        static Message makeVerb(InetAddress local, Validator validator) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                SERIALIZER.serialize(validator, dos, (int)Gossiper.instance.getVersion(validator.request.endpoint));
                return new Message(local, StorageService.Verb.TREE_RESPONSE, bos.toByteArray(), Gossiper.instance.getVersion(validator.request.endpoint));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void serialize(Validator v, DataOutputStream dos, int version) throws IOException {
            TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos, version);
            MerkleTree.serializer.serialize(v.tree, dos, version);
            dos.flush();
        }

        @Override
        public Validator deserialize(DataInputStream dis, int version) throws IOException {
            TreeRequest request = TreeRequestVerbHandler.SERIALIZER.deserialize(dis, version);
            try {
                return new Validator(request, MerkleTree.serializer.deserialize(dis, version));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void doVerb(Message message, String id) {
            byte[] bytes = message.getMessageBody();
            DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
            try {
                Validator response = this.deserialize(buffer, message.getVersion());
                TreeRequest request = new TreeRequest(response.request.sessionid, message.getFrom(), response.request.range, response.request.cf);
                instance.rendezvous(request, response.tree);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }
    }

    public static class TreeRequestVerbHandler
    implements IVerbHandler,
    ICompactSerializer<TreeRequest> {
        public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();

        static Message makeVerb(TreeRequest request, int version) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                SERIALIZER.serialize(request, dos, version);
                return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void serialize(TreeRequest request, DataOutputStream dos, int version) throws IOException {
            dos.writeUTF(request.sessionid);
            CompactEndpointSerializationHelper.serialize(request.endpoint, dos);
            dos.writeUTF((String)request.cf.left);
            dos.writeUTF((String)request.cf.right);
            if (version > 1) {
                AbstractBounds.serializer().serialize(request.range, dos);
            }
        }

        @Override
        public TreeRequest deserialize(DataInputStream dis, int version) throws IOException {
            String sessId = dis.readUTF();
            InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(dis);
            CFPair cfpair = new CFPair(dis.readUTF(), dis.readUTF());
            Range range = version > 1 ? (Range)AbstractBounds.serializer().deserialize(dis) : new Range((Token)StorageService.getPartitioner().getMinimumToken(), (Token)StorageService.getPartitioner().getMinimumToken());
            return new TreeRequest(sessId, endpoint, range, cfpair);
        }

        @Override
        public void doVerb(Message message, String id) {
            byte[] bytes = message.getMessageBody();
            DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
            try {
                TreeRequest remotereq = this.deserialize(buffer, message.getVersion());
                TreeRequest request = new TreeRequest(remotereq.sessionid, message.getFrom(), remotereq.range, remotereq.cf);
                ColumnFamilyStore store = Table.open((String)request.cf.left).getColumnFamilyStore((String)request.cf.right);
                Validator validator = new Validator(request);
                logger.debug("Queueing validation compaction for " + request);
                CompactionManager.instance.submitValidation(store, validator);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }
    }

    public static class Differencer
    implements Runnable {
        public final TreeRequest request;
        public final MerkleTree ltree;
        public final MerkleTree rtree;
        public List<Range> differences;

        public Differencer(TreeRequest request, MerkleTree ltree, MerkleTree rtree) {
            this.request = request;
            this.ltree = ltree;
            this.rtree = rtree;
            this.differences = new ArrayList<Range>();
        }

        @Override
        public void run() {
            InetAddress local = FBUtilities.getLocalAddress();
            if (this.ltree.partitioner() == null) {
                this.ltree.partitioner(StorageService.getPartitioner());
            }
            if (this.rtree.partitioner() == null) {
                this.rtree.partitioner(StorageService.getPartitioner());
            }
            this.differences.addAll(MerkleTree.difference(this.ltree, this.rtree));
            String format = "Endpoints " + local + " and " + this.request.endpoint + " %s for " + this.request.cf + " on " + this.request.range;
            if (this.differences.isEmpty()) {
                logger.info(String.format(format, "are consistent"));
                instance.completedRequest(this.request);
                return;
            }
            logger.info(String.format(format, "have " + this.differences.size() + " range(s) out of sync"));
            try {
                this.performStreamingRepair();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        void performStreamingRepair() throws IOException {
            logger.info("Performing streaming repair of " + this.differences.size() + " ranges for " + this.request);
            ColumnFamilyStore cfstore = Table.open((String)this.request.cf.left).getColumnFamilyStore((String)this.request.cf.right);
            try {
                Collection<SSTableReader> sstables = cfstore.getSSTables();
                Callback callback = new Callback();
                StreamOutSession outsession = StreamOutSession.create((String)this.request.cf.left, this.request.endpoint, callback);
                StreamOut.transferSSTables(outsession, sstables, this.differences, OperationType.AES);
                StreamIn.requestRanges(this.request.endpoint, (String)this.request.cf.left, this.differences, callback, OperationType.AES);
            }
            catch (Exception e) {
                throw new IOException("Streaming repair failed.", e);
            }
        }

        public String toString() {
            return "#<Differencer " + this.request + ">";
        }

        class Callback
        extends WrappedRunnable {
            private final AtomicInteger outstanding = new AtomicInteger(2);

            Callback() {
            }

            @Override
            protected void runMayThrow() throws Exception {
                if (this.outstanding.decrementAndGet() > 0) {
                    return;
                }
                logger.info("Finished streaming repair for " + Differencer.this.request);
                instance.completedRequest(Differencer.this.request);
            }
        }
    }

    public static class Validator
    implements Runnable {
        public final TreeRequest request;
        public final MerkleTree tree;
        private transient List<MerkleTree.RowHash> minrows;
        private transient long validated;
        private transient MerkleTree.TreeRange range;
        private transient MerkleTree.TreeRangeIterator ranges;
        private transient DecoratedKey lastKey;
        public static final MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);

        Validator(TreeRequest request) {
            this(request, new MerkleTree(DatabaseDescriptor.getPartitioner(), request.range, 126, (int)Math.pow(2.0, 15.0)));
        }

        Validator(TreeRequest request, MerkleTree tree) {
            this.request = request;
            this.tree = tree;
            this.tree.fullRange = this.request.range;
            this.minrows = new ArrayList<MerkleTree.RowHash>();
            this.validated = 0L;
            this.range = null;
            this.ranges = null;
        }

        public void prepare(ColumnFamilyStore cfs) {
            ArrayList<DecoratedKey> keys = new ArrayList<DecoratedKey>();
            for (DecoratedKey sample : cfs.keySamples(this.request.range)) {
                assert (this.request.range.contains((Token)sample.token));
                keys.add(sample);
            }
            if (keys.isEmpty()) {
                this.tree.init();
            } else {
                DecoratedKey dk;
                int numkeys = keys.size();
                Random random = new Random();
                do {
                    dk = (DecoratedKey)keys.get(random.nextInt(numkeys));
                } while (this.tree.split((Token)dk.token));
            }
            logger.debug("Prepared AEService tree of size " + this.tree.size() + " for " + this.request);
            this.ranges = this.tree.invalids();
        }

        public void add(AbstractCompactedRow row) {
            assert (this.request.range.contains((Token)row.key.token)) : row.key.token + " is not contained in " + this.request.range;
            assert (this.lastKey == null || this.lastKey.compareTo(row.key) < 0) : "row " + row.key + " received out of order wrt " + this.lastKey;
            this.lastKey = row.key;
            if (this.range == null) {
                this.range = (MerkleTree.TreeRange)this.ranges.next();
            }
            while (!this.range.contains((Token)row.key.token)) {
                this.range.addHash(EMPTY_ROW);
                this.range = (MerkleTree.TreeRange)this.ranges.next();
            }
            this.range.addHash(this.rowHash(row));
        }

        private MerkleTree.RowHash rowHash(AbstractCompactedRow row) {
            ++this.validated;
            MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");
            row.update(digest);
            return new MerkleTree.RowHash((Token)row.key.token, digest.digest());
        }

        public void complete() {
            assert (this.ranges != null) : "Validator was not prepared()";
            if (this.range != null) {
                this.range.addHash(EMPTY_ROW);
            }
            while (this.ranges.hasNext()) {
                this.range = (MerkleTree.TreeRange)this.ranges.next();
                this.range.addHash(EMPTY_ROW);
            }
            StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
            logger.debug("Validated " + this.validated + " rows into AEService tree for " + this.request);
        }

        @Override
        public void run() {
            instance.respond(this, FBUtilities.getLocalAddress());
        }
    }
}

