package org.elasticsearch.xpack.ml.inference.loadingservice;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;

/* loaded from: input_file:org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.class */
public class ModelLoadingService implements ClusterStateListener {
    public static final Setting<ByteSizeValue> INFERENCE_MODEL_CACHE_SIZE;
    public static final Setting<TimeValue> INFERENCE_MODEL_CACHE_TTL;
    private static final Logger logger;
    private final Cache<String, LocalModel> localModelCache;
    private final TrainedModelProvider provider;
    private final ThreadPool threadPool;
    private final InferenceAuditor auditor;
    private final ByteSizeValue maxCacheSize;
    private final NamedXContentRegistry namedXContentRegistry;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Set<String> referencedModels = new HashSet();
    private final Map<String, Queue<ActionListener<Model>>> loadingListeners = new HashMap();
    private final Set<String> shouldNotAudit = new HashSet();

    public ModelLoadingService(TrainedModelProvider trainedModelProvider, InferenceAuditor inferenceAuditor, ThreadPool threadPool, ClusterService clusterService, NamedXContentRegistry namedXContentRegistry, Settings settings) {
        this.provider = trainedModelProvider;
        this.threadPool = threadPool;
        this.maxCacheSize = (ByteSizeValue) INFERENCE_MODEL_CACHE_SIZE.get(settings);
        this.auditor = inferenceAuditor;
        this.namedXContentRegistry = namedXContentRegistry;
        this.localModelCache = CacheBuilder.builder().setMaximumWeight(this.maxCacheSize.getBytes()).weigher((str, localModel) -> {
            return localModel.ramBytesUsed();
        }).removalListener(this::cacheEvictionListener).setExpireAfterAccess((TimeValue) INFERENCE_MODEL_CACHE_TTL.get(settings)).build();
        clusterService.addListener(this);
    }

    public void getModel(String str, ActionListener<Model> actionListener) {
        LocalModel localModel = (LocalModel) this.localModelCache.get(str);
        if (localModel != null) {
            actionListener.onResponse(localModel);
            logger.trace("[{}] loaded from cache", str);
        } else {
            if (loadModelIfNecessary(str, actionListener)) {
                logger.trace("[{}] is loading or loaded, added new listener to queue", str);
                return;
            }
            logger.trace("[{}] not actively loading, eager loading without cache", str);
            TrainedModelProvider trainedModelProvider = this.provider;
            CheckedConsumer checkedConsumer = trainedModelConfig -> {
                actionListener.onResponse(new LocalModel(trainedModelConfig.getModelId(), trainedModelConfig.ensureParsedDefinition(this.namedXContentRegistry).getModelDefinition(), trainedModelConfig.getInput()));
            };
            Objects.requireNonNull(actionListener);
            trainedModelProvider.getTrainedModel(str, true, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
        }
    }

    private boolean loadModelIfNecessary(String str, ActionListener<Model> actionListener) {
        synchronized (this.loadingListeners) {
            Model model = (Model) this.localModelCache.get(str);
            if (model != null) {
                actionListener.onResponse(model);
                return true;
            }
            if (!this.referencedModels.contains(str)) {
                return this.loadingListeners.computeIfPresent(str, (str2, queue) -> {
                    return addFluently(queue, actionListener);
                }) != null;
            }
            if (this.loadingListeners.computeIfPresent(str, (str3, queue2) -> {
                return addFluently(queue2, actionListener);
            }) == null) {
                logger.trace("[{}] attempting to load and cache", str);
                this.loadingListeners.put(str, addFluently(new ArrayDeque(), actionListener));
                loadModel(str);
            }
            return true;
        }
    }

    private void loadModel(String str) {
        this.provider.getTrainedModel(str, true, ActionListener.wrap(trainedModelConfig -> {
            logger.debug("[{}] successfully loaded model", str);
            handleLoadSuccess(str, trainedModelConfig);
        }, exc -> {
            logger.warn(new ParameterizedMessage("[{}] failed to load model", str), exc);
            handleLoadFailure(str, exc);
        }));
    }

    private void handleLoadSuccess(String str, TrainedModelConfig trainedModelConfig) throws IOException {
        LocalModel localModel = new LocalModel(trainedModelConfig.getModelId(), trainedModelConfig.ensureParsedDefinition(this.namedXContentRegistry).getModelDefinition(), trainedModelConfig.getInput());
        synchronized (this.loadingListeners) {
            Queue<ActionListener<Model>> remove = this.loadingListeners.remove(str);
            if (remove == null) {
                return;
            }
            this.localModelCache.put(str, localModel);
            this.shouldNotAudit.remove(str);
            ActionListener<Model> poll = remove.poll();
            while (true) {
                ActionListener<Model> actionListener = poll;
                if (actionListener == null) {
                    return;
                }
                actionListener.onResponse(localModel);
                poll = remove.poll();
            }
        }
    }

    private void handleLoadFailure(String str, Exception exc) {
        synchronized (this.loadingListeners) {
            Queue<ActionListener<Model>> remove = this.loadingListeners.remove(str);
            if (remove == null) {
                return;
            }
            ActionListener<Model> poll = remove.poll();
            while (true) {
                ActionListener<Model> actionListener = poll;
                if (actionListener == null) {
                    return;
                }
                actionListener.onFailure(exc);
                poll = remove.poll();
            }
        }
    }

    private void cacheEvictionListener(RemovalNotification<String, LocalModel> removalNotification) {
        if (removalNotification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED) {
            auditIfNecessary((String) removalNotification.getKey(), new ParameterizedMessage("model cache entry evicted.current cache [{}] current max [{}] model size [{}]. If this is undesired, consider updating setting [{}] or [{}].", new Object[]{new ByteSizeValue(this.localModelCache.weight()).getStringRep(), this.maxCacheSize.getStringRep(), new ByteSizeValue(((LocalModel) removalNotification.getValue()).ramBytesUsed()).getStringRep(), INFERENCE_MODEL_CACHE_SIZE.getKey(), INFERENCE_MODEL_CACHE_TTL.getKey()}).getFormattedMessage());
        }
    }

    public void clusterChanged(ClusterChangedEvent clusterChangedEvent) {
        HashSet hashSet;
        HashSet hashSet2;
        Set difference;
        if (clusterChangedEvent.changedCustomMetaDataSet().contains("ingest") && clusterChangedEvent.state().nodes().getLocalNode().isIngestNode()) {
            Set<String> referencedModelKeys = getReferencedModelKeys(clusterChangedEvent.state().metaData().custom("ingest"));
            if (referencedModelKeys.equals(this.referencedModels)) {
                return;
            }
            ArrayList<Tuple> arrayList = new ArrayList();
            synchronized (this.loadingListeners) {
                hashSet = new HashSet(this.referencedModels);
                hashSet2 = logger.isTraceEnabled() ? new HashSet(this.loadingListeners.keySet()) : null;
                for (String str : this.loadingListeners.keySet()) {
                    if (!referencedModelKeys.contains(str)) {
                        arrayList.add(Tuple.tuple(str, new ArrayList(this.loadingListeners.remove(str))));
                    }
                }
                difference = Sets.difference(hashSet, referencedModelKeys);
                Cache<String, LocalModel> cache = this.localModelCache;
                Objects.requireNonNull(cache);
                difference.forEach((v1) -> {
                    r1.invalidate(v1);
                });
                this.referencedModels.removeAll(difference);
                this.shouldNotAudit.removeAll(difference);
                referencedModelKeys.removeAll(this.referencedModels);
                this.referencedModels.addAll(referencedModelKeys);
                Iterator<String> it = referencedModelKeys.iterator();
                while (it.hasNext()) {
                    this.loadingListeners.put(it.next(), new ArrayDeque());
                }
            }
            if (logger.isTraceEnabled()) {
                if (!this.loadingListeners.keySet().equals(hashSet2)) {
                    logger.trace("cluster state event changed loading models: before {} after {}", hashSet2, this.loadingListeners.keySet());
                }
                if (!this.referencedModels.equals(hashSet)) {
                    logger.trace("cluster state event changed referenced models: before {} after {}", hashSet, this.referencedModels);
                }
            }
            for (Tuple tuple : arrayList) {
                String format = new ParameterizedMessage("Cancelling load of model [{}] as it is no longer referenced by a pipeline", tuple.v1()).getFormat();
                Iterator it2 = ((List) tuple.v2()).iterator();
                while (it2.hasNext()) {
                    ((ActionListener) it2.next()).onFailure(new ElasticsearchException(format, new Object[0]));
                }
            }
            difference.forEach(this::auditUnreferencedModel);
            loadModels(referencedModelKeys);
        }
    }

    private void auditIfNecessary(String str, String str2) {
        if (this.shouldNotAudit.contains(str)) {
            logger.trace("[{}] {}", str, str2);
            return;
        }
        this.auditor.warning(str, str2);
        this.shouldNotAudit.add(str);
        logger.warn("[{}] {}", str, str2);
    }

    private void loadModels(Set<String> set) {
        if (set.isEmpty()) {
            return;
        }
        this.threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                auditNewReferencedModel(str);
                loadModel(str);
            }
        });
    }

    private void auditNewReferencedModel(String str) {
        this.auditor.info(str, "referenced by ingest processors. Attempting to load model into cache");
    }

    private void auditUnreferencedModel(String str) {
        this.auditor.info(str, "no longer referenced by any processors");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Queue<T> addFluently(Queue<T> queue, T t) {
        queue.add(t);
        return queue;
    }

    private static Set<String> getReferencedModelKeys(IngestMetadata ingestMetadata) {
        HashSet hashSet = new HashSet();
        if (ingestMetadata == null) {
            return hashSet;
        }
        ingestMetadata.getPipelines().forEach((str, pipelineConfiguration) -> {
            Object obj;
            Object obj2 = pipelineConfiguration.getConfigAsMap().get("processors");
            if (obj2 instanceof List) {
                for (Object obj3 : (List) obj2) {
                    if (obj3 instanceof Map) {
                        Object obj4 = ((Map) obj3).get(InferenceProcessor.TYPE);
                        if ((obj4 instanceof Map) && (obj = ((Map) obj4).get(InferenceProcessor.MODEL_ID)) != null) {
                            if (!$assertionsDisabled && !(obj instanceof String)) {
                                throw new AssertionError();
                            }
                            hashSet.add(obj.toString());
                        }
                    }
                }
            }
        });
        return hashSet;
    }

    static {
        $assertionsDisabled = !ModelLoadingService.class.desiredAssertionStatus();
        INFERENCE_MODEL_CACHE_SIZE = Setting.memorySizeSetting("xpack.ml.inference_model.cache_size", "40%", new Setting.Property[]{Setting.Property.NodeScope});
        INFERENCE_MODEL_CACHE_TTL = Setting.timeSetting("xpack.ml.inference_model.time_to_live", new TimeValue(5L, TimeUnit.MINUTES), new TimeValue(1L, TimeUnit.MILLISECONDS), new Setting.Property[]{Setting.Property.NodeScope});
        logger = LogManager.getLogger(ModelLoadingService.class);
    }
}
