package com.github.sarxos.hbrs.hb;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/sarxos/hbrs/hb/Worker.class */
public abstract class Worker<T> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    private final LinkedBlockingQueue<T> items;
    private final String name;
    private final Worker<T>.Runner runner;
    private final boolean stateless;
    private final int capacity;
    private AtomicBoolean running;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/sarxos/hbrs/hb/Worker$Runner.class */
    public final class Runner extends Thread {
        public Runner(Runnable runnable, String str) {
            super(runnable, str);
            setDaemon(true);
        }
    }

    public Worker(String str) {
        this(str, false);
    }

    public Worker(String str, boolean z) {
        this(str, z, false);
    }

    public Worker(String str, boolean z, boolean z2) {
        this(str, 0, z, z2);
    }

    public Worker(String str, int i, boolean z, boolean z2) {
        this.running = new AtomicBoolean(false);
        this.name = str;
        this.runner = new Runner(this, str);
        this.stateless = z2;
        this.capacity = i;
        this.items = new LinkedBlockingQueue<>(i > 0 ? i : Integer.MAX_VALUE);
        if (z) {
            start();
        }
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.runner.start();
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            LOG.info("Message persister has been stopped");
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void process(Collection<T> collection) {
        Iterator<T> it = collection.iterator();
        while (it.hasNext()) {
            process((Worker<T>) it.next());
        }
    }

    public void process(T t) {
        if (!isRunning()) {
            throw new RuntimeException("Worker is not running");
        }
        int size = this.items.size();
        if (this.capacity > 0 && size > this.capacity * 0.95d) {
            int i = (int) (this.capacity * 0.25d);
            LOG.error("Worker {} capacity problem detected!", this.name);
            LOG.error("Worker queue is almost completely excited ({}/{}), draining {} items", new Object[]{Integer.valueOf(size), Integer.valueOf(this.capacity), Integer.valueOf(i)});
            for (int i2 = 0; i2 < i && !this.items.isEmpty(); i2++) {
                try {
                    this.items.take();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        try {
            this.items.put(t);
        } catch (InterruptedException e2) {
            LOG.debug("Interrupted: " + e2.getMessage(), e2);
        }
    }

    public abstract void work(Session session, T t);

    @Override // java.lang.Runnable
    public void run() {
        Session session;
        int batchSize = PersistenceKeeper.getBatchSize();
        int i = 0;
        Transaction transaction = null;
        Session session2 = null;
        while (isRunning()) {
            if (this.items.isEmpty()) {
                if (!this.stateless) {
                    if (transaction != null && transaction.isActive()) {
                        try {
                            transaction.commit();
                        } catch (HibernateException e) {
                            transaction.rollback();
                            LOG.error(e.getMessage(), e);
                        }
                    }
                    if (session2 != null && session2.isOpen()) {
                        try {
                            session2.flush();
                            session2.clear();
                            session2.close();
                        } catch (HibernateException e2) {
                            session2.clear();
                            LOG.error(e2.getMessage(), e2);
                        } finally {
                            session2.close();
                        }
                    }
                    i = 0;
                }
                LOG.debug("All awaiting items has been worked out");
            }
            try {
                T take = this.items.take();
                if (!this.stateless && (session2 == null || !session2.isOpen())) {
                    session2 = createSession();
                    transaction = session2.beginTransaction();
                }
                try {
                    work(session2, take);
                } catch (Exception e3) {
                    LOG.error(e3.getMessage(), e3);
                } catch (HibernateException e4) {
                    LOG.error("Hibernate error", e4);
                    if (!this.stateless) {
                        try {
                            try {
                                transaction.rollback();
                                session2.clear();
                            } catch (Exception e5) {
                                LOG.error("Cannot rollback", e5);
                                session2.clear();
                            }
                        } catch (Throwable th) {
                            session2.clear();
                            throw th;
                        }
                    }
                }
                if (!this.stateless && i > 0) {
                    int i2 = i;
                    i++;
                    if (i2 % batchSize == 0) {
                        try {
                            transaction.commit();
                        } catch (HibernateException e6) {
                            transaction.rollback();
                            LOG.error(e6.getMessage(), e6);
                        }
                        try {
                            session2.flush();
                            session2.clear();
                        } catch (HibernateException e7) {
                            LOG.error(e7.getMessage(), e7);
                        } finally {
                            session2.clear();
                        }
                        transaction = session2.beginTransaction();
                    }
                }
            } catch (InterruptedException e8) {
                LOG.debug("Message persister interrupted: " + e8.getMessage(), e8);
                return;
            }
        }
    }

    protected Session createSession() {
        return PersistenceKeeper.getSessionFactory().openSession();
    }
}
