FermiLink
Source code for fermilink.exploop.pid
from __future__ import annotations
import csv
import io
import os
import subprocess
[docs]
def is_pid_alive ( pid : int ) -> bool :
"""Return whether a PID is alive on the current platform."""
try :
normalized = int ( pid )
except ( TypeError , ValueError ):
return False
if normalized <= 0 :
return False
if os . name == "nt" :
return _windows_pid_alive ( normalized )
return _posix_pid_alive ( normalized )
def _posix_pid_alive ( pid : int ) -> bool :
try :
os . kill ( pid , 0 )
except ProcessLookupError :
return False
except PermissionError :
return True
except OSError :
return False
return True
def _windows_pid_alive ( pid : int , * , runner = subprocess . run ) -> bool :
try :
result = runner (
[ "tasklist" , "/FI" , f "PID eq { pid } " , "/FO" , "CSV" , "/NH" ],
check = False ,
capture_output = True ,
text = True ,
timeout = 8 ,
)
except ( OSError , subprocess . TimeoutExpired , ValueError ):
return False
if getattr ( result , "returncode" , 1 ) != 0 :
return False
stdout = str ( getattr ( result , "stdout" , "" ) or "" ) . strip ()
if not stdout or stdout . upper () . startswith ( "INFO:" ):
return False
try :
rows = csv . reader ( io . StringIO ( stdout ))
for row in rows :
if len ( row ) >= 2 and row [ 1 ] . strip () == str ( pid ):
return True
except csv . Error :
return False
return False