“2025py重学计划”的版本间的差异

来自linux中国网wiki
跳到导航 跳到搜索
 
(未显示同一用户的21个中间版本)
第130行: 第130行:
  
 
</pre>
 
</pre>
 +
=day 4=
 +
Day 4 函数与作用域 定义函数、传参、返回值、局部变量 封装一个“计算服务器负载”的函数
 +
 +
<pre>
 +
def  calculate_server_load(cpu_usage,memory_used,disk_io, total_memory=32,max_io=32):
 +
    """
 +
    計算伺服器負載。
 +
    參數:
 +
        cpu_usage (float): CPU 使用率(%)
 +
        memory_used (float): 記憶體使用量(GB)
 +
        disk_io (float): 磁碟 I/O 速率(MB/s)
 +
        total_memory (float): 總記憶體(GB,預設 32)
 +
        max_io (float): 最大 I/O 速率(MB/s,預設 200)
 +
    返回:
 +
        float: 伺服器負載(%)
 +
    """
 +
 +
    if not all(isinstance(x,(int,float)) for x in  [cpu_usage,memory_used, disk_io, total_memory,max_io]):
 +
        raise ValueError("參數必須為數字")
 +
    if cpu_usage < 0 or  memory_used < 0  or disk_io < 0  or total_memory <=  0 or max_io <= 0:
 +
        raise  ValueError("參數不能為負數,且總記憶體與最大 I/O 不能為 0 ")
 +
    if  cpu_usage > 100:
 +
        raise ValueError("CPU 使用率不能大於 100%")
 +
    if  memory_used > total_memory:
 +
        raise ValueError("記憶體使用量不能大於總記憶體")
 +
 +
    cpu_load = cpu_usage * 0.4
 +
    memory_load = (memory_used / total_memory * 100) * 0.4
 +
    io_load = (disk_io / max_io * 100) * 0.2
 +
    total_load = cpu_load + memory_load + io_load
 +
 +
    return round(total_load,2)
 +
 +
try:
 +
    load = calculate_server_load(80,20,100,32,200)
 +
    print(f"伺服器負載為 {load}%")
 +
except ValueError as e:
 +
    print(f"err{e}")
 +
</pre>
 +
 
=day 5=
 
=day 5=
Day 4 函数与作用域 定义函数、传参、返回值、局部变量 封装一个“计算服务器负载”的函数
+
Day 5 文件操作 with open, 读写、逐行处理 写一个“读取Nginx日志并分析IP访问量”的脚本
 +
<pre>
 +
from collections  import Counter
 +
import sys 
 +
 
 +
if len(sys.argv) < 2:
 +
    print("Usage: python nglog.py log_file")
 +
    sys.exit(1)
 +
 
 +
log_file = sys.argv[1]
 +
ip_counter = Counter()
 +
 
 +
try:
 +
    with open(log_file,'r') as file:
 +
        for line in file:
 +
            if line.strip():
 +
                ip = line.lstrip().split(' ',1)[0]
 +
                ip_counter[ip] += 1
 +
except FileNotFoundError:
 +
    print(f"Error: file '{log_file}' not found.")
 +
    sys.exit(1)
 +
 
 +
print("IP 访问量统计")
 +
for ip, count in ip_counter.most_common():
 +
    print(f"IP: {ip}, Accesses: {count}")
 +
 
 +
 
 +
# IP 地址提取:
 +
 
 +
#    line.lstrip().split(' ', 1)[0]:
 +
#        lstrip() 去除行首空白字符
 +
#        split(' ', 1) 按第一个空格分割字符串
 +
#        [0] 取分割后的第一个部分(即 IP 地址)
 +
 
 +
#  cat access.log
 +
# 192.168.1.1 - - [01/Jan/2023:00:00:00 +0000] "GET /index.html HTTP/1.1"
 +
 
 +
# python nglog.py access.log
 +
# IP 访问量统计
 +
# IP: 192.168.1.1, Accesses: 2
 +
# IP: 192.168.1.2, Accesses: 1
 +
 
 +
for ip, count in ip_counter.most_common():
 +
ip_counter.most_common() 的作用
 +
 
 +
    ip_counter 是一个 Counter 对象,它存储了每个 IP 地址的出现次数。
 +
        例如:Counter({'192.168.1.1': 5, '192.168.1.2': 3, '10.0.0.1': 2})
 +
    most_common() 是 Counter 的方法,用于按计数从高到低排序元素。
 +
        返回值:一个包含元组的列表,每个元组格式为 (元素, 计数)。
 +
        例如:[('192.168.1.1', 5), ('192.168.1.2', 3), ('10.0.0.1', 2)]
 +
 
 +
循环变量解构
 +
 
 +
    for ip, count in ...:
 +
        ip:每次循环中存储当前 IP 地址(元组的第一个元素)。
 +
        count:存储该 IP 的访问次数(元组的第二个元素)。
 +
 
 +
第 3 行:print(f"IP: {ip}, Accesses: {count}")
 +
f-string 格式化输出
 +
 
 +
    语法:f"字符串{变量}" 会将 {} 内的变量值插入字符串。
 +
    示例:
 +
        当 ip = '192.168.1.1' 且 count = 5 时,输出:
 +
        plaintext
 +
 
 +
        IP: 192.168.1.1, Accesses: 5
 +
 
 +
 
 +
完整执行流程示例
 +
假设 ip_counter 的内容为:
 +
 
 +
python
 +
运行
 +
 
 +
Counter({'192.168.1.1': 3, '10.0.0.1': 2, '192.168.1.100': 1})
 +
 
 +
 
 +
    第一次循环:
 +
        ip = '192.168.1.1',count = 3
 +
        输出:IP: 192.168.1.1, Accesses: 3
 +
    第二次循环:
 +
        ip = '10.0.0.1',count = 2
 +
        输出:IP: 10.0.0.1, Accesses: 2
 +
    第三次循环:
 +
        ip = '192.168.1.100',count = 1
 +
        输出:IP: 192.168.1.100, Accesses: 1
 +
 
 +
为什么用 most_common()?
 +
 
 +
    默认行为:most_common() 不指定参数时,会返回所有元素,并按计数从高到低排序。
 +
    等价写法:
 +
    python
 +
 
 +
运行
 +
 
 +
# 手动排序(效果相同)
 +
for ip, count in sorted(ip_counter.items(), key=lambda x: x[1], reverse=True):
 +
    print(f"IP: {ip}, Accesses: {count}")
 +
 
 +
 
 +
 
 +
最终输出示例
 +
plaintext
 +
 
 +
IP 访问量统计
 +
IP: 192.168.1.1, Accesses: 3
 +
IP: 10.0.0.1, Accesses: 2
 +
IP: 192.168.1.100, Accesses: 1
 +
 
 +
 
 +
 
 +
这两行代码的核心作用是将统计结果以易读的格式展示出来,并按访问量从高到低排序,帮助用户快速定位高频访问的 IP 地址。
 +
</pre>
 +
=day 6=
 +
Day 6 错误处理 try...except...finally 模拟连接失败的错误处理
 +
<pre>
 +
import time
 +
def connect_to_resource(url,max_retries=3):
 +
    retries = 0
 +
    while  retries < max_retries:
 +
        try:
 +
            print(f"尝试连接到 {url}, 第 {retries+1} 次尝试")
 +
            should_fail =  (retries < 2)
 +
            if should_fail:
 +
                raise Exception("Failed to connect to resource")
 +
            else:
 +
                print("连接成功 {url}")
 +
                return True
 +
        except Exception as e:
 +
            print(f"连接失败 {url}, 错误信息: {e}")
 +
            retries += 1
 +
            print(f"等 2 秒后重试")
 +
            time.sleep(1)
 +
        finally:
 +
            print("无论连接成功与不,这里都会执行清理操作")
 +
            print("-" * 20)
 +
 
 +
    print(f"尝试了 {max_retries} 次,连接失败")
 +
    return False
 +
 
 +
if __name__ == "__main__":
 +
    resource_url = "https://wiki.linuxchina.net"
 +
    connection_successful = connect_to_resource(resource_url)
 +
 
 +
    if connection_successful:
 +
        print("连接成功")
 +
    else:
 +
        print("连接失败")
 +
 
 +
 
 +
       
 +
 
 +
尝试连接到 https://wiki.linuxchina.net, 第 1 次尝试
 +
连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource
 +
等 2 秒后重试
 +
无论连接成功与不,这里都会执行清理操作
 +
--------------------
 +
尝试连接到 https://wiki.linuxchina.net, 第 2 次尝试
 +
连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource
 +
等 2 秒后重试
 +
无论连接成功与不,这里都会执行清理操作
 +
--------------------
 +
尝试连接到 https://wiki.linuxchina.net, 第 3 次尝试
 +
连接成功 {url}
 +
无论连接成功与不,这里都会执行清理操作
 +
--------------------
 +
连接成功
 +
 
 +
</pre>
 +
=day 7=
 +
Day 7 模块与包 import, sys.path, 自定义模块 把昨天的脚本封装为一个模块
 +
 
 +
=day 8=
 +
Day 8 subprocess 与 os 执行 shell 命令,获取结果 写一个“批量检测服务状态”的脚本
 +
 
 +
== [[利用python检测远程 IP和端口是否可连接并钉钉报警|socket version 利用python检测远程 IP和端口是否可连接并钉钉报警]]==
 +
== nc version==
 +
<pre>
 +
 
 +
import subprocess
 +
 
 +
def check_port(host,port):
 +
    try:
 +
        result = subprocess.run(
 +
            ['nc','-zv',host,str(port)],
 +
            stdout = subprocess.PIPE,
 +
            stderr = subprocess.STDOUT,
 +
            timeout = 3,
 +
            text=True
 +
        )
 +
        if "succeeded" in result.stdout or "open" in result.stdout:
 +
            return f"{host}:{port} is UP"
 +
        else:
 +
            return f"{host}:{port} is DOWN"
 +
    except subprocess.TimeoutExpired:
 +
        return f"{host}:{port} Timeout"
 +
    except Exception as e:
 +
        return f"{host}:{port} Error - {str(e)}"
 +
   
 +
if __name__ =='__main__':
 +
    targets = [
 +
        ('127.0.0.1',22),
 +
        ('8.8.8.8',53),
 +
        ('www.bing.com',80),
 +
    ]
 +
    for host, port in targets:
 +
        print(check_port(host,port))
 +
 
 +
 
 +
"""
 +
py check_port.py   
 +
127.0.0.1:22 is UP
 +
8.8.8.8:53 is UP
 +
www.bing.com:80 is UP
 +
 
 +
socket version and telnet version
 +
"""
 +
</pre>
 +
 
 +
== telnet version==
 +
<pre>
 +
import telnetlib
 +
import socket
 +
 
 +
def check_port(host,port,timeout=3):
 +
    try:
 +
        with telnetlib.Telnet(host,port,timeout):
 +
            return True
 +
    except socket.timeout:
 +
        return False
 +
    except Exception:
 +
        return False
 +
 
 +
if __name__ == '__main__':
 +
    targets = [
 +
        ("127.0.0.1",22),
 +
        ("linuxsa.org",80)
 +
    ]
 +
 
 +
    for host,port in targets:
 +
        status = "UP" if check_port(host,port) else "DOWN"
 +
        print(f"{host}:{port} is {status}")
 +
</pre>
 +
===telnet 报警版 ===
 +
<pre>
 +
import telnetlib
 +
import socket
 +
import yaml
 +
import requests
 +
import json
 +
from datetime import datetime
 +
 
 +
DING_WEBHOOK ='https://oapi.dingtalk.com/robot/send?access_token=4578bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73d'
 +
 
 +
def load_from_yaml(file_path):
 +
    with open(file_path,'r') as f:
 +
        data = yaml.safe_load(f)
 +
        return data.get("targets",[])
 +
   
 +
def send_ding_alert(message):
 +
    headers = {'Content-Type': 'application/json; charset=utf-8'}
 +
    data = {
 +
        "msgtype": "text",
 +
        "text": {
 +
            "content": message
 +
        }
 +
    }
 +
    try:
 +
        response = requests.post(DING_WEBHOOK, headers=headers, data=json.dumps(data),timeout=5)
 +
    except Exception as e:
 +
        print(f"Failed to send ding alert: {e}")
 +
     
 +
 +
def check_port(host,port,timeout=3):
 +
    try:
 +
        with telnetlib.Telnet(host,port,timeout):
 +
            return True
 +
    except socket.timeout:
 +
        return False
 +
    except Exception:
 +
        return False
 +
def main():
 +
    ips = load_from_yaml("/home/evan/data/tmp/py2025/ips.yaml")
 +
    down_list=[]
 +
 
 +
    for item in ips:
 +
        host = item.get("host")
 +
        port = item.get("port")
 +
        if not check_port(host,port):
 +
            down_list.append(f"{host}:{port}")
 +
   
 +
    if down_list:
 +
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
 +
        message = f"[{now}]  Port detection anomaly端口检测异常 {now} 报警: {', '.join(down_list)} 🚨"
 +
        send_ding_alert(message)
 +
    else:
 +
        print("All ports are up")
 +
 
 +
if __name__ == '__main__':
 +
    main()
 +
 
 +
ips.yaml
 +
targets:
 +
  - host: 127.0.0.1
 +
    port: 24
 +
  - host: 8.8.8.8
 +
    port: 53
 +
  - host: www.bing.com
 +
    port: 89
 +
 
 +
</pre>
 +
 
 +
=day 9=
 +
Day 9 requests 库 API 请求、返回解析、header/cookie 用钉钉/企业微信接口推送消息
 +
 
 +
pre:
 +
都是图形操作,你事先有个钉钉群,没有的话创建一个
 +
进入钉钉群 → 添加一个「自定义机器人」
 +
 
 +
设置关键词(如:报警)
 +
 
 +
复制 Webhook 地址(例子如下)
 +
 
 +
 
 +
<pre>
 +
import requests
 +
import json
 +
 
 +
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=6678bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73dc155'
 +
 
 +
message_text = "报警: 服务器磁盘使用率超过阈值!🚨- test message"
 +
 
 +
headers = {
 +
    "Content-Type": "application/json",
 +
}
 +
 
 +
payload = {
 +
    "msgtype": "text",
 +
    "text": {
 +
        "content":message_text
 +
    }
 +
}
 +
 
 +
response = requests.post(url=webhook_url,headers=headers, data=json.dumps(payload))
 +
 
 +
if response.status_code == 200:
 +
    result = response.json()
 +
    if result.get("errcode") == 0:
 +
        print("message sent successfully")
 +
    else:
 +
        print("message sent failed, error code: ", result)
 +
else:
 +
    print("HTTP err message sent failed, status code: ", response.status_code)
 +
</pre>
 +
 
 +
=Day 10 =
 +
paramiko + fabric SSH 自动执行远程命令 写一个“批量部署脚本”上传+执行
 +
<pre>
 +
 
 +
from fabric import Connection
 +
from invoke import Responder 
 +
 
 +
servers = [
 +
    {"host": "192.168.10.5", "user": "root", "password": "xxxxxxx"},
 +
    {"host": "192.168.1.101", "user": "root", "password": "123456"},
 +
 
 +
]
 +
 
 +
local_script = "test36.sh"
 +
remote_path = "/tmp/test36.sh"
 +
 
 +
remote_cmd = f"bash {remote_path}"
 +
 
 +
for server in servers:
 +
    print(f"uploading script to {server['host']}")
 +
 
 +
    conn = Connection(
 +
        host=server['host'],
 +
        user=server['user'],
 +
        connect_kwargs={"password": server['password']}
 +
    )
 +
 
 +
    print(f"uploading {local_script} to {remote_path}")
 +
    conn.put(local_script, remote_path)
 +
 
 +
    conn.run(f"chmod +x {remote_path}")
 +
 
 +
    print("running remote script")
 +
    result = conn.run(remote_cmd,hide=True)
 +
    print(result.stdout)
 +
 
 +
    conn.close()
 +
 
 +
    print(f"Done with {server['host']}")
 +
 
 +
 
 +
 
 +
 
 +
 
 +
 
 +
 
 +
 
 +
 
 +
# cat /tmp/test36.sh
 +
#!/bin/bash
 +
echo "Deploying service..."
 +
# 示例命令,可改为拉 Git、重启服务等
 +
systemctl restart nginx
 +
 
 +
 
 +
 
 +
 
 +
 
 +
 
 +
</pre>
 +
 
 +
==trouble shooting ==
 +
<pre>
 +
    from .loader import FilesystemLoader  # noqa
 +
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 +
  File "/usr/lib/python3/dist-packages/invoke/loader.py", line 3, in <module>
 +
    import imp
 +
ModuleNotFoundError: No module named 'imp'
 +
 
 +
你使用的 Fabric 版本太老,不兼容你当前的 Python 版本(Python 3.12+ 已经移除了 imp 模块)。
 +
 
 +
pip3 install --upgrade fabric invoke
 +
 
 +
 
 +
 
 +
 
 +
</pre>
 +
 
 +
=Day 11 =
 +
argparse 与定时任务 解析命令参数、结合 crontab 写一个“CLI磁盘告警工具”定时运行
 +
=Day 12=
 +
JSON/YAML 配置文件 json, yaml,配置与数据保存 写一个可配置的“日志清理工具”
 +
 
 +
<pre>
 +
import os 
 +
import glob
 +
import time 
 +
import argparse
 +
import json
 +
import  yaml
 +
from datetime import datetime,timedelta
 +
 
 +
def load_config(file_path):
 +
    with open(file_path, 'r') as f:
 +
        if file_path.endswith('.json'):
 +
            return json.load(f)
 +
        elif file_path.endswith('.yaml') or file_path.endswith('.yml'):
 +
            return yaml.safe_load(f)
 +
        else:
 +
            raise ValueError('Unsupported file type')
 +
       
 +
def clean_logs(path,pattern,keep_days):
 +
    now = time.time()
 +
    cutoff = now - (keep_days * 24 * 60 * 60)
 +
    full_patern = os.path.join(path,pattern)
 +
    deleted = []
 +
 
 +
    for file in glob.glob(full_patern):
 +
        if os.path.isfile(file):
 +
            file_mtime = os.path.getmtime(file)
 +
            if file_mtime < cutoff:
 +
                os.remove(file)
 +
                deleted.append(file)
 +
 
 +
    return deleted
 +
 
 +
def main():
 +
    #创建一个命令行解析器对象 parser description 是给这个工具添加一个简短说明,在 --help 里会看到
 +
    parser = argparse.ArgumentParser(description="clean log files")
 +
 
 +
    #添加一个参数 --config(必须提供,required=True)
 +
    parser.add_argument('--config',required=True,help="配置文件路径 (.json/.yaml)")
 +
    args = parser.parse_args()
 +
 
 +
    config = load_config(args.config)
 +
    log_items = config.get("log_paths",[])
 +
 
 +
    for item in log_items:
 +
        path = item.get("path")
 +
        pattern = item.get("pattern","*.log")
 +
        keep_days = item.get("keep_days",7)
 +
 
 +
        deleted = clean_logs(path,pattern,keep_days)
 +
        for f in deleted:
 +
            print(f"deleted {f}")
 +
 
 +
if __name__ == "__main__":
 +
    main()
 +
 
 +
 
 +
#      sudo python  log_cleaner.py --config  config.yaml
 +
# deleted /var/log/nginx/access.log
 +
# deleted /var/log/nginx/error.log
 +
# ➜  py2025
 +
 
 +
"""
 +
假设你的配置文件写的是:
 +
 
 +
log_paths:
 +
  - path: "/var/log/nginx"
 +
    pattern: "*.log"
 +
    keep_days: 5
 +
  - path: "/var/log/app"
 +
    keep_days: 10
 +
 
 +
这段代码会这样处理:
 +
 
 +
    第一个 item:path="/var/log/nginx",pattern="*.log",keep_days=5
 +
 
 +
    第二个 item:path="/var/log/app",pattern="*.log"(默认值),keep_days=10
 +
"""
 +
</pre>
 +
 
 +
=Day 13=
 +
简易项目实战 综合使用上述技能 “自动部署 + 告警 + 日志备份”工具原型
 +
=Day 14 =
 +
回顾 + 面试准备 简述项目、准备英文表达 准备好一个 Python 项目介绍 + 中英文回答模板
 +
=other=
 +
<pre>
 +
 
 +
import  os 
 +
import time
 +
 
 +
log_dir= "/var/log/nginx"
 +
now = time.time()
 +
 
 +
for filename in  os.listdir(log_dir):
 +
    if filename.endswith(".log"):
 +
        filepath = os.path.join(log_dir,filename)
 +
        if os.stat(filepath).st_mtime < now - 3600*24*7: # delete log files older than 7 days
 +
            os.remove(filepath)
 +
            print(f"Deleted {filepath}")
 +
 
 +
 
 +
 
 +
#shell
 +
#!/bin/bash
 +
 
 +
#LOG_DIR=$1
 +
LOG_DIR="/var/log/apache2"
 +
 
 +
find "$LOG_DIR" -name "*.log" -type f -mtime +7 -exec rm -f {} \;
 +
 
 +
</pre>

2025年5月19日 (一) 03:12的最新版本


Day 1

Day 1 Python 语法入门 变量、print、类型、输入输出 写一个 CLI 工具,输入姓名打印问候语

写一个 CLI 工具,输入姓名打印问候语

v1 cat  echo.py 
name = 'evan'
print( name + "  have a good day")

v2
➜  py2025 cat echo.py 
name = input("please input your name")
print( name + "  have a good day")

 py  echo.py 
please input your name evan
 evan  have a good day
➜  py2025 cat echo.py 


v3
import sys
if  len(sys.argv) < 2:
    print("Usage: python echo.py <name>")
else:
    name = sys.argv[1]
    print(f"Hi,  {name}    have a good day")

➜  py2025 py  echo.py 
Usage: python echo.py <name>
➜  py2025 py  echo.py  evan
Hi,  evan    have a good d

v4 
➜  py2025 cat greet.py 
import argparse 
def main():
    parser = argparse.ArgumentParser(description="Say Hi to someone")
    parser.add_argument("-n","--name",required=True,help="The name of the person to greet.")
    args = parser.parse_args()
    print(f"Hello,{args.name} hope you have a  good day")

if __name__ == "__main__":
    main()


➜  py2025 python greet.py -n evan 
Hello,evan hope you have a  good day

day2

Day 2 流程控制 if, for, while, break, continue 判断磁盘使用率是否超过阈值

num = [1,2,3,4,5,6,]
for nu in num:
    if  nu % 2 == 0:
        print(f"{num} 偶数")
        continue
    elif nu == 5:
        print(f"找到 5 quit loop")
        break
    print(f"{num} 奇数")

day3

Day 3 数据结构 list, dict, set, tuple 常用操作 把 log 中的错误代码用 dict 统计次数

  py2025 cat errors.log 
ERR001
ERR002
ERR001
ERR003
ERR002
ERR001
➜  py2025 py 1.py 
ERR001: 3次
ERR002: 2次
ERR003: 1次
➜  py2025 tail  1.py 
error_counts = {}
with open("errors.log","r") as file:
    for line in file:
        error_code =  line.strip()
        error_counts[error_code] = error_counts.get(error_code,0) + 1

for code,count in error_counts.items():
    print(f"{code}: {count}次")


这ai 解说还真的有意思 

for code, count in error_counts.items(): 这一行:

    error_counts 是一个字典,用于存储错误代码及其出现的次数。例如,error_counts 可能是 {"E001": 3, "E002": 2} 这样的形式。
    items() 是字典的一个方法,它会返回一个包含多个元组的可迭代对象,每个元组由字典的键和对应的值组成。对于 {"E001": 3, "E002": 2} 这个字典,error_counts.items() 的结果就是 [("E001", 3), ("E002", 2)] 这样的形式。
    for code, count in error_counts.items(): 是一个循环语句,code 和 count 是自定义的变量名。在每次循环中,code 会被赋值为字典的键(也就是错误代码,如 "E001"),count 会被赋值为对应的值(也就是该错误代码出现的次数,如 3)。也就是说,第一次循环时,code 是 "E001",count 是 3;第二次循环时,code 是 "E002",count 是 2。

print(f"{code}: {count} 次") 这一行:

    print() 是 Python 内置的函数,用于在控制台输出信息。
    f"{code}: {count} 次" 是一个格式化字符串(也称为 f-string)。在这个字符串中,{code} 和 {count} 是占位符,会被 code 和 count 变量的值所替换。例如,当 code 是 "E001",count 是 3 时,f"{code}: {count} 次" 就会被解析为 "E001: 3 次",然后 print() 函数会将这个字符串输出到控制台。

第 7 行:error_code = line.strip()

    strip() 方法移除字符串首尾的空白字符(如换行符 \n、空格)。
    示例:
        "E001\n".strip() → "E001"
        " E002 ".strip() → "E002"

第 9 行:error_counts[error_code] = error_counts.get(error_code, 0) + 1

    核心逻辑:统计错误代码的出现次数。
    get(key, default) 方法:
        如果 key 存在于字典中,返回对应的值。
        如果 key 不存在,返回默认值 default(这里是 0)。
    执行过程:
        第一次遇到 "E001":
            error_counts.get("E001", 0) 返回 0
            0 + 1 = 1,因此 error_counts["E001"] = 1
        第二次遇到 "E001":
            error_counts.get("E001", 0) 返回 1
            1 + 1 = 2,因此 error_counts["E001"] = 2

day 4

Day 4 函数与作用域 定义函数、传参、返回值、局部变量 封装一个“计算服务器负载”的函数

def  calculate_server_load(cpu_usage,memory_used,disk_io, total_memory=32,max_io=32):
    """
    計算伺服器負載。
    參數:
        cpu_usage (float): CPU 使用率(%)
        memory_used (float): 記憶體使用量(GB)
        disk_io (float): 磁碟 I/O 速率(MB/s)
        total_memory (float): 總記憶體(GB,預設 32)
        max_io (float): 最大 I/O 速率(MB/s,預設 200)
    返回:
        float: 伺服器負載(%)
    """

    if not all(isinstance(x,(int,float)) for x in  [cpu_usage,memory_used, disk_io, total_memory,max_io]):
        raise ValueError("參數必須為數字")
    if cpu_usage < 0 or  memory_used < 0  or disk_io < 0  or total_memory <=  0 or max_io <= 0:
        raise  ValueError("參數不能為負數,且總記憶體與最大 I/O 不能為 0 ")
    if  cpu_usage > 100:
        raise ValueError("CPU 使用率不能大於 100%")
    if  memory_used > total_memory:
        raise ValueError("記憶體使用量不能大於總記憶體")

    cpu_load = cpu_usage * 0.4 
    memory_load = (memory_used / total_memory * 100) * 0.4
    io_load = (disk_io / max_io * 100) * 0.2
    total_load = cpu_load + memory_load + io_load 

    return round(total_load,2)

try:
    load = calculate_server_load(80,20,100,32,200)
    print(f"伺服器負載為 {load}%")
except ValueError as e:
    print(f"err{e}")

day 5

Day 5 文件操作 with open, 读写、逐行处理 写一个“读取Nginx日志并分析IP访问量”的脚本

from collections  import Counter 
import sys  

if len(sys.argv) < 2:
    print("Usage: python nglog.py log_file")
    sys.exit(1)

log_file = sys.argv[1]
ip_counter = Counter()

try:
    with open(log_file,'r') as file:
        for line in file:
            if line.strip():
                ip = line.lstrip().split(' ',1)[0]
                ip_counter[ip] += 1
except FileNotFoundError:
    print(f"Error: file '{log_file}' not found.")
    sys.exit(1)

print("IP 访问量统计")
for ip, count in ip_counter.most_common():
    print(f"IP: {ip}, Accesses: {count}")


# IP 地址提取:

#     line.lstrip().split(' ', 1)[0]:
#         lstrip() 去除行首空白字符
#         split(' ', 1) 按第一个空格分割字符串
#         [0] 取分割后的第一个部分(即 IP 地址)

#  cat access.log 
# 192.168.1.1 - - [01/Jan/2023:00:00:00 +0000] "GET /index.html HTTP/1.1"

# python nglog.py access.log 
# IP 访问量统计
# IP: 192.168.1.1, Accesses: 2
# IP: 192.168.1.2, Accesses: 1

for ip, count in ip_counter.most_common():
ip_counter.most_common() 的作用

    ip_counter 是一个 Counter 对象,它存储了每个 IP 地址的出现次数。
        例如:Counter({'192.168.1.1': 5, '192.168.1.2': 3, '10.0.0.1': 2})
    most_common() 是 Counter 的方法,用于按计数从高到低排序元素。
        返回值:一个包含元组的列表,每个元组格式为 (元素, 计数)。
        例如:[('192.168.1.1', 5), ('192.168.1.2', 3), ('10.0.0.1', 2)]

循环变量解构

    for ip, count in ...:
        ip:每次循环中存储当前 IP 地址(元组的第一个元素)。
        count:存储该 IP 的访问次数(元组的第二个元素)。

第 3 行:print(f"IP: {ip}, Accesses: {count}")
f-string 格式化输出

    语法:f"字符串{变量}" 会将 {} 内的变量值插入字符串。
    示例:
        当 ip = '192.168.1.1' 且 count = 5 时,输出:
        plaintext

        IP: 192.168.1.1, Accesses: 5


完整执行流程示例
假设 ip_counter 的内容为:

python
运行

Counter({'192.168.1.1': 3, '10.0.0.1': 2, '192.168.1.100': 1})


    第一次循环:
        ip = '192.168.1.1',count = 3
        输出:IP: 192.168.1.1, Accesses: 3
    第二次循环:
        ip = '10.0.0.1',count = 2
        输出:IP: 10.0.0.1, Accesses: 2
    第三次循环:
        ip = '192.168.1.100',count = 1
        输出:IP: 192.168.1.100, Accesses: 1

为什么用 most_common()?

    默认行为:most_common() 不指定参数时,会返回所有元素,并按计数从高到低排序。
    等价写法:
    python

运行

# 手动排序(效果相同)
for ip, count in sorted(ip_counter.items(), key=lambda x: x[1], reverse=True):
    print(f"IP: {ip}, Accesses: {count}")



最终输出示例
plaintext

IP 访问量统计
IP: 192.168.1.1, Accesses: 3
IP: 10.0.0.1, Accesses: 2
IP: 192.168.1.100, Accesses: 1



这两行代码的核心作用是将统计结果以易读的格式展示出来,并按访问量从高到低排序,帮助用户快速定位高频访问的 IP 地址。

day 6

Day 6 错误处理 try...except...finally 模拟连接失败的错误处理

import time 
def connect_to_resource(url,max_retries=3):
    retries = 0
    while  retries < max_retries:
        try:
            print(f"尝试连接到 {url}, 第 {retries+1} 次尝试")
            should_fail =  (retries < 2)
            if should_fail:
                raise Exception("Failed to connect to resource")
            else:
                print("连接成功 {url}")
                return True 
        except Exception as e:
            print(f"连接失败 {url}, 错误信息: {e}")
            retries += 1
            print(f"等 2 秒后重试")
            time.sleep(1)
        finally:
            print("无论连接成功与不,这里都会执行清理操作")
            print("-" * 20)

    print(f"尝试了 {max_retries} 次,连接失败")
    return False 

if __name__ == "__main__":
    resource_url = "https://wiki.linuxchina.net"
    connection_successful = connect_to_resource(resource_url)

    if connection_successful:
        print("连接成功")
    else:
        print("连接失败")


        

尝试连接到 https://wiki.linuxchina.net, 第 1 次尝试
连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource
等 2 秒后重试
无论连接成功与不,这里都会执行清理操作
--------------------
尝试连接到 https://wiki.linuxchina.net, 第 2 次尝试
连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource
等 2 秒后重试
无论连接成功与不,这里都会执行清理操作
--------------------
尝试连接到 https://wiki.linuxchina.net, 第 3 次尝试
连接成功 {url}
无论连接成功与不,这里都会执行清理操作
--------------------
连接成功

day 7

Day 7 模块与包 import, sys.path, 自定义模块 把昨天的脚本封装为一个模块

day 8

Day 8 subprocess 与 os 执行 shell 命令,获取结果 写一个“批量检测服务状态”的脚本

socket version 利用python检测远程 IP和端口是否可连接并钉钉报警

nc version

import subprocess 

def check_port(host,port):
    try:
        result = subprocess.run(
            ['nc','-zv',host,str(port)],
            stdout = subprocess.PIPE,
            stderr = subprocess.STDOUT,
            timeout = 3, 
            text=True
        )
        if "succeeded" in result.stdout or "open" in result.stdout:
            return f"{host}:{port} is UP"
        else:
            return f"{host}:{port} is DOWN"
    except subprocess.TimeoutExpired:
        return f"{host}:{port} Timeout"
    except Exception as e:
        return f"{host}:{port} Error - {str(e)}"
    
if __name__ =='__main__':
    targets = [
        ('127.0.0.1',22),
        ('8.8.8.8',53),
        ('www.bing.com',80),
    ]
    for host, port in targets:
        print(check_port(host,port))


"""
 py check_port.py     
127.0.0.1:22 is UP
8.8.8.8:53 is UP
www.bing.com:80 is UP

socket version and telnet version
"""

telnet version

import telnetlib 
import socket 

def check_port(host,port,timeout=3):
    try:
        with telnetlib.Telnet(host,port,timeout):
            return True 
    except socket.timeout:
        return False
    except Exception:
        return False

if __name__ == '__main__':
    targets = [
        ("127.0.0.1",22),
        ("linuxsa.org",80)
    ]

    for host,port in targets:
        status = "UP" if check_port(host,port) else "DOWN"
        print(f"{host}:{port} is {status}")

telnet 报警版

import telnetlib 
import socket 
import yaml 
import requests 
import json 
from datetime import datetime 

DING_WEBHOOK ='https://oapi.dingtalk.com/robot/send?access_token=4578bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73d'

def load_from_yaml(file_path):
    with open(file_path,'r') as f:
        data = yaml.safe_load(f)
        return data.get("targets",[])
    
def send_ding_alert(message):
    headers = {'Content-Type': 'application/json; charset=utf-8'}
    data = {
        "msgtype": "text",
        "text": {
            "content": message
        }
    }
    try:
        response = requests.post(DING_WEBHOOK, headers=headers, data=json.dumps(data),timeout=5)
    except Exception as e:
        print(f"Failed to send ding alert: {e}")
       
 
def check_port(host,port,timeout=3):
    try:
        with telnetlib.Telnet(host,port,timeout):
            return True 
    except socket.timeout:
        return False
    except Exception:
        return False
def main():
    ips = load_from_yaml("/home/evan/data/tmp/py2025/ips.yaml")
    down_list=[]

    for item in ips:
        host = item.get("host")
        port = item.get("port")
        if not check_port(host,port):
            down_list.append(f"{host}:{port}")
    
    if down_list:
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        message = f"[{now}]  Port detection anomaly端口检测异常 {now} 报警: {', '.join(down_list)} 🚨"
        send_ding_alert(message)
    else:
        print("All ports are up")

if __name__ == '__main__':
    main()

ips.yaml 
targets:
  - host: 127.0.0.1
    port: 24
  - host: 8.8.8.8
    port: 53
  - host: www.bing.com
    port: 89

day 9

Day 9 requests 库 API 请求、返回解析、header/cookie 用钉钉/企业微信接口推送消息

pre: 都是图形操作,你事先有个钉钉群,没有的话创建一个 进入钉钉群 → 添加一个「自定义机器人」

设置关键词(如:报警)

复制 Webhook 地址(例子如下)


import requests 
import json 

webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=6678bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73dc155'

message_text = "报警: 服务器磁盘使用率超过阈值!🚨- test message"

headers = {
    "Content-Type": "application/json",
}

payload = {
    "msgtype": "text",
    "text": {
        "content":message_text 
    }
}

response = requests.post(url=webhook_url,headers=headers, data=json.dumps(payload))

if response.status_code == 200:
    result = response.json()
    if result.get("errcode") == 0:
        print("message sent successfully")
    else:
        print("message sent failed, error code: ", result)
else:
    print("HTTP err message sent failed, status code: ", response.status_code)

Day 10

paramiko + fabric SSH 自动执行远程命令 写一个“批量部署脚本”上传+执行

from fabric import Connection 
from invoke import Responder  

servers = [
    {"host": "192.168.10.5", "user": "root", "password": "xxxxxxx"},
    {"host": "192.168.1.101", "user": "root", "password": "123456"},

]

local_script = "test36.sh"
remote_path = "/tmp/test36.sh"

remote_cmd = f"bash {remote_path}"

for server in servers:
    print(f"uploading script to {server['host']}")

    conn = Connection(
        host=server['host'],
        user=server['user'],
        connect_kwargs={"password": server['password']}
    )

    print(f"uploading {local_script} to {remote_path}")
    conn.put(local_script, remote_path)

    conn.run(f"chmod +x {remote_path}")

    print("running remote script")
    result = conn.run(remote_cmd,hide=True)
    print(result.stdout)

    conn.close()

    print(f"Done with {server['host']}")









# cat /tmp/test36.sh 
#!/bin/bash
echo "Deploying service..."
# 示例命令,可改为拉 Git、重启服务等
systemctl restart nginx






trouble shooting

    from .loader import FilesystemLoader  # noqa
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/invoke/loader.py", line 3, in <module>
    import imp
ModuleNotFoundError: No module named 'imp'

你使用的 Fabric 版本太老,不兼容你当前的 Python 版本(Python 3.12+ 已经移除了 imp 模块)。

pip3 install --upgrade fabric invoke




Day 11

argparse 与定时任务 解析命令参数、结合 crontab 写一个“CLI磁盘告警工具”定时运行

Day 12

JSON/YAML 配置文件 json, yaml,配置与数据保存 写一个可配置的“日志清理工具”

import os  
import glob 
import time  
import argparse 
import json 
import  yaml 
from datetime import datetime,timedelta 

def load_config(file_path):
    with open(file_path, 'r') as f:
        if file_path.endswith('.json'):
            return json.load(f)
        elif file_path.endswith('.yaml') or file_path.endswith('.yml'):
            return yaml.safe_load(f)
        else:
            raise ValueError('Unsupported file type')
        
def clean_logs(path,pattern,keep_days):
    now = time.time()
    cutoff = now - (keep_days * 24 * 60 * 60)
    full_patern = os.path.join(path,pattern)
    deleted = []

    for file in glob.glob(full_patern):
        if os.path.isfile(file):
            file_mtime = os.path.getmtime(file)
            if file_mtime < cutoff:
                os.remove(file)
                deleted.append(file)

    return deleted 

def main():
    #创建一个命令行解析器对象 parser description 是给这个工具添加一个简短说明,在 --help 里会看到
    parser = argparse.ArgumentParser(description="clean log files") 

    #添加一个参数 --config(必须提供,required=True)
    parser.add_argument('--config',required=True,help="配置文件路径 (.json/.yaml)")
    args = parser.parse_args()

    config = load_config(args.config)
    log_items = config.get("log_paths",[])

    for item in log_items:
        path = item.get("path")
        pattern = item.get("pattern","*.log")
        keep_days = item.get("keep_days",7)

        deleted = clean_logs(path,pattern,keep_days)
        for f in deleted:
            print(f"deleted {f}")

if __name__ == "__main__":
    main()


#      sudo python  log_cleaner.py --config  config.yaml
# deleted /var/log/nginx/access.log
# deleted /var/log/nginx/error.log
# ➜  py2025 

"""
假设你的配置文件写的是:

log_paths:
  - path: "/var/log/nginx"
    pattern: "*.log"
    keep_days: 5
  - path: "/var/log/app"
    keep_days: 10

这段代码会这样处理:

    第一个 item:path="/var/log/nginx",pattern="*.log",keep_days=5

    第二个 item:path="/var/log/app",pattern="*.log"(默认值),keep_days=10
"""

Day 13

简易项目实战 综合使用上述技能 “自动部署 + 告警 + 日志备份”工具原型

Day 14

回顾 + 面试准备 简述项目、准备英文表达 准备好一个 Python 项目介绍 + 中英文回答模板

other

import  os  
import time 

log_dir= "/var/log/nginx"
now = time.time()

for filename in  os.listdir(log_dir):
    if filename.endswith(".log"):
        filepath = os.path.join(log_dir,filename)
        if os.stat(filepath).st_mtime < now - 3600*24*7: # delete log files older than 7 days
            os.remove(filepath)
            print(f"Deleted {filepath}")



#shell
#!/bin/bash

#LOG_DIR=$1
LOG_DIR="/var/log/apache2"

find "$LOG_DIR" -name "*.log" -type f -mtime +7 -exec rm -f {} \;