[haiku-commits] BRANCH hamishm-github.eventqueue [769322864f19] headers/os/net src/kits/network/libnetapi

  • From: hamishm-github.eventqueue <community@xxxxxxxxxxxx>
  • To: haiku-commits@xxxxxxxxxxxxx
  • Date: Sun, 18 Oct 2015 14:47:00 +0200 (CEST)

added 2 changesets to branch 'refs/remotes/hamishm-github/eventqueue'
old head: 21051029a4213e139aab70e9f1507b85cb072759
new head: 769322864f195a9bf40c5b9f9324566cc48db70f
overview: https://github.com/hamishm/haiku/compare/21051029a421...769322864f19

----------------------------------------------------------------------------

667af3ac4928: libnetapi: add EventLoop class for waiting on I/O events

* EventDispatcher wraps the event_queue system calls to provide a
callback-based interface for monitoring file descriptors, ports, etc.
* Users register interest in events using the WaitFor methods, and
run the event loop using the RunOnce method.
* One can also schedule arbitrary functions to run on the event loop
thread using the Execute and ExecuteAt methods.

769322864f19: libnetapi: add set of asynchronous socket classes

* BaseSocket provides a base set of functionality for all sockets.
* ServerSocket provides an asynchronous listening socket, and
StreamSocket provides an asynchronous stream socket.
* Instantiations are provided for TCP and UNIX sockets.

[ Hamish Morrison <hamishm53@xxxxxxxxx> ]

----------------------------------------------------------------------------

10 files changed, 1129 insertions(+)
headers/os/net/BaseSocket.h | 205 +++++++++++++++
headers/os/net/EventLoop.h | 133 ++++++++++
headers/os/net/Protocols.h | 37 +++
headers/os/net/ServerProtocols.h | 40 +++
headers/os/net/ServerSocket.h | 204 +++++++++++++++
headers/os/net/StreamSocket.h | 333 ++++++++++++++++++++++++
src/kits/network/libnetapi/EventLoop.cpp | 129 +++++++++
src/kits/network/libnetapi/Jamfile | 13 +
src/kits/network/libnetapi/ServerSocket.cpp | 17 ++
src/kits/network/libnetapi/StreamSocket.cpp | 18 ++

############################################################################

Commit: 667af3ac49284460fb2ee8f076d97791eada8805
Author: Hamish Morrison <hamishm53@xxxxxxxxx>
Date: Sat Oct 17 22:11:36 2015 UTC

libnetapi: add EventLoop class for waiting on I/O events

* EventDispatcher wraps the event_queue system calls to provide a
callback-based interface for monitoring file descriptors, ports, etc.
* Users register interest in events using the WaitFor methods, and
run the event loop using the RunOnce method.
* One can also schedule arbitrary functions to run on the event loop
thread using the Execute and ExecuteAt methods.

----------------------------------------------------------------------------

diff --git a/headers/os/net/EventLoop.h b/headers/os/net/EventLoop.h
new file mode 100644
index 0000000..992283a
--- /dev/null
+++ b/headers/os/net/EventLoop.h
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2015, Haiku, Inc. All rights reserved.
+ * Distributed under the terms of the MIT License.
+ */
+#ifndef _EVENT_LOOP_H
+#define _EVENT_LOOP_H
+
+#include <OS.h>
+
+#include <deque>
+#include <functional>
+#include <utility>
+#include <vector>
+
+
+namespace io {
+
+
+typedef std::function<void (int)> EventCallback;
+
+
+class EventLoop {
+public:
+ EventLoop();
+ virtual ~EventLoop();
+
+ status_t RunOnce();
+
+ status_t WaitForFD(int fd, int events,
EventCallback& callback,
+ bool oneShot = true);
+
+ status_t WaitForPort(port_id port, int events,
+ EventCallback&
callback, bool oneShot = true);
+
+ status_t WaitForSemaphore(sem_id semaphore, int
events,
+ EventCallback&
callback, bool oneShot = true);
+
+ status_t WaitForThread(thread_id thread, int
events,
+ EventCallback&
callback, bool oneShot = true);
+
+ template<typename F>
+ void ExecuteLater(F&& function);
+
+ template<typename F>
+ void ExecuteAt(F&& function, bigtime_t time);
+
+private:
+ typedef std::function<void ()> Function;
+
+ status_t _WaitForObject(int32 object, uint16
type, uint16 events,
+ EventCallback&
callback, bool oneShot = true);
+
+ void _ExecuteAt(Function&& fn, bigtime_t
time);
+
+ bigtime_t _DetermineTimeout();
+ void _DispatchTimers();
+ void _DispatchWork();
+
+private:
+ struct timer {
+ bigtime_t expiration;
+ Function function;
+ bool operator<(const timer& other) const
+ {
+ return expiration < other.expiration;
+ }
+ };
+
+ struct timer_comparator;
+
+ std::deque<Function> fWorkQueue;
+ std::vector<timer> fTimers;
+ int fEventQueue;
+};
+
+
+template<typename F>
+inline void
+EventLoop::ExecuteLater(F&& function)
+{
+ fWorkQueue.emplace_back(std::forward<F>(function));
+
+ // TODO: should wake up event loop
+}
+
+
+template<typename F>
+inline void
+EventLoop::ExecuteAt(F&& function, bigtime_t time)
+{
+ Function fn{std::forward<F>(function)};
+ _ExecuteAt(std::move(fn), time);
+}
+
+
+inline status_t
+EventLoop::WaitForFD(int fd, int events, EventCallback& callback, bool oneShot)
+{
+ return _WaitForObject(fd, B_OBJECT_TYPE_FD, events, callback, oneShot);
+
+}
+
+
+inline status_t
+EventLoop::WaitForPort(port_id port, int events, EventCallback& callback,
+ bool oneShot)
+{
+ return _WaitForObject(port, B_OBJECT_TYPE_PORT, events, callback,
oneShot);
+}
+
+
+inline status_t
+EventLoop::WaitForSemaphore(sem_id semaphore, int events,
+ EventCallback& callback, bool oneShot)
+{
+ return _WaitForObject(semaphore, B_OBJECT_TYPE_SEMAPHORE, events,
callback,
+ oneShot);
+}
+
+
+inline status_t
+EventLoop::WaitForThread(thread_id thread, int events, EventCallback& callback,
+ bool oneShot)
+{
+ return _WaitForObject(thread, B_OBJECT_TYPE_THREAD, events, callback,
+ oneShot);
+}
+
+
+}
+
+
+#endif // _EVENT_LOOP_H
diff --git a/src/kits/network/libnetapi/EventLoop.cpp
b/src/kits/network/libnetapi/EventLoop.cpp
new file mode 100644
index 0000000..67310ea
--- /dev/null
+++ b/src/kits/network/libnetapi/EventLoop.cpp
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2015, Hamish Morrison, hamishm53@xxxxxxxxx.
+ * All rights reserved. Distributed under the terms of the MIT License.
+ */
+
+#include <EventLoop.h>
+
+#include <OS.h>
+
+#include <algorithm>
+#include <system_error>
+
+
+namespace io {
+
+
+EventLoop::EventLoop()
+{
+ fEventQueue = event_queue_create(O_CLOEXEC);
+ if (fEventQueue < 0) {
+ throw std::system_error(fEventQueue, std::system_category());
+ }
+}
+
+
+EventLoop::~EventLoop()
+{
+ close(fEventQueue);
+}
+
+
+void
+EventLoop::_DispatchWork()
+{
+ for (auto const& work : fWorkQueue) {
+ work();
+ }
+ fWorkQueue.clear();
+}
+
+
+void
+EventLoop::_DispatchTimers()
+{
+ bigtime_t current = real_time_clock_usecs();
+
+ while (!fTimers.empty()) {
+ if (fTimers[0].expiration > current)
+ break;
+
+ fTimers[0].function();
+
+ std::pop_heap(fTimers.begin(), fTimers.end());
+ fTimers.pop_back();
+ }
+}
+
+
+bigtime_t
+EventLoop::_DetermineTimeout()
+{
+ if (fTimers.empty())
+ return B_INFINITE_TIMEOUT;
+
+ return fTimers[0].expiration;
+}
+
+
+status_t
+EventLoop::RunOnce()
+{
+ static const int EVENTS_TO_READ = 50;
+
+ _DispatchWork();
+ _DispatchTimers();
+
+ bigtime_t timeout = _DetermineTimeout();
+
+ event_wait_info infos[EVENTS_TO_READ];
+
+ ssize_t result = event_queue_wait(fEventQueue, infos, EVENTS_TO_READ,
+ B_ABSOLUTE_REAL_TIME_TIMEOUT, timeout);
+
+ if (result < B_OK)
+ return result;
+
+ for (ssize_t i = 0; i < result; i++) {
+ int32 events = infos[i].events;
+
+ EventCallback* wrapper = (EventCallback*)infos[i].user_data;
+ (*wrapper)(events);
+ }
+
+ return result;
+}
+
+
+status_t
+EventLoop::_WaitForObject(int32 object, uint16 type, uint16 events,
+ EventCallback& callback, bool oneShot)
+{
+ event_wait_info info;
+ info.object = object;
+ info.type = type;
+ info.events = events | B_EVENT_SELECT | (oneShot ? B_EVENT_ONE_SHOT :
0);
+ info.user_data = (void*)&callback;
+
+ status_t result = event_queue_select(fEventQueue, &info, 1);
+
+ // Somewhat ugly: if the error is 'B_ERROR' then the error is stored in
the
+ // events field of the event_wait_info.
+ if (result == B_ERROR)
+ return info.events;
+ else if (result != B_OK)
+ return result;
+
+ return B_OK;
+}
+
+
+void
+EventLoop::_ExecuteAt(Function&& function, bigtime_t time)
+{
+ fTimers.push_back({ time, function });
+ std::push_heap(fTimers.begin(), fTimers.end());
+}
+
+
+}
diff --git a/src/kits/network/libnetapi/Jamfile
b/src/kits/network/libnetapi/Jamfile
index a0d606c..aa6826d 100644
--- a/src/kits/network/libnetapi/Jamfile
+++ b/src/kits/network/libnetapi/Jamfile
@@ -36,6 +36,15 @@ for architectureObject in [ MultiArchSubDirSetup ] {
SetupFeatureObjectsDir no-ssl ;
}

+ if $(TARGET_PACKAGING_ARCH) != x86_gcc2 {
+ SubDirC++Flags -std=c++1y ;
+
+ asyncSources =
+ EventLoop.cpp ;
+ } else {
+ asyncSources = ;
+ }
+
# BUrl uses ICU to perform IDNA conversions (unicode domain
names)
UseBuildFeatureHeaders icu ;
Includes [ FGristFiles Url.cpp ]
@@ -67,6 +76,8 @@ for architectureObject in [ MultiArchSubDirSetup ] {
Socket.cpp
SecureSocket.cpp

+ $(asyncSources)
+
# TODO: another add-on for file:// (a much simpler one)
FileRequest.cpp


############################################################################

Commit: 769322864f195a9bf40c5b9f9324566cc48db70f
Author: Hamish Morrison <hamishm53@xxxxxxxxx>
Date: Sat Oct 17 20:55:52 2015 UTC

libnetapi: add set of asynchronous socket classes

* BaseSocket provides a base set of functionality for all sockets.
* ServerSocket provides an asynchronous listening socket, and
StreamSocket provides an asynchronous stream socket.
* Instantiations are provided for TCP and UNIX sockets.

----------------------------------------------------------------------------

diff --git a/headers/os/net/BaseSocket.h b/headers/os/net/BaseSocket.h
new file mode 100644
index 0000000..fefac22
--- /dev/null
+++ b/headers/os/net/BaseSocket.h
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2015, Haiku, Inc. All Rights Reserved.
+ * Distributed under the terms of the MIT License.
+ */
+#ifndef _BASE_SOCKET_H
+#define _BASE_SOCKET_H
+
+#include <fcntl.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+
+#include <system_error>
+
+
+namespace io {
+
+
+/*!
+ \brief Base class for sockets.
+
+ This provides the socket operations common to all socket types. The
+ semantics of the operations are similar to their POSIX counterparts.
+
+ All functions return 0 on success, or a POSIX error number otherwise.
+
+ The Protocol template parameter must provide AddressType and SocketType
+ typedefs suitable for the concrete socket type, along with methods
+ Family(), Type() and Protocol() to provide parameters for the open()
call.
+*/
+template<typename Protocol>
+class BaseSocket {
+public:
+ typedef typename Protocol::AddressType AddressType;
+
+ /*!
+ Create an unopened socket.
+ */
+ BaseSocket();
+
+ /*!
+ Create and open a socket using the provided protocol.
+ */
+ explicit BaseSocket(Protocol protocol);
+
+ /*!
+ Create a socket, adopting the provided socket descriptor.
+ */
+ explicit BaseSocket(int socket);
+
+ /*!
+ Destroy the socket, closing the underlying descriptor if one is
+ open.
+ */
+ virtual ~BaseSocket();
+
+ /*!
+ Adopt the provided socket descriptor. The behaviour is
undefined if
+ an invalid or non-socket file descriptor is passed.
+ */
+ void Adopt(int socket);
+ /*!
+ Open a socket descriptor for the given protocol.
+ */
+
+ int Open(Protocol protocol);
+ /*!
+ Close the underlying socket descriptor.
+ */
+ int Close();
+
+ /*!
+ Bind the socket to the given address.
+ */
+ int Bind(const AddressType& address);
+
+ /*!
+ Set or unset non-blocking mode on the socket.
+ */
+ int SetNonBlocking(bool nonBlocking);
+
+ /*!
+ Get the error of the most recent socket operation. The error
code
+ will be reset after this call.
+ */
+ int Error();
+
+protected:
+ int fSocket;
+};
+
+
+template<typename Protocol>
+inline
+BaseSocket<Protocol>::BaseSocket()
+ :
+ fSocket(-1)
+{
+}
+
+
+template<typename Protocol>
+inline
+BaseSocket<Protocol>::BaseSocket(Protocol protocol)
+{
+ int result = Open(protocol);
+ if (result != 0) {
+ throw std::system_error(result, std::system_category());
+ }
+}
+
+
+template<typename Protocol>
+inline
+BaseSocket<Protocol>::BaseSocket(int socket)
+ :
+ fSocket(socket)
+{
+}
+
+
+template<typename Protocol>
+BaseSocket<Protocol>::~BaseSocket()
+{
+ if (fSocket != -1) {
+ Close();
+ }
+}
+
+
+template<typename Protocol>
+inline void
+BaseSocket<Protocol>::Adopt(int socket)
+{
+ fSocket = socket;
+}
+
+
+template<typename Protocol>
+inline int
+BaseSocket<Protocol>::Open(Protocol protocol)
+{
+ fSocket = socket(protocol.Family(), protocol.Type(),
protocol.Protocol());
+ if (fSocket == -1) {
+ return errno;
+ }
+ return 0;
+}
+
+
+template<typename Protocol>
+inline int
+BaseSocket<Protocol>::Close()
+{
+ int result = close(fSocket);
+ if (result != 0) {
+ return errno;
+ }
+ return 0;
+}
+
+
+template<typename Protocol>
+inline int
+BaseSocket<Protocol>::SetNonBlocking(bool nonBlocking)
+{
+ int option = nonBlocking;
+ int result = ioctl(fSocket, FIONBIO, &option);
+ if (result != 0) {
+ return errno;
+ }
+ return 0;
+}
+
+
+template<typename Protocol>
+inline int
+BaseSocket<Protocol>::Bind(const AddressType& address)
+{
+ int result = bind(fSocket, &address.SockAddr(), address.Length());
+ if (result == -1) {
+ return errno;
+ }
+ return 0;
+}
+
+
+template<typename Protocol>
+inline int
+BaseSocket<Protocol>::Error()
+{
+ int error;
+ socklen_t length = sizeof(error);
+ int result = getsockopt(fSocket, SOL_SOCKET, SO_ERROR, &error, &length);
+
+ if (result == 0) {
+ return error;
+ }
+ return errno;
+}
+
+
+}
+
+
+#endif // _BASE_SOCKET_H
diff --git a/headers/os/net/Protocols.h b/headers/os/net/Protocols.h
new file mode 100644
index 0000000..be846e8
--- /dev/null
+++ b/headers/os/net/Protocols.h
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2015, Hamish Morrison, hamishm53@xxxxxxxxx.
+ * All rights reserved. Distributed under the terms of the MIT License.
+ */
+#ifndef _PROTOCOLS_H
+#define _PROTOCOLS_H
+
+#include <NetworkAddress.h>
+
+
+namespace io {
+
+
+class TCPProtocol {
+public:
+ typedef BNetworkAddress AddressType;
+
+ int Family() const { return AF_INET; }
+ int Type() const { return SOCK_STREAM; }
+ int Protocol() const { return 0; }
+};
+
+
+class UNIXProtocol {
+public:
+ typedef BNetworkAddress AddressType;
+
+ int Family() const { return AF_UNIX; }
+ int Type() const { return SOCK_STREAM; }
+ int Protocol() const { return 0; }
+};
+
+
+}
+
+
+#endif // _PROTOCOLS_H
diff --git a/headers/os/net/ServerProtocols.h b/headers/os/net/ServerProtocols.h
new file mode 100644
index 0000000..3afbfe5
--- /dev/null
+++ b/headers/os/net/ServerProtocols.h
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2015, Hamish Morrison, hamishm53@xxxxxxxxx.
+ * All rights reserved. Distributed under the terms of the MIT License.
+ */
+#ifndef _SERVER_PROTOCOLS_H
+#define _SERVER_PROTOCOLS_H
+
+#include <NetworkAddress.h>
+#include <StreamSocket.h>
+
+
+namespace io {
+
+
+class TCPServerProtocol {
+public:
+ typedef BNetworkAddress AddressType;
+ typedef TCPSocket SocketType;
+
+ int Family() const { return AF_INET; }
+ int Type() const { return SOCK_STREAM; }
+ int Protocol() const { return 0; }
+};
+
+
+class UNIXServerProtocol {
+public:
+ typedef BNetworkAddress AddressType;
+ typedef UNIXSocket SocketType;
+
+ int Family() const { return AF_UNIX; }
+ int Type() const { return SOCK_STREAM; }
+ int Protocol() const { return 0; }
+};
+
+
+}
+
+
+#endif // _SERVER_PROTOCOLS_H
diff --git a/headers/os/net/ServerSocket.h b/headers/os/net/ServerSocket.h
new file mode 100644
index 0000000..e51a71d
--- /dev/null
+++ b/headers/os/net/ServerSocket.h
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2015, Hamish Morrison, hamishm53@xxxxxxxxx.
+ * All rights reserved. Distributed under the terms of the MIT License.
+ */
+#ifndef _SERVER_SOCKET_H
+#define _SERVER_SOCKET_H
+
+#include <BaseSocket.h>
+#include <EventLoop.h>
+#include <ServerProtocols.h>
+
+#include <functional>
+
+#include <assert.h>
+
+
+namespace io {
+
+
+class EventLoop;
+
+
+typedef std::function<void (ssize_t)> IOCallback;
+
+
+/*!
+ \brief A socket for listening on an address and accepting connections.
+
+ ServerSocket provides an interface for asynchronously accepting sockets.
+ Each ServerSocket is associated with an EventLoop, which provides the
+ mechanism for waiting on events.
+
+ The semantics of the operations are the same as their BSD socket
+ counterparts, except a callback is also provided, which will be called
on
+ completion.
+
+ The callbacks are type-erased for storage, which may incur heap
allocation
+ if the callback object is sufficiently large. To avoid this, you can
pass a
+ std::ref or std::cref to your callback. If you do this, you must ensure
the
+ callback remains valid for the duration of the asynchronous operation.
+*/
+template<typename Protocol>
+class ServerSocket : public BaseSocket<Protocol> {
+public:
+ typedef typename Protocol::SocketType SocketType;
+
+ /*!
+ Create a ServerSocket associated with the given event loop
using the
+ given protocol.
+ This will open an underlying socket handle. A std::system_error
will be
+ thrown if this fails.
+ */
+ ServerSocket(EventLoop&
eventLoop,
+ Protocol
protocol);
+
+ /*!
+ Create a ServerSocket associated with the given event loop, and
adopt
+ the existing socket handle.
+ */
+ ServerSocket(EventLoop&
eventLoop, int socket);
+
+ /*!
+ Close the socket and cancel any pending asynchronous operations.
+ */
+ virtual ~ServerSocket();
+
+ int Listen(int backlog);
+
+ /*!
+ Asynchronously accept a connection. The callback will be
invoked with
+ the result of the connection. If successful, the provided
socket will
+ refer to the accepted connection. Otherwise, it will not be
modified.
+
+ The provided socket must remain valid until the callback is
called.
+ */
+ template<typename Callback>
+ void AsyncAccept(SocketType& socket,
+ Callback&&
callback);
+private:
+ void _HandleEvents(int events);
+ void _WaitForRead();
+
+private:
+ EventLoop& fEventLoop;
+ EventCallback fEventCallback;
+
+ SocketType* fAcceptSocket;
+ EventCallback fAcceptCallback;
+};
+
+
+extern template class ServerSocket<TCPServerProtocol>;
+extern template class ServerSocket<UNIXServerProtocol>;
+
+typedef ServerSocket<TCPServerProtocol> TCPServerSocket;
+typedef ServerSocket<UNIXServerProtocol> UNIXServerSocket;
+
+
+template<typename Protocol>
+inline
+ServerSocket<Protocol>::ServerSocket(EventLoop& eventLoop,
+ Protocol protocol)
+ :
+ BaseSocket<Protocol>(protocol),
+ fEventLoop(eventLoop),
+ fEventCallback(std::bind(&ServerSocket<Protocol>::_HandleEvents, this,
+ std::placeholders::_1))
+{
+}
+
+
+template<typename Protocol>
+inline
+ServerSocket<Protocol>::ServerSocket(EventLoop& eventLoop, int socket)
+ :
+ BaseSocket<Protocol>(socket),
+ fEventLoop(eventLoop),
+ fEventCallback(std::bind(&ServerSocket<Protocol>::_HandleEvents, this,
+ std::placeholders::_1))
+{
+}
+
+
+template<typename Protocol>
+inline
+ServerSocket<Protocol>::~ServerSocket()
+{
+}
+
+
+template<typename Protocol>
+inline int
+ServerSocket<Protocol>::Listen(int backlog)
+{
+ int result = listen(this->fSocket, backlog);
+ if (result != 0) {
+ return errno;
+ }
+ return 0;
+}
+
+
+template<typename Protocol>
+template<typename Callback>
+inline void
+ServerSocket<Protocol>::AsyncAccept(SocketType& socket, Callback&& callback)
+{
+ int result = accept(this->fSocket, nullptr, nullptr);
+ if (result == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ callback(result);
+ } else if (result >= 0) {
+ socket.Adopt(result);
+ callback(0);
+ } else {
+ fAcceptSocket = &socket;
+ fAcceptCallback = callback;
+ _WaitForRead();
+ }
+}
+
+
+template<typename Protocol>
+inline void
+ServerSocket<Protocol>::_WaitForRead()
+{
+ status_t result = fEventLoop.WaitForFD(this->fSocket, B_EVENT_READ,
+ fEventCallback);
+ if (result != B_OK) {
+ throw std::system_error(result, std::system_category());
+ }
+}
+
+
+template<typename Protocol>
+inline void
+ServerSocket<Protocol>::_HandleEvents(int events)
+{
+ if ((events & B_EVENT_ERROR) != 0) {
+ fAcceptCallback(this->Error());
+ return;
+ }
+
+ assert((events & B_EVENT_READ) != 0);
+
+ int result = accept(this->fSocket, nullptr, nullptr);
+
+ if (result == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ fAcceptCallback(result);
+ } else if (result >= 0) {
+ fAcceptSocket->Adopt(result);
+ fAcceptCallback(0);
+
+ fAcceptSocket = nullptr;
+ fAcceptCallback = nullptr;
+ } else {
+ _WaitForRead();
+ }
+}
+
+
+}
+
+
+#endif // _SERVER_SOCKET_H
diff --git a/headers/os/net/StreamSocket.h b/headers/os/net/StreamSocket.h
new file mode 100644
index 0000000..0dddfaf
--- /dev/null
+++ b/headers/os/net/StreamSocket.h
@@ -0,0 +1,333 @@
+/*
+ * Copyright 2015, Hamish Morrison, hamishm53@xxxxxxxxx.
+ * All rights reserved. Distributed under the terms of the MIT License.
+ */
+#ifndef _STREAM_SOCKET_H
+#define _STREAM_SOCKET_H
+
+#include <BaseSocket.h>
+#include <EventLoop.h>
+#include <Protocols.h>
+
+#include <sys/socket.h>
+
+#include <functional>
+
+
+namespace io {
+
+
+class EventLoop;
+
+
+typedef std::function<void (ssize_t)> IOCallback;
+
+
+/*!
+ \brief Interface for executing asynchronous socket operations.
+
+ AsyncStreamSocket provides an interface for asynchronous socket I/O on
+ stream sockets. Each AsyncStreamSocket is associated with an EventLoop,
+ which provides the mechanism for waiting on events.
+
+ The semantics of the operations are the same as their BSD socket
+ counterparts, except a callback is also provided, which will be called
on
+ completion.
+
+ The callbacks are type-erased for storage, which may incur heap
allocation
+ if the callback object is sufficiently large. To avoid this, you can
pass a
+ std::ref or std::cref to your callback. If you do this, you must ensure
the
+ callback remains valid for the duration of the asynchronous operation.
+*/
+template<typename Protocol>
+class StreamSocket : public BaseSocket<Protocol> {
+public:
+ typedef typename Protocol::AddressType AddressType;
+
+ /*!
+ Create an unopened StreamSocket associated with the given event
loop.
+ */
+ StreamSocket(EventLoop&
eventLoop);
+
+ /*!
+ Create a StreamSocket associated with the given event loop.
This will
+ open an underlying socket handle. A std::system_error will be
thrown if
+ this fails.
+ */
+ StreamSocket(EventLoop&
eventLoop,
+
Protocol protocol);
+
+ /*!
+ Create a StreamSocket associated with the given event loop, and
adopt
+ the existing socket handle.
+ */
+ StreamSocket(EventLoop&
eventLoop, int socket);
+
+ /*!
+ Close the socket and cancel any pending asynchronous operations.
+ */
+ virtual ~StreamSocket();
+
+ /*!
+ Connect the socket to the given peer. The callback will be
invoked with
+ the result of the operation. The callback may be invoked before
the
+ method returns.
+ */
+ template<typename Callback>
+ void AsyncConnect(const AddressType&
peer,
+ Callback&&
callback);
+
+ /*!
+ Receive into the provided buffer. The callback will be invoked
with the
+ result of the receive. The callback may be invoked before the
method
+ returns.
+ */
+ template<typename Callback>
+ void AsyncRecv(void* buffer, size_t
size, int flags,
+ Callback&&
callback);
+
+ /*!
+ Send up to size bytes from the buffer. The callback will be
invoked
+ with the result of the send. The callback may be invoked before
the
+ method returns.
+ */
+ template<typename Callback>
+ void AsyncSend(const void* buffer,
size_t size,
+ int flags,
Callback&& callback);
+
+private:
+ void _HandleEvents(int events);
+ void _HandleConnect();
+ void _HandleSend();
+ void _HandleRecv();
+
+ void _WaitForRead();
+ void _WaitForWrite();
+
+private:
+ EventLoop& fEventLoop;
+ EventCallback fEventCallback;
+
+ struct IORequest {
+ void* buffer;
+ size_t size;
+ int flags;
+ IOCallback callback;
+ };
+
+ IORequest fSendRequest;
+ IORequest fRecvRequest;
+ IOCallback fConnectCallback;
+
+ bool fWaitingRead : 1;
+ bool fWaitingWrite : 1;
+ bool fWaitingConnect : 1;
+};
+
+extern template class StreamSocket<TCPProtocol>;
+extern template class StreamSocket<UNIXProtocol>;
+
+typedef StreamSocket<TCPProtocol> TCPSocket;
+typedef StreamSocket<UNIXProtocol> UNIXSocket;
+
+
+template<typename Protocol>
+inline
+StreamSocket<Protocol>::StreamSocket(EventLoop& eventLoop)
+ :
+ fEventLoop(eventLoop),
+ fEventCallback(std::bind(&StreamSocket<Protocol>::_HandleEvents, this,
+ std::placeholders::_1)),
+ fWaitingRead(false),
+ fWaitingWrite(false),
+ fWaitingConnect(false)
+{
+}
+
+
+template<typename Protocol>
+inline
+StreamSocket<Protocol>::StreamSocket(EventLoop& eventLoop,
+ Protocol protocol)
+ :
+ BaseSocket<Protocol>(protocol),
+ fEventLoop(eventLoop),
+ fEventCallback(std::bind(&StreamSocket<Protocol>::_HandleEvents, this,
+ std::placeholders::_1)),
+ fWaitingRead(false),
+ fWaitingWrite(false),
+ fWaitingConnect(false)
+{
+}
+
+
+template<typename Protocol>
+inline
+StreamSocket<Protocol>::StreamSocket(EventLoop& eventLoop, int socket)
+ :
+ BaseSocket<Protocol>(socket),
+ fEventLoop(eventLoop),
+ fEventCallback(std::bind(&StreamSocket<Protocol>::_HandleEvents, this,
+ std::placeholders::_1)),
+ fWaitingRead(false),
+ fWaitingWrite(false),
+ fWaitingConnect(false)
+{
+}
+
+
+template<typename Protocol>
+inline
+StreamSocket<Protocol>::~StreamSocket()
+{
+}
+
+
+template<typename Protocol>
+template<typename Callback>
+inline void
+StreamSocket<Protocol>::AsyncConnect(const AddressType& address,
+ Callback&& callback)
+{
+ int result = connect(this->fSocket, &address.SockAddr(),
address.Length());
+
+ if (result == 0 || errno != EINPROGRESS) {
+ callback(result == 0 ? 0 : errno);
+ } else {
+ fWaitingConnect = true;
+ fConnectCallback = callback;
+ _WaitForWrite();
+ }
+}
+
+
+template<typename Protocol>
+template<typename Callback>
+inline void
+StreamSocket<Protocol>::AsyncRecv(void* buffer, size_t size, int flags,
+ Callback&& callback)
+{
+ ssize_t received = recv(this->fSocket, buffer, size, flags);
+
+ if (received >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
+ callback(received >= 0 ? received : errno);
+ } else {
+ fRecvRequest.buffer = buffer;
+ fRecvRequest.size = size;
+ fRecvRequest.flags = flags;
+ fRecvRequest.callback = callback;
+ fWaitingRead = true;
+ _WaitForRead();
+ }
+}
+
+
+template<typename Protocol>
+template<typename Callback>
+inline void
+StreamSocket<Protocol>::AsyncSend(const void* buffer, size_t size, int flags,
+ Callback&& callback)
+{
+ ssize_t sent = send(this->fSocket, buffer, size, flags);
+
+ if (sent >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
+ callback(sent >= 0 ? sent : errno);
+ } else {
+ fSendRequest.buffer = const_cast<void*>(buffer);
+ fSendRequest.size = size;
+ fSendRequest.flags = flags;
+ fSendRequest.callback = callback;
+ fWaitingWrite = true;
+ _WaitForWrite();
+ }
+}
+
+
+template<typename Protocol>
+void
+StreamSocket<Protocol>::_WaitForRead()
+{
+ status_t result = fEventLoop.WaitForFD(this->fSocket, B_EVENT_READ,
+ fEventCallback);
+ if (result != B_OK) {
+ throw std::system_error(result, std::system_category());
+ }
+}
+
+
+template<typename Protocol>
+void
+StreamSocket<Protocol>::_WaitForWrite()
+{
+ status_t result = fEventLoop.WaitForFD(this->fSocket, B_EVENT_WRITE,
+ fEventCallback);
+ if (result != B_OK) {
+ throw std::system_error(result, std::system_category());
+ }
+}
+
+
+template<typename Protocol>
+void
+StreamSocket<Protocol>::_HandleEvents(int events)
+{
+ if ((events & B_EVENT_READ) != 0 && fWaitingRead)
+ _HandleRecv();
+
+ if ((events & B_EVENT_WRITE) != 0) {
+ if (fWaitingConnect)
+ _HandleConnect();
+ if (fWaitingWrite)
+ _HandleSend();
+ }
+}
+
+
+template<typename Protocol>
+void
+StreamSocket<Protocol>::_HandleRecv()
+{
+ ssize_t received = recv(this->fSocket, fRecvRequest.buffer,
+ fRecvRequest.size, fRecvRequest.flags);
+
+ if (received >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
+ fWaitingRead = false;
+ fRecvRequest.callback(received >= 0 ? received : errno);
+ fRecvRequest.callback = nullptr;
+ } else {
+ _WaitForRead();
+ }
+}
+
+
+template<typename Protocol>
+void
+StreamSocket<Protocol>::_HandleSend()
+{
+ ssize_t sent = send(this->fSocket, fRecvRequest.buffer,
fRecvRequest.size,
+ fRecvRequest.flags);
+
+ if (sent >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
+ fWaitingWrite = false;
+ fSendRequest.callback(sent >= 0 ? sent : errno);
+ fSendRequest.callback = nullptr;
+ } else {
+ _WaitForWrite();
+ }
+}
+
+
+template<typename Protocol>
+void
+StreamSocket<Protocol>::_HandleConnect()
+{
+ fWaitingConnect = false;
+ fConnectCallback(0);
+ fConnectCallback = nullptr;
+}
+
+
+}
+
+
+#endif // _STREAM_SOCKET_H
diff --git a/src/kits/network/libnetapi/Jamfile
b/src/kits/network/libnetapi/Jamfile
index aa6826d..345e190 100644
--- a/src/kits/network/libnetapi/Jamfile
+++ b/src/kits/network/libnetapi/Jamfile
@@ -40,7 +40,9 @@ for architectureObject in [ MultiArchSubDirSetup ] {
SubDirC++Flags -std=c++1y ;

asyncSources =
- EventLoop.cpp ;
+ EventLoop.cpp
+ ServerSocket.cpp
+ StreamSocket.cpp ;
} else {
asyncSources = ;
}
diff --git a/src/kits/network/libnetapi/ServerSocket.cpp
b/src/kits/network/libnetapi/ServerSocket.cpp
new file mode 100644
index 0000000..2af2046
--- /dev/null
+++ b/src/kits/network/libnetapi/ServerSocket.cpp
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2015, Hamish Morrison, hamishm53@xxxxxxxxx.
+ * All Rights Reserved. Distributed under the terms of the MIT License.
+ */
+
+#include <ServerProtocols.h>
+#include <ServerSocket.h>
+
+
+namespace io {
+
+
+template class ServerSocket<TCPServerProtocol>;
+template class ServerSocket<UNIXServerProtocol>;
+
+
+}
diff --git a/src/kits/network/libnetapi/StreamSocket.cpp
b/src/kits/network/libnetapi/StreamSocket.cpp
new file mode 100644
index 0000000..def6d08
--- /dev/null
+++ b/src/kits/network/libnetapi/StreamSocket.cpp
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2015, Hamish Morrison, hamishm53@xxxxxxxxx.
+ * All rights reserved. Distributed under the terms of the MIT License.
+ */
+
+#include <Protocols.h>
+#include <StreamSocket.h>
+
+
+namespace io {
+
+
+template class StreamSocket<TCPProtocol>;
+template class StreamSocket<UNIXProtocol>;
+
+
+}
+


Other related posts: