Mercurial > foo_out_sdl
diff foosdk/sdk/foobar2000/helpers/readers.cpp @ 1:20d02a178406 default tip
*: check in everything else
yay
| author | Paper <paper@tflc.us> |
|---|---|
| date | Mon, 05 Jan 2026 02:15:46 -0500 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/foosdk/sdk/foobar2000/helpers/readers.cpp Mon Jan 05 02:15:46 2026 -0500 @@ -0,0 +1,594 @@ +#include "StdAfx.h" +#include "readers.h" +#include "readers_lite.h" +#include "fullFileBuffer.h" +#include "fileReadAhead.h" +#include <SDK/file_info_impl.h> +#include <list> +#include <memory> +#include <pfc/event_std.h> + +t_size reader_membuffer_base::read(void * p_buffer, t_size p_bytes, abort_callback & p_abort) { + p_abort.check_e(); + t_size max = get_buffer_size(); + if (max < m_offset) uBugCheck(); + max -= m_offset; + t_size delta = p_bytes; + if (delta > max) delta = max; + memcpy(p_buffer, (char*)get_buffer() + m_offset, delta); + m_offset += delta; + return delta; +} + +void reader_membuffer_base::seek(t_filesize position, abort_callback & p_abort) { + p_abort.check_e(); + t_filesize max = get_buffer_size(); + if (position == filesize_invalid || position > max) throw exception_io_seek_out_of_range(); + m_offset = (t_size)position; +} + + + + +file::ptr fullFileBuffer::open(const char * path, abort_callback & abort, file::ptr hint, t_filesize sizeMax) { + //mutexScope scope(hMutex, abort); + + file::ptr f; + if (hint.is_valid()) f = hint; + else filesystem::g_open_read(f, path, abort); + + if (sizeMax != filesize_invalid) { + t_filesize fs = f->get_size(abort); + if (fs > sizeMax) return f; + } + try { + service_ptr_t<reader_bigmem_mirror> r = new service_impl_t<reader_bigmem_mirror>(); + r->init(f, abort); + f = r; + } + catch (std::bad_alloc const &) {} + return f; +} + + + + + + + + + + +#include <memory> +#include <exception> +#include <pfc/synchro.h> +#include <pfc/threads.h> + +namespace { + struct dynInfoEntry_t { + file_info_impl m_info; + t_filesize m_offset; + }; + struct readAheadInstance_t { + file::ptr m_file; + size_t m_readAhead, m_wakeUpThreschold; + + pfc::array_t<uint8_t> m_buffer; + size_t m_bufferBegin, m_bufferEnd; + pfc::event m_canRead; + pfc::event_std m_canWrite; + pfc::mutex m_guard; + std::exception_ptr m_error; + t_filesize m_seekto; + abort_callback_impl m_abort; + bool m_remote; + bool m_atEOF = false; + + bool m_haveDynamicInfo; + std::list<dynInfoEntry_t> m_dynamicInfo; + }; + typedef std::shared_ptr<readAheadInstance_t> readAheadInstanceRef; + static constexpr t_filesize seek_reopen = (filesize_invalid-1); + class fileReadAhead : public file_readonly_t< service_multi_inherit< service_multi_inherit<file_v2, file_dynamicinfo_v2 >, stream_receive > > { + service_ptr m_metadata; + public: + readAheadInstanceRef m_instance; + ~fileReadAhead() { + if ( m_instance ) { + auto & i = *m_instance; + pfc::mutexScope guard( i.m_guard ); + i.m_abort.set(); + i.m_canWrite.set_state(true); + } + } + service_ptr get_metadata(abort_callback&) override { return m_metadata; } + void initialize( file::ptr chain, size_t readAhead, abort_callback & aborter ) { + m_metadata = chain->get_metadata_(aborter); + m_stats = chain->get_stats2_( stats2_all, aborter ); + if (!chain->get_content_type(m_contentType)) m_contentType = ""; + m_canSeek = chain->can_seek(); + m_position = chain->get_position( aborter ); + + + auto i = std::make_shared<readAheadInstance_t>();; + i->m_file = chain; + i->m_remote = chain->is_remote(); + i->m_readAhead = readAhead; + i->m_wakeUpThreschold = readAhead * 3 / 4; + i->m_buffer.set_size_discard( readAhead * 2 ); + i->m_bufferBegin = 0; i->m_bufferEnd = 0; + i->m_canWrite.set_state(true); + i->m_seekto = filesize_invalid; + m_instance = i; + + { + file_dynamicinfo::ptr dyn; + if (dyn &= chain) { + m_haveStaticInfo = dyn->get_static_info(m_staticInfo); + i->m_haveDynamicInfo = dyn->is_dynamic_info_enabled(); + } + } + + fb2k::splitTask( [i] { +#ifdef PFC_SET_THREAD_DESCRIPTION + PFC_SET_THREAD_DESCRIPTION("Fb2k Read-Ahead Thread"); +#endif + worker(*i); + } ); + } + t_size receive(void* p_buffer, t_size p_bytes, abort_callback& p_abort) override { + return read_internal(p_buffer, p_bytes, p_abort, true); + } + t_size read(void* p_buffer, t_size p_bytes, abort_callback& p_abort) override { + return read_internal(p_buffer, p_bytes, p_abort, false); + } + t_size read_internal(void * p_buffer,t_size p_bytes,abort_callback & p_abort, bool bReceive) { + auto & i = * m_instance; + size_t done = 0; + bool initial = true; + while( bReceive ? done == 0 : done < p_bytes ) { + if ( !initial ) { + // Do not invoke waiting with common case read with lots of data in the buffer + pfc::event::g_twoEventWait( i.m_canRead.get_handle(), p_abort.get_abort_event(), -1); + } + p_abort.check(); + pfc::mutexScope guard ( i.m_guard ); + size_t got = i.m_bufferEnd - i.m_bufferBegin; + if (got == 0) { + if (i.m_error) std::rethrow_exception(i.m_error); + if ( initial && ! i.m_atEOF ) { + initial = false; continue; // proceed to wait for more data + } + break; // EOF + } + + size_t delta = pfc::min_t<size_t>( p_bytes - done, got ); + + const bool wakeUpBefore = got < i.m_wakeUpThreschold; + + auto bufptr = i.m_buffer.get_ptr(); + if ( p_buffer != nullptr ) memcpy( (uint8_t*) p_buffer + done, bufptr + i.m_bufferBegin, delta ); + done += delta; + i.m_bufferBegin += delta; + got -= delta; + m_position += delta; + + if (!i.m_error && !i.m_atEOF) { + if ( got == 0 ) i.m_canRead.set_state( false ); + const bool wakeUpNow = got < i.m_wakeUpThreschold; + // Only set the event when *crossing* the boundary + // we will get a lot of wakeUpNow when nearing EOF + if ( wakeUpNow && ! wakeUpBefore ) i.m_canWrite.set_state( true ); + } + initial = false; + if ( i.m_atEOF ) break; // go no further + } + // FB2K_console_formatter() << "ReadAhead read: " << p_bytes << " => " << done; + return done; + } + t_filesize get_size(abort_callback & p_abort) override { + p_abort.check(); + return m_stats.m_size; + } + t_filesize get_position(abort_callback & p_abort) override { + p_abort.check(); + return m_position; + } + + void seek(t_filesize p_position,abort_callback & p_abort) override { + p_abort.check(); + if (!m_canSeek) throw exception_io_object_not_seekable(); + if ( m_stats.m_size != filesize_invalid && p_position > m_stats.m_size ) throw exception_io_seek_out_of_range(); + + auto posNow = get_position(p_abort); + + if ( p_position >= posNow && p_position < posNow + m_instance->m_readAhead ) { + // FB2K_console_formatter() << "ReadAhead skip: " << posNow << " => " << p_position; + auto toSkip = p_position - posNow; + if ( toSkip > 0 ) read(nullptr, (size_t) toSkip, p_abort); + return; + } + // FB2K_console_formatter() << "ReadAhead seek: " << posNow << " => " << p_position; + + seekInternal( p_position ); + } + bool can_seek() override { + return m_canSeek; + } + bool get_content_type(pfc::string_base & p_out) override { + if (m_contentType.length() == 0) return false; + p_out = m_contentType; return true; + } + t_filestats2 get_stats2( uint32_t, abort_callback & p_abort ) override { + p_abort.check(); + return m_stats; + + } + bool is_remote() override { + return m_instance->m_remote; + } + + void reopen( abort_callback & p_abort ) override { + if ( get_position( p_abort ) == 0 ) return; + seekInternal( seek_reopen ); + } + + bool get_static_info(class file_info & p_out) override { + if ( ! m_haveStaticInfo ) return false; + mergeInfo(p_out, m_staticInfo); + return true; + } + bool is_dynamic_info_enabled() override { + return m_instance->m_haveDynamicInfo; + + } + static void mergeInfo( file_info & out, const file_info & in ) { + out.copy_meta(in); + out.overwrite_info(in); + } + bool get_dynamic_info_v2(class file_info & out, t_filesize & outOffset) override { + auto & i = * m_instance; + if ( ! i.m_haveDynamicInfo ) return false; + + insync( i.m_guard ); + auto ptr = i.m_dynamicInfo.begin(); + for ( ;; ) { + if ( ptr == i.m_dynamicInfo.end() ) break; + if ( ptr->m_offset > m_position ) break; + ++ ptr; + } + + if ( ptr == i.m_dynamicInfo.begin() ) return false; + + auto iter = ptr; --iter; + mergeInfo(out, iter->m_info); + outOffset = iter->m_offset; + i.m_dynamicInfo.erase( i.m_dynamicInfo.begin(), ptr ); + + return true; + } + private: + void seekInternal( t_filesize p_position ) { + auto & i = * m_instance; + insync( i.m_guard ); + if (i.m_error) std::rethrow_exception(i.m_error); + i.m_bufferBegin = i.m_bufferEnd = 0; + i.m_canWrite.set_state(true); + i.m_seekto = p_position; + i.m_atEOF = false; + i.m_canRead.set_state(false); + + m_position = ( p_position == seek_reopen ) ? 0 : p_position; + } + static void worker( readAheadInstance_t & i ) { + try { + bool atEOF = false; + uint8_t* bufptr = i.m_buffer.get_ptr(); + const size_t readAtOnceLimit = i.m_remote ? 64*1024 : 256 * 1024; + for ( ;; ) { + i.m_canWrite.wait_for(-1); + size_t readHowMuch = 0, readOffset = 0; + { + pfc::mutexScope guard(i.m_guard); + i.m_abort.check(); + if ( i.m_seekto != filesize_invalid ) { + if ( i.m_seekto == seek_reopen ) { + i.m_file->reopen( i.m_abort ); + } else { + i.m_file->seek( i.m_seekto, i.m_abort ); + } + + i.m_seekto = filesize_invalid; + atEOF = false; + } + size_t got = i.m_bufferEnd - i.m_bufferBegin; + + if ( i.m_bufferBegin >= i.m_readAhead ) { + memmove( bufptr, bufptr + i.m_bufferBegin, got ); + i.m_bufferBegin = 0; + i.m_bufferEnd = got; + } + + if ( got < i.m_readAhead ) { + readHowMuch = i.m_readAhead - got; + readOffset = i.m_bufferEnd; + } + } + + if ( readHowMuch > readAtOnceLimit ) { + readHowMuch = readAtOnceLimit; + } + + bool dynInfoGot = false; + dynInfoEntry_t dynInfo; + + if ( readHowMuch > 0 ) { + readHowMuch = i.m_file->receive( bufptr + readOffset, readHowMuch, i.m_abort ); + + if ( readHowMuch == 0 ) atEOF = true; + + if ( i.m_haveDynamicInfo ) { + file_dynamicinfo::ptr dyn; + if ( dyn &= i.m_file ) { + file_dynamicinfo_v2::ptr dyn2; + if ( dyn2 &= dyn ) { + dynInfoGot = dyn2->get_dynamic_info_v2(dynInfo.m_info, dynInfo.m_offset); + } else { + dynInfoGot = dyn->get_dynamic_info( dynInfo.m_info ); + if (dynInfoGot) { + dynInfo.m_offset = dyn->get_position( i.m_abort ); + } + } + } + } + } + + { + pfc::mutexScope guard( i.m_guard ); + i.m_abort.check(); + if ( i.m_seekto != filesize_invalid ) { + // Seek request happened while we were reading - discard and continue + continue; + } + i.m_atEOF = atEOF; + i.m_canRead.set_state( true ); + i.m_bufferEnd += readHowMuch; + size_t got = i.m_bufferEnd - i.m_bufferBegin; + if ( atEOF || got >= i.m_readAhead ) i.m_canWrite.set_state(false); + + if ( dynInfoGot ) { + i.m_dynamicInfo.push_back( std::move(dynInfo) ); + } + + } + } + } catch (...) { + pfc::mutexScope guard(i.m_guard); + i.m_error = std::current_exception(); + i.m_canRead.set_state(true); + } + } + + bool m_canSeek; + t_filestats2 m_stats; + pfc::string8 m_contentType; + t_filesize m_position; + + + bool m_haveStaticInfo; + file_info_impl m_staticInfo; + }; + +} + + +file::ptr fileCreateReadAhead(file::ptr chain, size_t readAheadBytes, abort_callback & aborter ) { + auto obj = fb2k::service_new<fileReadAhead>(); + obj->initialize( chain, readAheadBytes, aborter ); + + // Two paths to cast to file*, pick one explicitly to avoid compiler error + file_v2::ptr temp = std::move(obj); + return std::move(temp); +} + + + +namespace { + class CFileWithMemBlock : public reader_membuffer_base { + public: + CFileWithMemBlock(fb2k::memBlockRef mem, t_filestats const& stats, const char* contentType, bool remote) { + m_mem = mem; + m_stats = stats; + m_stats.m_size = mem->size(); + if (contentType != nullptr) m_contentType = contentType; + m_remote = remote; + } + const void* get_buffer() { + return m_mem->get_ptr(); + } + t_size get_buffer_size() { + return m_mem->get_size(); + } + t_filestats get_stats(abort_callback& p_abort) { + p_abort.check(); + return m_stats; + } + bool get_content_type(pfc::string_base& out) { + if (m_contentType.is_empty()) return false; + out = m_contentType; + return true; + } + bool is_remote() { + return m_remote; + } + private: + bool m_remote; + fb2k::memBlockRef m_mem; + pfc::string8 m_contentType; + t_filestats m_stats; + }; +} + +file::ptr createFileWithMemBlock(fb2k::memBlock::ptr mem, t_filestats stats, const char* contentType, bool remote) { + return new service_impl_t< CFileWithMemBlock >(mem, stats, contentType, remote); +} + +file::ptr createFileLimited(file::ptr base, t_filesize offset, t_filesize size, abort_callback& abort) { + return reader_limited::g_create(base, offset, size, abort); +} + +file::ptr createFileBigMemMirror(file::ptr source, abort_callback& abort) { + if (source->is_in_memory()) return source; + auto r = fb2k::service_new<reader_bigmem_mirror>(); + r->init(source, abort); + return r; +} + +file::ptr createFileMemMirror(file::ptr source, abort_callback& abort) { + file::ptr ret; + if (!reader_membuffer_mirror::g_create(ret, source, abort)) ret = source; + return ret; +} + +namespace { + class file_memMirrorAsync : public file_readonly_t< file_v2 > { + struct shared_t { + abort_callback_impl m_abort; + size_t m_size; + pfc::mutex m_sync; + file::ptr m_file; + + pfc::bigmem m_data; + size_t m_dataAvailable = 0; + + size_t m_triggerOffset = SIZE_MAX; + pfc::event m_trigger; + }; + shared_t m_shared; + + static void worker(shared_t& s) { + + constexpr size_t tempSize = 256 * 1024; + auto temp = std::make_unique<uint8_t[]>(tempSize); + + while (s.m_dataAvailable < s.m_size) { + size_t got = s.m_file->receive(temp.get(), tempSize, s.m_abort); + if (got == 0) break; + + PFC_INSYNC(s.m_sync); + s.m_data.write(temp.get(), got, s.m_dataAvailable); + + auto before = s.m_dataAvailable; + s.m_dataAvailable += got; + if (before < s.m_triggerOffset && s.m_dataAvailable >= s.m_triggerOffset) { + s.m_trigger.set_state(true); + } + } + } + t_filestats2 m_stats; + bool m_is_remote = false; + pfc::string8 m_contentType; + service_ptr m_metadata; + size_t m_position = 0; + fb2k::thread m_thread; + public: + ~file_memMirrorAsync() { + m_shared.m_abort.set(); + m_thread.waitTillDone(); + } + void open(file::ptr chain, completion_notify::ptr onDone, abort_callback& a) { + if ( chain->get_position(a) > 0 ) chain->reopen(a); + m_stats = chain->get_stats2_(stats2_all, a); + if (m_stats.m_size > SIZE_MAX) throw pfc::exception_overflow(); + m_is_remote = chain->is_remote(); + m_contentType = chain->get_content_type(); + m_metadata = chain->get_metadata_(a); + m_shared.m_size = (size_t)m_stats.m_size; + m_shared.m_file = chain; + m_shared.m_data.resize(m_shared.m_size); + auto work = [this, onDone] { + try { + worker(this->m_shared); + } catch (...) {} + this->m_shared.m_file.release(); + if (onDone.is_valid()) onDone->on_completion(this->m_shared.m_size == this->m_shared.m_dataAvailable ? 1 : 0); + }; + m_thread.startHere(work); + } + + + service_ptr get_metadata(abort_callback& a) override { + a.check(); + return m_metadata; + } + + t_filestats2 get_stats2(uint32_t, abort_callback& a) override { + a.check(); + return m_stats; + } + + t_filesize get_position(abort_callback& p_abort) override { + p_abort.check(); + return m_position; + } + + void seek(t_filesize p_position, abort_callback& p_abort) override { + if (p_position > get_size(p_abort)) throw exception_io_seek_out_of_range(); + m_position = (size_t)p_position; + } + + bool can_seek() override { return true; } + + bool get_content_type(pfc::string_base& p_out) override { + bool rv = m_contentType.length() > 0; + if (rv) p_out = m_contentType; + return rv; + } + + void reopen(abort_callback& p_abort) override { seek(0, p_abort); } + + bool is_remote() override { return m_is_remote; } + + t_size read(void* p_buffer, t_size p_bytes, abort_callback& p_abort) override { + auto limit = get_size(p_abort); + auto left = limit - m_position; + if (p_bytes > left) p_bytes = (size_t)left; + + const auto upper = m_position + p_bytes; + + auto& shared = m_shared; + + { + PFC_INSYNC(shared.m_sync); + if (shared.m_dataAvailable >= upper) { + shared.m_data.read(p_buffer, p_bytes, m_position); + m_position += p_bytes; + return p_bytes; + } + shared.m_trigger.set_state(false); + shared.m_triggerOffset = upper; + } + p_abort.waitForEvent(shared.m_trigger); + + PFC_INSYNC(shared.m_sync); + PFC_ASSERT(shared.m_dataAvailable >= upper); + shared.m_data.read(p_buffer, p_bytes, m_position); + m_position += p_bytes; + return p_bytes; + } + t_filesize skip(t_filesize p_bytes, abort_callback& p_abort) override { + auto total = get_size(p_abort); + PFC_ASSERT(total >= m_position ); + auto left = total - m_position; + if (p_bytes > left) p_bytes = left; + m_position += (size_t)p_bytes; + return p_bytes; + } + }; +} + +file::ptr createFileMemMirrorAsync(file::ptr source, completion_notify::ptr onDone, abort_callback & a) { + auto ret = fb2k::service_new<file_memMirrorAsync>(); + ret->open(source, onDone, a); + return ret; +}
