diff --git a/src/executorlib/task_scheduler/interactive/spawner_pysqa.py b/src/executorlib/task_scheduler/interactive/spawner_pysqa.py index 6b3b4463..3cdfc709 100644 --- a/src/executorlib/task_scheduler/interactive/spawner_pysqa.py +++ b/src/executorlib/task_scheduler/interactive/spawner_pysqa.py @@ -238,6 +238,8 @@ def create_pysqa_block_allocation_scheduler( cores_per_worker = executor_kwargs.get("cores", 1) if "cwd" in executor_kwargs and executor_kwargs["cwd"] is not None: executor_kwargs["cwd"] = os.path.abspath(executor_kwargs["cwd"]) + elif cache_directory is not None: + executor_kwargs["cwd"] = os.path.abspath(cache_directory) if cache_directory is not None: executor_kwargs["cache_directory"] = os.path.abspath(cache_directory) else: diff --git a/tests/unit/executor/test_flux_cluster.py b/tests/unit/executor/test_flux_cluster.py index 37661b13..a86937d0 100644 --- a/tests/unit/executor/test_flux_cluster.py +++ b/tests/unit/executor/test_flux_cluster.py @@ -71,6 +71,20 @@ def test_executor(self): self.assertEqual(len(os.listdir("executorlib_cache")), 4) self.assertTrue(fs1.done()) + def test_executor_no_cwd(self): + with FluxClusterExecutor( + resource_dict={"cores": 2}, + block_allocation=False, + cache_directory="executorlib_cache", + pmi_mode=pmi, + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + self.assertTrue(fs1.done()) + def test_executor_blockallocation(self): with FluxClusterExecutor( resource_dict={"cores": 2, "cwd": "executorlib_cache"}, @@ -86,6 +100,21 @@ def test_executor_blockallocation(self): self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertTrue(fs1.done()) + def test_executor_blockallocation_no_cwd(self): + with FluxClusterExecutor( + resource_dict={"cores": 2}, + block_allocation=True, + cache_directory="executorlib_cache", + pmi_mode=pmi, + max_workers=1, + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(len(os.listdir("executorlib_cache")), 2) + self.assertTrue(fs1.done()) + def test_executor_dependencies(self): with FluxClusterExecutor( resource_dict={"cores": 1, "cwd": "executorlib_cache"},