Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow dag processor exits with too many open files after sometime #46048

Open
1 of 2 tasks
tirkarthi opened this issue Jan 26, 2025 · 2 comments
Open
1 of 2 tasks

Airflow dag processor exits with too many open files after sometime #46048

tirkarthi opened this issue Jan 26, 2025 · 2 comments
Labels
area:core kind:bug This is a clearly a bug

Comments

@tirkarthi
Copy link
Contributor

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

It seems airflow dag processor opens sockets under the hood that are not closed properly leading too many open files after it runs for sometime. To reproduce this please follow below steps since it takes sometime for the ulimit to be reached in case it's unlimited or very high.

There is comment on reading the source code

pid = os.fork()
if pid == 0:
# Parent ends of the sockets are closed by the OS as they are set as non-inheritable
# Python GC should delete these for us, but lets make double sure that we don't keep anything
# around in the forked processes, especially things that might involve open files or sockets!
del constructor_kwargs
del logger
# Run the child entrypoint
_fork_main(child_stdin, child_stdout, child_stderr, child_logs.fileno(), target)
requests_fd = child_comms.fileno()
# Close the remaining parent-end of the sockets we've passed to the child via fork. We still have the
# other end of the pair open
cls._close_unused_sockets(child_stdin, child_stdout, child_stderr, child_comms, child_logs)

What you think should happen instead?

The open files should not increase and the files opened should be closed. The socket warnings should be fixed which could indicate the problem.

2025-01-26 09:20:05 [debug    ] Workload process exited 1      [supervisor] exit_code=0
[2025-01-26T09:20:05.502+0530] {dag.py:1841} INFO - Sync 1 DAGs
[2025-01-26T09:20:05.510+0530] {dag.py:2398} INFO - Setting next_dagrun for example_setup_teardown_taskflow to None, run_after=None
/usr/lib/python3.11/socket.py:789 ResourceWarning: unclosed <socket.socket fd=320, family=1, type=1, proto=0>

How to reproduce

  1. set min_file_process_interval = 10 in airflow.cfg to trigger frequent reparsing.
  2. Run PYTHONWARNINGS=always python -X dev -m airflow dag-processor
  3. Use ps aux | grep -i dag-processor to get the pid
  4. Run watch "ls -1 /proc/<pid>/fd | wc" which keeps increasing
  5. In another tab set open files limit for the process using prlimit --pid <pid> --nofile=1024:1024
    Once the limit is reached dag processor exits with following stack trace
/usr/lib/python3.11/socket.py:789 ResourceWarning: unclosed <socket.socket fd=1010, family=1, type=1, proto=0>
[2025-01-26T09:12:56.322+0530] {dag_processor_job_runner.py:63} ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 231, in run
    return self._run_parsing_loop()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 315, in _run_parsing_loop
    self._start_new_processes()
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 779, in _start_new_processes
    processor = self._create_process(file_path)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 763, in _create_process
    return DagFileProcessorProcess.start(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/processor.py", line 212, in start
    proc: Self = super().start(target=target, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 343, in start
    child_logs, read_logs = mkpipe()
                            ^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 132, in mkpipe
    rsock, wsock = socketpair()
                   ^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 657, in socketpair
    a, b = _socket.socketpair(family, type, proto)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 24] Too many open files
[2025-01-26T09:12:56.332+0530] {process_utils.py:266} INFO - Waiting up to 5 seconds for processes to exit...
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 62, in <module>
    main()
  File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 58, in main
    args.func(args)
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line 111, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/dag_processor_command.py", line 54, in dag_processor
    run_command_with_daemon_option(
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/dag_processor_command.py", line 57, in <lambda>
    callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute),
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 101, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 342, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 371, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 231, in run
    return self._run_parsing_loop()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 315, in _run_parsing_loop
    self._start_new_processes()
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 779, in _start_new_processes
    processor = self._create_process(file_path)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 763, in _create_process
    return DagFileProcessorProcess.start(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/processor.py", line 212, in start
    proc: Self = super().start(target=target, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 343, in start
    child_logs, read_logs = mkpipe()
                            ^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 132, in mkpipe
    rsock, wsock = socketpair()
                   ^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 657, in socketpair
    a, b = _socket.socketpair(family, type, proto)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 24] Too many open files

Operating System

Ubuntu 20.04.3 LTS

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@tirkarthi tirkarthi added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet and removed needs-triage label for new issues that we didn't triage yet labels Jan 26, 2025
@tirkarthi
Copy link
Contributor Author

cc: @kaxil @ashb @jedcunningham

@jedcunningham
Copy link
Member

Thanks @tirkarthi. I'll look into it this coming week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

2 participants