Networking

Networking code for copy/paste

SSL Client Socket

A minimal SSL client (also a TCP client). Demonstrates TCP and SSL client socket creation, initialization and usage by means of pure POSIX API.
 #include <openssl/ssl.h>
 #include <openssl/bio.h>
 #include <openssl/err.h>

 #include <cstdio>
 #include <cstring>

 #include <unistd.h>
 #include <fcntl.h>
 #include <sys/types.h>
 #include <errno.h>
 #include <assert.h>

 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <arpa/inet.h>
 #include <netdb.h>
 #include <sys/time.h>
 #include <string.h>

 #include <iostream>
 #include <iomanip>


 namespace {

 enum {
   FATAL = 0,
   ERROR,
   WARNING,
   INFO,
   DETAIL,
   DEBUG,
   TRACE
 };

 int _logLevel = DETAIL;

 #define LOG(level,msg) do { if (level <= _logLevel) {std::cout << msg; } } while(0)
 #define LOG_ERROR(msg) do { LOG(ERROR, "-E- " << __func__ << ' ' << msg << std::endl); } while(0)
 #define LOG_DEBUG(msg) do { LOG(DEBUG, "-D- " << __func__ << ' ' << msg << '\n'); } while(0)
 #define LOG_INFO(msg)  do { LOG(INFO,  "-I- " << __func__ << ' ' << msg << '\n'); } while(0)

 #define PERROR_AND_RETURN(rc) do {                                           \
   LOG_ERROR(strerror(errno) << '(' << errno << ')'); \
   return rc;                                                                        \
 } while (0)


 struct addrinfo* get_addrinfo(const char* node, const char* service, bool tcp) {
   struct addrinfo  hints  = addrinfo();
   struct addrinfo* result = NULL;

   memset(&hints, 0, sizeof(struct addrinfo));
   hints.ai_family    = AF_UNSPEC;    /* Allow IPv4 or IPv6 */
   hints.ai_socktype  = tcp ? SOCK_STREAM : SOCK_DGRAM; /* Stream/Datagram socket */
   hints.ai_flags     = 0;
   hints.ai_protocol  = 0;          /* Any protocol */

   int rc = 0;
   if (getaddrinfo(node, service, &hints, &result) != 0) { LOG_ERROR(gai_strerror(rc)); return NULL; }

   return result;
 }

 /**
  * 
  */
 int configure_socket(int fd, const struct addrinfo& ai, unsigned sndbuf, unsigned rcvbuf, bool blocking, bool nolinger, bool nodelay) {
   if (nolinger) {
     /* Set the socket for a non lingering, graceful close.
      * This will cause a final close of this socket not to wait until all
      * of the data sent on it has been received by the remote host.
      * The result is that the socket will be immediately released instead
      * of blocking in a TIME_WAIT state */
     linger linger = {1, 0};
     if (::setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger)) == -1) { LOG_ERROR(strerror(errno)); return -1; }
   }

   if (nodelay) {
     int flag = 1;
     if (::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) == -1) { LOG_ERROR(strerror(errno)); return -1; }
   }

   if (sndbuf > 0 && setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)) == -1) { LOG_ERROR(strerror(errno)); return -1; }
   if (rcvbuf > 0 && setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) == -1) { LOG_ERROR(strerror(errno)); return -1; }

   // put socket to blocking/non-blocking mode
   int flags = -1;
   if ((flags = fcntl(fd, F_GETFL, 0)) == -1) { LOG_ERROR(strerror(errno)); return -1; }
   if (blocking) flags &= ~O_NONBLOCK;
   else          flags |= O_NONBLOCK;
   if (fcntl(fd, F_SETFL, flags) == -1) { LOG_ERROR(strerror(errno)); return -1; }

   return 0;
 }


 /**
  * returns tcp-connected socket
  */
 int get_tcp_connection(const char* node, const char* service) {
   struct addrinfo* ai = get_addrinfo(node, service, true /* tcp */);
   if (ai == NULL) return -1;

   const unsigned sndbuf = 1024*128;
   const unsigned rcvbuf = 1024*128;

   int fd = -1;
   for (struct addrinfo* rp = ai; rp != NULL; rp = rp->ai_next) {
     if ((fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol)) == -1) { LOG_ERROR(strerror(errno)); fd = -1; break; }
     if (configure_socket(fd, *rp, sndbuf, rcvbuf, true /* blocking */, true /* nolinger */, true /* nodelay */) != 0) { ::close(fd); fd = -1; break; }

     if (::connect(fd, rp->ai_addr, rp->ai_addrlen) != -1) break;
     ::close(fd); fd = -1;
   }

   ::freeaddrinfo(ai);
   return fd;
 }

 }

 int main()
 {
     int p;

     const char* request = "GET /\n\n";
     char r[1024];

     /* Set up the library */
     SSL_library_init();
     ERR_load_BIO_strings();
     SSL_load_error_strings();
     OpenSSL_add_all_algorithms();

     /* Set up the SSL context */

     SSL_CTX* ctx = SSL_CTX_new(SSLv3_method());
     if (!ctx) {
       ERR_print_errors_fp(stderr);
       return 1;
     }

     /* Load the trust store */

     if(! SSL_CTX_load_verify_locations(ctx, NULL, "/etc/ssl/certs"))
     {
         fprintf(stderr, "Error loading trust store\n");
         ERR_print_errors_fp(stderr);
         SSL_CTX_free(ctx);
         return 0;
     }

     /* Setup the connection */

     int fd = get_tcp_connection("127.0.0.1", "12345");
 //    int fd = my_connect("127.0.0.1", 80);
     if (fd == -1) {
       LOG_ERROR(strerror(errno));
       return 1;
     }


     SSL* ssl = SSL_new(ctx); assert(ssl);
     if (!SSL_set_fd(ssl, fd)) {
       LOG_ERROR("SSL_set_fd(ssl, fd)");
       return 1;
     }

     SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);

     int rc = SSL_get_error(ssl, SSL_connect(ssl));
     if (rc != SSL_ERROR_NONE) {
       LOG_ERROR(ERR_error_string(rc, NULL));
       while ((rc = ERR_get_error()) != 0) {
         LOG_ERROR(ERR_error_string(rc, NULL));
       }
       return 1;
     }

     /* Check the certificate */

     rc = SSL_get_verify_result(ssl);
     if(rc != X509_V_OK)
     {
         if (rc == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT || rc == X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN) {
           fprintf(stderr, "self signed certificate\n");
         }
         else {
           fprintf(stderr, "Certificate verification error: %ld\n", SSL_get_verify_result(ssl));
           SSL_CTX_free(ctx);
           return 0;
         }
     }

     /* Send the request */
     if (SSL_write(ssl, request, strlen(request)) == -1) {
       PERROR_AND_RETURN(1);
     }
     LOG_INFO("Data sent [" << request << "]");

     /* Read in the response */

     for(;;)
     {
         p = SSL_read(ssl, r, 1023);
         if(p <= 0) break;
         r[p] = 0;
         printf("%s", r);
     }


     rc = SSL_shutdown(ssl);
     if(!rc){
       /* If we called SSL_shutdown() first then
          we always get return value of '0'. In
          this case, try again, but first send a
          TCP FIN to trigger the other side's
          close_notify*/
       shutdown(fd,1);
       rc = SSL_shutdown(ssl);
     }
     switch(rc){
       case 1:
         break; /* Success */
       case 0:
       case -1:
       default:
         LOG_ERROR("Shutdown failed");
     }

     /* Close the connection and free the context */

     SSL_CTX_free(ctx);
 //     if (write(fd, request, strlen(request)) == -1) {
 //       PERROR_AND_RETURN("write(fd, request, strlen(request))", 1);
 //     }
 // 
 //     /* Read in the response */
 // 
 //     for(;;)
 //     {
 //         p = read(fd, r, 1023);
 //         if(p <= 0) break;
 //         r[p] = 0;
 //         printf("%s", r);
 //     }
     return 0;
 }

SSL Server Socket

A minimum TCP/SSL server, demonstrates the server socket creation, initialization, and usage, as well as and server side SSL handshake part.
 #include <openssl/ssl.h>
 #include <openssl/bio.h>
 #include <openssl/err.h>

 #include <errno.h>
 #include <assert.h>

 #include <sys/types.h>
 #include <cstdio>
 #include <cstdlib>
 #include <unistd.h>
 #include <cstring>
 #include <sys/socket.h>
 #include <netdb.h>

 #define BUF_SIZE 500

 #define LISTEN_BACKLOG 50

 #include 
 #include 

 namespace {

 enum {
   FATAL = 0,
   ERROR,
   WARNING,
   INFO,
   DETAIL,
   DEBUG,
   TRACE
 };

 int _logLevel = DETAIL;

 #define LOG(level,msg) do { if (level <= _logLevel) {std::cout << msg; } } while(0)
 #define LOG_ERROR(msg) do { LOG(ERROR, "-E- " << __func__ << ' ' << msg << std::endl); } while(0)
 #define LOG_DEBUG(msg) do { LOG(DEBUG, "-D- " << __func__ << ' ' << msg << '\n'); } while(0)
 #define LOG_INFO(msg)  do { LOG(INFO,  "-I- " << __func__ << ' ' << msg << '\n'); } while(0)

 #define PERROR_AND_RETURN(rc) do {                                           \
   LOG_ERROR(strerror(errno) << '(' << errno << ')'); \
   return rc;                                                                        \
 } while (0)
 }

 int
 main(int argc, char *argv[])
 {
    struct addrinfo hints;
    struct addrinfo *result, *rp;
    int sfd, s;
    struct sockaddr_storage peer_addr;
    socklen_t peer_addr_len;
    ssize_t nread;
    char buf[BUF_SIZE];

    if (argc != 2) {
        fprintf(stderr, "Usage: %s port\n", argv[0]);
        exit(EXIT_FAILURE);
    }

     /* Set up the library */
     SSL_library_init();
     ERR_load_BIO_strings();
     SSL_load_error_strings();
     OpenSSL_add_all_algorithms();

     /* Set up the SSL context */

     SSL_CTX* ctx = SSL_CTX_new(SSLv3_method());
     if (!ctx) {
       ERR_print_errors_fp(stderr);
       return 1;
     }

     /* Load the trust store */

     if(! SSL_CTX_load_verify_locations(ctx, NULL, "/etc/ssl/certs"))
     {
         fprintf(stderr, "Error loading trust store\n");
         ERR_print_errors_fp(stderr);
         SSL_CTX_free(ctx);
         return 0;
     }

     if (!SSL_CTX_use_PrivateKey_file(ctx, "cert/server.key", SSL_FILETYPE_PEM)) {
       int rc;
       while ((rc = ERR_get_error()) != 0) {
         LOG_ERROR(ERR_error_string(rc, NULL));
       }
       return 1; // TODO: add SSL_ERROR_WANT_CONNECT loop around
     }
     LOG_INFO("server key ready");

     if (!SSL_CTX_use_certificate_file(ctx, "cert/server.crt", SSL_FILETYPE_PEM)) {
       int rc;
       while ((rc = ERR_get_error()) != 0) {
         LOG_ERROR(ERR_error_string(rc, NULL));
       }
       return 1; // TODO: add SSL_ERROR_WANT_CONNECT loop around
     }
     LOG_INFO("server cert ready");


    memset(&hints, 0, sizeof(struct addrinfo));
    hints.ai_family = AF_UNSPEC;    /* Allow IPv4 or IPv6 */
    hints.ai_socktype = SOCK_STREAM; /* Datagram socket */
    hints.ai_flags = AI_PASSIVE;    /* For wildcard IP address */
    hints.ai_protocol = 0;          /* Any protocol */
    hints.ai_canonname = NULL;
    hints.ai_addr = NULL;
    hints.ai_next = NULL;

    s = getaddrinfo(NULL, argv[1], &hints, &result);
    if (s != 0) {
        fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
        exit(EXIT_FAILURE);
    }

    /* getaddrinfo() returns a list of address structures.
       Try each address until we successfully bind(2).
       If socket(2) (or bind(2)) fails, we (close the socket
       and) try the next address. */

    for (rp = result; rp != NULL; rp = rp->ai_next) {
        sfd = socket(rp->ai_family, rp->ai_socktype,
                rp->ai_protocol);
        if (sfd == -1)
            continue;

        if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0)
            break;                  /* Success */

        close(sfd);
    }

    if (rp == NULL) {               /* No address succeeded */
        fprintf(stderr, "Could not bind\n");
        exit(EXIT_FAILURE);
    }

    freeaddrinfo(result);           /* No longer needed */

    if (listen(sfd, LISTEN_BACKLOG) == -1) {
      fprintf(stderr, "%s\n", strerror(errno));
      exit(EXIT_FAILURE);
    }

    peer_addr_len = sizeof(struct sockaddr_storage);
    int cfd = accept(sfd, (struct sockaddr *) &peer_addr, &peer_addr_len);
    if (cfd == -1) {
      fprintf(stderr, "%s\n", strerror(errno));
      exit(EXIT_FAILURE);
    }

    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    s = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len, hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV);
    if (s == 0) printf("Connection from %s:%s\n", hbuf, sbuf);
    else        fprintf(stderr, "getnameinfo: %s\n", gai_strerror(s));

    // **
    // * TCP socket ready
    // **

     SSL* ssl = SSL_new(ctx); assert(ssl);
     if (!SSL_set_fd(ssl, cfd)) {
       LOG_ERROR("SSL_set_fd(ssl, fd)");
       return 1;
     }

     SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);

     int rc = SSL_get_error(ssl, SSL_accept(ssl)); // TODO: is it thread-safe?
     if (rc != SSL_ERROR_NONE) {
       LOG_ERROR(ERR_error_string(rc, NULL));
       while ((rc = ERR_get_error()) != 0) {
         LOG_ERROR(ERR_error_string(rc, NULL));
       }
       return 1; // TODO: add SSL_ERROR_WANT_CONNECT loop around
     }

     rc = SSL_read(ssl, buf, BUF_SIZE);
     if (rc > 0)  {
       printf("Received %ld bytes from %s:%s\n", (long) rc, hbuf, sbuf);
     }

     const char* msg = "HELLO CLIENT, I AM SERVER\n";
     rc = SSL_write(ssl, msg, strlen(msg));
     if (rc > 0)  {
       printf("Received %ld bytes from %s:%s\n", (long) rc, hbuf, sbuf);
     }

     rc = SSL_shutdown(ssl);
     if(!rc){
       /* If we called SSL_shutdown() first then
          we always get return value of '0'. In
          this case, try again, but first send a
          TCP FIN to trigger the other side's
          close_notify*/
       shutdown(cfd,1);
       rc = SSL_shutdown(ssl);
     }
     switch(rc){
       case 1:
         break; /* Success */
       case 0:
       case -1:
       default:
         LOG_ERROR("Shutdown failed");
     }

     /* Close the connection and free the context */

     SSL_CTX_free(ctx);
   return 0;
 }

TCP Client (Java NIO)

A minimalistic Java NIO TCP client. Stays always connected. Disconnects and reconnects on any exception in the event handlers (onConnect, onDisconnect, onRead).
package net.bobah.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.PostConstruct;

import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

/**
 * A simple NIO TCP client
 * Assumptions:
 * - the client should always be connected,
 *   once it gets disconnected it reconnects
 * - the exception thrown by onRead means protocol error
 *   so client disconnects and reconnects
 * - the incoming flow is higher than outgoing, so
 *   direct channel write method is not implemented
 * 
 * @author Vladimir Lysyy (mail@bobah.net)
 *
 */
public abstract class TcpClient implements Runnable {
  protected static final Logger LOG = Logger.getLogger(TcpClient.class);
  private static final long INITIAL_RECONNECT_INTERVAL = 500; // 500 ms.
  private static final long MAXIMUM_RECONNECT_INTERVAL = 30000; // 30 sec.
  private static final int READ_BUFFER_SIZE = 0x100000;
  private static final int WRITE_BUFFER_SIZE = 0x100000;

  private long reconnectInterval = INITIAL_RECONNECT_INTERVAL;

  private ByteBuffer readBuf = ByteBuffer.allocateDirect(READ_BUFFER_SIZE); // 1Mb
  private ByteBuffer writeBuf = ByteBuffer.allocateDirect(WRITE_BUFFER_SIZE); // 1Mb

  private final Thread thread = new Thread(this);
  private SocketAddress address;

  private Selector selector;
  private SocketChannel channel;

  private final AtomicBoolean connected = new AtomicBoolean(false);

  private AtomicLong bytesOut = new AtomicLong(0L);
  private AtomicLong bytesIn = new AtomicLong(0L);

  public TcpClient() {
    
  }

  @PostConstruct
  public void init() {
    assert address != null: "server address missing";
  }

  public void start() throws IOException {
    LOG.info("starting event loop");
    thread.start();
  }

  public void join() throws InterruptedException {
    if (Thread.currentThread().getId() != thread.getId()) thread.join();
  }

  public void stop() throws IOException, InterruptedException {
    LOG.info("stopping event loop");
    thread.interrupt();
    selector.wakeup();
  }

  public boolean isConnected() {
    return connected.get();
  }

  /**
   * @param buffer data to send, the buffer should be flipped (ready for read)
   * @throws InterruptedException
   * @throws IOException
   */
  public void send(ByteBuffer buffer) throws InterruptedException, IOException {
    if (!connected.get()) throw new IOException("not connected");
    synchronized (writeBuf) {
      // try direct write of what's in the buffer to free up space
      if (writeBuf.remaining() < buffer.remaining()) {
        writeBuf.flip();
        int bytesOp = 0, bytesTotal = 0;
        while (writeBuf.hasRemaining() && (bytesOp = channel.write(writeBuf)) > 0) bytesTotal += bytesOp;
        writeBuf.compact();
      }

      // if didn't help, wait till some space appears
      if (Thread.currentThread().getId() != thread.getId()) {
        while (writeBuf.remaining() < buffer.remaining()) writeBuf.wait();
      }
      else {
        if (writeBuf.remaining() < buffer.remaining()) throw new IOException("send buffer full"); // TODO: add reallocation or buffers chain
      }
      writeBuf.put(buffer);

      // try direct write to decrease the latency
      writeBuf.flip();
      int bytesOp = 0, bytesTotal = 0;
      while (writeBuf.hasRemaining() && (bytesOp = channel.write(writeBuf)) > 0) bytesTotal += bytesOp;
      writeBuf.compact();

      if (writeBuf.hasRemaining()) {
        SelectionKey key = channel.keyFor(selector);
        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
        selector.wakeup();
      }
    }
  }

  /**
   * Override with something meaningful
   * @param buf
   */
  protected abstract void onRead(ByteBuffer buf) throws Exception;
  
  /**
   * Override with something meaningful
   * @param buf
   */
  protected abstract void onConnected() throws Exception;

  /**
   * Override with something meaningful
   * @param buf
   */
  protected abstract void onDisconnected();

  private void configureChannel(SocketChannel channel) throws IOException {
    channel.configureBlocking(false);
    channel.socket().setSendBufferSize(0x100000); // 1Mb
    channel.socket().setReceiveBufferSize(0x100000); // 1Mb
    channel.socket().setKeepAlive(true);
    channel.socket().setReuseAddress(true);
    channel.socket().setSoLinger(false, 0);
    channel.socket().setSoTimeout(0);
    channel.socket().setTcpNoDelay(true);
  }

  @Override
  public void run() {
    LOG.info("event loop running");
    try {
      while(! Thread.interrupted()) { // reconnection loop
        try {
          selector = Selector.open();
          channel = SocketChannel.open();
          configureChannel(channel);

          channel.connect(address);
          channel.register(selector, SelectionKey.OP_CONNECT);

          while(!thread.isInterrupted() && channel.isOpen()) { // events multiplexing loop
            if (selector.select() > 0) processSelectedKeys(selector.selectedKeys());
          }
        } catch (Exception e) {
          LOG.error("exception", e);
        } finally {
          connected.set(false);
          onDisconnected();
          writeBuf.clear();
          readBuf.clear();
          if (channel != null) channel.close();
          if (selector != null) selector.close();
          LOG.info("connection closed");
        }

        try {
          Thread.sleep(reconnectInterval);
          if (reconnectInterval < MAXIMUM_RECONNECT_INTERVAL) reconnectInterval *= 2;
          LOG.info("reconnecting to " + address);
        } catch (InterruptedException e) {
          break;
        }
      }
    } catch (Exception e) {
      LOG.error("unrecoverable error", e);
    }
   
    LOG.info("event loop terminated");
  }

  private void processSelectedKeys(Set keys) throws Exception {
    Iterator itr = keys.iterator();
    while (itr.hasNext()) {
      SelectionKey key = itr.next();
      if (key.isReadable()) processRead(key);
      if (key.isWritable()) processWrite(key);
      if (key.isConnectable()) processConnect(key);
      if (key.isAcceptable()) ;
      itr.remove();
    }
  }

  private void processConnect(SelectionKey key) throws Exception {
    SocketChannel ch = (SocketChannel) key.channel();
    if (ch.finishConnect()) {
      LOG.info("connected to " + address);
      key.interestOps(key.interestOps() ^ SelectionKey.OP_CONNECT);
      key.interestOps(key.interestOps() | SelectionKey.OP_READ);
      reconnectInterval = INITIAL_RECONNECT_INTERVAL;
      connected.set(true);
      onConnected();
    }
  }

  private void processRead(SelectionKey key) throws Exception {
    ReadableByteChannel ch = (ReadableByteChannel)key.channel();

    int bytesOp = 0, bytesTotal = 0;
    while (readBuf.hasRemaining() && (bytesOp = ch.read(readBuf)) > 0) bytesTotal += bytesOp;

    if (bytesTotal > 0) {
      readBuf.flip();
      onRead(readBuf);
      readBuf.compact();
    }
    else if (bytesOp == -1) {
      LOG.info("peer closed read channel");
      ch.close();
    }

    bytesIn.addAndGet(bytesTotal);
  }

  private void processWrite(SelectionKey key) throws IOException {
    WritableByteChannel ch = (WritableByteChannel)key.channel();
    synchronized (writeBuf) {
      writeBuf.flip();

      int bytesOp = 0, bytesTotal = 0;
      while (writeBuf.hasRemaining() && (bytesOp = ch.write(writeBuf)) > 0) bytesTotal += bytesOp;

      bytesOut.addAndGet(bytesTotal);

      if (writeBuf.remaining() == 0) {
        key.interestOps(key.interestOps() ^ SelectionKey.OP_WRITE);
      }

      if (bytesTotal > 0) writeBuf.notify();
      else if (bytesOp == -1) {
        LOG.info("peer closed write channel");
        ch.close();
      }

      writeBuf.compact();
    }
  }

  public SocketAddress getAddress() {
    return address;
  }

  public void setAddress(SocketAddress address) {
    this.address = address;
  }

  public long getBytesOut() {
    return bytesOut.get();
  }

  public long getBytesIn() {
    return bytesIn.get();
  }


  /**
   * can be used for testing
   */
  public static void main(String[] args) throws Exception {
    BasicConfigurator.configure(new ConsoleAppender(new PatternLayout("%d{yyyyMMdd-HH:mm:ss} %-10t %-5p %-20C{1} - %m%n")));
    Logger.getRootLogger().setLevel(Level.INFO);
    final TcpClient client = new TcpClient() {
      @Override protected void onRead(ByteBuffer buf) throws Exception { buf.position(buf.limit()); }
      @Override protected void onDisconnected() { }
      @Override protected void onConnected() throws Exception { }
    };

    client.setAddress(new InetSocketAddress("127.0.0.1", 20001));
    try {
      client.start();
    } catch (IOException e) {
      e.printStackTrace();
    }

    Timer timer = new Timer();
    timer.schedule(new TimerTask() {
      @Override
      public void run() {
        LOG.info("out bytes: " + client.bytesOut.get());
        LOG.info("in bytes:  " + client.bytesIn.get());
      }
    }, 5000, 5000);

    while(!client.isConnected()) Thread.sleep(500);

    LOG.info("starting server flood");
    ByteBuffer buf = ByteBuffer.allocate(65535);
    Random rnd = new Random();
    while (true) {
      short len = (short) rnd.nextInt(Short.MAX_VALUE - 2);
      byte[] bytes = new byte[len];
      rnd.nextBytes(bytes);
      buf.putShort((short)len);
      buf.put(bytes);
      buf.flip();
      try {
        client.send(buf);
      } catch (Exception e) {
        LOG.error("exception: " + e.getMessage());
        while (!client.isConnected()) Thread.sleep(1000);
      }
      buf.clear();
      Thread.sleep(10);
    }
  }
}