001package com.github.sarxos.webcam;
002
003import java.util.concurrent.ExecutorService;
004import java.util.concurrent.Executors;
005import java.util.concurrent.RejectedExecutionException;
006import java.util.concurrent.SynchronousQueue;
007import java.util.concurrent.ThreadFactory;
008import java.util.concurrent.TimeUnit;
009import java.util.concurrent.atomic.AtomicBoolean;
010import java.util.concurrent.atomic.AtomicInteger;
011
012import org.slf4j.Logger;
013import org.slf4j.LoggerFactory;
014
015
016public class WebcamProcessor {
017
018        private static final Logger LOG = LoggerFactory.getLogger(WebcamProcessor.class);
019
020        /**
021         * Thread factory for processor.
022         * 
023         * @author Bartosz Firyn (SarXos)
024         */
025        private static final class ProcessorThreadFactory implements ThreadFactory {
026
027                private static final AtomicInteger N = new AtomicInteger(0);
028
029                @Override
030                public Thread newThread(Runnable r) {
031                        Thread t = new Thread(r, String.format("atomic-processor-%d", N.incrementAndGet()));
032                        t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance());
033                        t.setDaemon(true);
034                        return t;
035                }
036        }
037
038        /**
039         * Heart of overall processing system. This class process all native calls
040         * wrapped in tasks, by doing this all tasks executions are
041         * super-synchronized.
042         * 
043         * @author Bartosz Firyn (SarXos)
044         */
045        private static final class AtomicProcessor implements Runnable {
046
047                private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true);
048                private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true);
049
050                /**
051                 * Process task.
052                 * 
053                 * @param task the task to be processed
054                 * @return Processed task
055                 * @throws InterruptedException when thread has been interrupted
056                 */
057                public void process(WebcamTask task) throws InterruptedException {
058                        inbound.put(task);
059
060                        Throwable t = outbound.take().getThrowable();
061                        if (t != null) {
062                                throw new WebcamException("Cannot execute task", t);
063                        }
064                }
065
066                @Override
067                public void run() {
068                        while (true) {
069                                WebcamTask t = null;
070                                try {
071                                        (t = inbound.take()).handle();
072                                } catch (InterruptedException e) {
073                                        break;
074                                } catch (Throwable e) {
075                                        if (t != null) {
076                                                t.setThrowable(e);
077                                        }
078                                } finally {
079                                        if (t != null) {
080                                                try {
081                                                        outbound.put(t);
082                                                } catch (InterruptedException e) {
083                                                        break;
084                                                } catch (Exception e) {
085                                                        throw new RuntimeException("Cannot put task into outbound queue", e);
086                                                }
087                                        }
088                                }
089                        }
090                }
091        }
092
093        /**
094         * Is processor started?
095         */
096        private static final AtomicBoolean started = new AtomicBoolean(false);
097
098        /**
099         * Execution service.
100         */
101        private static ExecutorService runner = null;
102
103        /**
104         * Static processor.
105         */
106        private static final AtomicProcessor processor = new AtomicProcessor();
107
108        /**
109         * Singleton instance.
110         */
111        private static final WebcamProcessor INSTANCE = new WebcamProcessor();;
112
113        private WebcamProcessor() {
114        }
115
116        /**
117         * Process single webcam task.
118         * 
119         * @param task the task to be processed
120         * @throws InterruptedException when thread has been interrupted
121         */
122        public void process(WebcamTask task) throws InterruptedException {
123
124                if (started.compareAndSet(false, true)) {
125                        runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory());
126                        runner.execute(processor);
127                }
128
129                if (!runner.isShutdown()) {
130                        processor.process(task);
131                } else {
132                        throw new RejectedExecutionException("Cannot process because processor runner has been already shut down");
133                }
134        }
135
136        public void shutdown() {
137                if (started.compareAndSet(true, false)) {
138
139                        LOG.debug("Shutting down webcam processor");
140
141                        runner.shutdown();
142
143                        LOG.debug("Awaiting tasks termination");
144
145                        while (runner.isTerminated()) {
146
147                                try {
148                                        runner.awaitTermination(100, TimeUnit.MILLISECONDS);
149                                } catch (InterruptedException e) {
150                                        return;
151                                }
152
153                                runner.shutdownNow();
154                        }
155
156                        LOG.debug("All tasks has been terminated");
157                }
158
159        }
160
161        public static synchronized WebcamProcessor getInstance() {
162                return INSTANCE;
163        }
164}