package org.elasticsearch.xpack.ml.process;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;

/* loaded from: input_file:org/elasticsearch/xpack/ml/process/AbstractNativeProcess.class */
public abstract class AbstractNativeProcess implements NativeProcess {
    private static final Logger LOGGER = LogManager.getLogger(AbstractNativeProcess.class);
    private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000);
    private final String jobId;
    private final CppLogMessageHandler cppLogHandler;
    private final OutputStream processInStream;
    private final AtomicBoolean processInStreamClosed = new AtomicBoolean();
    private final InputStream processOutStream;
    private final OutputStream processRestoreStream;
    private final LengthEncodedWriter recordWriter;
    private final ZonedDateTime startTime;
    private final int numberOfFields;
    private final List<Path> filesToDelete;
    private final Consumer<String> onProcessCrash;
    private final Duration processConnectTimeout;
    private volatile Future<?> logTailFuture;
    private volatile Future<?> stateProcessorFuture;
    private volatile boolean processCloseInitiated;
    private volatile boolean processKilled;
    private volatile boolean isReady;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractNativeProcess(String str, InputStream inputStream, OutputStream outputStream, InputStream inputStream2, OutputStream outputStream2, int i, List<Path> list, Consumer<String> consumer, Duration duration) {
        this.jobId = str;
        this.cppLogHandler = new CppLogMessageHandler(str, inputStream);
        this.processInStream = outputStream != null ? new BufferedOutputStream(outputStream) : null;
        this.processOutStream = inputStream2;
        this.processRestoreStream = outputStream2;
        this.recordWriter = new LengthEncodedWriter(this.processInStream);
        this.startTime = ZonedDateTime.now();
        this.numberOfFields = i;
        this.filesToDelete = list;
        this.onProcessCrash = (Consumer) Objects.requireNonNull(consumer);
        this.processConnectTimeout = (Duration) Objects.requireNonNull(duration);
    }

    public abstract String getName();

    public void start(ExecutorService executorService) {
        this.logTailFuture = executorService.submit(() -> {
            try {
                try {
                    CppLogMessageHandler cppLogMessageHandler = this.cppLogHandler;
                    try {
                        cppLogMessageHandler.tailStream();
                        if (cppLogMessageHandler != null) {
                            cppLogMessageHandler.close();
                        }
                        detectCrash();
                    } catch (Throwable th) {
                        if (cppLogMessageHandler != null) {
                            try {
                                cppLogMessageHandler.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (IOException e) {
                    if (!this.processKilled) {
                        LOGGER.error(new ParameterizedMessage("[{}] Error tailing {} process logs", this.jobId, getName()), e);
                    }
                    detectCrash();
                }
            } catch (Throwable th3) {
                detectCrash();
                throw th3;
            }
        });
    }

    private void detectCrash() {
        if (this.processCloseInitiated || this.processKilled || this.processInStream == null) {
            return;
        }
        String format = String.format(Locale.ROOT, "[%s] %s process stopped unexpectedly: %s", this.jobId, getName(), this.cppLogHandler.getErrors());
        LOGGER.error(format);
        this.onProcessCrash.accept(format);
    }

    public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream inputStream) {
        start(executorService);
        this.stateProcessorFuture = executorService.submit(() -> {
            try {
                try {
                    stateProcessor.process(inputStream);
                    if (!this.processKilled) {
                        LOGGER.info("[{}] State output finished", this.jobId);
                    }
                    if (inputStream != null) {
                        inputStream.close();
                    }
                } finally {
                }
            } catch (IOException e) {
                if (this.processKilled) {
                    return;
                }
                LOGGER.error(new ParameterizedMessage("[{}] Error reading {} state output", this.jobId, getName()), e);
            }
        });
    }

    @Override // org.elasticsearch.xpack.ml.process.NativeProcess
    public boolean isReady() {
        return this.isReady;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setReady() {
        this.isReady = true;
    }

    @Override // org.elasticsearch.xpack.ml.process.NativeProcess
    public void writeRecord(String[] strArr) throws IOException {
        this.recordWriter.writeRecord(strArr);
    }

    @Override // org.elasticsearch.xpack.ml.process.NativeProcess
    public void flushStream() throws IOException {
        this.recordWriter.flush();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.processCloseInitiated = true;
            if (this.processInStream != null && this.processInStreamClosed.compareAndSet(false, true)) {
                this.processInStream.close();
            }
            if (this.stateProcessorFuture != null) {
                this.stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES);
            }
            if (this.logTailFuture != null) {
                this.logTailFuture.get(5L, TimeUnit.SECONDS);
            }
            if (this.cppLogHandler.seenFatalError()) {
                throw ExceptionsHelper.serverError(this.cppLogHandler.getErrors());
            }
            LOGGER.debug("[{}] {} process exited", this.jobId, getName());
        } catch (InterruptedException e) {
            LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running {} process", this.jobId, getName()), e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running {} process", this.jobId, getName()), e2);
        } finally {
            deleteAssociatedFiles();
        }
    }

    @Override // org.elasticsearch.xpack.ml.process.NativeProcess
    public void kill() throws IOException {
        LOGGER.debug("[{}] Killing {} process", this.jobId, getName());
        this.processKilled = true;
        try {
            try {
                NativeControllerHolder.getNativeController().killProcess(this.cppLogHandler.getPid(this.processConnectTimeout));
                this.cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
                try {
                    if (this.processInStream != null && this.processInStreamClosed.compareAndSet(false, true)) {
                        this.processInStream.close();
                    }
                } catch (IOException e) {
                }
                try {
                    deleteAssociatedFiles();
                } catch (IOException e2) {
                }
            } catch (Throwable th) {
                try {
                    if (this.processInStream != null && this.processInStreamClosed.compareAndSet(false, true)) {
                        this.processInStream.close();
                    }
                } catch (IOException e3) {
                }
                try {
                    deleteAssociatedFiles();
                } catch (IOException e4) {
                }
                throw th;
            }
        } catch (TimeoutException e5) {
            LOGGER.warn("[{}] Failed to get PID of {} process to kill", this.jobId, getName());
            try {
                if (this.processInStream != null && this.processInStreamClosed.compareAndSet(false, true)) {
                    this.processInStream.close();
                }
            } catch (IOException e6) {
            }
            try {
                deleteAssociatedFiles();
            } catch (IOException e7) {
            }
        }
    }

    private synchronized void deleteAssociatedFiles() throws IOException {
        if (this.filesToDelete == null) {
            return;
        }
        for (Path path : this.filesToDelete) {
            if (Files.deleteIfExists(path)) {
                LOGGER.debug("[{}] Deleted file {}", this.jobId, path.toString());
            } else {
                LOGGER.warn("[{}] Failed to delete file {}", this.jobId, path.toString());
            }
        }
        this.filesToDelete.clear();
    }

    @Override // org.elasticsearch.xpack.ml.process.NativeProcess
    public ZonedDateTime getProcessStartTime() {
        return this.startTime;
    }

    @Override // org.elasticsearch.xpack.ml.process.NativeProcess
    public boolean isProcessAlive() {
        return !this.cppLogHandler.hasLogStreamEnded();
    }

    @Override // org.elasticsearch.xpack.ml.process.NativeProcess
    public boolean isProcessAliveAfterWaiting() {
        this.cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45L));
        return isProcessAlive();
    }

    @Override // org.elasticsearch.xpack.ml.process.NativeProcess
    public String readError() {
        return this.cppLogHandler.getErrors();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String jobId() {
        return this.jobId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InputStream processOutStream() {
        return this.processOutStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public OutputStream processRestoreStream() {
        return this.processRestoreStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int numberOfFields() {
        return this.numberOfFields;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LengthEncodedWriter recordWriter() {
        return this.recordWriter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isProcessKilled() {
        return this.processKilled;
    }

    public void consumeAndCloseOutputStream() {
        try {
            do {
            } while (processOutStream().read(new byte[512]) >= 0);
            processOutStream().close();
        } catch (IOException e) {
        }
    }
}
