自动化脚本
2024/10/3大约 5 分钟
自动化脚本
执行命令行操作
- 通过脚本执行不需要中途输入的命令行操作
subprocess是python自带的线程模块,从python2.4引入concurrent.futures是python3.2加入的模块,用于创建和管理线程池,之前的python版本使用需要安装futures包pexpect是一个社区包
subprocess.run
# 使用 subprocess.run 可以模拟在命令行运行命令
def execute_command(command, cwd=None, capture_output=True):
result = subprocess.run(
command, shell=True, cwd=cwd, capture_output=capture_output, text=True
)
if result.returncode != 0:
print(f"Error executing command: {command}\n{result.stderr}")
elif capture_output:
print(result.stdout)
return result.returncodesubprocess.Popen
# 使用 subprocess.Popen 或开启新的线程可以异步在命令行运行命令
def async_execute_command(command, cwd=None, capture_output=True):
return subprocess.Popen(
command, shell=True, cwd=cwd, capture_output=capture_output, text=True
)
p = async_execute_command('python test.py')
# 等待异步线程执行完成
p.wait()
# 创建子线程执行任务
def thread():
os.system("python test.py")
thread = threading.Thread(target=thread)
# 开始执行
monitor_thread.start()
# 等待异步线程执行完成
monitor_thread.join()
# -------------------------------------------------
# 使用 subprocess.Popen 开启新线程同时捕获输出并实时在命令行重新输出
# 对部分程序可能会有问题
def execute_command_with_realtime_capture(command, cwd=None):
# 启动子进程并打开管道, capture_output无法做到实时输出
process = subprocess.Popen(command, cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1) # 行缓冲
last_line = None # 用于存储最后一行输出
# 实时读取并打印输出,同时记录最后一行
while True:
output = process.stdout.readline()
if output == "" and process.poll() is not None: # 当输出为空且进程结束时,退出循环
break
if output:
sys.stdout.write(output) # 实时输出到命令行
last_line = output # 更新最后一行
# 捕获标准错误输出(如果有)
stderr_output = process.stderr.read()
if stderr_output:
sys.stderr.write(stderr_output)
# 等待子进程完成
process.wait()
# 打印并处理最后一行结果
if last_line:
print("\n最后一行输出:", last_line.strip())
return process.returncodeconcurrent.futures
# 使用 concurrent.futures 模块开启异步线程并获取返回值
import concurrent.futures # 引入子包,而不是concurrent
def run_async(func, *args, **kwargs):
"""
异步执行给定的函数并返回Future对象。
:param func: 要异步执行的函数
:param args: 函数的位置参数
:param kwargs: 函数的关键字参数
:return: Future对象
"""
with concurrent.futures.ThreadPoolExecutor() as executor:
# submit函数会立即返回一个Future对象,不会阻塞
future = executor.submit(func, *args, **kwargs)
# 返回Future对象
return futurepexpect.spawn
# pip install pexpect
def run_darknet(command, cwd=HOST_DIR, capture_output=False, log_file=None):
"""
执行命令并捕获输出
:param command: 需要执行的命令
:param cwd: 执行目录,默认为 host 目录
:param capture_output: 是否捕获输出
:param log_file: 日志文件路径,若提供则将输出写入该文件
:return: 执行状态码,最后一行
"""
# 使用 pexpect.spawn 启动子进程
process = pexpect.spawn(command, cwd=cwd, env=run_env, encoding="utf-8", logfile=open(log_file, 'w') if log_file else None, timeout=300)
# 实时读取并打印输出,同时记录最后一行
while True:
try:
output = process.readline().strip() # 读取一行输出并去掉末尾的换行符
if output == "":
break
if not capture_output:
sys.stdout.write(output + "\n") # 实时输出到命令行
except pexpect.EOF:
break # 子进程结束
except pexpect.TIMEOUT:
# 超时后终止 pexpect 子程序
process.terminate()
raise('pexpect timeout')
# 获取退出状态码
process.wait() # 等待子进程退出
return process.exitstatus- 中途输入:
Popen.communicate(input=None, timeout=None)这个方法在Popen对象上调用,它将阻塞主线程直到子进程完成。如果指定了input,则将其发送给子进程的stdin。stdout和stderr作为元组返回
获取正在执行的程序的相关信息
- 根据程序名称获取程序句柄:获取程序通过
psutil.process_iter得到所有正在执行程序的名字和句柄,遍历获取一个需要关注的程序
def get_pid_by_name(process_name):
pid = None
flag = True
while flag:
time.sleep(0.2)
for proc in psutil.process_iter(['pid', 'name']):
print("pid:", proc.info['pid'], "name:", proc.info['name'])
if proc.info['name'] == process_name:
pid = proc.info['pid']
flag = False
break
print("get pid")
return pid
# 通过pid获取程序其他信息
process = psutil.Process(pid)
# 内存信息
memory_info = process.memory_info()
# cpu这段时间占用信息
cpu_usage = process.cpu_percent(interval=interval)
# 进程的创建时间(单位是时间戳,秒)
start_time = process.create_time()
# 程序是否还在运行
process.is_running()优化脚本的输出
- 覆盖命令行输出
- 通过使用
\r可以使输出光标回到本行开头 - 通过使用
\b可以使输出光标后退一格,并不删除上一个字符,会导致命令行此行卡住,等待此行输出完全结束才会在命令行显示
- 通过使用
# 使用end="\r"使光标移动到本行开始,进行覆盖操作(仅适合覆盖的文本长度大于等于原长度的情况)
print("some data", end="\r")
print("another different data")
# 通过对sys.stdout进行操作,输入"\r",达到覆盖的效果(仅适合覆盖的文本长度大于等于原长度的情况)
def restart_line():
sys.stdout.write('\r')
sys.stdout.flush()- 实现转圈效果
lst = ["\\", "|", "/", "—"]
for i in range(20):
j = i % 4
print("hello world", lst[j] * 4, end="\r")
time.sleep(0.2)- 实现英文输出靠右,如果希望其他语言的输出靠右需要得到如
GTK这样的命令行格式化输出工具的支持,否则无法获取其他语言字符串在命令行的占位数
# shutil库的说明中似乎表示其中的部分函数对Mac无效
def print_right(text:str):
# 获取控制台宽度
console_width = shutil.get_terminal_size().columns
# 计算左边需要填充的空格数
num_spaces = console_width - len(text) - 1 # 减1是因为print会在字符串末尾添加换行符
# 打印左边的空格
print(' ' * max(num_spaces, 0), end='')
# 打印文本
print(text)- 格式化输出建表部分见python笔记
