Mercurial > foo_out_sdl
comparison foosdk/sdk/foobar2000/helpers/readers.cpp @ 1:20d02a178406 default tip
*: check in everything else
yay
| author | Paper <paper@tflc.us> |
|---|---|
| date | Mon, 05 Jan 2026 02:15:46 -0500 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:e9bb126753e7 | 1:20d02a178406 |
|---|---|
| 1 #include "StdAfx.h" | |
| 2 #include "readers.h" | |
| 3 #include "readers_lite.h" | |
| 4 #include "fullFileBuffer.h" | |
| 5 #include "fileReadAhead.h" | |
| 6 #include <SDK/file_info_impl.h> | |
| 7 #include <list> | |
| 8 #include <memory> | |
| 9 #include <pfc/event_std.h> | |
| 10 | |
| 11 t_size reader_membuffer_base::read(void * p_buffer, t_size p_bytes, abort_callback & p_abort) { | |
| 12 p_abort.check_e(); | |
| 13 t_size max = get_buffer_size(); | |
| 14 if (max < m_offset) uBugCheck(); | |
| 15 max -= m_offset; | |
| 16 t_size delta = p_bytes; | |
| 17 if (delta > max) delta = max; | |
| 18 memcpy(p_buffer, (char*)get_buffer() + m_offset, delta); | |
| 19 m_offset += delta; | |
| 20 return delta; | |
| 21 } | |
| 22 | |
| 23 void reader_membuffer_base::seek(t_filesize position, abort_callback & p_abort) { | |
| 24 p_abort.check_e(); | |
| 25 t_filesize max = get_buffer_size(); | |
| 26 if (position == filesize_invalid || position > max) throw exception_io_seek_out_of_range(); | |
| 27 m_offset = (t_size)position; | |
| 28 } | |
| 29 | |
| 30 | |
| 31 | |
| 32 | |
| 33 file::ptr fullFileBuffer::open(const char * path, abort_callback & abort, file::ptr hint, t_filesize sizeMax) { | |
| 34 //mutexScope scope(hMutex, abort); | |
| 35 | |
| 36 file::ptr f; | |
| 37 if (hint.is_valid()) f = hint; | |
| 38 else filesystem::g_open_read(f, path, abort); | |
| 39 | |
| 40 if (sizeMax != filesize_invalid) { | |
| 41 t_filesize fs = f->get_size(abort); | |
| 42 if (fs > sizeMax) return f; | |
| 43 } | |
| 44 try { | |
| 45 service_ptr_t<reader_bigmem_mirror> r = new service_impl_t<reader_bigmem_mirror>(); | |
| 46 r->init(f, abort); | |
| 47 f = r; | |
| 48 } | |
| 49 catch (std::bad_alloc const &) {} | |
| 50 return f; | |
| 51 } | |
| 52 | |
| 53 | |
| 54 | |
| 55 | |
| 56 | |
| 57 | |
| 58 | |
| 59 | |
| 60 | |
| 61 | |
| 62 #include <memory> | |
| 63 #include <exception> | |
| 64 #include <pfc/synchro.h> | |
| 65 #include <pfc/threads.h> | |
| 66 | |
| 67 namespace { | |
| 68 struct dynInfoEntry_t { | |
| 69 file_info_impl m_info; | |
| 70 t_filesize m_offset; | |
| 71 }; | |
| 72 struct readAheadInstance_t { | |
| 73 file::ptr m_file; | |
| 74 size_t m_readAhead, m_wakeUpThreschold; | |
| 75 | |
| 76 pfc::array_t<uint8_t> m_buffer; | |
| 77 size_t m_bufferBegin, m_bufferEnd; | |
| 78 pfc::event m_canRead; | |
| 79 pfc::event_std m_canWrite; | |
| 80 pfc::mutex m_guard; | |
| 81 std::exception_ptr m_error; | |
| 82 t_filesize m_seekto; | |
| 83 abort_callback_impl m_abort; | |
| 84 bool m_remote; | |
| 85 bool m_atEOF = false; | |
| 86 | |
| 87 bool m_haveDynamicInfo; | |
| 88 std::list<dynInfoEntry_t> m_dynamicInfo; | |
| 89 }; | |
| 90 typedef std::shared_ptr<readAheadInstance_t> readAheadInstanceRef; | |
| 91 static constexpr t_filesize seek_reopen = (filesize_invalid-1); | |
| 92 class fileReadAhead : public file_readonly_t< service_multi_inherit< service_multi_inherit<file_v2, file_dynamicinfo_v2 >, stream_receive > > { | |
| 93 service_ptr m_metadata; | |
| 94 public: | |
| 95 readAheadInstanceRef m_instance; | |
| 96 ~fileReadAhead() { | |
| 97 if ( m_instance ) { | |
| 98 auto & i = *m_instance; | |
| 99 pfc::mutexScope guard( i.m_guard ); | |
| 100 i.m_abort.set(); | |
| 101 i.m_canWrite.set_state(true); | |
| 102 } | |
| 103 } | |
| 104 service_ptr get_metadata(abort_callback&) override { return m_metadata; } | |
| 105 void initialize( file::ptr chain, size_t readAhead, abort_callback & aborter ) { | |
| 106 m_metadata = chain->get_metadata_(aborter); | |
| 107 m_stats = chain->get_stats2_( stats2_all, aborter ); | |
| 108 if (!chain->get_content_type(m_contentType)) m_contentType = ""; | |
| 109 m_canSeek = chain->can_seek(); | |
| 110 m_position = chain->get_position( aborter ); | |
| 111 | |
| 112 | |
| 113 auto i = std::make_shared<readAheadInstance_t>();; | |
| 114 i->m_file = chain; | |
| 115 i->m_remote = chain->is_remote(); | |
| 116 i->m_readAhead = readAhead; | |
| 117 i->m_wakeUpThreschold = readAhead * 3 / 4; | |
| 118 i->m_buffer.set_size_discard( readAhead * 2 ); | |
| 119 i->m_bufferBegin = 0; i->m_bufferEnd = 0; | |
| 120 i->m_canWrite.set_state(true); | |
| 121 i->m_seekto = filesize_invalid; | |
| 122 m_instance = i; | |
| 123 | |
| 124 { | |
| 125 file_dynamicinfo::ptr dyn; | |
| 126 if (dyn &= chain) { | |
| 127 m_haveStaticInfo = dyn->get_static_info(m_staticInfo); | |
| 128 i->m_haveDynamicInfo = dyn->is_dynamic_info_enabled(); | |
| 129 } | |
| 130 } | |
| 131 | |
| 132 fb2k::splitTask( [i] { | |
| 133 #ifdef PFC_SET_THREAD_DESCRIPTION | |
| 134 PFC_SET_THREAD_DESCRIPTION("Fb2k Read-Ahead Thread"); | |
| 135 #endif | |
| 136 worker(*i); | |
| 137 } ); | |
| 138 } | |
| 139 t_size receive(void* p_buffer, t_size p_bytes, abort_callback& p_abort) override { | |
| 140 return read_internal(p_buffer, p_bytes, p_abort, true); | |
| 141 } | |
| 142 t_size read(void* p_buffer, t_size p_bytes, abort_callback& p_abort) override { | |
| 143 return read_internal(p_buffer, p_bytes, p_abort, false); | |
| 144 } | |
| 145 t_size read_internal(void * p_buffer,t_size p_bytes,abort_callback & p_abort, bool bReceive) { | |
| 146 auto & i = * m_instance; | |
| 147 size_t done = 0; | |
| 148 bool initial = true; | |
| 149 while( bReceive ? done == 0 : done < p_bytes ) { | |
| 150 if ( !initial ) { | |
| 151 // Do not invoke waiting with common case read with lots of data in the buffer | |
| 152 pfc::event::g_twoEventWait( i.m_canRead.get_handle(), p_abort.get_abort_event(), -1); | |
| 153 } | |
| 154 p_abort.check(); | |
| 155 pfc::mutexScope guard ( i.m_guard ); | |
| 156 size_t got = i.m_bufferEnd - i.m_bufferBegin; | |
| 157 if (got == 0) { | |
| 158 if (i.m_error) std::rethrow_exception(i.m_error); | |
| 159 if ( initial && ! i.m_atEOF ) { | |
| 160 initial = false; continue; // proceed to wait for more data | |
| 161 } | |
| 162 break; // EOF | |
| 163 } | |
| 164 | |
| 165 size_t delta = pfc::min_t<size_t>( p_bytes - done, got ); | |
| 166 | |
| 167 const bool wakeUpBefore = got < i.m_wakeUpThreschold; | |
| 168 | |
| 169 auto bufptr = i.m_buffer.get_ptr(); | |
| 170 if ( p_buffer != nullptr ) memcpy( (uint8_t*) p_buffer + done, bufptr + i.m_bufferBegin, delta ); | |
| 171 done += delta; | |
| 172 i.m_bufferBegin += delta; | |
| 173 got -= delta; | |
| 174 m_position += delta; | |
| 175 | |
| 176 if (!i.m_error && !i.m_atEOF) { | |
| 177 if ( got == 0 ) i.m_canRead.set_state( false ); | |
| 178 const bool wakeUpNow = got < i.m_wakeUpThreschold; | |
| 179 // Only set the event when *crossing* the boundary | |
| 180 // we will get a lot of wakeUpNow when nearing EOF | |
| 181 if ( wakeUpNow && ! wakeUpBefore ) i.m_canWrite.set_state( true ); | |
| 182 } | |
| 183 initial = false; | |
| 184 if ( i.m_atEOF ) break; // go no further | |
| 185 } | |
| 186 // FB2K_console_formatter() << "ReadAhead read: " << p_bytes << " => " << done; | |
| 187 return done; | |
| 188 } | |
| 189 t_filesize get_size(abort_callback & p_abort) override { | |
| 190 p_abort.check(); | |
| 191 return m_stats.m_size; | |
| 192 } | |
| 193 t_filesize get_position(abort_callback & p_abort) override { | |
| 194 p_abort.check(); | |
| 195 return m_position; | |
| 196 } | |
| 197 | |
| 198 void seek(t_filesize p_position,abort_callback & p_abort) override { | |
| 199 p_abort.check(); | |
| 200 if (!m_canSeek) throw exception_io_object_not_seekable(); | |
| 201 if ( m_stats.m_size != filesize_invalid && p_position > m_stats.m_size ) throw exception_io_seek_out_of_range(); | |
| 202 | |
| 203 auto posNow = get_position(p_abort); | |
| 204 | |
| 205 if ( p_position >= posNow && p_position < posNow + m_instance->m_readAhead ) { | |
| 206 // FB2K_console_formatter() << "ReadAhead skip: " << posNow << " => " << p_position; | |
| 207 auto toSkip = p_position - posNow; | |
| 208 if ( toSkip > 0 ) read(nullptr, (size_t) toSkip, p_abort); | |
| 209 return; | |
| 210 } | |
| 211 // FB2K_console_formatter() << "ReadAhead seek: " << posNow << " => " << p_position; | |
| 212 | |
| 213 seekInternal( p_position ); | |
| 214 } | |
| 215 bool can_seek() override { | |
| 216 return m_canSeek; | |
| 217 } | |
| 218 bool get_content_type(pfc::string_base & p_out) override { | |
| 219 if (m_contentType.length() == 0) return false; | |
| 220 p_out = m_contentType; return true; | |
| 221 } | |
| 222 t_filestats2 get_stats2( uint32_t, abort_callback & p_abort ) override { | |
| 223 p_abort.check(); | |
| 224 return m_stats; | |
| 225 | |
| 226 } | |
| 227 bool is_remote() override { | |
| 228 return m_instance->m_remote; | |
| 229 } | |
| 230 | |
| 231 void reopen( abort_callback & p_abort ) override { | |
| 232 if ( get_position( p_abort ) == 0 ) return; | |
| 233 seekInternal( seek_reopen ); | |
| 234 } | |
| 235 | |
| 236 bool get_static_info(class file_info & p_out) override { | |
| 237 if ( ! m_haveStaticInfo ) return false; | |
| 238 mergeInfo(p_out, m_staticInfo); | |
| 239 return true; | |
| 240 } | |
| 241 bool is_dynamic_info_enabled() override { | |
| 242 return m_instance->m_haveDynamicInfo; | |
| 243 | |
| 244 } | |
| 245 static void mergeInfo( file_info & out, const file_info & in ) { | |
| 246 out.copy_meta(in); | |
| 247 out.overwrite_info(in); | |
| 248 } | |
| 249 bool get_dynamic_info_v2(class file_info & out, t_filesize & outOffset) override { | |
| 250 auto & i = * m_instance; | |
| 251 if ( ! i.m_haveDynamicInfo ) return false; | |
| 252 | |
| 253 insync( i.m_guard ); | |
| 254 auto ptr = i.m_dynamicInfo.begin(); | |
| 255 for ( ;; ) { | |
| 256 if ( ptr == i.m_dynamicInfo.end() ) break; | |
| 257 if ( ptr->m_offset > m_position ) break; | |
| 258 ++ ptr; | |
| 259 } | |
| 260 | |
| 261 if ( ptr == i.m_dynamicInfo.begin() ) return false; | |
| 262 | |
| 263 auto iter = ptr; --iter; | |
| 264 mergeInfo(out, iter->m_info); | |
| 265 outOffset = iter->m_offset; | |
| 266 i.m_dynamicInfo.erase( i.m_dynamicInfo.begin(), ptr ); | |
| 267 | |
| 268 return true; | |
| 269 } | |
| 270 private: | |
| 271 void seekInternal( t_filesize p_position ) { | |
| 272 auto & i = * m_instance; | |
| 273 insync( i.m_guard ); | |
| 274 if (i.m_error) std::rethrow_exception(i.m_error); | |
| 275 i.m_bufferBegin = i.m_bufferEnd = 0; | |
| 276 i.m_canWrite.set_state(true); | |
| 277 i.m_seekto = p_position; | |
| 278 i.m_atEOF = false; | |
| 279 i.m_canRead.set_state(false); | |
| 280 | |
| 281 m_position = ( p_position == seek_reopen ) ? 0 : p_position; | |
| 282 } | |
| 283 static void worker( readAheadInstance_t & i ) { | |
| 284 try { | |
| 285 bool atEOF = false; | |
| 286 uint8_t* bufptr = i.m_buffer.get_ptr(); | |
| 287 const size_t readAtOnceLimit = i.m_remote ? 64*1024 : 256 * 1024; | |
| 288 for ( ;; ) { | |
| 289 i.m_canWrite.wait_for(-1); | |
| 290 size_t readHowMuch = 0, readOffset = 0; | |
| 291 { | |
| 292 pfc::mutexScope guard(i.m_guard); | |
| 293 i.m_abort.check(); | |
| 294 if ( i.m_seekto != filesize_invalid ) { | |
| 295 if ( i.m_seekto == seek_reopen ) { | |
| 296 i.m_file->reopen( i.m_abort ); | |
| 297 } else { | |
| 298 i.m_file->seek( i.m_seekto, i.m_abort ); | |
| 299 } | |
| 300 | |
| 301 i.m_seekto = filesize_invalid; | |
| 302 atEOF = false; | |
| 303 } | |
| 304 size_t got = i.m_bufferEnd - i.m_bufferBegin; | |
| 305 | |
| 306 if ( i.m_bufferBegin >= i.m_readAhead ) { | |
| 307 memmove( bufptr, bufptr + i.m_bufferBegin, got ); | |
| 308 i.m_bufferBegin = 0; | |
| 309 i.m_bufferEnd = got; | |
| 310 } | |
| 311 | |
| 312 if ( got < i.m_readAhead ) { | |
| 313 readHowMuch = i.m_readAhead - got; | |
| 314 readOffset = i.m_bufferEnd; | |
| 315 } | |
| 316 } | |
| 317 | |
| 318 if ( readHowMuch > readAtOnceLimit ) { | |
| 319 readHowMuch = readAtOnceLimit; | |
| 320 } | |
| 321 | |
| 322 bool dynInfoGot = false; | |
| 323 dynInfoEntry_t dynInfo; | |
| 324 | |
| 325 if ( readHowMuch > 0 ) { | |
| 326 readHowMuch = i.m_file->receive( bufptr + readOffset, readHowMuch, i.m_abort ); | |
| 327 | |
| 328 if ( readHowMuch == 0 ) atEOF = true; | |
| 329 | |
| 330 if ( i.m_haveDynamicInfo ) { | |
| 331 file_dynamicinfo::ptr dyn; | |
| 332 if ( dyn &= i.m_file ) { | |
| 333 file_dynamicinfo_v2::ptr dyn2; | |
| 334 if ( dyn2 &= dyn ) { | |
| 335 dynInfoGot = dyn2->get_dynamic_info_v2(dynInfo.m_info, dynInfo.m_offset); | |
| 336 } else { | |
| 337 dynInfoGot = dyn->get_dynamic_info( dynInfo.m_info ); | |
| 338 if (dynInfoGot) { | |
| 339 dynInfo.m_offset = dyn->get_position( i.m_abort ); | |
| 340 } | |
| 341 } | |
| 342 } | |
| 343 } | |
| 344 } | |
| 345 | |
| 346 { | |
| 347 pfc::mutexScope guard( i.m_guard ); | |
| 348 i.m_abort.check(); | |
| 349 if ( i.m_seekto != filesize_invalid ) { | |
| 350 // Seek request happened while we were reading - discard and continue | |
| 351 continue; | |
| 352 } | |
| 353 i.m_atEOF = atEOF; | |
| 354 i.m_canRead.set_state( true ); | |
| 355 i.m_bufferEnd += readHowMuch; | |
| 356 size_t got = i.m_bufferEnd - i.m_bufferBegin; | |
| 357 if ( atEOF || got >= i.m_readAhead ) i.m_canWrite.set_state(false); | |
| 358 | |
| 359 if ( dynInfoGot ) { | |
| 360 i.m_dynamicInfo.push_back( std::move(dynInfo) ); | |
| 361 } | |
| 362 | |
| 363 } | |
| 364 } | |
| 365 } catch (...) { | |
| 366 pfc::mutexScope guard(i.m_guard); | |
| 367 i.m_error = std::current_exception(); | |
| 368 i.m_canRead.set_state(true); | |
| 369 } | |
| 370 } | |
| 371 | |
| 372 bool m_canSeek; | |
| 373 t_filestats2 m_stats; | |
| 374 pfc::string8 m_contentType; | |
| 375 t_filesize m_position; | |
| 376 | |
| 377 | |
| 378 bool m_haveStaticInfo; | |
| 379 file_info_impl m_staticInfo; | |
| 380 }; | |
| 381 | |
| 382 } | |
| 383 | |
| 384 | |
| 385 file::ptr fileCreateReadAhead(file::ptr chain, size_t readAheadBytes, abort_callback & aborter ) { | |
| 386 auto obj = fb2k::service_new<fileReadAhead>(); | |
| 387 obj->initialize( chain, readAheadBytes, aborter ); | |
| 388 | |
| 389 // Two paths to cast to file*, pick one explicitly to avoid compiler error | |
| 390 file_v2::ptr temp = std::move(obj); | |
| 391 return std::move(temp); | |
| 392 } | |
| 393 | |
| 394 | |
| 395 | |
| 396 namespace { | |
| 397 class CFileWithMemBlock : public reader_membuffer_base { | |
| 398 public: | |
| 399 CFileWithMemBlock(fb2k::memBlockRef mem, t_filestats const& stats, const char* contentType, bool remote) { | |
| 400 m_mem = mem; | |
| 401 m_stats = stats; | |
| 402 m_stats.m_size = mem->size(); | |
| 403 if (contentType != nullptr) m_contentType = contentType; | |
| 404 m_remote = remote; | |
| 405 } | |
| 406 const void* get_buffer() { | |
| 407 return m_mem->get_ptr(); | |
| 408 } | |
| 409 t_size get_buffer_size() { | |
| 410 return m_mem->get_size(); | |
| 411 } | |
| 412 t_filestats get_stats(abort_callback& p_abort) { | |
| 413 p_abort.check(); | |
| 414 return m_stats; | |
| 415 } | |
| 416 bool get_content_type(pfc::string_base& out) { | |
| 417 if (m_contentType.is_empty()) return false; | |
| 418 out = m_contentType; | |
| 419 return true; | |
| 420 } | |
| 421 bool is_remote() { | |
| 422 return m_remote; | |
| 423 } | |
| 424 private: | |
| 425 bool m_remote; | |
| 426 fb2k::memBlockRef m_mem; | |
| 427 pfc::string8 m_contentType; | |
| 428 t_filestats m_stats; | |
| 429 }; | |
| 430 } | |
| 431 | |
| 432 file::ptr createFileWithMemBlock(fb2k::memBlock::ptr mem, t_filestats stats, const char* contentType, bool remote) { | |
| 433 return new service_impl_t< CFileWithMemBlock >(mem, stats, contentType, remote); | |
| 434 } | |
| 435 | |
| 436 file::ptr createFileLimited(file::ptr base, t_filesize offset, t_filesize size, abort_callback& abort) { | |
| 437 return reader_limited::g_create(base, offset, size, abort); | |
| 438 } | |
| 439 | |
| 440 file::ptr createFileBigMemMirror(file::ptr source, abort_callback& abort) { | |
| 441 if (source->is_in_memory()) return source; | |
| 442 auto r = fb2k::service_new<reader_bigmem_mirror>(); | |
| 443 r->init(source, abort); | |
| 444 return r; | |
| 445 } | |
| 446 | |
| 447 file::ptr createFileMemMirror(file::ptr source, abort_callback& abort) { | |
| 448 file::ptr ret; | |
| 449 if (!reader_membuffer_mirror::g_create(ret, source, abort)) ret = source; | |
| 450 return ret; | |
| 451 } | |
| 452 | |
| 453 namespace { | |
| 454 class file_memMirrorAsync : public file_readonly_t< file_v2 > { | |
| 455 struct shared_t { | |
| 456 abort_callback_impl m_abort; | |
| 457 size_t m_size; | |
| 458 pfc::mutex m_sync; | |
| 459 file::ptr m_file; | |
| 460 | |
| 461 pfc::bigmem m_data; | |
| 462 size_t m_dataAvailable = 0; | |
| 463 | |
| 464 size_t m_triggerOffset = SIZE_MAX; | |
| 465 pfc::event m_trigger; | |
| 466 }; | |
| 467 shared_t m_shared; | |
| 468 | |
| 469 static void worker(shared_t& s) { | |
| 470 | |
| 471 constexpr size_t tempSize = 256 * 1024; | |
| 472 auto temp = std::make_unique<uint8_t[]>(tempSize); | |
| 473 | |
| 474 while (s.m_dataAvailable < s.m_size) { | |
| 475 size_t got = s.m_file->receive(temp.get(), tempSize, s.m_abort); | |
| 476 if (got == 0) break; | |
| 477 | |
| 478 PFC_INSYNC(s.m_sync); | |
| 479 s.m_data.write(temp.get(), got, s.m_dataAvailable); | |
| 480 | |
| 481 auto before = s.m_dataAvailable; | |
| 482 s.m_dataAvailable += got; | |
| 483 if (before < s.m_triggerOffset && s.m_dataAvailable >= s.m_triggerOffset) { | |
| 484 s.m_trigger.set_state(true); | |
| 485 } | |
| 486 } | |
| 487 } | |
| 488 t_filestats2 m_stats; | |
| 489 bool m_is_remote = false; | |
| 490 pfc::string8 m_contentType; | |
| 491 service_ptr m_metadata; | |
| 492 size_t m_position = 0; | |
| 493 fb2k::thread m_thread; | |
| 494 public: | |
| 495 ~file_memMirrorAsync() { | |
| 496 m_shared.m_abort.set(); | |
| 497 m_thread.waitTillDone(); | |
| 498 } | |
| 499 void open(file::ptr chain, completion_notify::ptr onDone, abort_callback& a) { | |
| 500 if ( chain->get_position(a) > 0 ) chain->reopen(a); | |
| 501 m_stats = chain->get_stats2_(stats2_all, a); | |
| 502 if (m_stats.m_size > SIZE_MAX) throw pfc::exception_overflow(); | |
| 503 m_is_remote = chain->is_remote(); | |
| 504 m_contentType = chain->get_content_type(); | |
| 505 m_metadata = chain->get_metadata_(a); | |
| 506 m_shared.m_size = (size_t)m_stats.m_size; | |
| 507 m_shared.m_file = chain; | |
| 508 m_shared.m_data.resize(m_shared.m_size); | |
| 509 auto work = [this, onDone] { | |
| 510 try { | |
| 511 worker(this->m_shared); | |
| 512 } catch (...) {} | |
| 513 this->m_shared.m_file.release(); | |
| 514 if (onDone.is_valid()) onDone->on_completion(this->m_shared.m_size == this->m_shared.m_dataAvailable ? 1 : 0); | |
| 515 }; | |
| 516 m_thread.startHere(work); | |
| 517 } | |
| 518 | |
| 519 | |
| 520 service_ptr get_metadata(abort_callback& a) override { | |
| 521 a.check(); | |
| 522 return m_metadata; | |
| 523 } | |
| 524 | |
| 525 t_filestats2 get_stats2(uint32_t, abort_callback& a) override { | |
| 526 a.check(); | |
| 527 return m_stats; | |
| 528 } | |
| 529 | |
| 530 t_filesize get_position(abort_callback& p_abort) override { | |
| 531 p_abort.check(); | |
| 532 return m_position; | |
| 533 } | |
| 534 | |
| 535 void seek(t_filesize p_position, abort_callback& p_abort) override { | |
| 536 if (p_position > get_size(p_abort)) throw exception_io_seek_out_of_range(); | |
| 537 m_position = (size_t)p_position; | |
| 538 } | |
| 539 | |
| 540 bool can_seek() override { return true; } | |
| 541 | |
| 542 bool get_content_type(pfc::string_base& p_out) override { | |
| 543 bool rv = m_contentType.length() > 0; | |
| 544 if (rv) p_out = m_contentType; | |
| 545 return rv; | |
| 546 } | |
| 547 | |
| 548 void reopen(abort_callback& p_abort) override { seek(0, p_abort); } | |
| 549 | |
| 550 bool is_remote() override { return m_is_remote; } | |
| 551 | |
| 552 t_size read(void* p_buffer, t_size p_bytes, abort_callback& p_abort) override { | |
| 553 auto limit = get_size(p_abort); | |
| 554 auto left = limit - m_position; | |
| 555 if (p_bytes > left) p_bytes = (size_t)left; | |
| 556 | |
| 557 const auto upper = m_position + p_bytes; | |
| 558 | |
| 559 auto& shared = m_shared; | |
| 560 | |
| 561 { | |
| 562 PFC_INSYNC(shared.m_sync); | |
| 563 if (shared.m_dataAvailable >= upper) { | |
| 564 shared.m_data.read(p_buffer, p_bytes, m_position); | |
| 565 m_position += p_bytes; | |
| 566 return p_bytes; | |
| 567 } | |
| 568 shared.m_trigger.set_state(false); | |
| 569 shared.m_triggerOffset = upper; | |
| 570 } | |
| 571 p_abort.waitForEvent(shared.m_trigger); | |
| 572 | |
| 573 PFC_INSYNC(shared.m_sync); | |
| 574 PFC_ASSERT(shared.m_dataAvailable >= upper); | |
| 575 shared.m_data.read(p_buffer, p_bytes, m_position); | |
| 576 m_position += p_bytes; | |
| 577 return p_bytes; | |
| 578 } | |
| 579 t_filesize skip(t_filesize p_bytes, abort_callback& p_abort) override { | |
| 580 auto total = get_size(p_abort); | |
| 581 PFC_ASSERT(total >= m_position ); | |
| 582 auto left = total - m_position; | |
| 583 if (p_bytes > left) p_bytes = left; | |
| 584 m_position += (size_t)p_bytes; | |
| 585 return p_bytes; | |
| 586 } | |
| 587 }; | |
| 588 } | |
| 589 | |
| 590 file::ptr createFileMemMirrorAsync(file::ptr source, completion_notify::ptr onDone, abort_callback & a) { | |
| 591 auto ret = fb2k::service_new<file_memMirrorAsync>(); | |
| 592 ret->open(source, onDone, a); | |
| 593 return ret; | |
| 594 } |
