From e01a42db36d88e40bcee13cb3e1847bff82afc5a Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 26 Jun 2015 01:17:32 +0200 Subject: [PATCH] Implemented flush --- src/api.cpp | 33 +++++++++++++++ src/api.h | 1 + src/core.cpp | 57 ++++++++++++++------------ src/core.h | 10 +++++ src/jni/com_barchart_udt_SocketUDT.cpp | 19 +++++++++ src/udt.h | 1 + 6 files changed, 95 insertions(+), 26 deletions(-) diff --git a/src/api.cpp b/src/api.cpp index 72e7ecc..ba21abf 100644 --- a/src/api.cpp +++ b/src/api.cpp @@ -803,6 +803,16 @@ void CUDTUnited::connect_complete(const UDTSOCKET u) s->m_Status = CONNECTED; } +int CUDTUnited::flush(const UDTSOCKET u) { + CUDTSocket* s = locate(u); + if (NULL == s) + throw CUDTException(5, 4, 0); + + s->m_pUDT->flush(); + + return 0; +} + int CUDTUnited::close(const UDTSOCKET u) { CUDTSocket* s = locate(u); @@ -1717,6 +1727,24 @@ int CUDT::connect(UDTSOCKET u, const sockaddr* name, int namelen) } } +int CUDT::flush(UDTSOCKET u) +{ + try + { + return s_UDTUnited.flush(u); + } + catch (CUDTException e) + { + s_UDTUnited.setError(new CUDTException(e)); + return ERROR; + } + catch (...) + { + s_UDTUnited.setError(new CUDTException(-1, 0, 0)); + return ERROR; + } +} + int CUDT::close(UDTSOCKET u) { try @@ -2259,6 +2287,11 @@ int connect(UDTSOCKET u, const struct sockaddr* name, int namelen) return CUDT::connect(u, name, namelen); } +int flush(UDTSOCKET u) +{ + return CUDT::flush(u); +} + int close(UDTSOCKET u) { return CUDT::close(u); diff --git a/src/api.h b/src/api.h index faf2f8f..349bb6c 100644 --- a/src/api.h +++ b/src/api.h @@ -168,6 +168,7 @@ public: int listen(const UDTSOCKET u, int backlog); UDTSOCKET accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen); int connect(const UDTSOCKET u, const sockaddr* name, int namelen); + int flush(const UDTSOCKET u); int close(const UDTSOCKET u); int getpeername(const UDTSOCKET u, sockaddr* name, int* namelen); int getsockname(const UDTSOCKET u, sockaddr* name, int* namelen); diff --git a/src/core.cpp b/src/core.cpp index a5acfa7..3c120ab 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -910,6 +910,36 @@ void CUDT::connect(const sockaddr* peer, CHandShake* hs) delete [] buffer; } +void CUDT::flush() { + + uint64_t entertime = CTimer::getTime(); + + while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL)) + { + // linger has been checked by previous close() call and has expired + if (m_ullLingerExpiration >= entertime) + break; + + if (!m_bSynSending) + { + // if this socket enables asynchronous sending, return immediately and let GC to close it later + if (0 == m_ullLingerExpiration) + m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL; + + return; + } + + #ifndef WINDOWS + timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1000000; + nanosleep(&ts, NULL); + #else + Sleep(1); + #endif + } +} + void CUDT::close() { if (!m_bOpened) @@ -917,32 +947,7 @@ void CUDT::close() if (0 != m_Linger.l_onoff) { - uint64_t entertime = CTimer::getTime(); - - while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL)) - { - // linger has been checked by previous close() call and has expired - if (m_ullLingerExpiration >= entertime) - break; - - if (!m_bSynSending) - { - // if this socket enables asynchronous sending, return immediately and let GC to close it later - if (0 == m_ullLingerExpiration) - m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL; - - return; - } - - #ifndef WINDOWS - timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 1000000; - nanosleep(&ts, NULL); - #else - Sleep(1); - #endif - } + flush(); } // remove this socket from the snd queue diff --git a/src/core.h b/src/core.h index 3f1f376..e10562b 100644 --- a/src/core.h +++ b/src/core.h @@ -84,6 +84,7 @@ public: //API static int listen(UDTSOCKET u, int backlog); static UDTSOCKET accept(UDTSOCKET u, sockaddr* addr, int* addrlen); static int connect(UDTSOCKET u, const sockaddr* name, int namelen); + static int flush(UDTSOCKET u); static int close(UDTSOCKET u); static int getpeername(UDTSOCKET u, sockaddr* name, int* namelen); static int getsockname(UDTSOCKET u, sockaddr* name, int* namelen); @@ -163,6 +164,15 @@ private: void connect(const sockaddr* peer, CHandShake* hs); + // Functionality: + // Flush all buffered data. + // Parameters: + // None. + // Returned value: + // None. + + void flush(); + // Functionality: // Close the opened UDT entity. // Parameters: diff --git a/src/jni/com_barchart_udt_SocketUDT.cpp b/src/jni/com_barchart_udt_SocketUDT.cpp index 794f65d..7f56b57 100644 --- a/src/jni/com_barchart_udt_SocketUDT.cpp +++ b/src/jni/com_barchart_udt_SocketUDT.cpp @@ -706,6 +706,25 @@ JNIEXPORT void JNICALL Java_com_barchart_udt_SocketUDT_bind0( // } +JNIEXPORT void JNICALL Java_com_barchart_udt_SocketUDT_flush0( // + JNIEnv * const env, // + const jobject self // + ) { + + const jint socketID = UDT_GetSocketID(env, self); + + const int rv = UDT::flush(socketID); + + if (rv == UDT::ERROR) { + UDT::ERRORINFO errorInfo = UDT::getlasterror(); + UDT_ThrowExceptionUDT_ErrorInfo( // + env, socketID, "flush0:flush", &errorInfo); + return; + } + +} + + JNIEXPORT void JNICALL Java_com_barchart_udt_SocketUDT_close0( // JNIEnv * const env, // const jobject self // diff --git a/src/udt.h b/src/udt.h index b8075fd..9c9ae89 100644 --- a/src/udt.h +++ b/src/udt.h @@ -281,6 +281,7 @@ UDT_API int bind2(UDTSOCKET u, UDPSOCKET udpsock); UDT_API int listen(UDTSOCKET u, int backlog); UDT_API UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen); UDT_API int connect(UDTSOCKET u, const struct sockaddr* name, int namelen); +UDT_API int flush(UDTSOCKET u); UDT_API int close(UDTSOCKET u); UDT_API int getpeername(UDTSOCKET u, struct sockaddr* name, int* namelen); UDT_API int getsockname(UDTSOCKET u, struct sockaddr* name, int* namelen);