You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
It seems airflow dag processor opens sockets under the hood that are not closed properly leading too many open files after it runs for sometime. To reproduce this please follow below steps since it takes sometime for the ulimit to be reached in case it's unlimited or very high.
The open files should not increase and the files opened should be closed. The socket warnings should be fixed which could indicate the problem.
2025-01-26 09:20:05 [debug ] Workload process exited 1 [supervisor] exit_code=0
[2025-01-26T09:20:05.502+0530] {dag.py:1841} INFO - Sync 1 DAGs
[2025-01-26T09:20:05.510+0530] {dag.py:2398} INFO - Setting next_dagrun for example_setup_teardown_taskflow to None, run_after=None
/usr/lib/python3.11/socket.py:789 ResourceWarning: unclosed <socket.socket fd=320, family=1, type=1, proto=0>
How to reproduce
set min_file_process_interval = 10 in airflow.cfg to trigger frequent reparsing.
Run PYTHONWARNINGS=always python -X dev -m airflow dag-processor
Use ps aux | grep -i dag-processor to get the pid
Run watch "ls -1 /proc/<pid>/fd | wc" which keeps increasing
In another tab set open files limit for the process using prlimit --pid <pid> --nofile=1024:1024
Once the limit is reached dag processor exits with following stack trace
/usr/lib/python3.11/socket.py:789 ResourceWarning: unclosed <socket.socket fd=1010, family=1, type=1, proto=0>
[2025-01-26T09:12:56.322+0530] {dag_processor_job_runner.py:63} ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
self.processor.run()
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 231, in run
return self._run_parsing_loop()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 315, in _run_parsing_loop
self._start_new_processes()
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 779, in _start_new_processes
processor = self._create_process(file_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 763, in _create_process
return DagFileProcessorProcess.start(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/processor.py", line 212, in start
proc: Self = super().start(target=target, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 343, in start
child_logs, read_logs = mkpipe()
^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 132, in mkpipe
rsock, wsock = socketpair()
^^^^^^^^^^^^
File "/usr/lib/python3.11/socket.py", line 657, in socketpair
a, b = _socket.socketpair(family, type, proto)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 24] Too many open files
[2025-01-26T09:12:56.332+0530] {process_utils.py:266} INFO - Waiting up to 5 seconds for processes to exit...
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 62, in <module>
main()
File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 58, in main
args.func(args)
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line 111, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/dag_processor_command.py", line 54, in dag_processor
run_command_with_daemon_option(
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py", line 86, in run_command_with_daemon_option
callback()
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/dag_processor_command.py", line 57, in <lambda>
callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 101, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 342, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 371, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
self.processor.run()
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 231, in run
return self._run_parsing_loop()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 315, in _run_parsing_loop
self._start_new_processes()
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 779, in _start_new_processes
processor = self._create_process(file_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 763, in _create_process
return DagFileProcessorProcess.start(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/processor.py", line 212, in start
proc: Self = super().start(target=target, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 343, in start
child_logs, read_logs = mkpipe()
^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 132, in mkpipe
rsock, wsock = socketpair()
^^^^^^^^^^^^
File "/usr/lib/python3.11/socket.py", line 657, in socketpair
a, b = _socket.socketpair(family, type, proto)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 24] Too many open files
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
It seems airflow dag processor opens sockets under the hood that are not closed properly leading too many open files after it runs for sometime. To reproduce this please follow below steps since it takes sometime for the ulimit to be reached in case it's unlimited or very high.
There is comment on reading the source code
airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py
Lines 345 to 361 in 29b9e8e
What you think should happen instead?
The open files should not increase and the files opened should be closed. The socket warnings should be fixed which could indicate the problem.
How to reproduce
min_file_process_interval = 10
in airflow.cfg to trigger frequent reparsing.PYTHONWARNINGS=always python -X dev -m airflow dag-processor
ps aux | grep -i dag-processor
to get the pidwatch "ls -1 /proc/<pid>/fd | wc"
which keeps increasingprlimit --pid <pid> --nofile=1024:1024
Once the limit is reached dag processor exits with following stack trace
Operating System
Ubuntu 20.04.3 LTS
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: