2025py重学计划
目录
Day 1
Day 1 Python 语法入门 变量、print、类型、输入输出 写一个 CLI 工具,输入姓名打印问候语
写一个 CLI 工具,输入姓名打印问候语
v1 cat echo.py name = 'evan' print( name + " have a good day") v2 ➜ py2025 cat echo.py name = input("please input your name") print( name + " have a good day") py echo.py please input your name evan evan have a good day ➜ py2025 cat echo.py v3 import sys if len(sys.argv) < 2: print("Usage: python echo.py <name>") else: name = sys.argv[1] print(f"Hi, {name} have a good day") ➜ py2025 py echo.py Usage: python echo.py <name> ➜ py2025 py echo.py evan Hi, evan have a good d v4 ➜ py2025 cat greet.py import argparse def main(): parser = argparse.ArgumentParser(description="Say Hi to someone") parser.add_argument("-n","--name",required=True,help="The name of the person to greet.") args = parser.parse_args() print(f"Hello,{args.name} hope you have a good day") if __name__ == "__main__": main() ➜ py2025 python greet.py -n evan Hello,evan hope you have a good day
day2
Day 2 流程控制 if, for, while, break, continue 判断磁盘使用率是否超过阈值
num = [1,2,3,4,5,6,] for nu in num: if nu % 2 == 0: print(f"{num} 偶数") continue elif nu == 5: print(f"找到 5 quit loop") break print(f"{num} 奇数")
day3
Day 3 数据结构 list, dict, set, tuple 常用操作 把 log 中的错误代码用 dict 统计次数
py2025 cat errors.log ERR001 ERR002 ERR001 ERR003 ERR002 ERR001 ➜ py2025 py 1.py ERR001: 3次 ERR002: 2次 ERR003: 1次 ➜ py2025 tail 1.py error_counts = {} with open("errors.log","r") as file: for line in file: error_code = line.strip() error_counts[error_code] = error_counts.get(error_code,0) + 1 for code,count in error_counts.items(): print(f"{code}: {count}次") 这ai 解说还真的有意思 for code, count in error_counts.items(): 这一行: error_counts 是一个字典,用于存储错误代码及其出现的次数。例如,error_counts 可能是 {"E001": 3, "E002": 2} 这样的形式。 items() 是字典的一个方法,它会返回一个包含多个元组的可迭代对象,每个元组由字典的键和对应的值组成。对于 {"E001": 3, "E002": 2} 这个字典,error_counts.items() 的结果就是 [("E001", 3), ("E002", 2)] 这样的形式。 for code, count in error_counts.items(): 是一个循环语句,code 和 count 是自定义的变量名。在每次循环中,code 会被赋值为字典的键(也就是错误代码,如 "E001"),count 会被赋值为对应的值(也就是该错误代码出现的次数,如 3)。也就是说,第一次循环时,code 是 "E001",count 是 3;第二次循环时,code 是 "E002",count 是 2。 print(f"{code}: {count} 次") 这一行: print() 是 Python 内置的函数,用于在控制台输出信息。 f"{code}: {count} 次" 是一个格式化字符串(也称为 f-string)。在这个字符串中,{code} 和 {count} 是占位符,会被 code 和 count 变量的值所替换。例如,当 code 是 "E001",count 是 3 时,f"{code}: {count} 次" 就会被解析为 "E001: 3 次",然后 print() 函数会将这个字符串输出到控制台。 第 7 行:error_code = line.strip() strip() 方法移除字符串首尾的空白字符(如换行符 \n、空格)。 示例: "E001\n".strip() → "E001" " E002 ".strip() → "E002" 第 9 行:error_counts[error_code] = error_counts.get(error_code, 0) + 1 核心逻辑:统计错误代码的出现次数。 get(key, default) 方法: 如果 key 存在于字典中,返回对应的值。 如果 key 不存在,返回默认值 default(这里是 0)。 执行过程: 第一次遇到 "E001": error_counts.get("E001", 0) 返回 0 0 + 1 = 1,因此 error_counts["E001"] = 1 第二次遇到 "E001": error_counts.get("E001", 0) 返回 1 1 + 1 = 2,因此 error_counts["E001"] = 2
day 4
Day 4 函数与作用域 定义函数、传参、返回值、局部变量 封装一个“计算服务器负载”的函数
def calculate_server_load(cpu_usage,memory_used,disk_io, total_memory=32,max_io=32): """ 計算伺服器負載。 參數: cpu_usage (float): CPU 使用率(%) memory_used (float): 記憶體使用量(GB) disk_io (float): 磁碟 I/O 速率(MB/s) total_memory (float): 總記憶體(GB,預設 32) max_io (float): 最大 I/O 速率(MB/s,預設 200) 返回: float: 伺服器負載(%) """ if not all(isinstance(x,(int,float)) for x in [cpu_usage,memory_used, disk_io, total_memory,max_io]): raise ValueError("參數必須為數字") if cpu_usage < 0 or memory_used < 0 or disk_io < 0 or total_memory <= 0 or max_io <= 0: raise ValueError("參數不能為負數,且總記憶體與最大 I/O 不能為 0 ") if cpu_usage > 100: raise ValueError("CPU 使用率不能大於 100%") if memory_used > total_memory: raise ValueError("記憶體使用量不能大於總記憶體") cpu_load = cpu_usage * 0.4 memory_load = (memory_used / total_memory * 100) * 0.4 io_load = (disk_io / max_io * 100) * 0.2 total_load = cpu_load + memory_load + io_load return round(total_load,2) try: load = calculate_server_load(80,20,100,32,200) print(f"伺服器負載為 {load}%") except ValueError as e: print(f"err{e}")
day 5
Day 5 文件操作 with open, 读写、逐行处理 写一个“读取Nginx日志并分析IP访问量”的脚本
from collections import Counter import sys if len(sys.argv) < 2: print("Usage: python nglog.py log_file") sys.exit(1) log_file = sys.argv[1] ip_counter = Counter() try: with open(log_file,'r') as file: for line in file: if line.strip(): ip = line.lstrip().split(' ',1)[0] ip_counter[ip] += 1 except FileNotFoundError: print(f"Error: file '{log_file}' not found.") sys.exit(1) print("IP 访问量统计") for ip, count in ip_counter.most_common(): print(f"IP: {ip}, Accesses: {count}") # IP 地址提取: # line.lstrip().split(' ', 1)[0]: # lstrip() 去除行首空白字符 # split(' ', 1) 按第一个空格分割字符串 # [0] 取分割后的第一个部分(即 IP 地址) # cat access.log # 192.168.1.1 - - [01/Jan/2023:00:00:00 +0000] "GET /index.html HTTP/1.1" # python nglog.py access.log # IP 访问量统计 # IP: 192.168.1.1, Accesses: 2 # IP: 192.168.1.2, Accesses: 1 for ip, count in ip_counter.most_common(): ip_counter.most_common() 的作用 ip_counter 是一个 Counter 对象,它存储了每个 IP 地址的出现次数。 例如:Counter({'192.168.1.1': 5, '192.168.1.2': 3, '10.0.0.1': 2}) most_common() 是 Counter 的方法,用于按计数从高到低排序元素。 返回值:一个包含元组的列表,每个元组格式为 (元素, 计数)。 例如:[('192.168.1.1', 5), ('192.168.1.2', 3), ('10.0.0.1', 2)] 循环变量解构 for ip, count in ...: ip:每次循环中存储当前 IP 地址(元组的第一个元素)。 count:存储该 IP 的访问次数(元组的第二个元素)。 第 3 行:print(f"IP: {ip}, Accesses: {count}") f-string 格式化输出 语法:f"字符串{变量}" 会将 {} 内的变量值插入字符串。 示例: 当 ip = '192.168.1.1' 且 count = 5 时,输出: plaintext IP: 192.168.1.1, Accesses: 5 完整执行流程示例 假设 ip_counter 的内容为: python 运行 Counter({'192.168.1.1': 3, '10.0.0.1': 2, '192.168.1.100': 1}) 第一次循环: ip = '192.168.1.1',count = 3 输出:IP: 192.168.1.1, Accesses: 3 第二次循环: ip = '10.0.0.1',count = 2 输出:IP: 10.0.0.1, Accesses: 2 第三次循环: ip = '192.168.1.100',count = 1 输出:IP: 192.168.1.100, Accesses: 1 为什么用 most_common()? 默认行为:most_common() 不指定参数时,会返回所有元素,并按计数从高到低排序。 等价写法: python 运行 # 手动排序(效果相同) for ip, count in sorted(ip_counter.items(), key=lambda x: x[1], reverse=True): print(f"IP: {ip}, Accesses: {count}") 最终输出示例 plaintext IP 访问量统计 IP: 192.168.1.1, Accesses: 3 IP: 10.0.0.1, Accesses: 2 IP: 192.168.1.100, Accesses: 1 这两行代码的核心作用是将统计结果以易读的格式展示出来,并按访问量从高到低排序,帮助用户快速定位高频访问的 IP 地址。
day 6
Day 6 错误处理 try...except...finally 模拟连接失败的错误处理
import time def connect_to_resource(url,max_retries=3): retries = 0 while retries < max_retries: try: print(f"尝试连接到 {url}, 第 {retries+1} 次尝试") should_fail = (retries < 2) if should_fail: raise Exception("Failed to connect to resource") else: print("连接成功 {url}") return True except Exception as e: print(f"连接失败 {url}, 错误信息: {e}") retries += 1 print(f"等 2 秒后重试") time.sleep(1) finally: print("无论连接成功与不,这里都会执行清理操作") print("-" * 20) print(f"尝试了 {max_retries} 次,连接失败") return False if __name__ == "__main__": resource_url = "https://wiki.linuxchina.net" connection_successful = connect_to_resource(resource_url) if connection_successful: print("连接成功") else: print("连接失败") 尝试连接到 https://wiki.linuxchina.net, 第 1 次尝试 连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource 等 2 秒后重试 无论连接成功与不,这里都会执行清理操作 -------------------- 尝试连接到 https://wiki.linuxchina.net, 第 2 次尝试 连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource 等 2 秒后重试 无论连接成功与不,这里都会执行清理操作 -------------------- 尝试连接到 https://wiki.linuxchina.net, 第 3 次尝试 连接成功 {url} 无论连接成功与不,这里都会执行清理操作 -------------------- 连接成功
day 7
Day 7 模块与包 import, sys.path, 自定义模块 把昨天的脚本封装为一个模块
day 8
Day 8 subprocess 与 os 执行 shell 命令,获取结果 写一个“批量检测服务状态”的脚本
socket version 利用python检测远程 IP和端口是否可连接并钉钉报警
nc version
import subprocess def check_port(host,port): try: result = subprocess.run( ['nc','-zv',host,str(port)], stdout = subprocess.PIPE, stderr = subprocess.STDOUT, timeout = 3, text=True ) if "succeeded" in result.stdout or "open" in result.stdout: return f"{host}:{port} is UP" else: return f"{host}:{port} is DOWN" except subprocess.TimeoutExpired: return f"{host}:{port} Timeout" except Exception as e: return f"{host}:{port} Error - {str(e)}" if __name__ =='__main__': targets = [ ('127.0.0.1',22), ('8.8.8.8',53), ('www.bing.com',80), ] for host, port in targets: print(check_port(host,port)) """ py check_port.py 127.0.0.1:22 is UP 8.8.8.8:53 is UP www.bing.com:80 is UP socket version and telnet version """
telnet version
import telnetlib import socket def check_port(host,port,timeout=3): try: with telnetlib.Telnet(host,port,timeout): return True except socket.timeout: return False except Exception: return False if __name__ == '__main__': targets = [ ("127.0.0.1",22), ("linuxsa.org",80) ] for host,port in targets: status = "UP" if check_port(host,port) else "DOWN" print(f"{host}:{port} is {status}")
telnet 报警版
import telnetlib import socket import yaml import requests import json from datetime import datetime DING_WEBHOOK ='https://oapi.dingtalk.com/robot/send?access_token=4578bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73d' def load_from_yaml(file_path): with open(file_path,'r') as f: data = yaml.safe_load(f) return data.get("targets",[]) def send_ding_alert(message): headers = {'Content-Type': 'application/json; charset=utf-8'} data = { "msgtype": "text", "text": { "content": message } } try: response = requests.post(DING_WEBHOOK, headers=headers, data=json.dumps(data),timeout=5) except Exception as e: print(f"Failed to send ding alert: {e}") def check_port(host,port,timeout=3): try: with telnetlib.Telnet(host,port,timeout): return True except socket.timeout: return False except Exception: return False def main(): ips = load_from_yaml("/home/evan/data/tmp/py2025/ips.yaml") down_list=[] for item in ips: host = item.get("host") port = item.get("port") if not check_port(host,port): down_list.append(f"{host}:{port}") if down_list: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") message = f"[{now}] Port detection anomaly端口检测异常 {now} 报警: {', '.join(down_list)} 🚨" send_ding_alert(message) else: print("All ports are up") if __name__ == '__main__': main() ips.yaml targets: - host: 127.0.0.1 port: 24 - host: 8.8.8.8 port: 53 - host: www.bing.com port: 89
day 9
Day 9 requests 库 API 请求、返回解析、header/cookie 用钉钉/企业微信接口推送消息
pre: 都是图形操作,你事先有个钉钉群,没有的话创建一个 进入钉钉群 → 添加一个「自定义机器人」
设置关键词(如:报警)
复制 Webhook 地址(例子如下)
import requests import json webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=6678bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73dc155' message_text = "报警: 服务器磁盘使用率超过阈值!🚨- test message" headers = { "Content-Type": "application/json", } payload = { "msgtype": "text", "text": { "content":message_text } } response = requests.post(url=webhook_url,headers=headers, data=json.dumps(payload)) if response.status_code == 200: result = response.json() if result.get("errcode") == 0: print("message sent successfully") else: print("message sent failed, error code: ", result) else: print("HTTP err message sent failed, status code: ", response.status_code)
Day 10
paramiko + fabric SSH 自动执行远程命令 写一个“批量部署脚本”上传+执行
from fabric import Connection from invoke import Responder servers = [ {"host": "192.168.10.5", "user": "root", "password": "xxxxxxx"}, {"host": "192.168.1.101", "user": "root", "password": "123456"}, ] local_script = "test36.sh" remote_path = "/tmp/test36.sh" remote_cmd = f"bash {remote_path}" for server in servers: print(f"uploading script to {server['host']}") conn = Connection( host=server['host'], user=server['user'], connect_kwargs={"password": server['password']} ) print(f"uploading {local_script} to {remote_path}") conn.put(local_script, remote_path) conn.run(f"chmod +x {remote_path}") print("running remote script") result = conn.run(remote_cmd,hide=True) print(result.stdout) conn.close() print(f"Done with {server['host']}") # cat /tmp/test36.sh #!/bin/bash echo "Deploying service..." # 示例命令,可改为拉 Git、重启服务等 systemctl restart nginx
trouble shooting
from .loader import FilesystemLoader # noqa ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3/dist-packages/invoke/loader.py", line 3, in <module> import imp ModuleNotFoundError: No module named 'imp' 你使用的 Fabric 版本太老,不兼容你当前的 Python 版本(Python 3.12+ 已经移除了 imp 模块)。 pip3 install --upgrade fabric invoke
Day 11
argparse 与定时任务 解析命令参数、结合 crontab 写一个“CLI磁盘告警工具”定时运行
Day 12
JSON/YAML 配置文件 json, yaml,配置与数据保存 写一个可配置的“日志清理工具”
import os import glob import time import argparse import json import yaml from datetime import datetime,timedelta def load_config(file_path): with open(file_path, 'r') as f: if file_path.endswith('.json'): return json.load(f) elif file_path.endswith('.yaml') or file_path.endswith('.yml'): return yaml.safe_load(f) else: raise ValueError('Unsupported file type') def clean_logs(path,pattern,keep_days): now = time.time() cutoff = now - (keep_days * 24 * 60 * 60) full_patern = os.path.join(path,pattern) deleted = [] for file in glob.glob(full_patern): if os.path.isfile(file): file_mtime = os.path.getmtime(file) if file_mtime < cutoff: os.remove(file) deleted.append(file) return deleted def main(): #创建一个命令行解析器对象 parser description 是给这个工具添加一个简短说明,在 --help 里会看到 parser = argparse.ArgumentParser(description="clean log files") #添加一个参数 --config(必须提供,required=True) parser.add_argument('--config',required=True,help="配置文件路径 (.json/.yaml)") args = parser.parse_args() config = load_config(args.config) log_items = config.get("log_paths",[]) for item in log_items: path = item.get("path") pattern = item.get("pattern","*.log") keep_days = item.get("keep_days",7) deleted = clean_logs(path,pattern,keep_days) for f in deleted: print(f"deleted {f}") if __name__ == "__main__": main() # sudo python log_cleaner.py --config config.yaml # deleted /var/log/nginx/access.log # deleted /var/log/nginx/error.log # ➜ py2025 """ 假设你的配置文件写的是: log_paths: - path: "/var/log/nginx" pattern: "*.log" keep_days: 5 - path: "/var/log/app" keep_days: 10 这段代码会这样处理: 第一个 item:path="/var/log/nginx",pattern="*.log",keep_days=5 第二个 item:path="/var/log/app",pattern="*.log"(默认值),keep_days=10 """
Day 13
简易项目实战 综合使用上述技能 “自动部署 + 告警 + 日志备份”工具原型
Day 14
回顾 + 面试准备 简述项目、准备英文表达 准备好一个 Python 项目介绍 + 中英文回答模板
other
import os import time log_dir= "/var/log/nginx" now = time.time() for filename in os.listdir(log_dir): if filename.endswith(".log"): filepath = os.path.join(log_dir,filename) if os.stat(filepath).st_mtime < now - 3600*24*7: # delete log files older than 7 days os.remove(filepath) print(f"Deleted {filepath}") #shell #!/bin/bash #LOG_DIR=$1 LOG_DIR="/var/log/apache2" find "$LOG_DIR" -name "*.log" -type f -mtime +7 -exec rm -f {} \;