2025py重学计划

来自linux中国网wiki
跳到导航 跳到搜索


Day 1

Day 1 Python 语法入门 变量、print、类型、输入输出 写一个 CLI 工具,输入姓名打印问候语

写一个 CLI 工具,输入姓名打印问候语

v1 cat  echo.py 
name = 'evan'
print( name + "  have a good day")

v2
➜  py2025 cat echo.py 
name = input("please input your name")
print( name + "  have a good day")

 py  echo.py 
please input your name evan
 evan  have a good day
➜  py2025 cat echo.py 


v3
import sys
if  len(sys.argv) < 2:
    print("Usage: python echo.py <name>")
else:
    name = sys.argv[1]
    print(f"Hi,  {name}    have a good day")

➜  py2025 py  echo.py 
Usage: python echo.py <name>
➜  py2025 py  echo.py  evan
Hi,  evan    have a good d

v4 
➜  py2025 cat greet.py 
import argparse 
def main():
    parser = argparse.ArgumentParser(description="Say Hi to someone")
    parser.add_argument("-n","--name",required=True,help="The name of the person to greet.")
    args = parser.parse_args()
    print(f"Hello,{args.name} hope you have a  good day")

if __name__ == "__main__":
    main()


➜  py2025 python greet.py -n evan 
Hello,evan hope you have a  good day

day2

Day 2 流程控制 if, for, while, break, continue 判断磁盘使用率是否超过阈值

num = [1,2,3,4,5,6,]
for nu in num:
    if  nu % 2 == 0:
        print(f"{num} 偶数")
        continue
    elif nu == 5:
        print(f"找到 5 quit loop")
        break
    print(f"{num} 奇数")

day3

Day 3 数据结构 list, dict, set, tuple 常用操作 把 log 中的错误代码用 dict 统计次数

  py2025 cat errors.log 
ERR001
ERR002
ERR001
ERR003
ERR002
ERR001
➜  py2025 py 1.py 
ERR001: 3次
ERR002: 2次
ERR003: 1次
➜  py2025 tail  1.py 
error_counts = {}
with open("errors.log","r") as file:
    for line in file:
        error_code =  line.strip()
        error_counts[error_code] = error_counts.get(error_code,0) + 1

for code,count in error_counts.items():
    print(f"{code}: {count}次")


这ai 解说还真的有意思 

for code, count in error_counts.items(): 这一行:

    error_counts 是一个字典,用于存储错误代码及其出现的次数。例如,error_counts 可能是 {"E001": 3, "E002": 2} 这样的形式。
    items() 是字典的一个方法,它会返回一个包含多个元组的可迭代对象,每个元组由字典的键和对应的值组成。对于 {"E001": 3, "E002": 2} 这个字典,error_counts.items() 的结果就是 [("E001", 3), ("E002", 2)] 这样的形式。
    for code, count in error_counts.items(): 是一个循环语句,code 和 count 是自定义的变量名。在每次循环中,code 会被赋值为字典的键(也就是错误代码,如 "E001"),count 会被赋值为对应的值(也就是该错误代码出现的次数,如 3)。也就是说,第一次循环时,code 是 "E001",count 是 3;第二次循环时,code 是 "E002",count 是 2。

print(f"{code}: {count} 次") 这一行:

    print() 是 Python 内置的函数,用于在控制台输出信息。
    f"{code}: {count} 次" 是一个格式化字符串(也称为 f-string)。在这个字符串中,{code} 和 {count} 是占位符,会被 code 和 count 变量的值所替换。例如,当 code 是 "E001",count 是 3 时,f"{code}: {count} 次" 就会被解析为 "E001: 3 次",然后 print() 函数会将这个字符串输出到控制台。

第 7 行:error_code = line.strip()

    strip() 方法移除字符串首尾的空白字符(如换行符 \n、空格)。
    示例:
        "E001\n".strip() → "E001"
        " E002 ".strip() → "E002"

第 9 行:error_counts[error_code] = error_counts.get(error_code, 0) + 1

    核心逻辑:统计错误代码的出现次数。
    get(key, default) 方法:
        如果 key 存在于字典中,返回对应的值。
        如果 key 不存在,返回默认值 default(这里是 0)。
    执行过程:
        第一次遇到 "E001":
            error_counts.get("E001", 0) 返回 0
            0 + 1 = 1,因此 error_counts["E001"] = 1
        第二次遇到 "E001":
            error_counts.get("E001", 0) 返回 1
            1 + 1 = 2,因此 error_counts["E001"] = 2

day 4

Day 4 函数与作用域 定义函数、传参、返回值、局部变量 封装一个“计算服务器负载”的函数

def  calculate_server_load(cpu_usage,memory_used,disk_io, total_memory=32,max_io=32):
    """
    計算伺服器負載。
    參數:
        cpu_usage (float): CPU 使用率(%)
        memory_used (float): 記憶體使用量(GB)
        disk_io (float): 磁碟 I/O 速率(MB/s)
        total_memory (float): 總記憶體(GB,預設 32)
        max_io (float): 最大 I/O 速率(MB/s,預設 200)
    返回:
        float: 伺服器負載(%)
    """

    if not all(isinstance(x,(int,float)) for x in  [cpu_usage,memory_used, disk_io, total_memory,max_io]):
        raise ValueError("參數必須為數字")
    if cpu_usage < 0 or  memory_used < 0  or disk_io < 0  or total_memory <=  0 or max_io <= 0:
        raise  ValueError("參數不能為負數,且總記憶體與最大 I/O 不能為 0 ")
    if  cpu_usage > 100:
        raise ValueError("CPU 使用率不能大於 100%")
    if  memory_used > total_memory:
        raise ValueError("記憶體使用量不能大於總記憶體")

    cpu_load = cpu_usage * 0.4 
    memory_load = (memory_used / total_memory * 100) * 0.4
    io_load = (disk_io / max_io * 100) * 0.2
    total_load = cpu_load + memory_load + io_load 

    return round(total_load,2)

try:
    load = calculate_server_load(80,20,100,32,200)
    print(f"伺服器負載為 {load}%")
except ValueError as e:
    print(f"err{e}")

day 5

Day 5 文件操作 with open, 读写、逐行处理 写一个“读取Nginx日志并分析IP访问量”的脚本

from collections  import Counter 
import sys  

if len(sys.argv) < 2:
    print("Usage: python nglog.py log_file")
    sys.exit(1)

log_file = sys.argv[1]
ip_counter = Counter()

try:
    with open(log_file,'r') as file:
        for line in file:
            if line.strip():
                ip = line.lstrip().split(' ',1)[0]
                ip_counter[ip] += 1
except FileNotFoundError:
    print(f"Error: file '{log_file}' not found.")
    sys.exit(1)

print("IP 访问量统计")
for ip, count in ip_counter.most_common():
    print(f"IP: {ip}, Accesses: {count}")


# IP 地址提取:

#     line.lstrip().split(' ', 1)[0]:
#         lstrip() 去除行首空白字符
#         split(' ', 1) 按第一个空格分割字符串
#         [0] 取分割后的第一个部分(即 IP 地址)

#  cat access.log 
# 192.168.1.1 - - [01/Jan/2023:00:00:00 +0000] "GET /index.html HTTP/1.1"

# python nglog.py access.log 
# IP 访问量统计
# IP: 192.168.1.1, Accesses: 2
# IP: 192.168.1.2, Accesses: 1

for ip, count in ip_counter.most_common():
ip_counter.most_common() 的作用

    ip_counter 是一个 Counter 对象,它存储了每个 IP 地址的出现次数。
        例如:Counter({'192.168.1.1': 5, '192.168.1.2': 3, '10.0.0.1': 2})
    most_common() 是 Counter 的方法,用于按计数从高到低排序元素。
        返回值:一个包含元组的列表,每个元组格式为 (元素, 计数)。
        例如:[('192.168.1.1', 5), ('192.168.1.2', 3), ('10.0.0.1', 2)]

循环变量解构

    for ip, count in ...:
        ip:每次循环中存储当前 IP 地址(元组的第一个元素)。
        count:存储该 IP 的访问次数(元组的第二个元素)。

第 3 行:print(f"IP: {ip}, Accesses: {count}")
f-string 格式化输出

    语法:f"字符串{变量}" 会将 {} 内的变量值插入字符串。
    示例:
        当 ip = '192.168.1.1' 且 count = 5 时,输出:
        plaintext

        IP: 192.168.1.1, Accesses: 5


完整执行流程示例
假设 ip_counter 的内容为:

python
运行

Counter({'192.168.1.1': 3, '10.0.0.1': 2, '192.168.1.100': 1})


    第一次循环:
        ip = '192.168.1.1',count = 3
        输出:IP: 192.168.1.1, Accesses: 3
    第二次循环:
        ip = '10.0.0.1',count = 2
        输出:IP: 10.0.0.1, Accesses: 2
    第三次循环:
        ip = '192.168.1.100',count = 1
        输出:IP: 192.168.1.100, Accesses: 1

为什么用 most_common()?

    默认行为:most_common() 不指定参数时,会返回所有元素,并按计数从高到低排序。
    等价写法:
    python

运行

# 手动排序(效果相同)
for ip, count in sorted(ip_counter.items(), key=lambda x: x[1], reverse=True):
    print(f"IP: {ip}, Accesses: {count}")



最终输出示例
plaintext

IP 访问量统计
IP: 192.168.1.1, Accesses: 3
IP: 10.0.0.1, Accesses: 2
IP: 192.168.1.100, Accesses: 1



这两行代码的核心作用是将统计结果以易读的格式展示出来,并按访问量从高到低排序,帮助用户快速定位高频访问的 IP 地址。

day 6

Day 6 错误处理 try...except...finally 模拟连接失败的错误处理

import time 
def connect_to_resource(url,max_retries=3):
    retries = 0
    while  retries < max_retries:
        try:
            print(f"尝试连接到 {url}, 第 {retries+1} 次尝试")
            should_fail =  (retries < 2)
            if should_fail:
                raise Exception("Failed to connect to resource")
            else:
                print("连接成功 {url}")
                return True 
        except Exception as e:
            print(f"连接失败 {url}, 错误信息: {e}")
            retries += 1
            print(f"等 2 秒后重试")
            time.sleep(1)
        finally:
            print("无论连接成功与不,这里都会执行清理操作")
            print("-" * 20)

    print(f"尝试了 {max_retries} 次,连接失败")
    return False 

if __name__ == "__main__":
    resource_url = "https://wiki.linuxchina.net"
    connection_successful = connect_to_resource(resource_url)

    if connection_successful:
        print("连接成功")
    else:
        print("连接失败")


        

尝试连接到 https://wiki.linuxchina.net, 第 1 次尝试
连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource
等 2 秒后重试
无论连接成功与不,这里都会执行清理操作
--------------------
尝试连接到 https://wiki.linuxchina.net, 第 2 次尝试
连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource
等 2 秒后重试
无论连接成功与不,这里都会执行清理操作
--------------------
尝试连接到 https://wiki.linuxchina.net, 第 3 次尝试
连接成功 {url}
无论连接成功与不,这里都会执行清理操作
--------------------
连接成功

day 7

Day 7 模块与包 import, sys.path, 自定义模块 把昨天的脚本封装为一个模块

day 8

Day 8 subprocess 与 os 执行 shell 命令,获取结果 写一个“批量检测服务状态”的脚本

socket version 利用python检测远程 IP和端口是否可连接并钉钉报警

nc version

import subprocess 

def check_port(host,port):
    try:
        result = subprocess.run(
            ['nc','-zv',host,str(port)],
            stdout = subprocess.PIPE,
            stderr = subprocess.STDOUT,
            timeout = 3, 
            text=True
        )
        if "succeeded" in result.stdout or "open" in result.stdout:
            return f"{host}:{port} is UP"
        else:
            return f"{host}:{port} is DOWN"
    except subprocess.TimeoutExpired:
        return f"{host}:{port} Timeout"
    except Exception as e:
        return f"{host}:{port} Error - {str(e)}"
    
if __name__ =='__main__':
    targets = [
        ('127.0.0.1',22),
        ('8.8.8.8',53),
        ('www.bing.com',80),
    ]
    for host, port in targets:
        print(check_port(host,port))


"""
 py check_port.py     
127.0.0.1:22 is UP
8.8.8.8:53 is UP
www.bing.com:80 is UP

socket version and telnet version
"""

telnet version

import telnetlib 
import socket 

def check_port(host,port,timeout=3):
    try:
        with telnetlib.Telnet(host,port,timeout):
            return True 
    except socket.timeout:
        return False
    except Exception:
        return False

if __name__ == '__main__':
    targets = [
        ("127.0.0.1",22),
        ("linuxsa.org",80)
    ]

    for host,port in targets:
        status = "UP" if check_port(host,port) else "DOWN"
        print(f"{host}:{port} is {status}")

telnet 报警版

import telnetlib 
import socket 
import yaml 
import requests 
import json 
from datetime import datetime 

DING_WEBHOOK ='https://oapi.dingtalk.com/robot/send?access_token=4578bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73d'

def load_from_yaml(file_path):
    with open(file_path,'r') as f:
        data = yaml.safe_load(f)
        return data.get("targets",[])
    
def send_ding_alert(message):
    headers = {'Content-Type': 'application/json; charset=utf-8'}
    data = {
        "msgtype": "text",
        "text": {
            "content": message
        }
    }
    try:
        response = requests.post(DING_WEBHOOK, headers=headers, data=json.dumps(data),timeout=5)
    except Exception as e:
        print(f"Failed to send ding alert: {e}")
       
 
def check_port(host,port,timeout=3):
    try:
        with telnetlib.Telnet(host,port,timeout):
            return True 
    except socket.timeout:
        return False
    except Exception:
        return False
def main():
    ips = load_from_yaml("/home/evan/data/tmp/py2025/ips.yaml")
    down_list=[]

    for item in ips:
        host = item.get("host")
        port = item.get("port")
        if not check_port(host,port):
            down_list.append(f"{host}:{port}")
    
    if down_list:
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        message = f"[{now}]  Port detection anomaly端口检测异常 {now} 报警: {', '.join(down_list)} 🚨"
        send_ding_alert(message)
    else:
        print("All ports are up")

if __name__ == '__main__':
    main()

ips.yaml 
targets:
  - host: 127.0.0.1
    port: 24
  - host: 8.8.8.8
    port: 53
  - host: www.bing.com
    port: 89

day 9

Day 9 requests 库 API 请求、返回解析、header/cookie 用钉钉/企业微信接口推送消息

pre: 都是图形操作,你事先有个钉钉群,没有的话创建一个 进入钉钉群 → 添加一个「自定义机器人」

设置关键词(如:报警)

复制 Webhook 地址(例子如下)


import requests 
import json 

webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=6678bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73dc155'

message_text = "报警: 服务器磁盘使用率超过阈值!🚨- test message"

headers = {
    "Content-Type": "application/json",
}

payload = {
    "msgtype": "text",
    "text": {
        "content":message_text 
    }
}

response = requests.post(url=webhook_url,headers=headers, data=json.dumps(payload))

if response.status_code == 200:
    result = response.json()
    if result.get("errcode") == 0:
        print("message sent successfully")
    else:
        print("message sent failed, error code: ", result)
else:
    print("HTTP err message sent failed, status code: ", response.status_code)

Day 10

paramiko + fabric SSH 自动执行远程命令 写一个“批量部署脚本”上传+执行

from fabric import Connection 
from invoke import Responder  

servers = [
    {"host": "192.168.10.5", "user": "root", "password": "xxxxxxx"},
    {"host": "192.168.1.101", "user": "root", "password": "123456"},

]

local_script = "test36.sh"
remote_path = "/tmp/test36.sh"

remote_cmd = f"bash {remote_path}"

for server in servers:
    print(f"uploading script to {server['host']}")

    conn = Connection(
        host=server['host'],
        user=server['user'],
        connect_kwargs={"password": server['password']}
    )

    print(f"uploading {local_script} to {remote_path}")
    conn.put(local_script, remote_path)

    conn.run(f"chmod +x {remote_path}")

    print("running remote script")
    result = conn.run(remote_cmd,hide=True)
    print(result.stdout)

    conn.close()

    print(f"Done with {server['host']}")









# cat /tmp/test36.sh 
#!/bin/bash
echo "Deploying service..."
# 示例命令,可改为拉 Git、重启服务等
systemctl restart nginx






trouble shooting

    from .loader import FilesystemLoader  # noqa
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/invoke/loader.py", line 3, in <module>
    import imp
ModuleNotFoundError: No module named 'imp'

你使用的 Fabric 版本太老,不兼容你当前的 Python 版本(Python 3.12+ 已经移除了 imp 模块)。

pip3 install --upgrade fabric invoke




Day 11

argparse 与定时任务 解析命令参数、结合 crontab 写一个“CLI磁盘告警工具”定时运行

Day 12

JSON/YAML 配置文件 json, yaml,配置与数据保存 写一个可配置的“日志清理工具”

import os  
import glob 
import time  
import argparse 
import json 
import  yaml 
from datetime import datetime,timedelta 

def load_config(file_path):
    with open(file_path, 'r') as f:
        if file_path.endswith('.json'):
            return json.load(f)
        elif file_path.endswith('.yaml') or file_path.endswith('.yml'):
            return yaml.safe_load(f)
        else:
            raise ValueError('Unsupported file type')
        
def clean_logs(path,pattern,keep_days):
    now = time.time()
    cutoff = now - (keep_days * 24 * 60 * 60)
    full_patern = os.path.join(path,pattern)
    deleted = []

    for file in glob.glob(full_patern):
        if os.path.isfile(file):
            file_mtime = os.path.getmtime(file)
            if file_mtime < cutoff:
                os.remove(file)
                deleted.append(file)

    return deleted 

def main():
    #创建一个命令行解析器对象 parser description 是给这个工具添加一个简短说明,在 --help 里会看到
    parser = argparse.ArgumentParser(description="clean log files") 

    #添加一个参数 --config(必须提供,required=True)
    parser.add_argument('--config',required=True,help="配置文件路径 (.json/.yaml)")
    args = parser.parse_args()

    config = load_config(args.config)
    log_items = config.get("log_paths",[])

    for item in log_items:
        path = item.get("path")
        pattern = item.get("pattern","*.log")
        keep_days = item.get("keep_days",7)

        deleted = clean_logs(path,pattern,keep_days)
        for f in deleted:
            print(f"deleted {f}")

if __name__ == "__main__":
    main()


#      sudo python  log_cleaner.py --config  config.yaml
# deleted /var/log/nginx/access.log
# deleted /var/log/nginx/error.log
# ➜  py2025 

"""
假设你的配置文件写的是:

log_paths:
  - path: "/var/log/nginx"
    pattern: "*.log"
    keep_days: 5
  - path: "/var/log/app"
    keep_days: 10

这段代码会这样处理:

    第一个 item:path="/var/log/nginx",pattern="*.log",keep_days=5

    第二个 item:path="/var/log/app",pattern="*.log"(默认值),keep_days=10
"""

Day 13

简易项目实战 综合使用上述技能 “自动部署 + 告警 + 日志备份”工具原型

Day 14

回顾 + 面试准备 简述项目、准备英文表达 准备好一个 Python 项目介绍 + 中英文回答模板

other

import  os  
import time 

log_dir= "/var/log/nginx"
now = time.time()

for filename in  os.listdir(log_dir):
    if filename.endswith(".log"):
        filepath = os.path.join(log_dir,filename)
        if os.stat(filepath).st_mtime < now - 3600*24*7: # delete log files older than 7 days
            os.remove(filepath)
            print(f"Deleted {filepath}")



#shell
#!/bin/bash

#LOG_DIR=$1
LOG_DIR="/var/log/apache2"

find "$LOG_DIR" -name "*.log" -type f -mtime +7 -exec rm -f {} \;