diff --git a/CMakeLists.txt b/CMakeLists.txt index 76b143953a4..ab7d1493d7f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -429,6 +429,21 @@ list(APPEND CMAKE_REQUIRED_LIBRARIES pthread) check_symbol_exists(pthread_getname_np pthread.h HAVE_PTHREAD_GETNAME_NP) check_symbol_exists(pthread_get_name_np pthread.h HAVE_PTHREAD_GET_NAME_NP) +option(USE_SPLICE "Enable the use of splice(2) for zero copy (default OFF) (linux only)" OFF) + +# Check for the splice function +include(CheckFunctionExists) +check_function_exists(splice HAVE_SPLICE) + +# Configure USE_SPLICE based on both availability and user option +if(HAVE_SPLICE AND USE_SPLICE) + message(STATUS "splice is available and enabled.") + set(TS_USE_LINUX_SPLICE 1) # Use ON for true +else() + message(STATUS "splice is either not available or disabled by the user.") + set(TS_USE_LINUX_SPLICE 0) # Use OFF for false +endif() + check_source_compiles( C "#include void main() { pthread_setname_np(\"name\"); }" HAVE_PTHREAD_SETNAME_NP_1 diff --git a/CMakePresets.json b/CMakePresets.json index 218a4654246..4d5f472aa50 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -17,6 +17,19 @@ "CMAKE_COMPILE_WARNING_AS_ERROR": "ON" } }, + { + "name": "default-splice", + "displayName": "Default build with splice", + "description": "Default build using Ninja generator with splice", + "inherits": ["default"], + "binaryDir": "${sourceDir}/build-${presetName}", + "generator": "Ninja", + "cacheVariables": { + "CMAKE_COLOR_DIAGNOSTICS": "ON", + "CMAKE_EXPORT_COMPILE_COMMANDS": "ON", + "USE_SPLICE": "ON" + } + }, { "name": "layout-defaults", "displayName": "Default install layout paths template", diff --git a/include/iocore/eventsystem/EventSystem.h b/include/iocore/eventsystem/EventSystem.h index dc80b025a10..836714a606b 100644 --- a/include/iocore/eventsystem/EventSystem.h +++ b/include/iocore/eventsystem/EventSystem.h @@ -30,6 +30,7 @@ #include "ts/apidefs.h" #include "iocore/eventsystem/IOBuffer.h" +#include "iocore/eventsystem/PipeIOBuffer.h" #include "iocore/eventsystem/Action.h" #include "iocore/eventsystem/Continuation.h" #include "iocore/eventsystem/EThread.h" diff --git a/include/iocore/eventsystem/IOBuffer.h b/include/iocore/eventsystem/IOBuffer.h index 9b2ffc1dd8f..34d95d8fa9e 100644 --- a/include/iocore/eventsystem/IOBuffer.h +++ b/include/iocore/eventsystem/IOBuffer.h @@ -45,6 +45,8 @@ #include "tscore/Ptr.h" #include "tscore/ink_assert.h" #include "tscore/ink_resource.h" +#include "Thread.h" + struct MIOBufferAccessor; @@ -52,6 +54,8 @@ class MIOBuffer; class IOBufferReader; class VIO; +extern ClassAllocator ioAllocator; + enum AllocType { NO_ALLOC, MEMALIGNED, @@ -629,7 +633,7 @@ class IOBufferReader @return pointer to the start of the unconsumed data. */ - char *start(); + virtual char *start(); /** End of inuse area of the first block with unconsumed data. Returns a @@ -640,7 +644,7 @@ class IOBufferReader @return pointer to the end of the first block with unconsumed data. */ - char *end(); + virtual char *end(); /** Amount of data available across all of the IOBufferBlocks. Returns the @@ -651,12 +655,12 @@ class IOBufferReader @return bytes of data available across all the buffers. */ - int64_t read_avail(); + virtual int64_t read_avail(); /** Check if there is more than @a size bytes available to read. @return @c true if more than @a size byte are available. */ - bool is_read_avail_more_than(int64_t size); + virtual bool is_read_avail_more_than(int64_t size); /** Number of IOBufferBlocks with data in the block list. Returns the @@ -666,7 +670,7 @@ class IOBufferReader @return number of blocks with data for this reader. */ - int block_count(); + virtual int block_count(); /** Amount of data available in the first buffer with data for this @@ -677,15 +681,15 @@ class IOBufferReader buffer. */ - int64_t block_read_avail(); + virtual int64_t block_read_avail(); /** Get a view of the data available to read. * * @return A view encompassing currently available readable data. */ - std::string_view block_read_view(); + virtual std::string_view block_read_view(); - void skip_empty_blocks(); + virtual void skip_empty_blocks(); /** Clears all fields in this IOBuffeReader, rendering it unusable. Drops @@ -694,7 +698,7 @@ class IOBufferReader to use this object again. */ - void clear(); + virtual void clear(); /** Instruct the reader to reset the IOBufferBlock list. Resets the @@ -703,7 +707,7 @@ class IOBufferReader and the list of IOBufferBlocks is set using the associated MIOBuffer. */ - void reset(); + virtual void reset(); /** Consume a number of bytes from this reader's IOBufferBlock @@ -714,7 +718,7 @@ class IOBufferReader to read_avail(). */ - void consume(int64_t n); + virtual void consume(int64_t n); /** Create another reader with access to the same data as this @@ -725,7 +729,7 @@ class IOBufferReader @return new reader with the same state as this. */ - IOBufferReader *clone(); + virtual IOBufferReader *clone(); /** Deallocate this reader. Removes and deallocates this reader from @@ -733,7 +737,7 @@ class IOBufferReader used after this call. */ - void dealloc(); + virtual void dealloc(); /** Get a pointer to the first block with data. Returns a pointer to @@ -744,7 +748,7 @@ class IOBufferReader available for this reader. */ - IOBufferBlock *get_current_block(); + virtual IOBufferBlock *get_current_block(); /** Consult this reader's MIOBuffer writable space. Queries the MIOBuffer @@ -756,7 +760,7 @@ class IOBufferReader returns true in MIOBuffer::current_low_water(). */ - bool current_low_water(); + virtual bool current_low_water(); /** Queries the underlying MIOBuffer about. Returns true if the amount @@ -768,7 +772,7 @@ class IOBufferReader this reader. */ - bool low_water(); + virtual bool low_water(); /** To see if the amount of data available to the reader is greater than @@ -779,7 +783,7 @@ class IOBufferReader @return true if the amount of data exceeds the MIOBuffer's water mark. */ - bool high_water(); + virtual bool high_water(); /** Perform a memchr() across the list of IOBufferBlocks. Returns the @@ -797,7 +801,7 @@ class IOBufferReader occurrence. */ - int64_t memchr(char c, int64_t len = INT64_MAX, int64_t offset = 0); + virtual int64_t memchr(char c, int64_t len = INT64_MAX, int64_t offset = 0); /** Copies and consumes data. Copies len bytes of data from the buffer @@ -813,7 +817,7 @@ class IOBufferReader @return number of bytes copied and consumed. */ - int64_t read(void *buf, int64_t len); + virtual int64_t read(void *buf, int64_t len); /** Copy data but do not consume it. Copies 'len' bytes of data from @@ -832,7 +836,7 @@ class IOBufferReader parameter buf is set to this value also. */ - char *memcpy(void *buf, int64_t len = INT64_MAX, int64_t offset = 0); + virtual char *memcpy(void *buf, int64_t len = INT64_MAX, int64_t offset = 0); /** Subscript operator. Returns a reference to the character at the @@ -845,7 +849,7 @@ class IOBufferReader @return reference to the character in that position. */ - char &operator[](int64_t i); + virtual char &operator[](int64_t i); MIOBuffer * writer() const @@ -907,7 +911,7 @@ class MIOBuffer @param len number of bytes to add to the inuse area of the block. */ - void fill(int64_t len); + virtual void fill(int64_t len); /** Adds a block to the end of the block list. The block added to list @@ -915,7 +919,7 @@ class MIOBuffer other buffer. */ - void append_block(IOBufferBlock *b); + virtual void append_block(IOBufferBlock *b); /** Adds a new block to the end of the block list. The size is determined @@ -923,13 +927,13 @@ class MIOBuffer buffer block sizes. */ - void append_block(int64_t asize_index); + virtual void append_block(int64_t asize_index); /** Adds a new block to the end of the block list. Note that this does nothing when the next block of the current writer exists. The block size is the same as specified size when the buffer was allocated. */ - void add_block(); + virtual void add_block(); /** Deprecated @@ -942,7 +946,7 @@ class MIOBuffer by the buffer once all readers on the buffer have consumed it. */ - void append_xmalloced(void *b, int64_t len); + virtual void append_xmalloced(void *b, int64_t len); /** Adds by reference len bytes of data pointed to by b to the end of the @@ -952,7 +956,7 @@ class MIOBuffer have consumed it. */ - void append_fast_allocated(void *b, int64_t len, int64_t fast_size_index); + virtual void append_fast_allocated(void *b, int64_t len, int64_t fast_size_index); /** Adds the nbytes worth of data pointed by rbuf to the buffer. The @@ -961,7 +965,7 @@ class MIOBuffer control. Returns the number of bytes added. */ - int64_t write(const void *rbuf, int64_t nbytes); + virtual int64_t write(const void *rbuf, int64_t nbytes); /** Add by data from IOBufferReader r to the this buffer by reference. If @@ -988,7 +992,7 @@ class MIOBuffer rather than sharing blocks to prevent a build of blocks on the buffer. */ - int64_t write(IOBufferReader *r, int64_t len = INT64_MAX, int64_t offset = 0); + virtual int64_t write(IOBufferReader *r, int64_t len = INT64_MAX, int64_t offset = 0); /** Copy data from the @a chain to this buffer. New IOBufferBlocks are allocated so this gets a copy of the data that is independent of the source. @@ -999,14 +1003,14 @@ class MIOBuffer @internal I do not like counting @a offset against @a bytes but that's how @c write works... */ - int64_t write(IOBufferChain const *chain, int64_t len = INT64_MAX, int64_t offset = 0); + virtual int64_t write(IOBufferChain const *chain, int64_t len = INT64_MAX, int64_t offset = 0); /** Returns a pointer to the first writable block on the block chain. Returns nullptr if there are not currently any writable blocks on the block list. */ - IOBufferBlock * + virtual IOBufferBlock * first_write_block() { if (_writer) { @@ -1020,26 +1024,26 @@ class MIOBuffer return nullptr; } - char * + virtual char * buf() { IOBufferBlock *b = first_write_block(); return b ? b->buf() : nullptr; } - char * + virtual char * buf_end() { return first_write_block()->buf_end(); } - char * + virtual char * start() { return first_write_block()->start(); } - char * + virtual char * end() { return first_write_block()->end(); @@ -1051,7 +1055,7 @@ class MIOBuffer by first_write_block()). */ - int64_t block_write_avail(); + virtual int64_t block_write_avail(); /** Returns the amount of space of available for writing on all writable @@ -1059,7 +1063,7 @@ class MIOBuffer block chain. */ - int64_t current_write_avail(); + virtual int64_t current_write_avail(); /** Adds blocks for writing if the watermark criteria are met. Returns @@ -1067,20 +1071,20 @@ class MIOBuffer on the block chain after a block due to the watermark criteria. */ - int64_t write_avail(); + virtual int64_t write_avail(); /** Returns the default data block size for this buffer. */ - int64_t block_size(); + virtual int64_t block_size(); /** Returns true if amount of the data outstanding on the buffer exceeds the watermark. */ - bool + virtual bool high_water() { return is_max_read_avail_more_than(this->water_mark); @@ -1092,7 +1096,7 @@ class MIOBuffer on write_avail() it may add blocks. */ - bool + virtual bool low_water() { return write_avail() <= water_mark; @@ -1103,7 +1107,7 @@ class MIOBuffer blocks on the buffer is less than the water mark. */ - bool + virtual bool current_low_water() { return current_write_avail() <= water_mark; @@ -1114,7 +1118,7 @@ class MIOBuffer to point to 'anAccessor'. */ - IOBufferReader *alloc_accessor(MIOBufferAccessor *anAccessor); + virtual IOBufferReader *alloc_accessor(MIOBufferAccessor *anAccessor); /** Allocates an IOBufferReader for this buffer. IOBufferReaders hold @@ -1125,7 +1129,7 @@ class MIOBuffer place on the buffer. */ - IOBufferReader *alloc_reader(); + virtual IOBufferReader *alloc_reader(); /** Allocates a new reader on this buffer and places it's starting @@ -1133,7 +1137,7 @@ class MIOBuffer previous allocated from this buffer. */ - IOBufferReader *clone_reader(IOBufferReader *r); + virtual IOBufferReader *clone_reader(IOBufferReader *r); /** Deallocates reader e from this buffer. e MUST be a pointer to a reader @@ -1143,7 +1147,7 @@ class MIOBuffer being freed as all outstanding readers are automatically deallocated. */ - void dealloc_reader(IOBufferReader *e); + virtual void dealloc_reader(IOBufferReader *e); /** Deallocates all outstanding readers on the buffer. @@ -1151,10 +1155,10 @@ class MIOBuffer */ void dealloc_all_readers(); - void set(void *b, int64_t len); - void alloc(int64_t i); - void append_block_internal(IOBufferBlock *b); - int64_t write(IOBufferBlock const *b, int64_t len, int64_t offset); + virtual void set(void *b, int64_t len); + virtual void alloc(int64_t i); + virtual void append_block_internal(IOBufferBlock *b); + virtual int64_t write(IOBufferBlock const *b, int64_t len, int64_t offset); // internal interface @@ -1167,21 +1171,21 @@ class MIOBuffer @return maximum amount of available data */ - int64_t max_read_avail(); + virtual int64_t max_read_avail(); /** Check if there is more than @a size bytes available to read. @return @c true if more than @a size byte are available. */ - bool is_max_read_avail_more_than(int64_t size); + virtual bool is_max_read_avail_more_than(int64_t size); - int max_block_count(); - void check_add_block(); + virtual int max_block_count(); + virtual void check_add_block(); - IOBufferBlock *get_current_block(); + virtual IOBufferBlock *get_current_block(); - void + virtual void reset() { if (_writer) { @@ -1194,7 +1198,7 @@ class MIOBuffer } } - void + virtual void init_readers() { for (auto &reader : readers) { @@ -1204,13 +1208,21 @@ class MIOBuffer } } - void + virtual void dealloc() { _writer = nullptr; dealloc_all_readers(); } + virtual void + free() + { + _writer = nullptr; + dealloc_all_readers(); + THREAD_FREE(this, ioAllocator, this_thread()); + } + void clear() { @@ -1238,7 +1250,7 @@ class MIOBuffer explicit MIOBuffer(int64_t default_size_index); MIOBuffer(); - ~MIOBuffer(); + virtual ~MIOBuffer(); }; /** diff --git a/include/iocore/eventsystem/PipeIOBuffer.h b/include/iocore/eventsystem/PipeIOBuffer.h new file mode 100644 index 00000000000..82486a48daa --- /dev/null +++ b/include/iocore/eventsystem/PipeIOBuffer.h @@ -0,0 +1,125 @@ +// +// Created by Yihong Jin on 1/26/25. +// + +#pragma once + + +#if TS_USE_LINUX_SPLICE + +#include "IOBuffer.h" + +class PipeIOBuffer; +class PipeIOBufferReader; + +class PipeIOBufferReader : public IOBufferReader +{ + public: + PipeIOBufferReader(){}; + // Overridden methods from IOBufferReader + char *start() override; + char *end() override; + int64_t read_avail() override; + bool is_read_avail_more_than(int64_t size) override; + int block_count() override; + int64_t block_read_avail() override; + std::string_view block_read_view() override; + void skip_empty_blocks() override; + void clear() override; + void reset() override; + void consume(int64_t n) override; + IOBufferReader *clone() override; + void dealloc() override; + IOBufferBlock *get_current_block() override; + bool current_low_water() override; + bool low_water() override; + bool high_water() override; + int64_t memchr(char c, int64_t len = INT64_MAX, int64_t offset = 0) override; + int64_t read(void *buf, int64_t len) override; + char *memcpy(void *buf, int64_t len = INT64_MAX, int64_t offset = 0) override; + char &operator[](int64_t i) override; +}; + +class PipeIOBuffer : public MIOBuffer +{ + public: + PipeIOBuffer(); + ~PipeIOBuffer() override; + + // MIOBuffer methods + void fill(int64_t len) override; + void consume(int64_t len); + void append_block(IOBufferBlock *b) override; + void append_block(int64_t asize_index) override; + void add_block() override; + void append_xmalloced(void *b, int64_t len) override; + void append_fast_allocated(void *b, int64_t len, int64_t fast_size_index) override; + int64_t write(const void *buf, int64_t nbytes) override; + int64_t write(IOBufferReader *r, int64_t len = INT64_MAX, int64_t offset = 0) override; + int64_t write(IOBufferChain const *chain, int64_t len = INT64_MAX, int64_t offset = 0) override; + + IOBufferBlock *first_write_block() override; + char *buf() override; + char *buf_end() override; + char *start() override; + char *end() override; + + int64_t block_write_avail() override; + int64_t current_write_avail() override; + int64_t write_avail() override; + int64_t block_size() override; + bool high_water() override; + bool low_water() override; + bool current_low_water() override; + + IOBufferReader *alloc_accessor(MIOBufferAccessor *anAccessor) override; + IOBufferReader *alloc_reader() override; + IOBufferReader *clone_reader(IOBufferReader *r) override; + void dealloc_reader(IOBufferReader *e) override; + void set(void *b, int64_t len) override; + void alloc(int64_t i) override; + void append_block_internal(IOBufferBlock *b) override; + int64_t write(IOBufferBlock const *b, int64_t len, int64_t offset) override; + + int64_t max_read_avail() override; + bool is_max_read_avail_more_than(int64_t size) override; + int max_block_count() override; + void check_add_block() override; + + void reset() override; + void init_readers() override; + void dealloc() override; + void free() override; + + // clear() is called by the constructor and destructor of MIOBuffer + // so it could not be virtual and override in derived class. + void clear(); + + // Public data members + int fd[2]; // Pipe file descriptors + PipeIOBufferReader pipe_reader; // Single reader instance for the pipe + bool reader_allocated; // Tracks if the reader is currently allocated + int64_t data_in_pipe; // Amount of data currently in the pipe and not consumed + int64_t pipe_capacity; // Total capacity of the pipe +}; + +extern PipeIOBuffer *new_PipeIOBuffer_internal(const char *loc, int64_t pipe_capacity); + +class PipeIOBuffer_tracker +{ + const char *loc; + + public: + explicit PipeIOBuffer_tracker(const char *_loc) : loc(_loc) {} + PipeIOBuffer * + operator()(int64_t size_index) + { + return new_PipeIOBuffer_internal(loc, BUFFER_SIZE_FOR_INDEX(size_index)); + } +}; + +/// MIOBuffer allocator/deallocator +#define new_PipeIOBuffer PipeIOBuffer_tracker(RES_PATH("memory/IOBuffer/")) +extern void free_PipeIOBuffer(PipeIOBuffer *mio); + +#endif // TS_USE_LINUX_SPLICE \ No newline at end of file diff --git a/include/iocore/eventsystem/Thread.h b/include/iocore/eventsystem/Thread.h index 22b006a496f..dc82604ba61 100644 --- a/include/iocore/eventsystem/Thread.h +++ b/include/iocore/eventsystem/Thread.h @@ -134,6 +134,9 @@ class Thread ProxyAllocator ioDataAllocator; ProxyAllocator ioAllocator; ProxyAllocator ioBlockAllocator; +#if TS_USE_LINUX_SPLICE + ProxyAllocator pipeIOAllocator; +#endif ProxyAllocator preWarmSMAllocator; // From InkAPI (plugins wrappers) ProxyAllocator apiHookAllocator; diff --git a/include/iocore/eventsystem/UnixSocket.h b/include/iocore/eventsystem/UnixSocket.h index 08cbdb7bd13..7ab1ed802d6 100644 --- a/include/iocore/eventsystem/UnixSocket.h +++ b/include/iocore/eventsystem/UnixSocket.h @@ -90,6 +90,11 @@ class UnixSocket int sendto(void const *buf, int size, int flags, struct sockaddr const *to, int tolen) const; int sendmsg(struct msghdr const *m, int flags) const; +#if TS_USE_LINUX_SPLICE + int splice_from(int pipe_fd, size_t len, int flags = 0) const; + int splice_to(int pipe_fd, size_t len, int flags = 0) const; +#endif + static int poll(struct pollfd *fds, unsigned long nfds, int timeout); int getsockname(struct sockaddr *sa, socklen_t *sz) const; @@ -251,6 +256,41 @@ UnixSocket::sendmsg(struct msghdr const *m, int flags) const return r; } +#if TS_USE_LINUX_SPLICE +// In non-blocking mode, there are following possible return values for raw splice(2) calls: +// 1. return value > 0: number of bytes transferred +// 2. return value == 0: End of input. +// If the input file descriptor refers to a pipe, the write end of the pipe is closed but the read end is not. +// If the input file descriptor refers to a socket, the socket is shut down. +// 3. return value == -1 and errno == EAGAIN: No data available +// 4. return value == -1 and errno == EINTR: Interrupted by signal +// 5. return value == -1 and other errno: Error + +inline int +UnixSocket::splice_from(int pipe_fd, size_t len, int flags) const +{ + int r; + do { + if (unlikely((r = ::splice(pipe_fd, nullptr, this->fd, nullptr, len, flags)) < 0)) { + r = -errno; + } + } while (r == -EINTR); + return r; +} + +inline int +UnixSocket::splice_to(int pipe_fd, size_t len, int flags) const +{ + int r; + do { + if (unlikely((r = ::splice(this->fd, nullptr, pipe_fd, nullptr, len, flags)) < 0)) { + r = -errno; + } + } while (r == -EINTR); + return r; +} +#endif + inline int UnixSocket::poll(struct pollfd *fds, unsigned long nfds, int timeout) { diff --git a/include/tscore/ink_config.h.cmake.in b/include/tscore/ink_config.h.cmake.in index 99036c6ec6f..03826c8ea07 100644 --- a/include/tscore/ink_config.h.cmake.in +++ b/include/tscore/ink_config.h.cmake.in @@ -162,6 +162,7 @@ const int DEFAULT_STACKSIZE = @DEFAULT_STACK_SIZE@; #cmakedefine01 TS_USE_TLS13 #cmakedefine01 TS_USE_TLS_ASYNC #cmakedefine01 TS_USE_TPROXY +#cmakedefine01 TS_USE_LINUX_SPLICE #cmakedefine01 TS_HAS_VERIFY_CERT_STORE #cmakedefine01 TS_HAS_TLS_SESSION_TICKET diff --git a/src/iocore/eventsystem/IOBuffer.cc b/src/iocore/eventsystem/IOBuffer.cc index 66decc4bac6..3a3c0a72fa1 100644 --- a/src/iocore/eventsystem/IOBuffer.cc +++ b/src/iocore/eventsystem/IOBuffer.cc @@ -43,9 +43,12 @@ FreelistAllocator ioBufAllocator[DEFAULT_BUFFER_SIZES]; ClassAllocator ioAllocator("ioAllocator", DEFAULT_BUFFER_NUMBER); ClassAllocator ioDataAllocator("ioDataAllocator", DEFAULT_BUFFER_NUMBER); ClassAllocator ioBlockAllocator("ioBlockAllocator", DEFAULT_BUFFER_NUMBER); -int64_t default_large_iobuffer_size = DEFAULT_LARGE_BUFFER_SIZE; -int64_t default_small_iobuffer_size = DEFAULT_SMALL_BUFFER_SIZE; -int64_t max_iobuffer_size = DEFAULT_BUFFER_SIZES - 1; +#if TS_USE_LINUX_SPLICE +ClassAllocator pipeIOAllocator("pipeIOAllocator", DEFAULT_BUFFER_NUMBER); +#endif +int64_t default_large_iobuffer_size = DEFAULT_LARGE_BUFFER_SIZE; +int64_t default_small_iobuffer_size = DEFAULT_SMALL_BUFFER_SIZE; +int64_t max_iobuffer_size = DEFAULT_BUFFER_SIZES - 1; // // Initialization diff --git a/src/iocore/eventsystem/P_EventSystem.h b/src/iocore/eventsystem/P_EventSystem.h index 53b4b463f75..8e9c56594ae 100644 --- a/src/iocore/eventsystem/P_EventSystem.h +++ b/src/iocore/eventsystem/P_EventSystem.h @@ -39,6 +39,7 @@ #include "P_Thread.h" #include "P_VIO.h" #include "P_IOBuffer.h" +#include "P_PipeIOBuffer.h" #include "P_VConnection.h" #include "P_UnixEvent.h" #include "P_UnixEThread.h" diff --git a/src/iocore/eventsystem/P_IOBuffer.h b/src/iocore/eventsystem/P_IOBuffer.h index 4613d17af84..559ad74c385 100644 --- a/src/iocore/eventsystem/P_IOBuffer.h +++ b/src/iocore/eventsystem/P_IOBuffer.h @@ -656,9 +656,7 @@ new_MIOBuffer_internal(const char *location, int64_t size_index) TS_INLINE void free_MIOBuffer(MIOBuffer *mio) { - mio->_writer = nullptr; - mio->dealloc_all_readers(); - THREAD_FREE(mio, ioAllocator, this_thread()); + mio->free(); } TS_INLINE MIOBuffer * diff --git a/src/iocore/eventsystem/P_PipeIOBuffer.h b/src/iocore/eventsystem/P_PipeIOBuffer.h new file mode 100644 index 00000000000..4322ff5942e --- /dev/null +++ b/src/iocore/eventsystem/P_PipeIOBuffer.h @@ -0,0 +1,502 @@ +// +// Created by Yihong Jin on 1/26/25. +// +#pragma once + +#include "tscore/ink_platform.h" +#include "tscore/ink_resource.h" + +#if TS_USE_LINUX_SPLICE +extern ClassAllocator pipeIOAllocator; + +TS_INLINE PipeIOBuffer * +new_PipeIOBuffer_internal(const char *location, int64_t pipe_capacity) +{ + PipeIOBuffer *b = THREAD_ALLOC(pipeIOAllocator, this_thread()); + b->_location = location; + b->alloc(pipe_capacity); + b->water_mark = 0; + return b; +} + + +////////////////////////////////////////////////////////////////// +// +// class PipeIOBufferReader -- +// inline functions definitions +// +////////////////////////////////////////////////////////////////// +TS_INLINE char * +PipeIOBufferReader::start() +{ + throw std::runtime_error("Not applicable for PipeIOBufferReader"); +} + +TS_INLINE char * +PipeIOBufferReader::end() +{ + throw std::runtime_error("Not applicable for PipeIOBufferReader"); +} + +TS_INLINE int64_t +PipeIOBufferReader::read_avail() +{ + return static_cast(mbuf)->data_in_pipe; +} + +TS_INLINE bool +PipeIOBufferReader::is_read_avail_more_than(int64_t size) +{ + return read_avail() > size; +} + +TS_INLINE int +PipeIOBufferReader::block_count() +{ + return 1; +} + +TS_INLINE int64_t +PipeIOBufferReader::block_read_avail() +{ + return read_avail(); +} + +TS_INLINE std::string_view + PipeIOBufferReader::block_read_view() +{ + throw std::runtime_error("Not applicable for PipeIOBufferReader"); +} + +TS_INLINE void +PipeIOBufferReader::skip_empty_blocks() +{ + // No-op for PipeIOBufferReader +} + +TS_INLINE void +PipeIOBufferReader::clear() +{ + // call base class clear + IOBufferReader::clear(); +} + +TS_INLINE void +PipeIOBufferReader::reset() +{ + // call base class reset + IOBufferReader::reset(); +} + +TS_INLINE void +PipeIOBufferReader::consume(int64_t n) +{ + auto *pipe_buf = static_cast(mbuf); + pipe_buf->consume(n); + + char buffer[n]; + ssize_t bytes_read = ::read(pipe_buf->fd[0], buffer, n); + if (bytes_read < 0) { + throw std::runtime_error("Pipe read failed during consume"); + } +} + +TS_INLINE IOBufferReader * +PipeIOBufferReader::clone() +{ + throw std::runtime_error("Cloning not supported for PipeIOBufferReader"); +} + +TS_INLINE void +PipeIOBufferReader::dealloc() +{ + static_cast(mbuf)->dealloc_reader(this); +} + +TS_INLINE IOBufferBlock * +PipeIOBufferReader::get_current_block() +{ + throw std::runtime_error("Not applicable for PipeIOBufferReader"); +} + +TS_INLINE bool +PipeIOBufferReader::current_low_water() +{ + return static_cast(mbuf)->current_low_water(); +} + +TS_INLINE bool +PipeIOBufferReader::low_water() +{ + return static_cast(mbuf)->low_water(); +} + +TS_INLINE bool +PipeIOBufferReader::high_water() +{ + return static_cast(mbuf)->high_water(); +} + +TS_INLINE int64_t +PipeIOBufferReader::memchr([[maybe_unused]] char c, [[maybe_unused]] int64_t len, [[maybe_unused]] int64_t offset) +{ + throw std::runtime_error("Not supported in PipeIOBufferReader"); +} + +// Read data from the pipe into the buffer. +TS_INLINE int64_t +PipeIOBufferReader::read(void *buf, int64_t len) +{ + auto *pipe_buf = static_cast(mbuf); + int64_t bytes_to_read = std::min(len, pipe_buf->data_in_pipe); + ssize_t bytes_read = ::read(pipe_buf->fd[0], buf, bytes_to_read); + if (bytes_read < 0) { + throw std::runtime_error("Pipe read failed"); + } + pipe_buf->consume(bytes_read); + return bytes_read; +} + +TS_INLINE char * +PipeIOBufferReader::memcpy([[maybe_unused]] void *buf, [[maybe_unused]] int64_t len, [[maybe_unused]] int64_t offset) +{ + throw std::runtime_error("Not supported in PipeIOBufferReader"); +} + +TS_INLINE char & +PipeIOBufferReader::operator[]([[maybe_unused]] int64_t i) +{ + throw std::runtime_error("Not supported in PipeIOBufferReader"); +} + +////////////////////////////////////////////////////////////////// +// +// class PipeIOBuffer -- +// inline functions definitions +// +////////////////////////////////////////////////////////////////// +TS_INLINE +PipeIOBuffer::PipeIOBuffer() : fd{-1, -1}, reader_allocated(false), data_in_pipe(0), pipe_capacity(0) +{ + // no ops due to ClassAllocator design +} + +TS_INLINE PipeIOBuffer::~PipeIOBuffer() +{ + // no ops due to ClassAllocator design +} + +TS_INLINE void +PipeIOBuffer::fill(int64_t len) +{ + if (len + data_in_pipe > pipe_capacity) { + throw std::runtime_error("Not enough space in pipe to fill"); + } + data_in_pipe += len; +} + +TS_INLINE void +PipeIOBuffer::consume(int64_t len) +{ + if (len > data_in_pipe) { + throw std::runtime_error("Attempt to consume more data than available in pipe"); + } + data_in_pipe -= len; +} + +TS_INLINE void +PipeIOBuffer::append_block([[maybe_unused]] IOBufferBlock *b) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE void +PipeIOBuffer::append_block([[maybe_unused]] int64_t asize_index) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE void +PipeIOBuffer::add_block() +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE void +PipeIOBuffer::append_xmalloced([[maybe_unused]] void *b, [[maybe_unused]] int64_t len) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE void +PipeIOBuffer::append_fast_allocated([[maybe_unused]] void *b, [[maybe_unused]] int64_t len, + [[maybe_unused]] int64_t fast_size_index) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE int64_t +PipeIOBuffer::write(const void *buf, int64_t nbytes) +{ + if (nbytes > write_avail()) { + throw std::runtime_error("Not enough space in pipe to write"); + } + ssize_t written = ::write(fd[1], buf, nbytes); + if (written < 0) { + throw std::runtime_error("Pipe write failed"); + } + fill(written); // Update data_in_pipe to reflect the amount written + return written; +} + +TS_INLINE int64_t +PipeIOBuffer::write([[maybe_unused]] IOBufferReader *r, [[maybe_unused]] int64_t len, [[maybe_unused]] int64_t offset) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE int64_t +PipeIOBuffer::write([[maybe_unused]] IOBufferChain const *chain, [[maybe_unused]] int64_t len, [[maybe_unused]] int64_t offset) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE IOBufferBlock * +PipeIOBuffer::first_write_block() +{ + throw std::runtime_error("Not applicable for PipeIOBuffer"); +} + +TS_INLINE char * +PipeIOBuffer::buf() +{ + throw std::runtime_error("Not applicable for PipeIOBuffer"); +} + +TS_INLINE char * +PipeIOBuffer::buf_end() +{ + throw std::runtime_error("Not applicable for PipeIOBuffer"); +} + +TS_INLINE char * +PipeIOBuffer::start() +{ + throw std::runtime_error("Not applicable for PipeIOBuffer"); +} + +TS_INLINE char * +PipeIOBuffer::end() +{ + throw std::runtime_error("Not applicable for PipeIOBuffer"); +} + +TS_INLINE int64_t +PipeIOBuffer::block_write_avail() +{ + return write_avail(); +} + +TS_INLINE int64_t +PipeIOBuffer::current_write_avail() +{ + return write_avail(); +} + +TS_INLINE int64_t +PipeIOBuffer::write_avail() +{ + return pipe_capacity - data_in_pipe; +} + +TS_INLINE int64_t +PipeIOBuffer::block_size() +{ + return pipe_capacity; +} + +TS_INLINE bool +PipeIOBuffer::high_water() +{ + return is_max_read_avail_more_than(this->water_mark); +} + +TS_INLINE bool +PipeIOBuffer::low_water() +{ + return write_avail() <= water_mark; +} + +TS_INLINE bool +PipeIOBuffer::current_low_water() +{ + return low_water(); +} + +// Allocate a reader for the PipeIOBuffer with the given MIOBufferAccessor +TS_INLINE IOBufferReader * +PipeIOBuffer::alloc_accessor(MIOBufferAccessor *anAccessor) +{ + if (reader_allocated) { + throw std::runtime_error("PipeIOBuffer supports only a single reader"); + } + pipe_reader.mbuf = this; + pipe_reader.accessor = anAccessor; + reader_allocated = true; + return &pipe_reader; +} + +// Allocate a reader for the PipeIOBuffer but without an MIOBufferAccessor +TS_INLINE IOBufferReader * +PipeIOBuffer::alloc_reader() +{ + if (reader_allocated) { + throw std::runtime_error("PipeIOBuffer supports only a single reader"); + } + Dbg(DbgCtl("http_tunnel"), "PipeIOBuffer::alloc_reader() called"); + pipe_reader.mbuf = this; + pipe_reader.accessor = nullptr; + reader_allocated = true; + return &pipe_reader; +} + +TS_INLINE IOBufferReader * +PipeIOBuffer::clone_reader([[maybe_unused]] IOBufferReader *r) +{ + // PipeIOBuffer supports only a single reader, so cloning is not supported, but return the existing reader + return &pipe_reader; +} + +TS_INLINE void +PipeIOBuffer::dealloc_reader(IOBufferReader *e) +{ + if (&pipe_reader == e) { + // clear the accessor if it is set but keep the accessor pointer + if (pipe_reader.accessor) { + ink_assert(pipe_reader.accessor->writer() == this); + ink_assert(pipe_reader.accessor->reader() == e); + pipe_reader.accessor->clear(); + } + pipe_reader.clear(); + reader_allocated = false; + } else { + throw std::runtime_error("Attempt to deallocate a non-existing reader"); + } +} + +TS_INLINE void +PipeIOBuffer::set([[maybe_unused]] void *b, [[maybe_unused]] int64_t len) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE void +PipeIOBuffer::alloc(int64_t pipe_capacity) +{ + // The default pipe capacity is 64KB equivalent to 16 pages on x86_64 + if (pipe2(fd, O_NONBLOCK) < 0) { + throw std::runtime_error("Pipe creation failed"); + } + this->pipe_capacity = pipe_capacity; + + // Set the pipe capacity to the requested value if it is not the default value which is 16 pages + if (pipe_capacity != 16 * getpagesize()) { + // https://man7.org/linux/man-pages/man2/fcntl.2.html#:~:text=Changing%20the%20capacity%20of%20a%20pipe + // When allocating the buffer for the pipe, the kernel may use a capacity larger than arg, if that is convenient for + // the implementation. (In the current implementation, the allocation is the next higher power-of-two page-size + // multiple of the requested size.) + if (fcntl(fd[1], F_SETPIPE_SZ, pipe_capacity) < 0) { + close(fd[0]); + close(fd[1]); + throw std::runtime_error("Pipe capacity setting failed"); + } + } +} + +TS_INLINE void +PipeIOBuffer::append_block_internal([[maybe_unused]] IOBufferBlock *b) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE int64_t +PipeIOBuffer::write([[maybe_unused]] IOBufferBlock const *b, [[maybe_unused]] int64_t len, [[maybe_unused]] int64_t offset) +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE int64_t +PipeIOBuffer::max_read_avail() +{ + return data_in_pipe; +} + +TS_INLINE bool +PipeIOBuffer::is_max_read_avail_more_than(int64_t size) +{ + return data_in_pipe > size; +} + +// PipeIOBuffer has only one block, which is the pipe itself, so block count is always 1 +TS_INLINE int +PipeIOBuffer::max_block_count() +{ + return 1; +} + +TS_INLINE void +PipeIOBuffer::check_add_block() +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +TS_INLINE void +PipeIOBuffer::reset() +{ + // clear internal state and release external resources + clear(); + + // call alloc to reinitialize the pipe + alloc(pipe_capacity); +} + +TS_INLINE void +PipeIOBuffer::init_readers() +{ + throw std::runtime_error("Not supported in PipeIOBuffer"); +} + +// Release the external resources associated with the PipeIOBuffer +TS_INLINE void +PipeIOBuffer::dealloc() +{ + // close the pipe if it is open + if (fd[0] != -1) { + close(fd[0]); + } + if (fd[1] != -1) { + close(fd[1]); + } + // set the file descriptors to -1 + fd[0] = -1; + fd[1] = -1; + dealloc_reader(&pipe_reader); +} + +TS_INLINE void +PipeIOBuffer::free() +{ + clear(); + THREAD_FREE(this, pipeIOAllocator, this_thread()); +} + +// Clear internal state and call dealloc to release external resources +TS_INLINE void +PipeIOBuffer::clear() +{ + data_in_pipe = 0; + pipe_capacity = 0; + dealloc(); +} + +#endif // TS_USE_LINUX_SPLICE \ No newline at end of file diff --git a/src/iocore/net/P_UnixNetVConnection.h b/src/iocore/net/P_UnixNetVConnection.h index 5ff97aee16b..1ffb31f38f3 100644 --- a/src/iocore/net/P_UnixNetVConnection.h +++ b/src/iocore/net/P_UnixNetVConnection.h @@ -188,6 +188,9 @@ class UnixNetVConnection : public NetVConnection, public NetEvent return this->control_flags; } + void handle_linux_splice_for_net_read_io(NetHandler *nh, NetState *s, MIOBufferAccessor &buf, + int64_t ntodo, MutexTryLock& lock, bool &handled); + virtual int64_t load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs); void readDisable(NetHandler *nh); void readSignalError(NetHandler *nh, int err); diff --git a/src/iocore/net/UnixNetVConnection.cc b/src/iocore/net/UnixNetVConnection.cc index 6bd0331d544..7686de5c057 100644 --- a/src/iocore/net/UnixNetVConnection.cc +++ b/src/iocore/net/UnixNetVConnection.cc @@ -508,6 +508,13 @@ UnixNetVConnection::net_read_io(NetHandler *nh) read_disable(nh, this); return; } +#if TS_USE_LINUX_SPLICE + bool handled = false; + this->handle_linux_splice_for_net_read_io(nh, s, buf, ntodo, lock, handled); + if (handled) { + return; + } +#endif int64_t toread = buf.writer()->write_avail(); if (toread > ntodo) { toread = ntodo; @@ -629,6 +636,85 @@ UnixNetVConnection::net_read_io(NetHandler *nh) read_reschedule(nh, this); } +void +UnixNetVConnection::handle_linux_splice_for_net_read_io(NetHandler *nh, NetState *s, MIOBufferAccessor &buf, + int64_t ntodo, MutexTryLock& lock, bool &handled) +{ + int64_t r = 0; + PipeIOBuffer *pipe_buffer = dynamic_cast(buf.writer()); + if (pipe_buffer) { + handled = true; + // Use splice_to to transfer data from socket directly to pipe with SPLICE_F_MORE hint + int64_t to_splice = std::min(ntodo, pipe_buffer->write_avail()); + if (to_splice > 0) { + r = con.sock.splice_to(pipe_buffer->fd[1], to_splice, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); + + if (r <= 0) { + // Temporary Unavailable, Non-Blocking I/O + if (r == -EAGAIN || r == -ENOTCONN) { + Metrics::Counter::increment(net_rsb.calls_to_read_nodata); + this->read.triggered = 0; + nh->read_ready_list.remove(this); + return; + } + // End of stream detected if splice returns 0 + if (!r || r == -ECONNRESET) { + this->read.triggered = 0; + nh->read_ready_list.remove(this); + read_signal_done(VC_EVENT_EOS, nh, this); + return; + } + // For other errors, signal error + this->read.triggered = 0; + read_signal_error(nh, this, static_cast(-r)); // pass the negative errno to signal error + return; + } + Metrics::Counter::increment(net_rsb.read_bytes, r); + Metrics::Counter::increment(net_rsb.read_bytes_count); + + // Successfully spliced data from socket to pipe + pipe_buffer->fill(r); + s->vio.ndone += r; + this->netActivity(); + } else { + r = 0; + } + + // Signal read ready, check if user is not done + if (r) { + // If there are no more bytes to read, signal read complete + ink_assert(ntodo >= 0); + if (s->vio.ntodo() <= 0) { + read_signal_done(VC_EVENT_READ_COMPLETE, nh, this); + return; + } else { + if (read_signal_and_update(VC_EVENT_READ_READY, this) != EVENT_CONT) { + return; + } + + // change of lock... don't look at shared variables! + if (lock.get_mutex() != s->vio.mutex.get()) { + read_reschedule(nh, this); + return; + } + } + } + + // If here are is no more room, or nothing to do, disable the connection + if (s->vio.ntodo() <= 0 || !s->enabled || !pipe_buffer->write_avail()) { + read_disable(nh, this); + return; + } + + // We should only splice once to the pipe until the pipe is cleared by consumer + // Linux splice() does not merge pipe buffer page so there is no guarantee that + // we can splice to pipe again without receiving EAGAIN from pipe even if data_in_pipe < pipe_capacity + // Disable read until consumer finish a successful splice from pipe to socket operation and the pipe is empty + read_disable(nh, this); + return; + } +} + // // Write the data for a UnixNetVConnection. // Rescheduling the UnixNetVConnection when necessary. @@ -810,6 +896,12 @@ UnixNetVConnection::net_write_io(NetHandler *nh) int e = 0; if (!signalled || (s->vio.ntodo() > 0 && !buf.writer()->high_water())) { e = VC_EVENT_WRITE_READY; +#if TS_USE_LINUX_SPLICE + PipeIOBufferReader *pipe_reader = dynamic_cast(buf.reader()); + if (pipe_reader && pipe_reader->read_avail() > 0) { + e = 0; + } +#endif } else if (wbe_event != this->write_buffer_empty_event) { // @a signalled means we won't send an event, and the event values differing means we // had a write buffer trap and cleared it, so we need to send it now. @@ -852,7 +944,29 @@ UnixNetVConnection::net_write_io(NetHandler *nh) int64_t UnixNetVConnection::load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs) { - int64_t r = 0; + int64_t r = 0; +#if TS_USE_LINUX_SPLICE + // Check if buf.reader() is a PipeIOBufferReader + PipeIOBufferReader *pipe_reader = dynamic_cast(buf.reader()); + if (pipe_reader) { + // Use splice_from to write directly from the pipe to the socket + // if the reader is a PipeIOBufferReader, then the buffer must be a PipeIOBuffer (derived from MIOBuffer) + PipeIOBuffer *pipe_buffer = static_cast(pipe_reader->mbuf); + int64_t to_splice = std::min(towrite, pipe_reader->read_avail()); + if (to_splice > 0) { + r = con.sock.splice_from(pipe_buffer->fd[0], to_splice, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); + if (r > 0) { + pipe_buffer->consume(r); + total_written += r; + } + } + + needs |= EVENTIO_WRITE; + + return r; + } +#endif + // Fallback to sendmsg for non-PipeIOBufferReaders int64_t try_to_write = 0; IOBufferReader *tmp_reader = buf.reader()->clone(); diff --git a/src/proxy/http/HttpSM.cc b/src/proxy/http/HttpSM.cc index 32ae5ab0cb7..92f7a0804f6 100644 --- a/src/proxy/http/HttpSM.cc +++ b/src/proxy/http/HttpSM.cc @@ -7274,6 +7274,32 @@ HttpSM::setup_blind_tunnel(bool send_response_hdr, IOBufferReader *initial) // header buffer into new buffer client_request_body_bytes += from_ua_buf->write(_ua.get_txn()->get_remote_reader()); +#if TS_USE_LINUX_SPLICE + MIOBuffer *from_ua_pipe_buf = new_PipeIOBuffer(BUFFER_SIZE_INDEX_32K); + MIOBuffer *to_ua_pipe_buf = new_PipeIOBuffer(BUFFER_SIZE_INDEX_32K); + // copy the data from from_ua_buf to from_ua_pipe_buf + int64_t avail; + if (r_from && (avail = r_from->read_avail()) > 0) { + char buffer[avail]; + r_from->read(buffer, avail); + from_ua_pipe_buf->write(buffer, avail); + } + // copy the data from to_ua_buf to to_ua_pipe_buf + if (r_to && (avail = r_to->read_avail()) > 0) { + char buffer[avail]; + r_to->read(buffer, avail); + to_ua_pipe_buf->write(buffer, avail); + } + // release old buffers and readers + free_MIOBuffer(from_ua_buf); + free_MIOBuffer(to_ua_buf); + // reassign the buffers and readers + from_ua_buf = from_ua_pipe_buf; + to_ua_buf = to_ua_pipe_buf; + r_from = from_ua_pipe_buf->alloc_reader(); + r_to = to_ua_buf->alloc_reader(); +#endif + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler); this->do_transform_open(); diff --git a/src/proxy/http/HttpTunnel.cc b/src/proxy/http/HttpTunnel.cc index 631ded94fd1..fc4f54b14e6 100644 --- a/src/proxy/http/HttpTunnel.cc +++ b/src/proxy/http/HttpTunnel.cc @@ -1105,6 +1105,15 @@ HttpTunnel::producer_run(HttpTunnelProducer *p) } } +#if TS_USE_LINUX_SPLICE + Dbg(dbg_ctl_http_tunnel, "[%" PRId64 "] [tunnel_run] producer_run done", sm->sm_id); + // If we are using splice, we need to make sure that we don't deallocate the buffer reader since we only have one + // Check whether read_buffer is PipeIOBuffer through dynamic_cast + // if so, don't deallocate it by returning + if (p->read_buffer && dynamic_cast(p->read_buffer)) { + return; + } +#endif // Now that the tunnel has started, we must remove producer's reader so // that it doesn't act like a buffer guard if (p->read_buffer && p->buffer_start) {