Added UDT barchart natives + source into networking library. Easier to keep track of

This commit is contained in:
nathan 2015-06-28 13:00:54 +02:00
parent bdb633b06c
commit 9db87baf6a
48 changed files with 8784 additions and 0 deletions

View File

@ -0,0 +1,110 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class AppClient {
static boolean finished = false;
* @param args
* @throws IOException
public static void main(final String[] args) {
String host;
int port = 9000;
final int size = 10000;
final byte[] data = new byte[size];
Future<Boolean> monResult = null;
if (args.length != 2) {
System.out.println("usage: appclient server_host server_port");
host = args[0];
port = Integer.parseInt(args[1]);
try {
final NetSocketUDT socket = new NetSocketUDT();
if (System.getProperty("").contains("win"))
socket.socketUDT().setOption(OptionUDT.UDT_MSS, 1052);
socket.connect(new InetSocketAddress(host, port));
final OutputStream os = socket.getOutputStream();
// Start the monitor background task
monResult = Executors.newSingleThreadExecutor().submit(
new Callable<Boolean>() {
public Boolean call() {
return monitor(socket.socketUDT());
for (int i = 0; i < 1000000; i++) {
finished = true;
if (monResult != null)
} catch (final IOException ioe) {
} catch (final InterruptedException e) {
} catch (final ExecutionException e) {
public static boolean monitor(final SocketUDT socket) {
try {
while (!finished) {
System.out.printf("%.2f\t\t" + "%.2f\t" + "%d\t" + "%.2f\t\t\t"
+ "%d\t" + "%d\n", socket.monitor().mbpsSendRate,
return true;
} catch (final Exception e) {
return false;

View File

@ -0,0 +1,86 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AppServer {
* @param args
* @throws IOException
static Logger log = LoggerFactory.getLogger(AppServer.class);
public static void main(final String[] args) throws IOException {
int port = 9000;
if (args.length > 1) {
System.out.println("usage: appserver [server_port]");
if (args.length == 1) {
port = Integer.parseInt(args[0]);
final NetServerSocketUDT acceptorSocket = new NetServerSocketUDT();
acceptorSocket.bind(new InetSocketAddress("", port), 256);
System.out.printf("server is ready at port: %d\n", port);
while (true) {
final Socket clientSocket = acceptorSocket.accept();
// Start the read ahead background task
Executors.newSingleThreadExecutor().submit(new Callable<Boolean>() {
public Boolean call() {
return clientTask(clientSocket);
public static boolean clientTask(final Socket clientSocket) {
final byte[] data = new byte[10000];
try {
final InputStream is = clientSocket.getInputStream();
while (true) {
int remain = data.length;
while (remain > 0) {
final int ret =, data.length - remain, remain);
remain -= ret;
} catch (final IOException ioe) {
return false;

View File

@ -0,0 +1,76 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A wrapper around the base UDT congestion control class
* @see <a href="">reference</a>
* @see <a href="">tutorial</a>
* @see FactoryUDT
* @see FactoryInterfaceUDT
* @author CCob
public class CCC {
/** Force SocketUDT to load JNI if it hasn't already */
private static boolean initOk = SocketUDT.INIT_OK;
/** Used internally by the JNI layer, points to JNICCC class */
private long nativeHandle;
private int msINT;
private int pktINT;
private int usRTO;
private final Logger log = LoggerFactory.getLogger(CCC.class);
private native void initNative();
protected native void setACKTimer(final int msINT);
protected native void setACKInterval(final int pktINT);
protected native void setRTO(final int usRTO);
protected native void setPacketSndPeriod(final double sndPeriod);
protected native void setCWndSize(final double cWndSize);
protected native MonitorUDT getPerfInfo();
public CCC() {
public void init() {"CCC::init");
public void close() {"CCC::close");
public void onACK(final int ack) {
public void onLoss(final int[] lossList) {
public void onTimeout() {
// TODO: implement Java wrapper around CPacket
// public void onPktSent(const CPacket* pkt) {}
// public void onPktReceived(const CPacket* pkt) {}
// public void processCustomMsg(const CPacket& pkt) {}/
// void sendCustomMsg(CPacket& pkt) const;

View File

@ -0,0 +1,245 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* UDT Epoll Manager
* @see <a href="">Epoll</a>
* @see <a href="">UDT Epoll</a>
public class EpollUDT {
* poll interest option mask
* <p>
* see udt.h enum - EPOLLOpt
* <pre>
* UDT_EPOLL_IN = 0x1,
* UDT_EPOLL_OUT = 0x4,
* </pre>
* this is subset adapted to jdk select pattern
public static enum Opt {
* not interested
NONE(0x0), //
* UDT_EPOLL_IN : interested in read
READ(0x1), //
* UDT_EPOLL_OUT: interested in write
WRITE(0x4), //
* UDT_EPOLL_ERR: interested in exceptions
ERROR(0x8), //
BOTH(WRITE.code | READ.code), //
ERROR_READ(ERROR.code | READ.code), //
ERROR_WRITE(ERROR.code | WRITE.code), //
ALL(ERROR.code | WRITE.code | READ.code), //
private static final Opt[] ENUM_VALS = Opt.values();
public static Opt from(final int code) {
for (final Opt known : ENUM_VALS) {
if (known.code == code) {
return known;
return UNKNOWN;
* poll event mask;
* <p>
* used for both requesting interest and reporting readiness
public final int code;
Opt(final int code) {
this.code = code;
public boolean hasError() {
return (code & ERROR.code) != 0;
public boolean hasRead() {
return (code & READ.code) != 0;
public boolean hasWrite() {
return (code & WRITE.code) != 0;
* Non-empty mask of 3 parts.
public boolean isValidInterestRequest() {
switch (this) {
case NONE:
case READ:
case WRITE:
case ERROR:
case BOTH:
case ALL:
return true;
return false;
protected static final Logger log = LoggerFactory.getLogger(EpollUDT.class);
protected final int id;
protected volatile boolean isActive;
* place holder socket to work around logic in epoll.h CEPoll::wait() which
* expects at least one socket being monitored with non empty interest
private final SocketUDT socketUDT;
* allocate poll
public EpollUDT() throws ExceptionUDT {
id = SocketUDT.epollCreate0();
isActive = true;
socketUDT = new SocketUDT(TypeUDT.DATAGRAM);
SocketUDT.epollAdd0(id,, Opt.BOTH.code);
log.debug("ep {} create", id());
* deallocate poll; called on {@link #finalize()}
public void destroy() throws ExceptionUDT {
isActive = false;
log.debug("ep {} delete", id());
* poll descriptor id
public int id() {
return id;
* poll becomes active after instance creation and inactive after
* {@link #destroy()}
public boolean isActive() {
return isActive;
* deallocate poll
* <p>
* NOTE: catch all exceptions; else prevents GC
* <p>
* NOTE: do not leak "this" references; else prevents GC
protected void finalize() {
try {
} catch (final Throwable e) {
log.error("failed to destroy id=" + id(), e);
* register socket into event processing poll
public void add(final SocketUDT socket, final Opt option)
throws ExceptionUDT {
log.debug("ep {} add {} {}", id(), socket, option);
// assert option.isValidInterestRequest();
SocketUDT.epollAdd0(id(),, option.code);
* unregister socket from event processing poll
public void remove(final SocketUDT socket) throws ExceptionUDT {
log.debug("ep {} rem {}", id(), socket);
* update existing poll/socket registration with changed interest
public void update(final SocketUDT socket, final Opt option)
throws ExceptionUDT {
log.debug("ep {} mod {} {}", id(), socket, option);
assert option.isValidInterestRequest();
SocketUDT.epollUpdate0(id(),, option.code);
/** report current poll/socket readiness */
public Opt verify(final SocketUDT socket) throws ExceptionUDT {
final int code = SocketUDT.epollVerify0(id(),;
return Opt.from(code);

View File

@ -0,0 +1,139 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
* keep code values in sync with
* @see <a href="">UDT Error Codes
* List</a>
public enum ErrorUDT {
SUCCESS(0, "success operation"), //
ECONNSETUP(1000, "connection setup failure"), //
NOSERVER(1001, "server does not exist"), //
ECONNREJ(1002, "connection request was rejected by server"), //
ESOCKFAIL(1003, "could not create/configure UDP socket"), //
ESECFAIL(1004, "connection request was aborted due to security reasons"), //
ECONNFAIL(2000, "connection failure"), //
ECONNLOST(2001, "connection was broken"), //
ENOCONN(2002, "connection does not exist"), //
ERESOURCE(3000, "system resource failure"), //
ETHREAD(3001, "could not create new thread"), //
ENOBUF(3002, "no memory space"), //
EFILE(4000, "file access error"), //
EINVRDOFF(4001, "invalid read offset"), //
ERDPERM(4002, "no read permission"), //
EINVWROFF(4003, "invalid write offset"), //
EWRPERM(4004, "no write permission"), //
EINVOP(5000, "operation not supported"), //
EBOUNDSOCK(5001, "cannot execute the operation on a bound socket"), //
ECONNSOCK(5002, "cannot execute the operation on a connected socket"), //
EINVPARAM(5003, "bad parameters"), //
EINVSOCK(5004, "invalid UDT socket"), //
EUNBOUNDSOCK(5005, "cannot listen on unbound socket"), //
ENOLISTEN(5006, "(accept) socket is not in listening state"), //
"rendezvous connection process does not allow listen and accept call"), //
"rendezvous connection setup is enabled but bind has not been called before connect"), //
ESTREAMILL(5009, "operation not supported in SOCK_STREAM mode"), //
EDGRAMILL(5010, "operation not supported in SOCK_DGRAM mode"), //
EDUPLISTEN(5011, "another socket is already listening on the same UDP port"), //
ELARGEMSG(5012, "message is too large to be hold in the sending buffer"), //
EINVPOLLID(5013, "epoll ID is invalid"), //
EASYNCFAIL(6000, "non-blocking call failure"), //
EASYNCSND(6001, "no buffer available for sending"), //
EASYNCRCV(6002, "no data available for read"), //
ETIMEOUT(6003, "timeout before operation completes"), //
EPEERERR(7000, "error has happened at the peer side"), //
// non UDT values:
WRAPPER_UNKNOWN(-1, "unknown error code"), //
WRAPPER_UNIMPLEMENTED(-2, "this feature is not yet implemented"), //
WRAPPER_MESSAGE(-3, "wrapper generated error"), //
USER_DEFINED_MESSAGE(-4, "user defined message"), //
private final int code;
public int getCode() {
return code;
private final String description;
public String getDescription() {
return description;
private ErrorUDT(final int code, final String description) {
this.code = code;
this.description = description;
static final ErrorUDT[] ENUM_VALS = values();
public static ErrorUDT errorFrom(final int code) {
for (final ErrorUDT known : ENUM_VALS) {
if (known.code == code) {
return known;
public static String descriptionFrom(final int socketID,
final int errorCode, final String errorComment) {
final ErrorUDT error = ErrorUDT.errorFrom(errorCode);
return String.format("UDT Error : %d : %s : %s [id: 0x%08x]", //
errorCode, error.description, errorComment, socketID);

View File

@ -0,0 +1,81 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import com.barchart.udt.anno.Native;
* The Class ExceptionUDT. Wraps all native UDT exceptions and more.
public class ExceptionUDT extends SocketException {
* The error udt. Keeps error description for this exception. Use this enum
* in switch/case to fine tune exception processing.
private final ErrorUDT errorUDT;
public ErrorUDT getError() {
return errorUDT;
* The socket id. Keeps socketID of the socket that produced this exception.
* Can possibly contain '0' when particular method can not determine
* {@link #socketID} that produced the exception.
private final int socketID;
public int getSocketID() {
return socketID;
* Instantiates a new exception udt for native UDT::Exception. This
* exception is generated in the underlying UDT method.
* @param socketID
* the socket id
* @param errorCode
* the error code
* @param comment
* the comment
protected ExceptionUDT(final int socketID, final int errorCode,
final String comment) {
super(ErrorUDT.descriptionFrom(socketID, errorCode, comment));
errorUDT = ErrorUDT.errorFrom(errorCode);
this.socketID = socketID;
* Instantiates a new exception udt for synthetic JNI wrapper exception.
* This exception is generated in the JNI glue code itself.
* @param socketID
* the socket id
* @param error
* the error
* @param comment
* the comment
protected ExceptionUDT(final int socketID, final ErrorUDT error,
final String comment) {
super(ErrorUDT.descriptionFrom(socketID, error.getCode(), comment));
errorUDT = error;
this.socketID = socketID;

View File

@ -0,0 +1,19 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
* @author CCob
public interface FactoryInterfaceUDT {
CCC create();
FactoryInterfaceUDT cloneFactory();

View File

@ -0,0 +1,88 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Wrapper around the UDT CCCFactory class
* @see <a href="">reference</a>
* @see <a href="">tutorial</a>
* @see CCC
* @author CCob
public class FactoryUDT<C> implements FactoryInterfaceUDT {
C classType;
final Class<C> clazz;
Logger log = LoggerFactory.getLogger(FactoryUDT.class);
boolean doInit = false;
boolean doClose = false;
boolean doOnACK = false;
boolean doOnLoss = false;
boolean doOnTimeout = false;
public FactoryUDT(final Class<C> clazz) {
this.clazz = clazz;
if (!CCC.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException(
"Generic argument 'C' must be 'CCC' class or extension");
try {
if (clazz.getMethod("init").getDeclaringClass() != CCC.class)
doInit = true;
if (clazz.getMethod("close").getDeclaringClass() != CCC.class)
doClose = true;
if (clazz.getMethod("onACK", int.class).getDeclaringClass() != CCC.class)
doOnACK = true;
if (clazz.getMethod("onLoss", int[].class).getDeclaringClass() != CCC.class)
doOnLoss = true;
if (clazz.getMethod("onTimeout").getDeclaringClass() != CCC.class)
doOnTimeout = true;
} catch (final SecurityException e) {
log.error("Error setting up class factory", e);
} catch (final NoSuchMethodException e) {
log.error("Expected CCC method doesn't exsit", e);
public CCC create() {
try {
final Object cccObj = clazz.newInstance();
return (CCC) cccObj;
} catch (final InstantiationException e) {
log.error("Failed to instansiate CCC class", e);
} catch (final IllegalAccessException e) {
log.error("Failed to instansiate CCC class", e);
return null;
public FactoryInterfaceUDT cloneFactory() {
return new FactoryUDT<C>(clazz);

View File

@ -0,0 +1,101 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
public class LingerUDT extends Number implements Comparable<LingerUDT> {
public static final LingerUDT LINGER_ZERO = new LingerUDT(0);
// measured in seconds
final int timeout;
* Default constructor. NOTE: linger value is "u_short" on windows and "int"
* on linux:<br>
* Windows: <a
* href="">
* linger Structure on Windows</a><br>
* Linux: <a href=
* ""
* >GCC Socket-Level Options</a><br>
* Therefore select smallest range: 0 <= linger <= 65535 <br>
* @param lingerSeconds
* the seconds to linger; "0" means "do not linger"
* @throws IllegalArgumentException
* when lingerSeconds is out of range
public LingerUDT(int lingerSeconds) throws IllegalArgumentException {
if (65535 < lingerSeconds) {
throw new IllegalArgumentException(
"lingerSeconds is out of range: 0 <= linger <= 65535");
this.timeout = lingerSeconds > 0 ? lingerSeconds : 0;
private static final long serialVersionUID = 3414455799823407217L;
public double doubleValue() {
return timeout;
* (non-Javadoc)
* @see java.lang.Number#floatValue()
public float floatValue() {
return timeout;
public int intValue() {
return timeout;
public long longValue() {
return timeout;
boolean isLingerOn() {
return timeout > 0;
int timeout() {
return timeout;
public boolean equals(Object otherLinger) {
if (otherLinger instanceof LingerUDT) {
LingerUDT other = (LingerUDT) otherLinger;
return other.timeout == this.timeout;
return false;
public int hashCode() {
return timeout;
public int compareTo(LingerUDT other) {
return other.timeout - this.timeout;
public String toString() {
return String.valueOf(timeout);

View File

@ -0,0 +1,379 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import java.lang.reflect.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* note: do not change field names; used by JNI
public class MonitorUDT {
private static final Logger log = LoggerFactory.getLogger(MonitorUDT.class);
protected final SocketUDT socketUDT;
protected MonitorUDT(final SocketUDT socketUDT) {
this.socketUDT = socketUDT;
// ### global measurements
/** time since the UDT entity is started, in milliseconds. */
protected volatile long msTimeStamp;
public long millisSinceStart() {
return msTimeStamp;
* total number of sent data packets, including retransmissions
protected volatile long pktSentTotal;
public long globalSentTotal() {
return pktSentTotal;
* total number of received packets
protected volatile long pktRecvTotal;
public long globalReceivedTotal() {
return pktRecvTotal;
* total number of lost packets (sender side)
protected volatile int pktSndLossTotal;
public int globalSenderLost() {
return pktSndLossTotal;
* total number of lost packets (receiver side)
protected volatile int pktRcvLossTotal;
public int globalReceiverLost() {
return pktRcvLossTotal;
* total number of retransmitted packets
protected volatile int pktRetransTotal;
public int globalRetransmittedTotal() {
return pktRetransTotal;
* total number of sent ACK packets
protected volatile int pktSentACKTotal;
public int globalSentAckTotal() {
return pktSentACKTotal;
* total number of received ACK packets
protected volatile int pktRecvACKTotal;
public int globalReceivedAckTotal() {
return pktRecvACKTotal;
* total number of sent NAK packets
protected volatile int pktSentNAKTotal;
public int globalSentNakTotal() {
return pktSentNAKTotal;
* total number of received NAK packets
protected volatile int pktRecvNAKTotal;
public int globalReceivedNakTotal() {
return pktRecvNAKTotal;
* total time duration when UDT is sending data (idle time exclusive)
protected volatile long usSndDurationTotal;
public long globalMicrosSendDurationTotal() {
return usSndDurationTotal;
// ### local measurements
* number of sent data packets, including retransmissions
protected volatile long pktSent;
public long localPacketsSent() {
return pktSent;
* number of received packets
protected volatile long pktRecv;
public long localPacketsReceived() {
return pktRecv;
* number of lost packets (sender side)
protected volatile int pktSndLoss;
public int localSenderLost() {
return pktSndLoss;
* number of lost packets (receiverer side)
protected volatile int pktRcvLoss;
public int localReceiverLost() {
return pktRcvLoss;
* number of retransmitted packets
protected volatile int pktRetrans;
public int localRetransmitted() {
return pktRetrans;
* number of sent ACK packets
protected volatile int pktSentACK;
public int localSentAck() {
return pktSentACK;
* number of received ACK packets
protected volatile int pktRecvACK;
public int localReceivedAck() {
return pktRecvACK;
* number of sent NAK packets
protected volatile int pktSentNAK;
public int localSentNak() {
return pktSentNAK;
* number of received NAK packets
protected volatile int pktRecvNAK;
public int localReceivedNak() {
return pktRecvNAK;
* sending rate in Mb/s
protected volatile double mbpsSendRate;
public double mbpsSendRate() {
return mbpsSendRate;
* receiving rate in Mb/s
protected volatile double mbpsRecvRate;
public double mbpsReceiveRate() {
return mbpsRecvRate;
* busy sending time (i.e., idle time exclusive)
protected volatile long usSndDuration;
public long microsSendTime() {
return usSndDuration;
// ### instant measurements
* packet sending period, in microseconds
protected volatile double usPktSndPeriod;
public double currentSendPeriod() {
return usPktSndPeriod;
* flow window size, in number of packets
protected volatile int pktFlowWindow;
public int currentFlowWindow() {
return pktFlowWindow;
* congestion window size, in number of packets
protected volatile int pktCongestionWindow;
public int currentCongestionWindow() {
return pktCongestionWindow;
* number of packets on flight
protected volatile int pktFlightSize;
public int currentFlightSize() {
return pktFlightSize;
* RTT, in milliseconds
protected volatile double msRTT;
public double currentMillisRTT() {
return msRTT;
* estimated bandwidth, in Mb/s
protected volatile double mbpsBandwidth;
public double currentMbpsBandwidth() {
return mbpsBandwidth;
* available UDT sender buffer size
protected volatile int byteAvailSndBuf;
public int currentAvailableInSender() {
return byteAvailSndBuf;
* available UDT receiver buffer size
protected volatile int byteAvailRcvBuf;
public int currentAvailableInReceiver() {
return byteAvailRcvBuf;
* current monitor status snapshot for all parameters
public void appendSnapshot(final StringBuilder text) {
text.append(String.format("[id: 0x%08x]",;
final Field fieldArray[] = MonitorUDT.class.getDeclaredFields();
for (final Field field : fieldArray) {
if (!isNumeric(field)) {
try {
final String fieldName = field.getName();
final String fieldValue = field.get(this).toString();
text.append(" = ");
} catch (final Exception e) {
log.error("unexpected", e);
final double localSendLoss = 100.0 * pktSndLoss / pktSent;
text.append("\n\t% localSendLoss = ");
final double localReceiveLoss = 100.0 * pktRcvLoss / pktRecv;
text.append("\n\t% localReceiveLoss = ");
protected boolean isNumeric(final Field field) {
final Class<?> fieledType = field.getType();
return fieledType == int.class || fieledType == long.class
|| fieledType == double.class;
public String toString() {
final StringBuilder text = new StringBuilder(1024);
return text.toString();

View File

@ -0,0 +1,334 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import static com.barchart.udt.OptionUDT.Format.*;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.barchart.udt.util.HelpUDT;
* The Enum OptionUDT.
* <p>
* provide 2 names: 1) UDT original and 2) human-readble
* <p>
* keep code values in sync with udt.h - UDT::UDTOpt; enum starts with index 0
* @see <a href="">udt options</a>
* <pre>
* UDT_MSS, // the Maximum Transfer Unit
* UDT_SNDSYN, // if sending is blocking
* UDT_RCVSYN, // if receiving is blocking
* UDT_CC, // custom congestion control algorithm
* UDT_FC, // Flight flag size (window size)
* UDT_SNDBUF, // maximum buffer in sending queue
* UDT_RCVBUF, // UDT receiving buffer size
* UDT_LINGER, // waiting for unsent data when closing
* UDP_SNDBUF, // UDP sending buffer size
* UDP_RCVBUF, // UDP receiving buffer size
* UDT_MAXMSG, // maximum datagram message size
* UDT_MSGTTL, // time-to-live of a datagram message
* UDT_RENDEZVOUS, // rendezvous connection mode
* UDT_SNDTIMEO, // send() timeout
* UDT_RCVTIMEO, // recv() timeout
* UDT_REUSEADDR, // reuse an existing port or create a new one
* UDT_MAXBW, // maximum bandwidth (bytes per second) that the connection can use
* UDT_STATE, // current socket state, see UDTSTATUS, read only
* UDT_EVENT, // current avalable events associated with the socket
* UDT_SNDDATA, // size of data in the sending buffer
* UDT_RCVDATA // size of data available for recv
* </pre>
public class OptionUDT<T> {
static {
log = LoggerFactory.getLogger(OptionUDT.class);
values = new CopyOnWriteArrayList<OptionUDT<?>>();
/** the Maximum Transfer Unit. */
public static final OptionUDT<Integer> UDT_MSS = //
NEW(0, Integer.class, DECIMAL);
/** the Maximum Transfer Unit., bytes */
public static final OptionUDT<Integer> Maximum_Transfer_Unit = //
NEW(0, Integer.class, DECIMAL);
/** if sending is blocking. */
public static final OptionUDT<Boolean> UDT_SNDSYN = //
NEW(1, Boolean.class, BOOLEAN);
/** if sending is blocking., true/false */
public static final OptionUDT<Boolean> Is_Send_Synchronous = //
NEW(1, Boolean.class, BOOLEAN);
/** if receiving is blocking. */
public static final OptionUDT<Boolean> UDT_RCVSYN = //
NEW(2, Boolean.class, BOOLEAN);
/** if receiving is blocking, true/false */
public static final OptionUDT<Boolean> Is_Receive_Synchronous = //
NEW(2, Boolean.class, BOOLEAN);
/** custom congestion control algorithm */
public static final OptionUDT<FactoryUDT> UDT_CC = //
NEW(3, FactoryUDT.class, DEFAULT);
/** custom congestion control algorithm, class factory */
public static final OptionUDT<FactoryUDT> Custom_Congestion_Control = //
NEW(3, FactoryUDT.class, DEFAULT);
/** Flight flag size (window size). */
public static final OptionUDT<Integer> UDT_FC = //
NEW(4, Integer.class, BINARY);
/** Flight flag size (window size), bytes */
public static final OptionUDT<Integer> Flight_Window_Size = //
NEW(4, Integer.class, BINARY);
/** maximum buffer in sending queue. */
public static final OptionUDT<Integer> UDT_SNDBUF = //
NEW(5, Integer.class, DECIMAL);
/** maximum buffer in sending queue. */
public static final OptionUDT<Integer> Protocol_Send_Buffer_Size = //
NEW(5, Integer.class, DECIMAL);
/** UDT receiving buffer size. */
public static final OptionUDT<Integer> UDT_RCVBUF = //
NEW(6, Integer.class, DECIMAL);
/** UDT receiving buffer size limit, bytes */
public static final OptionUDT<Integer> Protocol_Receive_Buffer_Size = //
NEW(6, Integer.class, DECIMAL);
/** waiting for unsent data when closing. */
public static final OptionUDT<LingerUDT> UDT_LINGER = //
NEW(7, LingerUDT.class, DECIMAL);
/** waiting for unsent data when closing. true/false and timeout, seconds */
public static final OptionUDT<LingerUDT> Time_To_Linger_On_Close = //
NEW(7, LingerUDT.class, DECIMAL);
/** UDP sending buffer size. */
public static final OptionUDT<Integer> UDP_SNDBUF = //
NEW(8, Integer.class, DECIMAL);
/** UDP sending buffer size limit, bytes */
public static final OptionUDT<Integer> System_Send_Buffer_Size = //
NEW(8, Integer.class, DECIMAL);
/** UDP receiving buffer size. */
public static final OptionUDT<Integer> UDP_RCVBUF = //
NEW(9, Integer.class, DECIMAL);
/** UDP receiving buffer size limit, bytes */
public static final OptionUDT<Integer> System_Receive_Buffer_Size = //
NEW(9, Integer.class, DECIMAL);
/* maximum datagram message size */
// UDT_MAXMSG(10, Integer.class, DECIMAL); no support in udt core
/* time-to-live of a datagram message */
// UDT_MSGTTL(11, Integer.class, DECIMAL); no support in udt core
/** rendezvous connection mode. */
public static final OptionUDT<Boolean> UDT_RENDEZVOUS = //
NEW(12, Boolean.class, BOOLEAN);
/** rendezvous connection mode, enabled/disabled */
public static final OptionUDT<Boolean> Is_Randezvous_Connect_Enabled = //
NEW(12, Boolean.class, BOOLEAN);
/** send() timeout. */
public static final OptionUDT<Integer> UDT_SNDTIMEO = //
NEW(13, Integer.class, DECIMAL);
/** send() timeout. milliseconds */
public static final OptionUDT<Integer> Send_Timeout = //
NEW(13, Integer.class, DECIMAL);
/** recv() timeout. */
public static final OptionUDT<Integer> UDT_RCVTIMEO = //
NEW(14, Integer.class, DECIMAL);
/** recv() timeout. milliseconds */
public static final OptionUDT<Integer> Receive_Timeout = //
NEW(14, Integer.class, DECIMAL);
/** reuse an existing port or create a one. */
public static final OptionUDT<Boolean> UDT_REUSEADDR = //
NEW(15, Boolean.class, BOOLEAN);
/** reuse an existing port or create a one. true/false */
public static final OptionUDT<Boolean> Is_Address_Reuse_Enabled = //
NEW(15, Boolean.class, BOOLEAN);
/** maximum bandwidth (bytes per second) that the connection can use. */
public static final OptionUDT<Long> UDT_MAXBW = //
NEW(16, Long.class, DECIMAL);
/** maximum bandwidth (bytes per second) that the connection can use. */
public static final OptionUDT<Long> Maximum_Bandwidth = //
NEW(16, Long.class, DECIMAL);
/** current socket state, see UDTSTATUS, read only */
public static final OptionUDT<Integer> UDT_STATE = //
NEW(17, Integer.class, DECIMAL);
/** current socket status code, see {@link StatusUDT#getCode()}, read only */
public static final OptionUDT<Integer> Status_Code = //
NEW(17, Integer.class, DECIMAL);
/** current available events associated with the socket */
public static final OptionUDT<Integer> UDT_EVENT = //
NEW(18, Integer.class, DECIMAL);
/** current available epoll events, see {@link EpollUDT.Opt#code} */
public static final OptionUDT<Integer> Epoll_Event_Mask = //
NEW(18, Integer.class, DECIMAL);
/** size of data in the sending buffer */
public static final OptionUDT<Integer> UDT_SNDDATA = //
NEW(19, Integer.class, DECIMAL);
/** current consumed sending buffer utilization, read only, bytes */
public static final OptionUDT<Integer> Send_Buffer_Consumed = //
NEW(19, Integer.class, DECIMAL);
/** size of data available for recv */
public static final OptionUDT<Integer> UDT_RCVDATA = //
NEW(20, Integer.class, DECIMAL);
/** current available receiving buffer capacity, read only, bytes */
public static final OptionUDT<Integer> Receive_Buffer_Available = //
NEW(20, Integer.class, DECIMAL);
protected OptionUDT(final int code, final Class<T> klaz, final Format format) {
this.code = code;
this.type = klaz;
this.format = format;
protected static <T> OptionUDT<T> NEW(final int code, final Class<T> klaz,
final Format format) {
return new OptionUDT<T>(code, klaz, format);
public static void appendSnapshot( //
final SocketUDT socketUDT, //
final StringBuilder text //
) {
text.append(String.format("[id: 0x%08x]",;
for (final OptionUDT<?> option : values) {
int optionCode = 0;
String optionName = null;
String optionValue = null;
try {
optionCode = option.code;
optionName =;
optionValue = option.format.convert(//
if (optionName.startsWith("UD")) {
text.append(") ");
text.append(" = ");
} catch (final Exception e) {
log.error("unexpected; " + optionName, e);
protected static final Logger log;
protected static final List<OptionUDT<?>> values;
private final int code;
private final Class<?> type;
private final Format format;
private String name;
public int code() {
return code;
public Class<?> type() {
return type;
public Format format() {
return format;
public String name() {
if (name == null) {
name = HelpUDT.constantFieldName(getClass(), this);
return name;
* render options in human format
public enum Format {
public String convert(final Object value) {
if (value instanceof Number) {
final long number = ((Number) value).longValue();
return String.format("%,d", number);
return "invalid format";
}, //
public String convert(final Object value) {
if (value instanceof Number) {
final long number = ((Number) value).longValue();
return String.format("%,d (%,d K)", number, number / 1024);
return "invalid format";
}, //
public String convert(final Object value) {
if (value instanceof Boolean) {
final boolean bool = ((Boolean) value).booleanValue();
return String.format("%b", bool);
return "invalid format";
}, //
public String convert(final Object value) {
return "" + value;
}, //
public abstract String convert(Object value);

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,100 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* status of underlying UDT native socket as reported by
* {@link SocketUDT#getStatus0()}
* <p>
* keep in sync with udt.h UDTSTATUS enum; see:
* <p>
public enum StatusUDT {
/** note: keep the order */
/** newly created socket; both connector and acceptor */
INIT(1), //
/** bound socket; both connector and acceptor */
OPENED(2), //
/** bound and listening acceptor socket */
/** bound connector socket trying to connect */
/** bound and connected connector socket */
/** acceptor socket after close(), connector socket after remote unreachable */
BROKEN(6), //
/** connector socket while close() is in progress */
CLOSING(7), //
/** connector socket after close() is done */
CLOSED(8), //
/** trying to check status on socket that was closed and removed */
/** non udt constant, catch-all value */
UNKNOWN(100), //
protected static final Logger log = LoggerFactory
private final int code;
private StatusUDT(final int code) {
this.code = code;
public int getCode() {
return code;
public static final StatusUDT from(final int code) {
switch (code) {
case 1:
return INIT;
case 2:
return OPENED;
case 3:
case 4:
case 5:
case 6:
return BROKEN;
case 7:
return CLOSING;
case 8:
return CLOSED;
case 9:
return NONEXIST;
log.error("unexpected code={}", code);
return UNKNOWN;

View File

@ -0,0 +1,51 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt;
* UDT socket mode type.
* NOTE: {@link #TypeUDT} means stream vs datagram;
* {@link com.barchart.udt.nio.KindUDT} means server vs client.
* <p>
* maps to socket.h constants<br>
* SOCK_STREAM = 1<br>
* SOCK_DGRAM = 2<br>
public enum TypeUDT {
* The STREAM type. Defines "byte stream" UDT mode.
STREAM(1), //
* The DATAGRAM. Defines "datagram or message" UDT mode.
protected final int code;
* native UDT constant
public int code() {
return code;
* @param code
* native UDT constant
private TypeUDT(final int code) {
this.code = code;

View File

@ -0,0 +1,24 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.anno;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
* annotation marks a native JNI entity, please not change it in any way
@Target( { ElementType.FIELD, ElementType.METHOD, ElementType.CONSTRUCTOR })
public @interface Native {

View File

@ -0,0 +1,26 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.anno;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
* annotation documents thread safe access contract
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.CONSTRUCTOR })
public @interface ThreadSafe {
String value();

View File

@ -0,0 +1,30 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.ccc;
import com.barchart.udt.CCC;
* Wrapper around the CUDPBlast class that demos the use of a custom congestion
* control algorithm
* @author CCob
public class UDPBlast extends CCC {
static final int iSMSS = 1500;
public UDPBlast() {
public void setRate(final int mbps) {
setPacketSndPeriod((iSMSS * 8.0) / mbps);

View File

@ -0,0 +1,24 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import com.barchart.udt.ErrorUDT;
import com.barchart.udt.ExceptionUDT;
public class ExceptionReceiveUDT extends ExceptionUDT {
protected ExceptionReceiveUDT(final int socketID, final ErrorUDT error,
final String comment) {
super(socketID, error, comment);

View File

@ -0,0 +1,24 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import com.barchart.udt.ErrorUDT;
import com.barchart.udt.ExceptionUDT;
public class ExceptionSendUDT extends ExceptionUDT {
protected ExceptionSendUDT(final int socketID, final ErrorUDT error,
final String comment) {
super(socketID, error, comment);

View File

@ -0,0 +1,22 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import com.barchart.udt.SocketUDT;
* custom/common acceptor/connector socket features
public interface IceCommon {
* expose underlying socket
SocketUDT socketUDT();

View File

@ -0,0 +1,560 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import java.nio.channels.DatagramChannel;
* compatibility verification interface
public interface IceDatagramSocket {
* Binds this DatagramSocket to a specific address & port.
* <p>
* If the address is <code>null</code>, then the system will pick up an
* ephemeral port and a valid local address to bind the socket.
* <p>
* @param addr
* The address & port to bind to.
* @throws SocketException
* if any error happens during the bind, or if the socket is
* already bound.
* @throws SecurityException
* if a security manager exists and its <code>checkListen</code>
* method doesn't allow the operation.
* @throws IllegalArgumentException
* if addr is a SocketAddress subclass not supported by this
* socket.
* @since 1.4
void bind(SocketAddress addr) throws SocketException;
* Connects the socket to a remote address for this socket. When a socket is
* connected to a remote address, packets may only be sent to or received
* from that address. By default a datagram socket is not connected.
* <p>
* If the remote destination to which the socket is connected does not
* exist, or is otherwise unreachable, and if an ICMP destination
* unreachable packet has been received for that address, then a subsequent
* call to send or receive may throw a PortUnreachableException. Note, there
* is no guarantee that the exception will be thrown.
* <p>
* A caller's permission to send and receive datagrams to a given host and
* port are checked at connect time. When a socket is connected, receive and
* send <b>will not perform any security checks</b> on incoming and outgoing
* packets, other than matching the packet's and the socket's address and
* port. On a send operation, if the packet's address is set and the
* packet's address and the socket's address do not match, an
* IllegalArgumentException will be thrown. A socket connected to a
* multicast address may only be used to send packets.
* @param address
* the remote address for the socket
* @param port
* the remote port for the socket.
* @exception IllegalArgumentException
* if the address is null, or the port is out of range.
* @exception SecurityException
* if the caller is not allowed to send datagrams to and
* receive datagrams from the address and port.
* @see #disconnect
* @see #send
* @see #receive
void connect(InetAddress address, int port);
* Connects this socket to a remote socket address (IP address + port
* number).
* <p>
* @param addr
* The remote address.
* @throws SocketException
* if the connect fails
* @throws IllegalArgumentException
* if addr is null or addr is a SocketAddress subclass not
* supported by this socket
* @since 1.4
* @see #connect
void connect(SocketAddress addr) throws SocketException;
* Disconnects the socket. This does nothing if the socket is not connected.
* @see #connect
void disconnect();
* Returns the binding state of the socket.
* @return true if the socket succesfuly bound to an address
* @since 1.4
boolean isBound();
* Returns the connection state of the socket.
* @return true if the socket succesfuly connected to a server
* @since 1.4
boolean isConnected();
* Returns the address to which this socket is connected. Returns null if
* the socket is not connected.
* @return the address to which this socket is connected.
InetAddress getInetAddress();
* Returns the port for this socket. Returns -1 if the socket is not
* connected.
* @return the port to which this socket is connected.
int getPort();
* Returns the address of the endpoint this socket is connected to, or
* <code>null</code> if it is unconnected.
* @return a <code>SocketAddress</code> representing the remote endpoint of
* this socket, or <code>null</code> if it is not connected yet.
* @see #getInetAddress()
* @see #getPort()
* @see #connect(SocketAddress)
* @since 1.4
SocketAddress getRemoteSocketAddress();
* Returns the address of the endpoint this socket is bound to, or
* <code>null</code> if it is not bound yet.
* @return a <code>SocketAddress</code> representing the local endpoint of
* this socket, or <code>null</code> if it is not bound yet.
* @see #getLocalAddress()
* @see #getLocalPort()
* @see #bind(SocketAddress)
* @since 1.4
SocketAddress getLocalSocketAddress();
* Sends a datagram packet from this socket. The <code>DatagramPacket</code>
* includes information indicating the data to be sent, its length, the IP
* address of the remote host, and the port number on the remote host.
* <p>
* If there is a security manager, and the socket is not currently connected
* to a remote address, this method first performs some security checks.
* First, if <code>p.getAddress().isMulticastAddress()</code> is true, this
* method calls the security manager's <code>checkMulticast</code> method
* with <code>p.getAddress()</code> as its argument. If the evaluation of
* that expression is false, this method instead calls the security
* manager's <code>checkConnect</code> method with arguments
* <code>p.getAddress().getHostAddress()</code> and <code>p.getPort()</code>
* . Each call to a security manager method could result in a
* SecurityException if the operation is not allowed.
* @param p
* the <code>DatagramPacket</code> to be sent.
* @exception IOException
* if an I/O error occurs.
* @exception SecurityException
* if a security manager exists and its
* <code>checkMulticast</code> or <code>checkConnect</code>
* method doesn't allow the send.
* @exception PortUnreachableException
* may be thrown if the socket is connected to a currently
* unreachable destination. Note, there is no guarantee that
* the exception will be thrown.
* @exception java.nio.channels.IllegalBlockingModeException
* if this socket has an associated channel, and the channel
* is in non-blocking mode.
* @see DatagramPacket
* @see SecurityManager#checkMulticast(InetAddress)
* @see SecurityManager#checkConnect revised 1.4 spec JSR-51
void send(DatagramPacket p) throws IOException;
* Receives a datagram packet from this socket. When this method returns,
* the <code>DatagramPacket</code>'s buffer is filled with the data
* received. The datagram packet also contains the sender's IP address, and
* the port number on the sender's machine.
* <p>
* This method blocks until a datagram is received. The <code>length</code>
* field of the datagram packet object contains the length of the received
* message. If the message is longer than the packet's length, the message
* is truncated.
* <p>
* If there is a security manager, a packet cannot be received if the
* security manager's <code>checkAccept</code> method does not allow it.
* @param p
* the <code>DatagramPacket</code> into which to place the
* incoming data.
* @exception IOException
* if an I/O error occurs.
* @exception SocketTimeoutException
* if setSoTimeout was previously called and the timeout has
* expired.
* @exception PortUnreachableException
* may be thrown if the socket is connected to a currently
* unreachable destination. Note, there is no guarantee that
* the exception will be thrown.
* @exception java.nio.channels.IllegalBlockingModeException
* if this socket has an associated channel, and the channel
* is in non-blocking mode.
* @see DatagramPacket
* @see revised 1.4 spec JSR-51
void receive(DatagramPacket p) throws IOException;
* Gets the local address to which the socket is bound.
* <p>
* If there is a security manager, its <code>checkConnect</code> method is
* first called with the host address and <code>-1</code> as its arguments
* to see if the operation is allowed.
* @see SecurityManager#checkConnect
* @return the local address to which the socket is bound, or an
* <code>InetAddress</code> representing any local address if either
* the socket is not bound, or the security manager
* <code>checkConnect</code> method does not allow the operation
* @since 1.1
InetAddress getLocalAddress();
* Returns the port number on the local host to which this socket is bound.
* @return the port number on the local host to which this socket is bound.
int getLocalPort();
* Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds.
* With this option set to a non-zero timeout, a call to receive() for this
* DatagramSocket will block for only this amount of time. If the timeout
* expires, a <B></B> is raised, though the
* DatagramSocket is still valid. The option <B>must</B> be enabled prior to
* entering the blocking operation to have effect. The timeout must be > 0.
* A timeout of zero is interpreted as an infinite timeout.
* @param timeout
* the specified timeout in milliseconds.
* @throws SocketException
* if there is an error in the underlying protocol, such as an
* UDP error.
* @since JDK1.1
* @see #getSoTimeout()
void setSoTimeout(int timeout) throws SocketException;
* Retrieve setting for SO_TIMEOUT. 0 returns implies that the option is
* disabled (i.e., timeout of infinity).
* @return the setting for SO_TIMEOUT
* @throws SocketException
* if there is an error in the underlying protocol, such as an
* UDP error.
* @since JDK1.1
* @see #setSoTimeout(int)
int getSoTimeout() throws SocketException;
* Sets the SO_SNDBUF option to the specified value for this
* <tt>DatagramSocket</tt>. The SO_SNDBUF option is used by the network
* implementation as a hint to size the underlying network I/O buffers. The
* SO_SNDBUF setting may also be used by the network implementation to
* determine the maximum size of the packet that can be sent on this socket.
* <p>
* As SO_SNDBUF is a hint, applications that want to verify what size the
* buffer is should call {@link #getSendBufferSize()}.
* <p>
* Increasing the buffer size may allow multiple outgoing packets to be
* queued by the network implementation when the send rate is high.
* <p>
* Note: If {@link #send(DatagramPacket)} is used to send a
* <code>DatagramPacket</code> that is larger than the setting of SO_SNDBUF
* then it is implementation specific if the packet is sent or discarded.
* @param size
* the size to which to set the send buffer size. This value must
* be greater than 0.
* @exception SocketException
* if there is an error in the underlying protocol, such as
* an UDP error.
* @exception IllegalArgumentException
* if the value is 0 or is negative.
* @see #getSendBufferSize()
void setSendBufferSize(int size) throws SocketException;
* Get value of the SO_SNDBUF option for this <tt>DatagramSocket</tt>, that
* is the buffer size used by the platform for output on this
* <tt>DatagramSocket</tt>.
* @return the value of the SO_SNDBUF option for this
* <tt>DatagramSocket</tt>
* @exception SocketException
* if there is an error in the underlying protocol, such as
* an UDP error.
* @see #setSendBufferSize
int getSendBufferSize() throws SocketException;
* Sets the SO_RCVBUF option to the specified value for this
* <tt>DatagramSocket</tt>. The SO_RCVBUF option is used by the the network
* implementation as a hint to size the underlying network I/O buffers. The
* SO_RCVBUF setting may also be used by the network implementation to
* determine the maximum size of the packet that can be received on this
* socket.
* <p>
* Because SO_RCVBUF is a hint, applications that want to verify what size
* the buffers were set to should call {@link #getReceiveBufferSize()}.
* <p>
* Increasing SO_RCVBUF may allow the network implementation to buffer
* multiple packets when packets arrive faster than are being received using
* {@link #receive(DatagramPacket)}.
* <p>
* Note: It is implementation specific if a packet larger than SO_RCVBUF can
* be received.
* @param size
* the size to which to set the receive buffer size. This value
* must be greater than 0.
* @exception SocketException
* if there is an error in the underlying protocol, such as
* an UDP error.
* @exception IllegalArgumentException
* if the value is 0 or is negative.
* @see #getReceiveBufferSize()
void setReceiveBufferSize(int size) throws SocketException;
* Get value of the SO_RCVBUF option for this <tt>DatagramSocket</tt>, that
* is the buffer size used by the platform for input on this
* <tt>DatagramSocket</tt>.
* @return the value of the SO_RCVBUF option for this
* <tt>DatagramSocket</tt>
* @exception SocketException
* if there is an error in the underlying protocol, such as
* an UDP error.
* @see #setReceiveBufferSize(int)
int getReceiveBufferSize() throws SocketException;
* Enable/disable the SO_REUSEADDR socket option.
* <p>
* For UDP sockets it may be necessary to bind more than one socket to the
* same socket address. This is typically for the purpose of receiving
* multicast packets (See {@link}). The
* <tt>SO_REUSEADDR</tt> socket option allows multiple sockets to be bound
* to the same socket address if the <tt>SO_REUSEADDR</tt> socket option is
* enabled prior to binding the socket using {@link #bind(SocketAddress)}.
* <p>
* Note: This functionality is not supported by all existing platforms, so
* it is implementation specific whether this option will be ignored or not.
* However, if it is not supported then {@link #getReuseAddress()} will
* always return <code>false</code>.
* <p>
* When a <tt>DatagramSocket</tt> is created the initial setting of
* <tt>SO_REUSEADDR</tt> is disabled.
* <p>
* The behaviour when <tt>SO_REUSEADDR</tt> is enabled or disabled after a
* socket is bound (See {@link #isBound()}) is not defined.
* @param on
* whether to enable or disable the
* @exception SocketException
* if an error occurs enabling or disabling the
* <tt>SO_RESUEADDR</tt> socket option, or the socket is
* closed.
* @since 1.4
* @see #getReuseAddress()
* @see #bind(SocketAddress)
* @see #isBound()
* @see #isClosed()
void setReuseAddress(boolean on) throws SocketException;
* Tests if SO_REUSEADDR is enabled.
* @return a <code>boolean</code> indicating whether or not SO_REUSEADDR is
* enabled.
* @exception SocketException
* if there is an error in the underlying protocol, such as
* an UDP error.
* @since 1.4
* @see #setReuseAddress(boolean)
boolean getReuseAddress() throws SocketException;
* Enable/disable SO_BROADCAST.
* @param on
* whether or not to have broadcast turned on.
* @exception SocketException
* if there is an error in the underlying protocol, such as
* an UDP error.
* @since 1.4
* @see #getBroadcast()
void setBroadcast(boolean on) throws SocketException;
* Tests if SO_BROADCAST is enabled.
* @return a <code>boolean</code> indicating whether or not SO_BROADCAST is
* enabled.
* @exception SocketException
* if there is an error in the underlying protocol, such as
* an UDP error.
* @since 1.4
* @see #setBroadcast(boolean)
boolean getBroadcast() throws SocketException;
* Sets traffic class or type-of-service octet in the IP datagram header for
* datagrams sent from this DatagramSocket. As the underlying network
* implementation may ignore this value applications should consider it a
* hint.
* <P>
* The tc <B>must</B> be in the range <code> 0 <= tc <=
* 255</code> or an IllegalArgumentException will be thrown.
* <p>
* Notes:
* <p>
* for Internet Protocol v4 the value consists of an octet with precedence
* and TOS fields as detailed in RFC 1349. The TOS field is bitset created
* by bitwise-or'ing values such the following :-
* <p>
* <UL>
* </UL>
* The last low order bit is always ignored as this corresponds to the MBZ
* (must be zero) bit.
* <p>
* Setting bits in the precedence field may result in a SocketException
* indicating that the operation is not permitted.
* <p>
* for Internet Protocol v6 <code>tc</code> is the value that would be
* placed into the sin6_flowinfo field of the IP header.
* @param tc
* an <code>int</code> value for the bitset.
* @throws SocketException
* if there is an error setting the traffic class or
* type-of-service
* @since 1.4
* @see #getTrafficClass
void setTrafficClass(int tc) throws SocketException;
* Gets traffic class or type-of-service in the IP datagram header for
* packets sent from this DatagramSocket.
* <p>
* As the underlying network implementation may ignore the traffic class or
* type-of-service set using {@link #setTrafficClass(int)} this method may
* return a different value than was previously set using the
* {@link #setTrafficClass(int)} method on this DatagramSocket.
* @return the traffic class or type-of-service already set
* @throws SocketException
* if there is an error obtaining the traffic class or
* type-of-service value.
* @since 1.4
* @see #setTrafficClass(int)
int getTrafficClass() throws SocketException;
* Closes this datagram socket.
* <p>
* Any thread currently blocked in {@link #receive} upon this socket will
* throw a {@link SocketException}.
* <p>
* If this socket has an associated channel then the channel is closed as
* well.
* revised 1.4 spec JSR-51
void close();
* Returns whether the socket is closed or not.
* @return true if the socket has been closed
* @since 1.4
boolean isClosed();
* Returns the unique {@link DatagramChannel} object
* associated with this datagram socket, if any.
* <p>
* A datagram socket will have a channel if, and only if, the channel itself
* was created via the {@link DatagramChannel#open
*} method.
* @return the datagram channel associated with this datagram socket, or
* <tt>null</tt> if this socket was not created for a channel
* @since 1.4 spec JSR-51
DatagramChannel getChannel();

View File

@ -0,0 +1,375 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import java.nio.channels.ServerSocketChannel;
* compatibility verification interface
public interface IceServerSocket {
* Binds the <code>ServerSocket</code> to a specific address (IP address and
* port number).
* <p>
* If the address is <code>null</code>, then the system will pick up an
* ephemeral port and a valid local address to bind the socket.
* <p>
* @param endpoint
* The IP address & port number to bind to.
* @throws IOException
* if the bind operation fails, or if the socket is already
* bound.
* @throws SecurityException
* if a <code>SecurityManager</code> is present and its
* <code>checkListen</code> method doesn't allow the operation.
* @throws IllegalArgumentException
* if endpoint is a SocketAddress subclass not supported by this
* socket
* @since 1.4
void bind(SocketAddress endpoint) throws IOException;
* Binds the <code>ServerSocket</code> to a specific address (IP address and
* port number).
* <p>
* If the address is <code>null</code>, then the system will pick up an
* ephemeral port and a valid local address to bind the socket.
* <P>
* The <code>backlog</code> argument must be a positive value greater than
* 0. If the value passed if equal or less than 0, then the default value
* will be assumed.
* @param endpoint
* The IP address & port number to bind to.
* @param backlog
* The listen backlog length.
* @throws IOException
* if the bind operation fails, or if the socket is already
* bound.
* @throws SecurityException
* if a <code>SecurityManager</code> is present and its
* <code>checkListen</code> method doesn't allow the operation.
* @throws IllegalArgumentException
* if endpoint is a SocketAddress subclass not supported by this
* socket
* @since 1.4
void bind(SocketAddress endpoint, int backlog) throws IOException;
* Returns the local address of this server socket.
* @return the address to which this socket is bound, or <code>null</code>
* if the socket is unbound.
InetAddress getInetAddress();
* Returns the port on which this socket is listening.
* @return the port number to which this socket is listening or -1 if the
* socket is not bound yet.
int getLocalPort();
* Returns the address of the endpoint this socket is bound to, or
* <code>null</code> if it is not bound yet.
* @return a <code>SocketAddress</code> representing the local endpoint of
* this socket, or <code>null</code> if it is not bound yet.
* @see #getInetAddress()
* @see #getLocalPort()
* @see #bind(SocketAddress)
* @since 1.4
SocketAddress getLocalSocketAddress();
* Listens for a connection to be made to this socket and accepts it. The
* method blocks until a connection is made.
* <p>
* A new Socket <code>s</code> is created and, if there is a security
* manager, the security manager's <code>checkAccept</code> method is called
* with <code>s.getInetAddress().getHostAddress()</code> and
* <code>s.getPort()</code> as its arguments to ensure the operation is
* allowed. This could result in a SecurityException.
* @exception IOException
* if an I/O error occurs when waiting for a connection.
* @exception SecurityException
* if a security manager exists and its
* <code>checkAccept</code> method doesn't allow the
* operation.
* @exception SocketTimeoutException
* if a timeout was previously set with setSoTimeout and the
* timeout has been reached.
* @exception java.nio.channels.IllegalBlockingModeException
* if this socket has an associated channel, the channel is
* in non-blocking mode, and there is no connection ready to
* be accepted
* @return the new Socket
* @see SecurityManager#checkAccept revised 1.4 spec JSR-51
Socket accept() throws IOException;
* Closes this socket.
* Any thread currently blocked in {@link #accept()} will throw a
* {@link SocketException}.
* <p>
* If this socket has an associated channel then the channel is closed as
* well.
* @exception IOException
* if an I/O error occurs when closing the socket. revised
* 1.4 spec JSR-51
void close() throws IOException;
* Returns the unique {@link ServerSocketChannel} object
* associated with this socket, if any.
* <p>
* A server socket will have a channel if, and only if, the channel itself
* was created via the {@link ServerSocketChannel#open
*} method.
* @return the server-socket channel associated with this socket, or
* <tt>null</tt> if this socket was not created for a channel
* @since 1.4 spec JSR-51
ServerSocketChannel getChannel();
* Returns the binding state of the ServerSocket.
* @return true if the ServerSocket succesfuly bound to an address
* @since 1.4
boolean isBound();
* Returns the closed state of the ServerSocket.
* @return true if the socket has been closed
* @since 1.4
boolean isClosed();
* Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds.
* With this option set to a non-zero timeout, a call to accept() for this
* ServerSocket will block for only this amount of time. If the timeout
* expires, a <B></B> is raised, though the
* ServerSocket is still valid. The option <B>must</B> be enabled prior to
* entering the blocking operation to have effect. The timeout must be > 0.
* A timeout of zero is interpreted as an infinite timeout.
* @param timeout
* the specified timeout, in milliseconds
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since JDK1.1
* @see #getSoTimeout()
void setSoTimeout(int timeout) throws SocketException;
* Retrieve setting for SO_TIMEOUT. 0 returns implies that the option is
* disabled (i.e., timeout of infinity).
* @return the SO_TIMEOUT value
* @exception IOException
* if an I/O error occurs
* @since JDK1.1
* @see #setSoTimeout(int)
int getSoTimeout() throws IOException;
* Enable/disable the SO_REUSEADDR socket option.
* <p>
* When a TCP connection is closed the connection may remain in a timeout
* state for a period of time after the connection is closed (typically
* known as the <tt>TIME_WAIT</tt> state or <tt>2MSL</tt> wait state). For
* applications using a well known socket address or port it may not be
* possible to bind a socket to the required <tt>SocketAddress</tt> if there
* is a connection in the timeout state involving the socket address or
* port.
* <p>
* Enabling <tt>SO_REUSEADDR</tt> prior to binding the socket using
* {@link #bind(SocketAddress)} allows the socket to be bound even though a
* previous connection is in a timeout state.
* <p>
* When a <tt>ServerSocket</tt> is created the initial setting of
* <tt>SO_REUSEADDR</tt> is not defined. Applications can use
* {@link #getReuseAddress()} to determine the initial setting of
* <tt>SO_REUSEADDR</tt>.
* <p>
* The behaviour when <tt>SO_REUSEADDR</tt> is enabled or disabled after a
* socket is bound (See {@link #isBound()}) is not defined.
* @param on
* whether to enable or disable the socket option
* @exception SocketException
* if an error occurs enabling or disabling the
* <tt>SO_RESUEADDR</tt> socket option, or the socket is
* closed.
* @since 1.4
* @see #getReuseAddress()
* @see #bind(SocketAddress)
* @see #isBound()
* @see #isClosed()
void setReuseAddress(boolean on) throws SocketException;
* Tests if SO_REUSEADDR is enabled.
* @return a <code>boolean</code> indicating whether or not SO_REUSEADDR is
* enabled.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since 1.4
* @see #setReuseAddress(boolean)
boolean getReuseAddress() throws SocketException;
* Returns the implementation address and implementation port of this socket
* as a <code>String</code>.
* @return a string representation of this socket.
String toString();
* Sets a default proposed value for the SO_RCVBUF option for sockets
* accepted from this <tt>ServerSocket</tt>. The value actually set in the
* accepted socket must be determined by calling
* {@link Socket#getReceiveBufferSize()} after the socket is returned by
* {@link #accept()}.
* <p>
* The value of SO_RCVBUF is used both to set the size of the internal
* socket receive buffer, and to set the size of the TCP receive window that
* is advertized to the remote peer.
* <p>
* It is possible to change the value subsequently, by calling
* {@link Socket#setReceiveBufferSize(int)}. However, if the application
* wishes to allow a receive window larger than 64K bytes, as defined by
* RFC1323 then the proposed value must be set in the ServerSocket
* <B>before</B> it is bound to a local address. This implies, that the
* ServerSocket must be created with the no-argument constructor, then
* setReceiveBufferSize() must be called and lastly the ServerSocket is
* bound to an address by calling bind().
* <p>
* Failure to do this will not cause an error, and the buffer size may be
* set to the requested value but the TCP receive window in sockets accepted
* from this ServerSocket will be no larger than 64K bytes.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @param size
* the size to which to set the receive buffer size. This value
* must be greater than 0.
* @exception IllegalArgumentException
* if the value is 0 or is negative.
* @since 1.4
* @see #getReceiveBufferSize
void setReceiveBufferSize(int size) throws SocketException;
* Gets the value of the SO_RCVBUF option for this <tt>ServerSocket</tt>,
* that is the proposed buffer size that will be used for Sockets accepted
* from this <tt>ServerSocket</tt>.
* <p>
* Note, the value actually set in the accepted socket is determined by
* calling {@link Socket#getReceiveBufferSize()}.
* @return the value of the SO_RCVBUF option for this <tt>Socket</tt>.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @see #setReceiveBufferSize(int)
* @since 1.4
int getReceiveBufferSize() throws SocketException;
* Sets performance preferences for this ServerSocket.
* <p>
* Sockets use the TCP/IP protocol by default. Some implementations may
* offer alternative protocols which have different performance
* characteristics than TCP/IP. This method allows the application to
* express its own preferences as to how these tradeoffs should be made when
* the implementation chooses from the available protocols.
* <p>
* Performance preferences are described by three integers whose values
* indicate the relative importance of short connection time, low latency,
* and high bandwidth. The absolute values of the integers are irrelevant;
* in order to choose a protocol the values are simply compared, with larger
* values indicating stronger preferences. If the application prefers short
* connection time over both low latency and high bandwidth, for example,
* then it could invoke this method with the values <tt>(1, 0, 0)</tt>. If
* the application prefers high bandwidth above low latency, and low latency
* above short connection time, then it could invoke this method with the
* values <tt>(0, 1, 2)</tt>.
* <p>
* Invoking this method after this socket has been bound will have no
* effect. This implies that in order to use this capability requires the
* socket to be created with the no-argument constructor.
* @param connectionTime
* An <tt>int</tt> expressing the relative importance of a short
* connection time
* @param latency
* An <tt>int</tt> expressing the relative importance of low
* latency
* @param bandwidth
* An <tt>int</tt> expressing the relative importance of high
* bandwidth
* @since 1.5
void setPerformancePreferences(int connectionTime, int latency, int bandwidth);

View File

@ -0,0 +1,791 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import java.nio.channels.SocketChannel;
* compatibility verification interface
public interface IceSocket {
* Connects this socket to the server.
* @param endpoint
* the <code>SocketAddress</code>
* @throws IOException
* if an error occurs during the connection
* @throws java.nio.channels.IllegalBlockingModeException
* if this socket has an associated channel, and the channel is
* in non-blocking mode
* @throws IllegalArgumentException
* if endpoint is null or is a SocketAddress subclass not
* supported by this socket
* @since 1.4 spec JSR-51
void connect(SocketAddress endpoint) throws IOException;
* Connects this socket to the server with a specified timeout value. A
* timeout of zero is interpreted as an infinite timeout. The connection
* will then block until established or an error occurs.
* @param endpoint
* the <code>SocketAddress</code>
* @param timeout
* the timeout value to be used in milliseconds.
* @throws IOException
* if an error occurs during the connection
* @throws SocketTimeoutException
* if timeout expires before connecting
* @throws java.nio.channels.IllegalBlockingModeException
* if this socket has an associated channel, and the channel is
* in non-blocking mode
* @throws IllegalArgumentException
* if endpoint is null or is a SocketAddress subclass not
* supported by this socket
* @since 1.4 spec JSR-51
void connect(SocketAddress endpoint, int timeout) throws IOException;
* Binds the socket to a local address.
* <P>
* If the address is <code>null</code>, then the system will pick up an
* ephemeral port and a valid local address to bind the socket.
* @param bindpoint
* the <code>SocketAddress</code> to bind to
* @throws IOException
* if the bind operation fails, or if the socket is already
* bound.
* @throws IllegalArgumentException
* if bindpoint is a SocketAddress subclass not supported by
* this socket
* @since 1.4
* @see #isBound
void bind(SocketAddress bindpoint) throws IOException;
* Returns the address to which the socket is connected.
* @return the remote IP address to which this socket is connected, or
* <code>null</code> if the socket is not connected.
InetAddress getInetAddress();
* Gets the local address to which the socket is bound.
* @return the local address to which the socket is bound or
* <code>InetAddress.anyLocalAddress()</code> if the socket is not
* bound yet.
* @since JDK1.1
InetAddress getLocalAddress();
* Returns the remote port to which this socket is connected.
* @return the remote port number to which this socket is connected, or 0 if
* the socket is not connected yet.
int getPort();
* Returns the local port to which this socket is bound.
* @return the local port number to which this socket is bound or -1 if the
* socket is not bound yet.
int getLocalPort();
* Returns the address of the endpoint this socket is connected to, or
* <code>null</code> if it is unconnected.
* @return a <code>SocketAddress</code> reprensenting the remote endpoint of
* this socket, or <code>null</code> if it is not connected yet.
* @see #getInetAddress()
* @see #getPort()
* @see #connect(SocketAddress, int)
* @see #connect(SocketAddress)
* @since 1.4
SocketAddress getRemoteSocketAddress();
* Returns the address of the endpoint this socket is bound to, or
* <code>null</code> if it is not bound yet.
* @return a <code>SocketAddress</code> representing the local endpoint of
* this socket, or <code>null</code> if it is not bound yet.
* @see #getLocalAddress()
* @see #getLocalPort()
* @see #bind(SocketAddress)
* @since 1.4
SocketAddress getLocalSocketAddress();
* Returns the unique {@link SocketChannel SocketChannel}
* object associated with this socket, if any.
* <p>
* A socket will have a channel if, and only if, the channel itself was
* created via the {@link SocketChannel#open
*} or
* {@link java.nio.channels.ServerSocketChannel#accept
* ServerSocketChannel.accept} methods.
* @return the socket channel associated with this socket, or <tt>null</tt>
* if this socket was not created for a channel
* @since 1.4 spec JSR-51
SocketChannel getChannel();
* Returns an input stream for this socket.
* <p>
* If this socket has an associated channel then the resulting input stream
* delegates all of its operations to the channel. If the channel is in
* non-blocking mode then the input stream's <tt>read</tt> operations will
* throw an {@link java.nio.channels.IllegalBlockingModeException}.
* <p>
* Under abnormal conditions the underlying connection may be broken by the
* remote host or the network software (for example a connection reset in
* the case of TCP connections). When a broken connection is detected by the
* network software the following applies to the returned input stream :-
* <ul>
* <li>
* <p>
* The network software may discard bytes that are buffered by the socket.
* Bytes that aren't discarded by the network software can be read using
* {@link InputStream#read read}.
* <li>
* <p>
* If there are no bytes buffered on the socket, or all buffered bytes have
* been consumed by {@link InputStream#read read}, then all
* subsequent calls to {@link InputStream#read read} will throw an
* {@link IOException IOException}.
* <li>
* <p>
* If there are no bytes buffered on the socket, and the socket has not been
* closed using {@link #close close}, then
* {@link InputStream#available available} will return
* <code>0</code>.
* </ul>
* <p>
* Closing the returned {@link InputStream InputStream} will close
* the associated socket.
* @return an input stream for reading bytes from this socket.
* @exception IOException
* if an I/O error occurs when creating the input stream, the
* socket is closed, the socket is not connected, or the
* socket input has been shutdown using
* {@link #shutdownInput()}
* revised 1.4 spec JSR-51
InputStream getInputStream() throws IOException;
* Returns an output stream for this socket.
* <p>
* If this socket has an associated channel then the resulting output stream
* delegates all of its operations to the channel. If the channel is in
* non-blocking mode then the output stream's <tt>write</tt> operations will
* throw an {@link java.nio.channels.IllegalBlockingModeException}.
* <p>
* Closing the returned {@link OutputStream OutputStream} will close
* the associated socket.
* @return an output stream for writing bytes to this socket.
* @exception IOException
* if an I/O error occurs when creating the output stream or
* if the socket is not connected. revised 1.4 spec JSR-51
OutputStream getOutputStream() throws IOException;
* Enable/disable TCP_NODELAY (disable/enable Nagle's algorithm).
* @param on
* <code>true</code> to enable TCP_NODELAY, <code>false</code> to
* disable.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since JDK1.1
* @see #getTcpNoDelay()
void setTcpNoDelay(boolean on) throws SocketException;
* Tests if TCP_NODELAY is enabled.
* @return a <code>boolean</code> indicating whether or not TCP_NODELAY is
* enabled.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since JDK1.1
* @see #setTcpNoDelay(boolean)
boolean getTcpNoDelay() throws SocketException;
* Enable/disable SO_LINGER with the specified linger time in seconds. The
* maximum timeout value is platform specific.
* The setting only affects socket close.
* @param on
* whether or not to linger on.
* @param linger
* how long to linger for, if on is true.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @exception IllegalArgumentException
* if the linger value is negative.
* @since JDK1.1
* @see #getSoLinger()
void setSoLinger(boolean on, int linger) throws SocketException;
* Returns setting for SO_LINGER. -1 returns implies that the option is
* disabled.
* The setting only affects socket close.
* @return the setting for SO_LINGER.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since JDK1.1
* @see #setSoLinger(boolean, int)
int getSoLinger() throws SocketException;
* Send one byte of urgent data on the socket. The byte to be sent is the
* lowest eight bits of the data parameter. The urgent byte is sent after
* any preceding writes to the socket OutputStream and before any future
* writes to the OutputStream.
* @param data
* The byte of data to send
* @exception IOException
* if there is an error sending the data.
* @since 1.4
void sendUrgentData(int data) throws IOException;
* Enable/disable OOBINLINE (receipt of TCP urgent data)
* By default, this option is disabled and TCP urgent data received on a
* socket is silently discarded. If the user wishes to receive urgent data,
* then this option must be enabled. When enabled, urgent data is received
* inline with normal data.
* <p>
* Note, only limited support is provided for handling incoming urgent data.
* In particular, no notification of incoming urgent data is provided and
* there is no capability to distinguish between normal data and urgent data
* unless provided by a higher level protocol.
* @param on
* <code>true</code> to enable OOBINLINE, <code>false</code> to
* disable.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since 1.4
* @see #getOOBInline()
void setOOBInline(boolean on) throws SocketException;
* Tests if OOBINLINE is enabled.
* @return a <code>boolean</code> indicating whether or not OOBINLINE is
* enabled.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since 1.4
* @see #setOOBInline(boolean)
boolean getOOBInline() throws SocketException;
* Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds.
* With this option set to a non-zero timeout, a read() call on the
* InputStream associated with this Socket will block for only this amount
* of time. If the timeout expires, a <B></B>
* is raised, though the Socket is still valid. The option <B>must</B> be
* enabled prior to entering the blocking operation to have effect. The
* timeout must be > 0. A timeout of zero is interpreted as an infinite
* timeout.
* @param timeout
* the specified timeout, in milliseconds.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since JDK 1.1
* @see #getSoTimeout()
void setSoTimeout(int timeout) throws SocketException;
* Returns setting for SO_TIMEOUT. 0 returns implies that the option is
* disabled (i.e., timeout of infinity).
* @return the setting for SO_TIMEOUT
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since JDK1.1
* @see #setSoTimeout(int)
int getSoTimeout() throws SocketException;
* Sets the SO_SNDBUF option to the specified value for this <tt>Socket</tt>
* . The SO_SNDBUF option is used by the platform's networking code as a
* hint for the size to set the underlying network I/O buffers.
* <p>
* Because SO_SNDBUF is a hint, applications that want to verify what size
* the buffers were set to should call {@link #getSendBufferSize()}.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @param size
* the size to which to set the send buffer size. This value must
* be greater than 0.
* @exception IllegalArgumentException
* if the value is 0 or is negative.
* @see #getSendBufferSize()
* @since 1.2
void setSendBufferSize(int size) throws SocketException;
* Get value of the SO_SNDBUF option for this <tt>Socket</tt>, that is the
* buffer size used by the platform for output on this <tt>Socket</tt>.
* @return the value of the SO_SNDBUF option for this <tt>Socket</tt>.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @see #setSendBufferSize(int)
* @since 1.2
int getSendBufferSize() throws SocketException;
* Sets the SO_RCVBUF option to the specified value for this <tt>Socket</tt>
* . The SO_RCVBUF option is used by the platform's networking code as a
* hint for the size to set the underlying network I/O buffers.
* <p>
* Increasing the receive buffer size can increase the performance of
* network I/O for high-volume connection, while decreasing it can help
* reduce the backlog of incoming data.
* <p>
* Because SO_RCVBUF is a hint, applications that want to verify what size
* the buffers were set to should call {@link #getReceiveBufferSize()}.
* <p>
* The value of SO_RCVBUF is also used to set the TCP receive window that is
* advertized to the remote peer. Generally, the window size can be modified
* at any time when a socket is connected. However, if a receive window
* larger than 64K is required then this must be requested <B>before</B> the
* socket is connected to the remote peer. There are two cases to be aware
* of:
* <p>
* <ol>
* <li>For sockets accepted from a ServerSocket, this must be done by
* calling {@link ServerSocket#setReceiveBufferSize(int)} before the
* ServerSocket is bound to a local address.
* <p></li>
* <li>For client sockets, setReceiveBufferSize() must be called before
* connecting the socket to its remote peer.
* <p></li>
* </ol>
* @param size
* the size to which to set the receive buffer size. This value
* must be greater than 0.
* @exception IllegalArgumentException
* if the value is 0 or is negative.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @see #getReceiveBufferSize()
* @see ServerSocket#setReceiveBufferSize(int)
* @since 1.2
void setReceiveBufferSize(int size) throws SocketException;
* Gets the value of the SO_RCVBUF option for this <tt>Socket</tt>, that is
* the buffer size used by the platform for input on this <tt>Socket</tt>.
* @return the value of the SO_RCVBUF option for this <tt>Socket</tt>.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @see #setReceiveBufferSize(int)
* @since 1.2
int getReceiveBufferSize() throws SocketException;
* Enable/disable SO_KEEPALIVE.
* @param on
* whether or not to have socket keep alive turned on.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since 1.3
* @see #getKeepAlive()
void setKeepAlive(boolean on) throws SocketException;
* Tests if SO_KEEPALIVE is enabled.
* @return a <code>boolean</code> indicating whether or not SO_KEEPALIVE is
* enabled.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since 1.3
* @see #setKeepAlive(boolean)
boolean getKeepAlive() throws SocketException;
* Sets traffic class or type-of-service octet in the IP header for packets
* sent from this Socket. As the underlying network implementation may
* ignore this value applications should consider it a hint.
* <P>
* The tc <B>must</B> be in the range <code> 0 <= tc <=
* 255</code> or an IllegalArgumentException will be thrown.
* <p>
* Notes:
* <p>
* For Internet Protocol v4 the value consists of an octet with precedence
* and TOS fields as detailed in RFC 1349. The TOS field is bitset created
* by bitwise-or'ing values such the following :-
* <p>
* <UL>
* </UL>
* The last low order bit is always ignored as this corresponds to the MBZ
* (must be zero) bit.
* <p>
* Setting bits in the precedence field may result in a SocketException
* indicating that the operation is not permitted.
* <p>
* As RFC 1122 section indicates, a compliant TCP implementation
* should, but is not required to, let application change the TOS field
* during the lifetime of a connection. So whether the type-of-service field
* can be changed after the TCP connection has been established depends on
* the implementation in the underlying platform. Applications should not
* assume that they can change the TOS field after the connection.
* <p>
* For Internet Protocol v6 <code>tc</code> is the value that would be
* placed into the sin6_flowinfo field of the IP header.
* @param tc
* an <code>int</code> value for the bitset.
* @throws SocketException
* if there is an error setting the traffic class or
* type-of-service
* @since 1.4
* @see #getTrafficClass
void setTrafficClass(int tc) throws SocketException;
* Gets traffic class or type-of-service in the IP header for packets sent
* from this Socket
* <p>
* As the underlying network implementation may ignore the traffic class or
* type-of-service set using {@link #setTrafficClass(int)} this method may
* return a different value than was previously set using the
* {@link #setTrafficClass(int)} method on this Socket.
* @return the traffic class or type-of-service already set
* @throws SocketException
* if there is an error obtaining the traffic class or
* type-of-service value.
* @since 1.4
* @see #setTrafficClass(int)
int getTrafficClass() throws SocketException;
* Enable/disable the SO_REUSEADDR socket option.
* <p>
* When a TCP connection is closed the connection may remain in a timeout
* state for a period of time after the connection is closed (typically
* known as the <tt>TIME_WAIT</tt> state or <tt>2MSL</tt> wait state). For
* applications using a well known socket address or port it may not be
* possible to bind a socket to the required <tt>SocketAddress</tt> if there
* is a connection in the timeout state involving the socket address or
* port.
* <p>
* Enabling <tt>SO_REUSEADDR</tt> prior to binding the socket using
* {@link #bind(SocketAddress)} allows the socket to be bound even though a
* previous connection is in a timeout state.
* <p>
* When a <tt>Socket</tt> is created the initial setting of
* <tt>SO_REUSEADDR</tt> is disabled.
* <p>
* The behaviour when <tt>SO_REUSEADDR</tt> is enabled or disabled after a
* socket is bound (See {@link #isBound()}) is not defined.
* @param on
* whether to enable or disable the socket option
* @exception SocketException
* if an error occurs enabling or disabling the
* <tt>SO_RESUEADDR</tt> socket option, or the socket is
* closed.
* @since 1.4
* @see #getReuseAddress()
* @see #bind(SocketAddress)
* @see #isClosed()
* @see #isBound()
void setReuseAddress(boolean on) throws SocketException;
* Tests if SO_REUSEADDR is enabled.
* @return a <code>boolean</code> indicating whether or not SO_REUSEADDR is
* enabled.
* @exception SocketException
* if there is an error in the underlying protocol, such as a
* TCP error.
* @since 1.4
* @see #setReuseAddress(boolean)
boolean getReuseAddress() throws SocketException;
* Closes this socket.
* <p>
* Any thread currently blocked in an I/O operation upon this socket will
* throw a {@link SocketException}.
* <p>
* Once a socket has been closed, it is not available for further networking
* use (i.e. can't be reconnected or rebound). A new socket needs to be
* created.
* <p>
* Closing this socket will also close the socket's
* {@link InputStream InputStream} and {@link OutputStream
* OutputStream}.
* <p>
* If this socket has an associated channel then the channel is closed as
* well.
* @exception IOException
* if an I/O error occurs when closing this socket. revised
* 1.4 spec JSR-51
* @see #isClosed
void close() throws IOException;
* Places the input stream for this socket at "end of stream". Any data sent
* to the input stream side of the socket is acknowledged and then silently
* discarded.
* <p>
* If you read from a socket input stream after invoking shutdownInput() on
* the socket, the stream will return EOF.
* @exception IOException
* if an I/O error occurs when shutting down this socket.
* @since 1.3
* @see
* @see
* @see, int)
* @see #isInputShutdown
void shutdownInput() throws IOException;
* Disables the output stream for this socket. For a TCP socket, any
* previously written data will be sent followed by TCP's normal connection
* termination sequence.
* If you write to a socket output stream after invoking shutdownOutput() on
* the socket, the stream will throw an IOException.
* @exception IOException
* if an I/O error occurs when shutting down this socket.
* @since 1.3
* @see
* @see
* @see, int)
* @see #isOutputShutdown
void shutdownOutput() throws IOException;
* Converts this socket to a <code>String</code>.
* @return a string representation of this socket.
String toString();
* Returns the connection state of the socket.
* @return true if the socket successfuly connected to a server
* @since 1.4
boolean isConnected();
* Returns the binding state of the socket.
* @return true if the socket successfuly bound to an address
* @since 1.4
* @see #bind
boolean isBound();
* Returns the closed state of the socket.
* @return true if the socket has been closed
* @since 1.4
* @see #close
boolean isClosed();
* Returns whether the read-half of the socket connection is closed.
* @return true if the input of the socket has been shutdown
* @since 1.4
* @see #shutdownInput
boolean isInputShutdown();
* Returns whether the write-half of the socket connection is closed.
* @return true if the output of the socket has been shutdown
* @since 1.4
* @see #shutdownOutput
boolean isOutputShutdown();
* Sets performance preferences for this socket.
* <p>
* Sockets use the TCP/IP protocol by default. Some implementations may
* offer alternative protocols which have different performance
* characteristics than TCP/IP. This method allows the application to
* express its own preferences as to how these tradeoffs should be made when
* the implementation chooses from the available protocols.
* <p>
* Performance preferences are described by three integers whose values
* indicate the relative importance of short connection time, low latency,
* and high bandwidth. The absolute values of the integers are irrelevant;
* in order to choose a protocol the values are simply compared, with larger
* values indicating stronger preferences. Negative values represent a lower
* priority than positive values. If the application prefers short
* connection time over both low latency and high bandwidth, for example,
* then it could invoke this method with the values <tt>(1, 0, 0)</tt>. If
* the application prefers high bandwidth above low latency, and low latency
* above short connection time, then it could invoke this method with the
* values <tt>(0, 1, 2)</tt>.
* <p>
* Invoking this method after this socket has been connected will have no
* effect.
* @param connectionTime
* An <tt>int</tt> expressing the relative importance of a short
* connection time
* @param latency
* An <tt>int</tt> expressing the relative importance of low
* latency
* @param bandwidth
* An <tt>int</tt> expressing the relative importance of high
* bandwidth
* @since 1.5
void setPerformancePreferences(int connectionTime, int latency, int bandwidth);

View File

@ -0,0 +1,146 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import java.nio.channels.IllegalBlockingModeException;
import com.barchart.udt.ErrorUDT;
import com.barchart.udt.SocketUDT;
* {@link InputStream} implementation for UDT sockets.
public class NetInputStreamUDT extends InputStream {
protected final SocketUDT socketUDT;
* @param socketUDT
* The UDT socket.
public NetInputStreamUDT(final SocketUDT socketUDT) {
if (!socketUDT.isBlocking()) {
throw new IllegalBlockingModeException();
this.socketUDT = socketUDT;
public int read() throws IOException {
* Here's the contract from the JavaDoc on this for SocketChannel:
* A read operation might not fill the buffer, and in fact it might not
* read any bytes at all. Whether or not it does so depends upon the
* nature and state of the channel. A socket channel in non-blocking
* mode, for example, cannot read any more bytes than are immediately
* available from the socket's input buffer; similarly, a file channel
* cannot read any more bytes than remain in the file. It is guaranteed,
* however, that if a channel is in blocking mode and there is at least
* one byte remaining in the buffer then this method will block until at
* least one byte is read.
* Long story short: This UDT InputStream should only ever be created
* when the SocketChannel's in blocking mode, and when it's in blocking
* mode the SocketChannel read call below will block just like we need
* it too.
final byte[] data = new byte[1];
final int count = read(data);
assert count == 1;
return data[0] & 0xFF;
public int read(final byte[] bytes) throws IOException {
return read(bytes, 0, bytes.length);
public int read(final byte[] bytes, final int off, final int len)
throws IOException {
final int count = socketUDT.receive(bytes, off, off + len);
if (count > 0) {
assert count <= len;
return count;
if (count == 0) {
throw new ExceptionReceiveUDT(,
ErrorUDT.USER_DEFINED_MESSAGE, "UDT receive time out") {
throw new IllegalStateException("should not happen");
public void close() throws IOException {
public int available() throws IOException {
// This is the default InputStream return value.
// The java/net/ implementation delegates to
// the native implementation, which returns 0 on at least some OSes.
return 0;
public long skip(final long numbytes) throws IOException {
if (numbytes <= 0) {
return 0;
long n = numbytes;
final int buflen = (int) Math.min(1024, n);
final byte data[] = new byte[buflen];
while (n > 0) {
final int r = read(data, 0, (int) Math.min(buflen, n));
if (r < 0) {
n -= r;
return numbytes - n;
public void mark(final int readlimit) {
throw new UnsupportedOperationException("mark not supported");
public void reset() throws IOException {
throw new UnsupportedOperationException("reset not supported");
public boolean markSupported() {
return false;

View File

@ -0,0 +1,92 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import java.nio.channels.IllegalBlockingModeException;
import com.barchart.udt.ErrorUDT;
import com.barchart.udt.SocketUDT;
* {@link OutputStream} for UDT sockets.
public class NetOutputStreamUDT extends OutputStream {
protected final SocketUDT socketUDT;
* @param socketUDT
* The UDT socket.
public NetOutputStreamUDT(final SocketUDT socketUDT) {
if (!socketUDT.isBlocking()) {
throw new IllegalBlockingModeException();
this.socketUDT = socketUDT;
public void write(final int b) throws IOException {
// Just cast it -- this is the same thing SocketOutputStream does.
final byte[] bytes = { (byte) b };
public void write(final byte[] bytes) throws IOException {
write(bytes, 0, bytes.length);
public void write(final byte[] bytes, final int off, final int len)
throws IOException {
int bytesRemaining = len;
while (bytesRemaining > 0) {
final int count = socketUDT.send(bytes, off + len - bytesRemaining,
off + len);
if (count > 0) {
bytesRemaining -= count;
if (count == 0) {
throw new ExceptionSendUDT(,
ErrorUDT.USER_DEFINED_MESSAGE, "UDT send time out");
throw new IllegalStateException(
"Socket has been chaged to non-blocking");
public void close() throws IOException {
public void flush() throws IOException {

View File

@ -0,0 +1,151 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import java.nio.channels.ServerSocketChannel;
import com.barchart.udt.ExceptionUDT;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
* {@link ServerSocket} - like wrapper for {@link SocketUDT}
public class NetServerSocketUDT extends ServerSocket implements
IceServerSocket, IceCommon {
protected final SocketUDT socketUDT;
/** uses {@link TypeUDT#STREAM} socket in blocking mode */
public NetServerSocketUDT() throws IOException {
this(new SocketUDT(TypeUDT.STREAM));
/** uses provided socket keeping blocking mode */
protected NetServerSocketUDT(final SocketUDT socketUDT) throws IOException {
this.socketUDT = socketUDT;
public Socket accept() throws IOException {
final SocketUDT clientUDT = socketUDT.accept();
return new NetSocketUDT(clientUDT);
public void bind(final SocketAddress endpoint) throws IOException {
final int backlog = SocketUDT.DEFAULT_ACCEPT_QUEUE_SIZE;
bind(endpoint, backlog);
* NOTE: bind() means listen() for UDT server socket
public void bind(SocketAddress bindpoint, int backlog) throws IOException {
if (bindpoint == null) {
bindpoint = new InetSocketAddress(0);
if (backlog <= 0) {
socketUDT.bind((InetSocketAddress) bindpoint);
public void close() throws IOException {
public ServerSocketChannel getChannel() {
throw new UnsupportedOperationException("feature not available");
public InetAddress getInetAddress() {
return socketUDT.getLocalInetAddress();
public int getLocalPort() {
return socketUDT.getLocalInetPort();
public SocketAddress getLocalSocketAddress() {
try {
return socketUDT.getLocalSocketAddress();
} catch (final ExceptionUDT e) {
return null;
public int getReceiveBufferSize() throws SocketException {
return socketUDT.getReceiveBufferSize();
public boolean getReuseAddress() throws SocketException {
return socketUDT.getReuseAddress();
public int getSoTimeout() throws IOException {
return socketUDT.getSoTimeout();
public boolean isBound() {
return socketUDT.isBound();
public boolean isClosed() {
return socketUDT.isClosed();
public void setPerformancePreferences(final int connectionTime,
final int latency, final int bandwidth) {
throw new UnsupportedOperationException("feature not available");
// NOTE: set both send and receive, since they are inherited on accept()
public void setReceiveBufferSize(final int size) throws SocketException {
public void setReuseAddress(final boolean on) throws SocketException {
public void setSoTimeout(final int timeout) throws SocketException {
public SocketUDT socketUDT() {
return socketUDT;

View File

@ -0,0 +1,282 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.barchart.udt.ExceptionUDT;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
import com.barchart.udt.anno.ThreadSafe;
* {@link Socket} - like wrapper for {@link SocketUDT}
public class NetSocketUDT extends Socket implements IceSocket, IceCommon {
private final Logger log = LoggerFactory.getLogger(getClass());
protected InputStream inputStream;
protected OutputStream outputStream;
protected final SocketUDT socketUDT;
/** uses {@link TypeUDT#STREAM} socket in blocking mode */
public NetSocketUDT() throws ExceptionUDT {
this(new SocketUDT(TypeUDT.STREAM));
/** uses provided socket keeping blocking mode */
protected NetSocketUDT(final SocketUDT socketUDT) {
this.socketUDT = socketUDT;
public void bind(SocketAddress bindpoint) throws IOException {
if (bindpoint == null) {
bindpoint = new InetSocketAddress(0);
socketUDT.bind((InetSocketAddress) bindpoint);
public synchronized void close() throws IOException {
public void connect(final SocketAddress endpoint) throws IOException {
socketUDT.connect((InetSocketAddress) endpoint);
public void connect(final SocketAddress endpoint, final int timeout)
throws IOException {
throw new UnsupportedOperationException("feature not available");
public SocketChannel getChannel() {
throw new UnsupportedOperationException("feature not available");
public InetAddress getInetAddress() {
return socketUDT.getRemoteInetAddress();
public synchronized InputStream getInputStream() throws IOException {
if (inputStream == null) {
inputStream = new NetInputStreamUDT(socketUDT);
return inputStream;
public boolean getKeepAlive() throws SocketException {
// UDT has keep alive automatically under the
// hood which I believe you cannot turn off
return true;
public InetAddress getLocalAddress() {
return socketUDT.getLocalInetAddress();
public int getLocalPort() {
return socketUDT.getLocalInetPort();
public SocketAddress getLocalSocketAddress() {
try {
return socketUDT.getLocalSocketAddress();
} catch (final ExceptionUDT e) {
return null;
public boolean getOOBInline() throws SocketException {
return false;
public synchronized OutputStream getOutputStream() throws IOException {
if (outputStream == null) {
outputStream = new NetOutputStreamUDT(socketUDT);
return outputStream;
public int getPort() {
return socketUDT.getRemoteInetPort();
public synchronized int getReceiveBufferSize() throws SocketException {
return socketUDT.getReceiveBufferSize();
public SocketAddress getRemoteSocketAddress() {
try {
return socketUDT.getRemoteSocketAddress();
} catch (final ExceptionUDT e) {
return null;
public boolean getReuseAddress() throws SocketException {
return socketUDT.getReuseAddress();
public synchronized int getSendBufferSize() throws SocketException {
return socketUDT.getSendBufferSize();
public int getSoLinger() throws SocketException {
return socketUDT.getSoLinger();
public synchronized int getSoTimeout() throws SocketException {
return socketUDT.getSoTimeout();
public boolean getTcpNoDelay() throws SocketException {
return false;
public int getTrafficClass() throws SocketException {
return 0;
public boolean isBound() {
return socketUDT.isBound();
public boolean isClosed() {
return socketUDT.isClosed();
public boolean isConnected() {
return socketUDT.isConnected();
public boolean isInputShutdown() {
return socketUDT.isClosed();
public boolean isOutputShutdown() {
return socketUDT.isClosed();
public void sendUrgentData(final int data) throws IOException {
log.debug("Sending urgent data not supported in Barchart UDT...");
public void setKeepAlive(final boolean on) throws SocketException {
log.debug("Keep alive not supported in Barchart UDT...");
public void setOOBInline(final boolean on) throws SocketException {
log.debug("OOB inline not supported in Barchart UDT...");
public void setPerformancePreferences(final int connectionTime,
final int latency, final int bandwidth) {
public synchronized void setReceiveBufferSize(final int size)
throws SocketException {
public void setReuseAddress(final boolean on) throws SocketException {
public synchronized void setSendBufferSize(final int size)
throws SocketException {
public void setSoLinger(final boolean on, final int linger)
throws SocketException {
socketUDT.setSoLinger(on, linger);
public synchronized void setSoTimeout(final int timeout)
throws SocketException {
public void setTcpNoDelay(final boolean on) throws SocketException {
log.debug("TCP no delay not supported in Barchart UDT...");
public void setTrafficClass(final int tc) throws SocketException {
log.debug("Traffic class not supported in Barchart UDT...");
public void shutdownInput() throws IOException {
public void shutdownOutput() throws IOException {
public SocketUDT socketUDT() {
return socketUDT;

View File

@ -0,0 +1,55 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
* Interface shared by all {@link KindUDT} kinds.
public interface ChannelUDT extends Channel {
* Was connection request
* {@link SocketChannelUDT#connect(} acknowledged by
* {@link SocketChannelUDT#finishConnect()}?
boolean isConnectFinished();
* The kind of UDT channel.
KindUDT kindUDT();
* UDT specific provider which produced this channel.
SelectorProviderUDT providerUDT();
* Underlying UDT socket.
SocketUDT socketUDT();
* The type of UDT socket.
TypeUDT typeUDT();
* Mask of all interest options which are permitted for this channel.
* @see SelectionKey
int validOps();

View File

@ -0,0 +1,48 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
* UDT channel role type, or kind.
* <p>
* {@link TypeUDT} means stream (byte oriented) vs datagram (message oriented).
* <p>
* {@link KindUDT} gives distinction between server vs client vs peer.
public enum KindUDT {
* Server mode: listens and accepts connections; generates
* {@link #CONNECTOR} as a result of {@link SocketUDT#accept()}
* @see ServerSocketChannelUDT
* Client mode: channel which initiates connections to servers; options are
* user-provided.
* <p>
* Server mode: channel which is a result of accept(); inherits options from
* parent {@link #ACCEPTOR}.
* @see SocketChannelUDT
* Rendezvous mode: symmetric peer channel on each side of the connection.
* @see RendezvousChannelUDT

View File

@ -0,0 +1,135 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import java.nio.ByteBuffer;
import java.nio.channels.IllegalBlockingModeException;
* {@link InputStream} implementation for UDT sockets.
public class NioInputStreamUDT extends InputStream {
protected final SocketChannelUDT channel;
* Creates a new input stream for the specified channel.
* @param channel
* The UDT socket channel.
protected NioInputStreamUDT(final SocketChannelUDT channel) {
if (channel == null) {
throw new NullPointerException("channel == null");
if (!channel.isBlocking()) {
throw new IllegalBlockingModeException();
} = channel;
public int read() throws IOException {
* Here's the contract from the JavaDoc on this for SocketChannel:
* A read operation might not fill the buffer, and in fact it might not
* read any bytes at all. Whether or not it does so depends upon the
* nature and state of the channel. A socket channel in non-blocking
* mode, for example, cannot read any more bytes than are immediately
* available from the socket's input buffer; similarly, a file channel
* cannot read any more bytes than remain in the file. It is guaranteed,
* however, that if a channel is in blocking mode and there is at least
* one byte remaining in the buffer then this method will block until at
* least one byte is read.
* Long story short: This UDT InputStream should only ever be created
* when the SocketChannel's in blocking mode, and when it's in blocking
* mode the SocketChannel read call below will block just like we need
* it too.
final byte[] data = new byte[1];
return data[0];
public int read(final byte[] bytes) throws IOException {
return read(bytes, 0, bytes.length);
public int read(final byte[] bytes, final int off, final int len)
throws IOException {
if (len > bytes.length - off) {
throw new IndexOutOfBoundsException("len > bytes.length - off");
final ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.limit(off + len);
final int read =;
return read;
public long skip(final long n) throws IOException {
final ByteBuffer buffer = ByteBuffer.allocateDirect(32768);
long remaining = n;
while (remaining > 0) {
buffer.limit((int) Math.min(remaining, buffer.capacity()));
final int ret =;
if (ret <= 0)
remaining -= ret;
return n - remaining;
public int available() throws IOException {
// This is the default InputStream return value.
// The java/net/ implementation delegates to
// the native implementation, which returns 0 on at least some OSes.
return 0;
public void close() throws IOException {
public void mark(final int readlimit) {
throw new UnsupportedOperationException("mark not supported");
public void reset() throws IOException {
throw new UnsupportedOperationException("reset not supported");
public boolean markSupported() {
return false;

View File

@ -0,0 +1,54 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import java.nio.ByteBuffer;
* {@link OutputStream} for UDT sockets.
public class NioOutputStreamUDT extends OutputStream {
protected final SocketChannelUDT channel;
* Creates a new UDT output stream.
* @param channel
* The UDT socket channel.
protected NioOutputStreamUDT(final SocketChannelUDT channel) { = channel;
public void write(final byte[] bytes, final int off, final int len)
throws IOException {
channel.write(ByteBuffer.wrap(bytes, off, len));
public void write(final byte[] bytes) throws IOException {
public void write(final int b) throws IOException {
// Just cast it -- this is the same thing SocketOutputStream does.
final byte[] bytes = { (byte) b };
public void close() throws IOException {

View File

@ -0,0 +1,44 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
public class NioServerSocketUDT extends NetServerSocketUDT {
protected final ServerSocketChannelUDT channelUDT;
protected NioServerSocketUDT(final ServerSocketChannelUDT channelUDT)
throws IOException {
this.channelUDT = channelUDT;
public Socket accept() throws IOException {
throw new RuntimeException("feature not available");
public void bind(final SocketAddress endpoint) throws IOException {
final SelectorProviderUDT provider = //
(SelectorProviderUDT) channelUDT.provider();
final int backlog = provider.getAcceptQueueSize();
bind(endpoint, backlog);
public ServerSocketChannelUDT getChannel() {
return channelUDT;

View File

@ -0,0 +1,48 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import com.barchart.udt.ExceptionUDT;
public class NioSocketUDT extends NetSocketUDT {
protected final SocketChannelUDT channelUDT;
protected NioSocketUDT(final SocketChannelUDT channelUDT)
throws ExceptionUDT {
this.channelUDT = channelUDT;
public SocketChannelUDT getChannel() {
return channelUDT;
public synchronized InputStream getInputStream() throws IOException {
if (inputStream == null) {
inputStream = new NioInputStreamUDT(channelUDT);
return inputStream;
public synchronized OutputStream getOutputStream() throws IOException {
if (outputStream == null) {
outputStream = new NioOutputStreamUDT(channelUDT);
return outputStream;

View File

@ -0,0 +1,46 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import java.nio.channels.SocketChannel;
import com.barchart.udt.ExceptionUDT;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
* {@link SocketChannel}-like wrapper for {@link SocketUDT}, can be either
* stream or message oriented, depending on {@link TypeUDT}
* <p>
* See <a href="">Firewall
* Traversing with UDT</a>
public class RendezvousChannelUDT extends SocketChannelUDT implements
ChannelUDT {
* Ensure rendezvous mode.
protected RendezvousChannelUDT( //
final SelectorProviderUDT provider, //
final SocketUDT socketUDT //
) throws ExceptionUDT {
super(provider, socketUDT);
public KindUDT kindUDT() {

View File

@ -0,0 +1,506 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.barchart.udt.EpollUDT;
import com.barchart.udt.EpollUDT.Opt;
import com.barchart.udt.ExceptionUDT;
import com.barchart.udt.OptionUDT;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.StatusUDT;
* UDT selection key implementation.
public class SelectionKeyUDT extends SelectionKey implements
Comparable<SelectionKeyUDT> {
* JDK interest to Epoll READ mapping.
protected static final int HAS_READ = OP_ACCEPT | OP_READ;
* JDK interest to Epoll WRITE mapping.
protected static final int HAS_WRITE = OP_CONNECT | OP_WRITE;
protected static final Logger log = LoggerFactory
* Convert select options : from jdk into epoll.
protected static Opt from(final int interestOps) {
final boolean hasRead = (interestOps & HAS_READ) != 0;
final boolean hasWrite = (interestOps & HAS_WRITE) != 0;
if (hasRead && hasWrite) {
return Opt.ALL;
if (hasRead) {
return Opt.ERROR_READ;
if (hasWrite) {
return Opt.ERROR_WRITE;
return Opt.ERROR;
* Render select options.
public static final String toStringOps(final int selectOps) {
final char A = (OP_ACCEPT & selectOps) != 0 ? 'A' : '-';
final char C = (OP_CONNECT & selectOps) != 0 ? 'C' : '-';
final char R = (OP_READ & selectOps) != 0 ? 'R' : '-';
final char W = (OP_WRITE & selectOps) != 0 ? 'W' : '-';
return String.format("%c%c%c%c", A, C, R, W);
* Channel bound to the key.
private final ChannelUDT channelUDT;
* Requested interest in epoll format.
private volatile Opt epollOpt;
* Requested interest in JDK format.
private volatile int interestOps;
* Key validity state. Key is valid when created, and invalid when canceled.
private volatile boolean isValid;
* Reported ready interest.
private volatile int readyOps;
* Correlation index for {@link #doRead(int)} vs {@link #doWrite(int)}
private volatile int resultIndex;
* Selector bound to the key.
private final SelectorUDT selectorUDT;
protected SelectionKeyUDT( //
final SelectorUDT selectorUDT, //
final ChannelUDT channelUDT, //
final Object attachment //
) {
this.selectorUDT = selectorUDT;
this.channelUDT = channelUDT;
* Ensure key is NOT canceled.
protected void assertValidKey() throws CancelledKeyException {
if (isValid()) {
throw new CancelledKeyException();
* Ensure only permitted interest mask bits are present.
protected void assertValidOps(final int interestOps) {
if ((interestOps & ~(channel().validOps())) != 0) {
throw new IllegalArgumentException("invalid interestOps="
+ interestOps);
public void cancel() {
if (isValid()) {
public SelectableChannel channel() {
return (SelectableChannel) channelUDT;
* Underlying UDT channel.
protected ChannelUDT channelUDT() {
return channelUDT;
public int compareTo(final SelectionKeyUDT that) {
final int thisId = this.socketId();
final int thatId = that.socketId();
if (thisId > thatId) {
return +1;
if (thisId < thatId) {
return -1;
return 0;
* Apply READ readiness according to {@link KindUDT} channel role.
* <p>
* Note: {@link #doRead(int)} is invoked before {@link #doWrite(int)}
* <p>
* Sockets with exceptions are returned to both read and write sets.
* @return Should report ready-state change?
protected boolean doRead(final int resultIndex) {
int readyOps = 0;
final int interestOps = this.interestOps;
/** Store read/write verifier. */
this.resultIndex = resultIndex;
try {
/** Check error report. */
if (!epollOpt.hasRead()) {
if (isSocketBroken()) {
readyOps = channel().validOps();
return true;
} else {
logError("Unexpected error report.");
return false;
switch (kindUDT()) {
if ((interestOps & OP_ACCEPT) != 0) {
readyOps = OP_ACCEPT;
return true;
} else {
logError("Ready to ACCEPT while not interested.");
return false;
if ((interestOps & OP_READ) != 0) {
readyOps = OP_READ;
return true;
} else {
logError("Ready to READ while not interested.");
return false;
logError("Wrong kind.");
return false;
} finally {
this.readyOps = readyOps;
* Apply WRITE readiness according to {@link KindUDT} channel role.
* <p>
* Note: {@link #doRead(int)} is invoked before {@link #doWrite(int)}
* <p>
* Sockets with exceptions are returned to both read and write sets.
* @return Should report ready-state change?
protected boolean doWrite(final int resultIndex) {
int readyOps = 0;
final int interestOps = this.interestOps;
/** Verify read/write relationship. */
final boolean hadReadBeforeWrite = this.resultIndex == resultIndex;
try {
/** Check error report. */
if (!epollOpt.hasWrite()) {
if (isSocketBroken()) {
readyOps = channel().validOps();
return true;
} else {
logError("Unexpected error report.");
return false;
switch (kindUDT()) {
logError("Ready to WRITE for acceptor.");
return false;
if (channelUDT().isConnectFinished()) {
if ((interestOps & OP_WRITE) != 0) {
readyOps = OP_WRITE;
return true;
} else {
logError("Ready to WRITE when not insterested.");
return false;
} else {
if ((interestOps & OP_CONNECT) != 0) {
readyOps = OP_CONNECT;
return true;
} else {
logError("Ready to CONNECT when not interested.");
return false;
logError("Wrong kind.");
return false;
} finally {
if (hadReadBeforeWrite) {
this.readyOps |= readyOps;
} else {
this.readyOps = readyOps;
* Requested interest in epoll format.
protected Opt epollOpt() {
return epollOpt;
* Epoll bound to this key.
protected EpollUDT epollUDT() {
return selector().epollUDT();
* Key equality based on socket-id.
public boolean equals(final Object otherKey) {
if (otherKey instanceof SelectionKeyUDT) {
final SelectionKeyUDT other = (SelectionKeyUDT) otherKey;
return other.socketId() == this.socketId();
return false;
* Check socket error condition.
boolean hasError() throws ExceptionUDT {
final int code = socketUDT().getOption(OptionUDT.Epoll_Event_Mask);
return Opt.from(code).hasError();
* Key hach code based on socket-id.
public int hashCode() {
return socketId();
public int interestOps() {
return interestOps;
public SelectionKey interestOps(final int interestOps) {
try {
final Opt epollNew = from(interestOps);
if (epollNew != epollOpt) {
if (Opt.ERROR == epollNew) {
} else {
epollUDT().add(socketUDT(), epollNew);
epollOpt = epollNew;
} catch (final Exception e) {
log.error("epoll udpate failure", e);
} finally {
this.interestOps = interestOps;
return this;
* Check socket termination status.
* @return true if status is {@link StatusUDT#BROKEN} or worse
protected boolean isSocketBroken() {
switch (socketUDT().status()) {
case INIT:
case OPENED:
return false;
case BROKEN:
case CLOSED:
return true;
logError("Unknown socket status.");
return true;
public boolean isValid() {
return isValid;
* Channel role.
protected KindUDT kindUDT() {
return channelUDT.kindUDT();
* Key processing logic error logger.
protected void logError(final String comment) {
final String message = "logic error : \n\t" + this;
log.warn(message, new Exception("" + comment));
* Change socket registration with epoll, and change key validity status.
protected void makeValid(final boolean isValid) {
try {
if (isValid) {
epollOpt = Opt.ERROR;
epollUDT().add(socketUDT(), epollOpt);
} else {
} catch (final Throwable e) {
log.error("Epoll failure.", e);
} finally {
this.isValid = isValid;
public int readyOps() {
return readyOps;
protected void readyOps(final int ops) {
readyOps = ops;
public SelectorUDT selector() {
return selectorUDT;
* Id of a socket bound to this key.
protected int socketId() {
return socketUDT().id();
* Socket bound to this key.
protected SocketUDT socketUDT() {
return channelUDT.socketUDT();
public String toString() {
return String
.format("[id: 0x%08x] poll=%s ready=%s inter=%s %s %s %s bind=%s:%s peer=%s:%s", //
socketUDT().id(), //
epollOpt, //
toStringOps(readyOps), //
toStringOps(interestOps), //
channelUDT.typeUDT(), //
channelUDT.kindUDT(), //
socketUDT().status(), //
socketUDT().getLocalInetAddress(), //
socketUDT().getLocalInetPort(), //
socketUDT().getRemoteInetAddress(), //
socketUDT().getRemoteInetPort() //

View File

@ -0,0 +1,146 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import java.nio.channels.DatagramChannel;
import java.nio.channels.Pipe;
import java.nio.channels.spi.SelectorProvider;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
* selection provider for UDT
* <p>
* note: you must use the same system-wide provider instance for the same
* {@link TypeUDT} of UDT channels and UDT selectors;
public class SelectorProviderUDT extends SelectorProvider {
* system-wide provider instance, for {@link TypeUDT#DATAGRAM} UDT sockets
public static final SelectorProviderUDT DATAGRAM = //
new SelectorProviderUDT(TypeUDT.DATAGRAM);
* system-wide provider instance, for {@link TypeUDT#STREAM} UDT sockets
public static final SelectorProviderUDT STREAM = //
new SelectorProviderUDT(TypeUDT.STREAM);
public static SelectorProviderUDT from(final TypeUDT type) {
switch (type) {
return DATAGRAM;
case STREAM:
return STREAM;
throw new IllegalStateException("wrong type=" + type);
private volatile int acceptQueueSize = SocketUDT.DEFAULT_ACCEPT_QUEUE_SIZE;
private volatile int maxSelectorSize = SocketUDT.DEFAULT_MAX_SELECTOR_SIZE;
private final TypeUDT type;
* {@link TypeUDT} of UDT sockets generated by this provider
public final TypeUDT type() {
return type;
protected SelectorProviderUDT(final TypeUDT type) {
this.type = type;
public int getAcceptQueueSize() {
return acceptQueueSize;
public int getMaxSelectorSize() {
return maxSelectorSize;
* Not supported.
public DatagramChannel openDatagramChannel() throws IOException {
throw new UnsupportedOperationException("feature not available");
public DatagramChannel openDatagramChannel(final ProtocolFamily family) throws IOException {
throw new UnsupportedOperationException("feature not available");
* Not supported.
public Pipe openPipe() throws IOException {
throw new UnsupportedOperationException("feature not available");
* Open UDT {@link KindUDT#RENDEZVOUS} socket channel.
* @see RendezvousChannelUDT
public RendezvousChannelUDT openRendezvousChannel() throws IOException {
final SocketUDT socketUDT = new SocketUDT(type);
return new RendezvousChannelUDT(this, socketUDT);
* Open UDT specific selector.
* @see SelectorUDT
public SelectorUDT openSelector() throws IOException {
return new SelectorUDT(this, maxSelectorSize);
* Open UDT {@link KindUDT#ACCEPTOR} socket channel.
* @see ServerSocketChannelUDT
public ServerSocketChannelUDT openServerSocketChannel() throws IOException {
final SocketUDT serverSocketUDT = new SocketUDT(type);
return new ServerSocketChannelUDT(this, serverSocketUDT);
* Open UDT {@link KindUDT#CONNECTOR} socket channel.
* @see SocketChannelUDT
public SocketChannelUDT openSocketChannel() throws IOException {
final SocketUDT socketUDT = new SocketUDT(type);
return new SocketChannelUDT(this, socketUDT);
public void setAcceptQueueSize(final int queueSize) {
acceptQueueSize = queueSize;
public void setMaxSelectorSize(final int selectorSize) {
maxSelectorSize = selectorSize;

View File

@ -0,0 +1,489 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import static com.barchart.udt.SocketUDT.*;
import java.nio.IntBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.IllegalSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.barchart.udt.EpollUDT;
import com.barchart.udt.ExceptionUDT;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
import com.barchart.udt.util.HelpUDT;
* selector
* <p>
* design guidelines:
* <p>
* 1) follow general contracts of jdk 6 nio; see <a href=
* ""
* >barchart-udt-reference-jdk6</a>
* <p>
* 2) adapt to how netty is doing select; see <a href=
* ""
* >NioEventLoop</a>
* <p>
* note: you must use {@link SelectorProviderUDT#openSelector()} to obtain
* instance of this class; do not use JDK
* {@link Selector#open()}
public class SelectorUDT extends AbstractSelector {
protected static final Logger log = LoggerFactory
* use this call to instantiate a selector for UDT
protected static Selector open(final TypeUDT type) throws IOException {
final SelectorProviderUDT provider;
switch (type) {
provider = SelectorProviderUDT.DATAGRAM;
case STREAM:
provider = SelectorProviderUDT.STREAM;
log.error("unsupported type={}", type);
throw new IOException("unsupported type");
return provider.openSelector();
private final EpollUDT epollUDT = new EpollUDT();
public final int maximimSelectorSize;
* list of epoll sockets with read interest
private final IntBuffer readBuffer;
* [ socket-id : selection-key ]
private final ConcurrentMap<Integer, SelectionKeyUDT> //
registeredKeyMap = new ConcurrentHashMap<Integer, SelectionKeyUDT>();
* public view : immutable
private final Set<? extends SelectionKey> //
registeredKeySet = HelpUDT.unmodifiableSet(registeredKeyMap.values());
* tracks correlation read with write for the same key
private volatile int resultIndex;
* set of keys with data ready for an operation
private final ConcurrentMap<SelectionKeyUDT, SelectionKeyUDT> //
selectedKeyMap = new ConcurrentHashMap<SelectionKeyUDT, SelectionKeyUDT>();
* public view : removal allowed, but not addition
private final Set<? extends SelectionKey> //
selectedKeySet = HelpUDT.ungrowableSet(selectedKeyMap.keySet());
/** select is exclusive */
private final Lock selectLock = new ReentrantLock();
/** reported epoll socket list sizes */
private final IntBuffer sizeBuffer;
* Canceled keys.
private final ConcurrentMap<SelectionKeyUDT, SelectionKeyUDT> //
terminatedKeyMap = new ConcurrentHashMap<SelectionKeyUDT, SelectionKeyUDT>();
/** guarded by {@link #doSelectLocked} */
private volatile int wakeupBaseCount;
private volatile int wakeupStepCount;
/** list of epoll sockets with write interest */
private final IntBuffer writeBuffer;
protected SelectorUDT( //
final SelectorProvider provider, //
final int maximumSelectorSize //
) throws ExceptionUDT {
this.maximimSelectorSize = maximumSelectorSize;
readBuffer = HelpUDT.newDirectIntBufer(maximumSelectorSize);
writeBuffer = HelpUDT.newDirectIntBufer(maximumSelectorSize);
sizeBuffer = HelpUDT.newDirectIntBufer(UDT_SIZE_COUNT);
* Enqueue cancel request.
protected void cancel(final SelectionKeyUDT keyUDT) {
terminatedKeyMap.putIfAbsent(keyUDT, keyUDT);
* Process pending cancel requests.
protected void doCancel() {
if (terminatedKeyMap.isEmpty()) {
final Iterator<SelectionKeyUDT> iterator = terminatedKeyMap.values()
while (iterator.hasNext()) {
final SelectionKeyUDT keyUDT =;
if (keyUDT.isValid()) {
* @param millisTimeout
* <0 : invinite; =0 : immediate; >0 : finite;
protected int doEpollEnter(final long millisTimeout) throws IOException {
if (!isOpen()) {
log.error("slector is closed");
throw new ClosedSelectorException();
try {
return doEpollExclusive(millisTimeout);
} finally {
* @param millisTimeout
* <0 : invinite;
* =0 : immediate;
* >0 : finite;
* @return
* <0 : should not happen
* =0 : means nothing was selected/timeout
* >0 : number of selected keys
protected int doEpollExclusive(final long millisTimeout) throws IOException {
try {
/** java.nio.Selector contract for wakeup() */
// begin();
/** pre select */
/** select proper */
/** post select */
} finally {
/** java.nio.Selector contract for wakeup() */
// end();
return selectedKeyMap.size();
* @param millisTimeout
* <0 : infinite
* =0 : immediate
* >0 : finite
protected int doEpollSelect(long millisTimeout) throws ExceptionUDT {
int readyCount = 0;
if (millisTimeout < 0) {
/** infinite: do select in slices; check for wakeup; */
do {
if (readyCount > 0 || wakeupIsPending()) {
} while (true);
} else if (millisTimeout > 0) {
/** finite: do select in slices; check for wakeup; count down */
do {
if (readyCount > 0 || wakeupIsPending()) {
} while (millisTimeout > 0);
} else {
/** immediate */
readyCount = doEpollSelectUDT(0);
return readyCount;
protected int doEpollSelectUDT(final long timeout) throws ExceptionUDT {
return SocketUDT.selectEpoll(//, //
readBuffer, //
writeBuffer, //
sizeBuffer, //
timeout //
protected void doResults() {
final int resultIndex = this.resultIndex++;
protected void doResultsRead(final int resultIndex) {
final int readSize = sizeBuffer.get(UDT_READ_INDEX);
for (int index = 0; index < readSize; index++) {
final int socketId = readBuffer.get(index);
final SelectionKeyUDT keyUDT = registeredKeyMap.get(socketId);
* Epoll will report closed socket once in both read and write sets.
* But selector consumer may cancel the key before close.
if (keyUDT == null) {
logSocketId("missing from read ", socketId);
if (keyUDT.doRead(resultIndex)) {
selectedKeyMap.putIfAbsent(keyUDT, keyUDT);
protected void doResultsWrite(final int resultIndex) {
final int writeSize = sizeBuffer.get(UDT_WRITE_INDEX);
for (int index = 0; index < writeSize; index++) {
final int socketId = writeBuffer.get(index);
final SelectionKeyUDT keyUDT = registeredKeyMap.get(socketId);
* Epoll will report closed socket once in both read and write sets.
* But selector consumer may cancel the key before close.
if (keyUDT == null) {
logSocketId("missing from write", socketId);
if (keyUDT.doWrite(resultIndex)) {
selectedKeyMap.putIfAbsent(keyUDT, keyUDT);
protected EpollUDT epollUDT() {
return epollUDT;
protected void implCloseSelector() throws IOException {
try {
for (final SelectionKeyUDT keyUDT : registeredKeyMap.values()) {
} finally {
public Set<SelectionKey> keys() {
if (!isOpen()) {
throw new ClosedSelectorException();
return (Set<SelectionKey>) registeredKeySet;
protected void logSocketId(final String title, final int socketId) {
if (log.isDebugEnabled()) {
log.debug("{} {}", title, String.format("[id: 0x%08x]", socketId));
protected SelectionKey register( //
final AbstractSelectableChannel channel, //
final int interestOps, //
final Object attachment //
) {
if (registeredKeyMap.size() >= maximimSelectorSize) {
log.error("reached maximimSelectorSize");
throw new IllegalSelectorException();
if (!(channel instanceof ChannelUDT)) {
log.error("!(channel instanceof ChannelUDT)");
throw new IllegalSelectorException();
final ChannelUDT channelUDT = (ChannelUDT) channel;
final Integer socketId = channelUDT.socketUDT().id();
SelectionKeyUDT keyUDT = registeredKeyMap.get(socketId);
if (keyUDT == null) {
keyUDT = new SelectionKeyUDT(this, channelUDT, attachment);
registeredKeyMap.putIfAbsent(socketId, keyUDT);
keyUDT = registeredKeyMap.get(socketId);
return keyUDT;
public int select() throws IOException {
return select(0);
public int select(final long timeout) throws IOException {
if (timeout < 0) {
throw new IllegalArgumentException("negative timeout");
} else if (timeout > 0) {
return doEpollEnter(timeout);
} else {
return doEpollEnter(SocketUDT.TIMEOUT_INFINITE);
public Set<SelectionKey> selectedKeys() {
if (!isOpen()) {
throw new ClosedSelectorException();
return (Set<SelectionKey>) selectedKeySet;
public int selectNow() throws IOException {
return doEpollEnter(SocketUDT.TIMEOUT_NONE);
public Selector wakeup() {
return this;
protected boolean wakeupIsPending() {
return wakeupBaseCount != wakeupStepCount;
protected void wakeupMarkBase() {
wakeupBaseCount = wakeupStepCount;

View File

@ -0,0 +1,168 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
import com.barchart.udt.anno.ThreadSafe;
* {@link ServerSocketChannel}-like wrapper for {@link SocketUDT} can be either
* stream or message oriented, depending on {@link TypeUDT}
* <p>
* you must use {@link SelectorProviderUDT#openServerSocketChannel()} to obtain
* instance of this class; do not use JDK
* {@link ServerSocketChannel#open()};
* <p>
* example:
* <pre>
* SelectorProvider provider = SelectorProviderUDT.DATAGRAM;
* ServerSocketChannel acceptChannel = provider.openServerSocketChannel();
* ServerSocket acceptSocket = acceptChannel.socket();
* InetSocketAddress acceptAddress = new InetSocketAddress(&quot;localhost&quot;, 12345);
* acceptorSocket.bind(acceptAddress);
* assert acceptSocket.isBound();
* SocketChannel connectChannel = acceptChannel.accept();
* assert connectChannel.isConnected();
* </pre>
public class ServerSocketChannelUDT extends ServerSocketChannel implements ChannelUDT {
protected static final Logger log = LoggerFactory
protected NioServerSocketUDT socketAdapter;
protected final SocketUDT socketUDT;
protected ServerSocketChannelUDT( //
final SelectorProviderUDT provider, //
final SocketUDT socketUDT //
) {
this.socketUDT = socketUDT;
public SocketChannelUDT accept() throws IOException {
try {
final SocketUDT clientUDT = socketUDT.accept();
if (clientUDT == null) {
return null;
} else {
return new SocketChannelUDT( //
providerUDT(), //
clientUDT, //
clientUDT.isConnected() //
} finally {
protected void implCloseSelectableChannel() throws IOException {
protected void implConfigureBlocking(final boolean block)
throws IOException {
public boolean isConnectFinished() {
return true;
public KindUDT kindUDT() {
return KindUDT.ACCEPTOR;
public SelectorProviderUDT providerUDT() {
return (SelectorProviderUDT) super.provider();
public ServerSocketChannel bind(final SocketAddress local, final int backlog) throws IOException {
throw new UnsupportedOperationException();
public <T> ServerSocketChannel setOption(final SocketOption<T> name, final T value) throws IOException {
throw new UnsupportedOperationException();
public <T> T getOption(final SocketOption<T> name) throws IOException {
throw new UnsupportedOperationException();
public Set<SocketOption<?>> supportedOptions() {
throw new UnsupportedOperationException();
public synchronized NioServerSocketUDT socket() {
if (socketAdapter == null) {
try {
socketAdapter = new NioServerSocketUDT(this);
} catch (final Exception e) {
log.error("failed to make socket", e);
return null;
return socketAdapter;
public SocketUDT socketUDT() {
return socketUDT;
public String toString() {
return socketUDT.toString();
public TypeUDT typeUDT() {
return providerUDT().type();
public SocketAddress getLocalAddress() throws IOException {
throw new UnsupportedOperationException();

View File

@ -0,0 +1,572 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.nio;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.barchart.udt.ExceptionUDT;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.TypeUDT;
import com.barchart.udt.anno.ThreadSafe;
* {@link SocketChannel}-like wrapper for {@link SocketUDT}, can be either
* stream or message oriented, depending on {@link TypeUDT}
* <p>
* The UDT socket that this SocketChannel wraps will be switched to blocking
* mode since this is the default for all SocketChannels on construction. If you
* require non-blocking functionality, you will need to call configureBlocking
* on the constructed SocketChannel class.
* <p>
* you must use {@link SelectorProviderUDT#openSocketChannel()} to obtain
* instance of this class; do not use JDK
* {@link SocketChannel#open()};
* <p>
* example:
* <pre>
* SelectorProvider provider = SelectorProviderUDT.DATAGRAM;
* SocketChannel clientChannel = provider.openSocketChannel();
* clientChannel.configureBlocking(true);
* Socket clientSocket = clientChannel.socket();
* InetSocketAddress clientAddress = new InetSocketAddress(&quot;localhost&quot;, 10000);
* clientSocket.bind(clientAddress);
* assert clientSocket.isBound();
* InetSocketAddress serverAddress = new InetSocketAddress(&quot;localhost&quot;, 12345);
* clientChannel.connect(serverAddress);
* assert clientSocket.isConnected();
* </pre>
public class SocketChannelUDT extends SocketChannel implements ChannelUDT {
protected static final Logger log = LoggerFactory
protected final Object connectLock = new Object();
* local volatile variable, which mirrors super.blocking, to avoid the cost
* of synchronized call inside isBlocking()
protected volatile boolean isBlockingMode = isBlocking();
protected volatile boolean isConnectFinished;
protected volatile boolean isConnectionPending;
protected NioSocketUDT socketAdapter;
protected final SocketUDT socketUDT;
private SocketAddress localAddress;
private InetSocketAddress remoteSocket;
protected SocketChannelUDT( //
final SelectorProviderUDT provider, //
final SocketUDT socketUDT //
) throws ExceptionUDT {
this.socketUDT = socketUDT;
protected SocketChannelUDT( //
final SelectorProviderUDT provider, //
final SocketUDT socketUDT, //
final boolean isConnected //
) throws ExceptionUDT {
this(provider, socketUDT);
if (isConnected) {
isConnectFinished = true;
isConnectionPending = false;
} else {
isConnectFinished = false;
isConnectionPending = true;
public boolean connect(final SocketAddress remote) throws IOException {
if (!isOpen()) {
throw new ClosedChannelException();
if (isConnected()) {
log.warn("already connected; ignoring remote={}", remote);
return true;
if (remote == null) {
log.error("remote == null");
throw new NullPointerException();
remoteSocket = (InetSocketAddress) remote;
if (remoteSocket.isUnresolved()) {
log.error("can not use unresolved address: remote={}", remote);
throw new UnresolvedAddressException();
if (isBlocking()) {
synchronized (connectLock) {
try {
if (isConnectionPending) {
throw new ConnectionPendingException();
isConnectionPending = true;
} finally {
isConnectionPending = false;
return socketUDT.isConnected();
} else {
/** non Blocking */
if (!isRegistered()) {
/** this channel is independent of any selector */
log.error("UDT channel is in NON blocking mode; "
+ "must register with a selector " //
+ "before trying to connect(); " //
+ "socketId=" +;
throw new IllegalBlockingModeException();
/** this channel is registered with a selector */
synchronized (connectLock) {
if (isConnectionPending) {
log.error("connection already in progress");
throw new ConnectionPendingException();
isConnectFinished = false;
isConnectionPending = true;
* connection operation must later be completed by invoking the
* #finishConnect() method.
return false;
public boolean finishConnect() throws IOException {
if (!isOpen()) {
throw new ClosedChannelException();
if (isBlocking()) {
synchronized (connectLock) {
while (isConnectionPending) {
try {
} catch (final InterruptedException e) {
throw new IOException(e);
if (isConnected()) {
isConnectFinished = true;
isConnectionPending = false;
return true;
} else {
log.error("connect failure : {}", socketUDT);
throw new IOException();
public SocketAddress getRemoteAddress() throws IOException {
if(this.isConnectFinished()) {
return remoteSocket;
return null;
protected void implCloseSelectableChannel() throws IOException {
protected void implConfigureBlocking(final boolean block)
throws IOException {
isBlockingMode = block;
public boolean isConnected() {
return socketUDT.isConnected();
public boolean isConnectFinished() {
return isConnectFinished;
public boolean isConnectionPending() {
return isConnectionPending;
public KindUDT kindUDT() {
public SelectorProviderUDT providerUDT() {
return (SelectorProviderUDT) super.provider();
* See {@link SocketChannel#read(ByteBuffer)} contract;
* note: this method does not return (-1) as EOS (end of stream flag)
* @return <code><0</code> should not happen<br>
* <code>=0</code> blocking mode: timeout occurred on receive<br>
* <code>=0</code> non-blocking mode: nothing is received by the
* underlying UDT socket<br>
* <code>>0</code> actual bytes received count<br>
* @see SocketUDT#receive(ByteBuffer)
* @see SocketUDT#receive(byte[], int, int)
public int read(final ByteBuffer buffer) throws IOException {
final int remaining = buffer.remaining();
if (remaining <= 0) {
return 0;
final SocketUDT socket = socketUDT;
final boolean isBlocking = isBlockingMode;
final int sizeReceived;
try {
if (isBlocking) {
begin(); // JDK contract for NIO blocking calls
if (buffer.isDirect()) {
sizeReceived = socket.receive(buffer);
} else {
final byte[] array = buffer.array();
final int position = buffer.position();
final int limit = buffer.limit();
sizeReceived = socket.receive(array, position, limit);
if (0 < sizeReceived && sizeReceived <= remaining) {
buffer.position(position + sizeReceived);
} finally {
if (isBlocking) {
end(true); // JDK contract for NIO blocking calls
// see contract for receive()
if (sizeReceived < 0) {
// log.trace("nothing was received; socket={}", socket);
return 0;
if (sizeReceived == 0) {
// log.trace("receive timeout; socket={}", socket);
return 0;
if (sizeReceived <= remaining) {
return sizeReceived;
} else {
log.error("should not happen: socket={}", socket);
return 0;
public long read(final ByteBuffer[] dsts, final int offset, final int length)
throws IOException {
throw new RuntimeException("feature not available");
public synchronized NioSocketUDT socket() {
if (socketAdapter == null) {
try {
socketAdapter = new NioSocketUDT(this);
} catch (final ExceptionUDT e) {
log.error("failed to make socket", e);
return socketAdapter;
public SocketUDT socketUDT() {
return socketUDT;
public String toString() {
return socketUDT.toString();
* See {@link SocketChannel#write(ByteBuffer)} contract;
* @return <code><0</code> should not happen<br>
* <code>=0</code> blocking mode: timeout occurred on send<br>
* <code>=0</code> non-blocking mode: buffer is full in the
* underlying UDT socket; nothing is sent<br>
* <code>>0</code> actual bytes sent count<br>
* @see SocketUDT#send(ByteBuffer)
* @see SocketUDT#send(byte[], int, int)
public int write(final ByteBuffer buffer) throws IOException {
// writeCount.incrementAndGet();
if (buffer == null) {
throw new NullPointerException("buffer == null");
final int remaining = buffer.remaining();
if (remaining <= 0) {
return 0;
final SocketUDT socket = socketUDT;
final boolean isBlocking = isBlockingMode;
int sizeSent = 0;
int ret = 0;
try {
if (isBlocking) {
begin(); // JDK contract for NIO blocking calls
if (buffer.isDirect()) {
do {
ret = socket.send(buffer);
if (ret > 0)
sizeSent += ret;
} while (buffer.hasRemaining() && isBlocking);
} else {
final byte[] array = buffer.array();
int position = buffer.position();
final int limit = buffer.limit();
do {
ret = socket.send(array, position, limit);
if (0 < ret && ret <= remaining) {
sizeSent += ret;
position += ret;
} while (buffer.hasRemaining() && isBlocking);
} finally {
if (isBlocking) {
end(true); // JDK contract for NIO blocking calls
// see contract for send()
if (ret < 0) {
// log.trace("no buffer space; socket={}", socket);
return 0;
if (ret == 0) {
// log.trace("send timeout; socket={}", socket);
return 0;
if (sizeSent <= remaining) {
return sizeSent;
} else {
log.error("should not happen; socket={}", socket);
return 0;
public long write(final ByteBuffer[] bufferArray, final int offset,
final int length) throws IOException {
try {
long total = 0;
for (int index = offset; index < offset + length; index++) {
final ByteBuffer buffer = bufferArray[index];
final int remaining = buffer.remaining();
final int processed = write(buffer);
if (remaining == processed) {
total += processed;
} else {
throw new IllegalStateException(
"failed to write buffer in array");
return total;
} catch (final Throwable e) {
throw new IOException("failed to write buffer array", e);
public TypeUDT typeUDT() {
return providerUDT().type();
/** java 7 */
public SocketChannelUDT bind(final SocketAddress localAddress)
throws IOException {
socketUDT.bind((InetSocketAddress) localAddress);
return this;
public <T> SocketChannel setOption(final SocketOption<T> name, final T value) throws IOException {
throw new UnsupportedOperationException();
public <T> T getOption(final SocketOption<T> name) throws IOException {
throw new UnsupportedOperationException();
public Set<SocketOption<?>> supportedOptions() {
throw new UnsupportedOperationException();
public SocketChannel shutdownInput() throws IOException {
throw new UnsupportedOperationException();
public SocketChannel shutdownOutput() throws IOException {
throw new UnsupportedOperationException();
public SocketAddress getLocalAddress() throws IOException {
if(this.isConnected()) {
return localAddress;
return null;

View File

@ -0,0 +1,140 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.util;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.Collection;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.barchart.udt.EpollUDT;
* miscellaneous utilities
public class HelpUDT {
protected static final Logger log = LoggerFactory.getLogger(EpollUDT.class);
public static long md5sum(final String text) {
final byte[] defaultBytes = text.getBytes();
try {
final MessageDigest algorithm = MessageDigest.getInstance("MD5");
final byte digest[] = algorithm.digest();
final ByteBuffer buffer = ByteBuffer.wrap(digest);
return buffer.getLong();
} catch (final NoSuchAlgorithmException e) {
log.error("md5 failed", e);
return 0;
* direct integer buffer with proper native byte order
public static final IntBuffer newDirectIntBufer(final int capacity) {
/** java int is 4 bytes */
return ByteBuffer. //
allocateDirect(capacity * 4). //
order(ByteOrder.nativeOrder()). //
public static <E> Set<E> ungrowableSet(final Set<E> set) {
return new UngrowableSet<E>(set);
public static <E> Set<E> unmodifiableSet(final Collection<E> values) {
return new UnmodifiableSet<E>(values);
private HelpUDT() {
public static final void checkBuffer(final ByteBuffer buffer) {
if (buffer == null) {
throw new IllegalArgumentException("buffer == null");
if (!buffer.isDirect()) {
throw new IllegalArgumentException("must use DirectByteBuffer");
public static final void checkArray(final byte[] array) {
if (array == null) {
throw new IllegalArgumentException("array == null");
public static String constantFieldName(final Class<?> klaz,
final Object instance) {
final Field[] filedArray = klaz.getDeclaredFields();
for (final Field field : filedArray) {
final int modifiers = field.getModifiers();
final boolean isConstant = true && //
Modifier.isPublic(modifiers) && //
Modifier.isStatic(modifiers) && //
Modifier.isFinal(modifiers) //
if (isConstant) {
try {
if (instance == field.get(null)) {
return field.getName();
} catch (final Throwable e) {
log.debug("", e);
return "unknown";
public static void checkSocketAddress(final InetSocketAddress socketAddress) {
if (socketAddress == null) {
throw new IllegalArgumentException("socketAddress can't be null");
/** can not use in JNI ; internal InetAddress field is null */
if (socketAddress.isUnresolved()) {
throw new IllegalArgumentException("socketAddress is unresolved : "
+ socketAddress + " : check your DNS settings");

View File

@ -0,0 +1,102 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.util;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
class UngrowableSet<E> implements Set<E> {
private final Set<E> set;
UngrowableSet(final Set<E> set) {
this.set = set;
public boolean add(final E o) {
throw new UnsupportedOperationException();
public boolean addAll(final Collection<? extends E> coll) {
throw new UnsupportedOperationException();
public void clear() {
public boolean contains(final Object o) {
return set.contains(o);
public boolean containsAll(final Collection<?> coll) {
return set.containsAll(coll);
public boolean equals(final Object o) {
return set.equals(o);
public int hashCode() {
return set.hashCode();
public boolean isEmpty() {
return set.isEmpty();
public Iterator<E> iterator() {
return set.iterator();
public boolean remove(final Object o) {
return set.remove(o);
public boolean removeAll(final Collection<?> coll) {
return set.removeAll(coll);
public boolean retainAll(final Collection<?> coll) {
return set.retainAll(coll);
public int size() {
return set.size();
public Object[] toArray() {
return set.toArray();
public <T> T[] toArray(final T[] a) {
return set.toArray(a);
public String toString() {
return set.toString();

View File

@ -0,0 +1,87 @@
* Copyright (C) 2009-2013 Barchart, Inc. <>
* All rights reserved. Licensed under the OSI BSD License.
package com.barchart.udt.util;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
class UnmodifiableSet<E> implements Set<E> {
private final Collection<E> values;
UnmodifiableSet(final Collection<E> values) {
this.values = values;
public boolean add(final E e) {
throw new UnsupportedOperationException();
public boolean addAll(final Collection<? extends E> c) {
throw new UnsupportedOperationException();
public void clear() {
throw new UnsupportedOperationException();
public boolean contains(final Object o) {
return values.contains(o);
public boolean containsAll(final Collection<?> c) {
return values.containsAll(c);
public boolean isEmpty() {
return values.isEmpty();
public Iterator<E> iterator() {
return values.iterator();
public boolean remove(final Object o) {
throw new UnsupportedOperationException();
public boolean removeAll(final Collection<?> c) {
throw new UnsupportedOperationException();
public boolean retainAll(final Collection<?> c) {
throw new UnsupportedOperationException();
public int size() {
return values.size();
public Object[] toArray() {
return values.toArray();
public <T> T[] toArray(final T[] a) {
return values.toArray(a);