Message Queue Event Loop Example (J2SE)

The code demonstrates the convenient and minimalistic implementation of the message queue event loop with restart capability.

/**
 * @author Vladimir Lysyy
 */

package net.bobah.concurrent.demo;

import java.util.concurrent.SynchronousQueue;

public class Runner implements Runnable {
  Thread thread = null;
  SynchronousQueue<Object> queue = new SynchronousQueue<Object>(true);

  private static void log(final String msg) {
    System.out.println(Thread.currentThread().getName() + " - " + msg);
  }

  @Override
  public void run() {
    log("thread function enter");
    try {
      for (; !Thread.interrupted();) {
        // Do something useful
        Object o = queue.take();
        log("processing [" + o + "]");
        // Let other threads do something useful
        Thread.yield();
      }
    }
    catch (InterruptedException e) {
      log("thread function interrupted");
    }
    log("thread function exit");
  }

  public synchronized void start() {
    if (thread == null) {
      log("starting thread");
      thread = new Thread(this);
      thread.start();
      log("thread started");
    }
  }

  public synchronized void stop() throws InterruptedException {
    if (thread != null) {
      if (thread.isAlive()) {
        log("stopping thread");
        thread.interrupt();
        thread.join();
        log("thread stopped");
      }
      thread = null;
    }
  }
 
  public void submit(Object o) throws InterruptedException {
    queue.put(o);
  }

  public static void main(String[] args) {
    try {
    Runner inst = new Runner();

    // starting a thread
    inst.start();
    // sending couple of messages
    inst.submit("message 1".intern());
    inst.submit("message 2".intern());
    Thread.sleep(500);
    // asynchronously stopping it while it's blocking on the queue
    inst.stop();

    // restarting a thread and
    inst.start();
    // sending another couple of messages
    inst.submit("message 3".intern());
    inst.submit("message 4".intern());
    // asynchronously stopping it while it's blocking on the queue
    Thread.sleep(500);
    inst.stop();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}