001package com.github.sarxos.webcam; 002 003import java.util.concurrent.ExecutorService; 004import java.util.concurrent.Executors; 005import java.util.concurrent.RejectedExecutionException; 006import java.util.concurrent.SynchronousQueue; 007import java.util.concurrent.ThreadFactory; 008import java.util.concurrent.TimeUnit; 009import java.util.concurrent.atomic.AtomicBoolean; 010import java.util.concurrent.atomic.AtomicInteger; 011 012import org.slf4j.Logger; 013import org.slf4j.LoggerFactory; 014 015 016public class WebcamProcessor { 017 018 private static final Logger LOG = LoggerFactory.getLogger(WebcamProcessor.class); 019 020 /** 021 * Thread factory for processor. 022 * 023 * @author Bartosz Firyn (SarXos) 024 */ 025 private static final class ProcessorThreadFactory implements ThreadFactory { 026 027 private static final AtomicInteger N = new AtomicInteger(0); 028 029 @Override 030 public Thread newThread(Runnable r) { 031 Thread t = new Thread(r, String.format("atomic-processor-%d", N.incrementAndGet())); 032 t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance()); 033 t.setDaemon(true); 034 return t; 035 } 036 } 037 038 /** 039 * Heart of overall processing system. This class process all native calls 040 * wrapped in tasks, by doing this all tasks executions are 041 * super-synchronized. 042 * 043 * @author Bartosz Firyn (SarXos) 044 */ 045 private static final class AtomicProcessor implements Runnable { 046 047 private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true); 048 private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true); 049 050 /** 051 * Process task. 052 * 053 * @param task the task to be processed 054 * @return Processed task 055 * @throws InterruptedException when thread has been interrupted 056 */ 057 public void process(WebcamTask task) throws InterruptedException { 058 inbound.put(task); 059 060 Throwable t = outbound.take().getThrowable(); 061 if (t != null) { 062 throw new WebcamException("Cannot execute task", t); 063 } 064 } 065 066 @Override 067 public void run() { 068 while (true) { 069 WebcamTask t = null; 070 try { 071 (t = inbound.take()).handle(); 072 } catch (InterruptedException e) { 073 break; 074 } catch (Throwable e) { 075 if (t != null) { 076 t.setThrowable(e); 077 } 078 } finally { 079 if (t != null) { 080 try { 081 outbound.put(t); 082 } catch (InterruptedException e) { 083 break; 084 } catch (Exception e) { 085 throw new RuntimeException("Cannot put task into outbound queue", e); 086 } 087 } 088 } 089 } 090 } 091 } 092 093 /** 094 * Is processor started? 095 */ 096 private static final AtomicBoolean started = new AtomicBoolean(false); 097 098 /** 099 * Execution service. 100 */ 101 private static ExecutorService runner = null; 102 103 /** 104 * Static processor. 105 */ 106 private static final AtomicProcessor processor = new AtomicProcessor(); 107 108 /** 109 * Singleton instance. 110 */ 111 private static final WebcamProcessor INSTANCE = new WebcamProcessor();; 112 113 private WebcamProcessor() { 114 } 115 116 /** 117 * Process single webcam task. 118 * 119 * @param task the task to be processed 120 * @throws InterruptedException when thread has been interrupted 121 */ 122 public void process(WebcamTask task) throws InterruptedException { 123 124 if (started.compareAndSet(false, true)) { 125 runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory()); 126 runner.execute(processor); 127 } 128 129 if (!runner.isShutdown()) { 130 processor.process(task); 131 } else { 132 throw new RejectedExecutionException("Cannot process because processor runner has been already shut down"); 133 } 134 } 135 136 public void shutdown() { 137 if (started.compareAndSet(true, false)) { 138 139 LOG.debug("Shutting down webcam processor"); 140 141 runner.shutdown(); 142 143 LOG.debug("Awaiting tasks termination"); 144 145 while (runner.isTerminated()) { 146 147 try { 148 runner.awaitTermination(100, TimeUnit.MILLISECONDS); 149 } catch (InterruptedException e) { 150 return; 151 } 152 153 runner.shutdownNow(); 154 } 155 156 LOG.debug("All tasks has been terminated"); 157 } 158 159 } 160 161 public static synchronized WebcamProcessor getInstance() { 162 return INSTANCE; 163 } 164}