package ai.vital.prime.service.queue;

import ai.vital.vitalservice.QueueConsumer;
import ai.vital.vitalservice.VitalStatus;
import ai.vital.vitalservice.query.ResultList;
import ai.vital.vitalsigns.java.VitalJavaSerializationUtils;
import ai.vital.vitalsigns.model.GraphObject;
import ai.vital.vitalsigns.model.VITAL_GraphContainerObject;
import ai.vital.vitalsigns.model.VitalApp;
import ai.vital.vitalsigns.model.VitalOrganization;
import ai.vital.vitalsigns.model.properties.Property_hasAppID;
import ai.vital.vitalsigns.model.properties.Property_hasOrganizationID;
import ai.vital.vitalsigns.model.property.StringProperty;
import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/vital/prime/service/queue/KafkaQueueInterface.class */
public class KafkaQueueInterface implements QueueInterface {
    private static final Logger log = LoggerFactory.getLogger(KafkaQueueInterface.class);
    private KafkaProducer<String, byte[]> producer;
    private Properties producerProperties = new Properties();
    private Properties consumerProperties = new Properties();
    private Map<String, List<ConsumerWrapper>> consumers = new HashMap();

    /* loaded from: input_file:ai/vital/prime/service/queue/KafkaQueueInterface$ConsumerWrapper.class */
    static class ConsumerWrapper {
        KafkaConsumer<String, byte[]> kConsumer;
        QueueConsumer vConsumer;
        boolean running = false;
        boolean threadStopped = false;
        Thread thread = null;
        private String topicName;
        private String fullTopicName;
        private Properties properties;

        public ConsumerWrapper(String str, String str2, Properties properties, QueueConsumer queueConsumer) {
            this.properties = properties;
            this.topicName = str;
            this.fullTopicName = str2;
            this.vConsumer = queueConsumer;
        }

        public boolean close(long j) {
            this.running = false;
            int i = 0;
            long j2 = j >= 50 ? j / 50 : 0L;
            while (this.thread.isAlive() && i < j2) {
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                i++;
            }
            if (this.thread.isAlive()) {
                KafkaQueueInterface.log.error("Couldn't stop the consumer thread within {}ms", Long.valueOf(j));
                return false;
            }
            KafkaQueueInterface.log.info("Consumer thread stopped successfully after {}ms", Long.valueOf(i * 50));
            return true;
        }

        public void start() {
            if (this.running) {
                throw new RuntimeException("Consumer already started");
            }
            this.running = true;
            this.thread = new Thread() { // from class: ai.vital.prime.service.queue.KafkaQueueInterface.ConsumerWrapper.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    List asList = Arrays.asList(ConsumerWrapper.this.fullTopicName);
                    ConsumerWrapper.this.kConsumer = new KafkaConsumer<>(ConsumerWrapper.this.properties, new StringDeserializer(), new ByteArrayDeserializer());
                    ConsumerWrapper.this.kConsumer.subscribe(asList);
                    while (ConsumerWrapper.this.running) {
                        Iterator it = ConsumerWrapper.this.kConsumer.poll(100L).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            byte[] bArr = (byte[]) consumerRecord.value();
                            KafkaQueueInterface.log.info("kafka record offset = {}, key = {}, value = {}", Long.valueOf(consumerRecord.offset()), consumerRecord.key(), Integer.valueOf(bArr.length));
                            List list = (List) VitalJavaSerializationUtils.deserialize(bArr);
                            ResultList resultList = new ResultList();
                            Iterator it2 = list.iterator();
                            while (it2.hasNext()) {
                                resultList.addResult((GraphObject) it2.next());
                            }
                            ConsumerWrapper.this.vConsumer.messageReceived(ConsumerWrapper.this.topicName, resultList);
                        }
                    }
                    ConsumerWrapper.this.kConsumer.close();
                }
            };
            this.thread.setDaemon(true);
            this.thread.start();
        }
    }

    public KafkaQueueInterface(VITAL_GraphContainerObject vITAL_GraphContainerObject) throws Exception {
        StringProperty stringProperty = (StringProperty) vITAL_GraphContainerObject.getProperty("producer");
        StringProperty stringProperty2 = (StringProperty) vITAL_GraphContainerObject.getProperty("consumer");
        if (stringProperty == null) {
            throw new Exception("No producer config property");
        }
        if (stringProperty2 == null) {
            throw new Exception("No consumer config property");
        }
        this.producerProperties.load(new StringReader(stringProperty.asString()));
        this.consumerProperties.load(new StringReader(stringProperty2.asString()));
        log.info("Starting kafka producer...");
        this.producer = new KafkaProducer<>(this.producerProperties, new StringSerializer(), new ByteArraySerializer());
        log.info("Shared producer started successfully");
    }

    private String getFullQueueName(VitalOrganization vitalOrganization, VitalApp vitalApp, String str) {
        return vitalOrganization.get(Property_hasOrganizationID.class).toString() + "__" + vitalApp.get(Property_hasAppID.class).toString() + "__" + str;
    }

    @Override // ai.vital.prime.service.queue.QueueInterface
    public VitalStatus queueSend(VitalOrganization vitalOrganization, VitalApp vitalApp, String str, String str2, List<GraphObject> list) throws Exception {
        List<GraphObject> list2;
        if (list instanceof Serializable) {
            list2 = list;
        } else {
            list2 = new ArrayList();
            list2.addAll(list);
        }
        this.producer.send(new ProducerRecord(getFullQueueName(vitalOrganization, vitalApp, str), str2, SerializationUtils.serialize((Serializable) list2)));
        return VitalStatus.withOKMessage("Message sent");
    }

    @Override // ai.vital.prime.service.queue.QueueInterface
    public synchronized VitalStatus queueConsumer(VitalOrganization vitalOrganization, VitalApp vitalApp, String str, QueueConsumer queueConsumer) throws Exception {
        String fullQueueName = getFullQueueName(vitalOrganization, vitalApp, str);
        if (this.consumers.get(fullQueueName) != null) {
            throw new Exception("A consumer already registered to topic: " + str + " (" + fullQueueName + DefaultExpressionEngine.DEFAULT_INDEX_END);
        }
        Properties properties = new Properties();
        properties.putAll(this.consumerProperties);
        log.info("Getting partitions for topic: " + fullQueueName);
        int size = this.producer.partitionsFor(fullQueueName).size();
        log.info("Topic: {} partitions count: {}", fullQueueName, Integer.valueOf(size));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < size; i++) {
            ConsumerWrapper consumerWrapper = new ConsumerWrapper(str, fullQueueName, properties, queueConsumer);
            consumerWrapper.start();
            arrayList.add(consumerWrapper);
        }
        this.consumers.put(fullQueueName, arrayList);
        VitalStatus withOKMessage = VitalStatus.withOKMessage("Consumer registered");
        withOKMessage.setSuccesses(Integer.valueOf(size));
        return withOKMessage;
    }

    @Override // ai.vital.prime.service.queue.QueueInterface
    public void close() {
        for (Map.Entry<String, List<ConsumerWrapper>> entry : this.consumers.entrySet()) {
            Iterator<ConsumerWrapper> it = entry.getValue().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close(0L);
                } catch (Exception e) {
                    log.error("Error when closing consumer, queue: " + entry.getKey() + ": " + e.getLocalizedMessage());
                }
            }
        }
        if (this.producer != null) {
            try {
                this.producer.close();
                log.info("Producer shut down successfully");
            } catch (Exception e2) {
                log.error("Error when shutting down producer: " + e2.getLocalizedMessage(), (Throwable) e2);
            }
            this.producer = null;
        }
    }

    @Override // ai.vital.prime.service.queue.QueueInterface
    public synchronized VitalStatus queueRemoveConsumer(VitalOrganization vitalOrganization, VitalApp vitalApp, String str) throws Exception {
        try {
            List<ConsumerWrapper> remove = this.consumers.remove(getFullQueueName(vitalOrganization, vitalApp, str));
            if (remove == null) {
                throw new Exception("no consumer registered for queue: " + str);
            }
            Iterator<ConsumerWrapper> it = remove.iterator();
            while (it.hasNext()) {
                it.next().close(500L);
            }
            return VitalStatus.withOKMessage("Consumer removed from queue: " + str);
        } catch (Exception e) {
            return VitalStatus.withError(e.getLocalizedMessage());
        }
    }
}
