diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 1591af5f36..14d82884dc 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -5068,8 +5068,8 @@ async def jsonrpc_file_reflect(self, **kwargs): else: server, port = random.choice(self.conf.reflector_servers) reflected = await asyncio.gather(*[ - self.file_manager['stream'].reflect_stream(stream, server, port) - for stream in self.file_manager.get_filtered_streams(**kwargs) + self.file_manager.source_managers['stream'].reflect_stream(stream, server, port) + for stream in self.file_manager.get_filtered(**kwargs) ]) total = [] for reflected_for_stream in reflected: diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index cce283473c..72fd1414ad 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -228,6 +228,7 @@ async def _retriable_reflect_stream(self, stream, host, port): while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0: stream.reflector_progress = 0 sent = await stream.upload_to_reflector(host, port) + return sent async def create(self, file_path: str, key: Optional[bytes] = None, iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index e1e92aead0..12b9999357 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -69,6 +69,16 @@ async def create_streams_in_range(self, *args, **kwargs): t = await self.stream_create(f'Stream_{i}', '0.00001') self.stream_claim_ids.append(t['outputs'][0]['claim_id']) + async def test_file_reflect(self): + tx = await self.stream_create('mirror', '0.01') + sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] + self.assertEqual([], await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash)) + all_except_sd = [ + blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash + ] + await self.reflector.blob_manager.delete_blobs(all_except_sd) + self.assertEqual(all_except_sd, await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash)) + async def test_file_management(self): await self.stream_create('foo', '0.01') await self.stream_create('foo2', '0.01')