diff --git a/ai b/ai index 7f9e646..fb00840 100644 --- a/ai +++ b/ai @@ -1,391 +1,321 @@ -#!/usr/bin/env python3 -""" -AI驱动智能防御系统 - 标准化指令响应 -""" +#!/bin/bash -import requests -import time -import re -import json -import subprocess -from datetime import datetime -import threading -import sqlite3 -import os +# AI驱动的问题修复脚本 - 自动识别并解决问题 -class AIDrivenDefenseSystem: - def __init__(self, api_key, log_file_path="/var/log/auth.log"): - self.api_key = api_key - self.api_url = "https://api.deepseek.com/v1/chat/completions" - self.log_file_path = log_file_path - self.last_position = 0 - - # AI指令映射表 - self.ai_commands = { - # 监控指令 - "高危攻击": self.defend_critical_attack, - "立即封锁": self.defend_immediate_block, - "暴力破解": self.defend_bruteforce, - "端口扫描": self.defend_port_scan, - "可疑行为": self.defend_suspicious, - "持续监控": self.defend_monitor_only, - - # 反击指令(合法范围内) - "反向追踪": self.defend_traceback, - "流量限制": self.defend_rate_limit, - "服务隐藏": self.defend_service_hide, - "蜜罐诱捕": self.defend_honeypot, - } - - # 初始化数据库 - self.init_database() - - def init_database(self): - """初始化防御数据库""" - self.conn = sqlite3.connect('defense_actions.db', check_same_thread=False) - cursor = self.conn.cursor() - cursor.execute(''' - CREATE TABLE IF NOT EXISTS defense_logs ( - id INTEGER PRIMARY KEY, - timestamp TIMESTAMP, - ip TEXT, - ai_command TEXT, - action_taken TEXT, - threat_level TEXT - ) - ''') - self.conn.commit() +set -e - def get_log_summary(self, log_lines): - """从日志中提取关键信息""" - summary = { - 'failed_logins': [], - 'suspicious_ips': [], - 'port_scan_signs': [], - 'error_messages': [], - 'timeline': [] - } - - for line in log_lines[-100:]: # 分析最近100行 - line = line.strip() - - # SSH相关检测 - if "Failed password" in line: - ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line) - if ip_match: - ip = ip_match.group(1) - summary['failed_logins'].append(f"SSH失败: {ip}") - if ip not in summary['suspicious_ips']: - summary['suspicious_ips'].append(ip) - - # 端口扫描检测 - elif "Connection reset by peer" in line or "refused connect" in line.lower(): - ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line) - if ip_match: - summary['port_scan_signs'].append(f"端口扫描: {ip_match.group(1)}") - - # 其他可疑行为 - elif "invalid user" in line.lower() or "authentication failure" in line.lower(): - ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line) or re.search(r'rhost=(\d+\.\d+\.\d+\.\d+)', line) - if ip_match: - summary['suspicious_ips'].append(ip_match.group(1)) - summary['error_messages'].append(line) - - return summary +# 配置 +DEEPSEEK_API_KEY="你的DeepSeek_API密钥" +LOG_FILE="/var/log/ai_fixer.log" +TEMP_DIR="/tmp/ai_fixer" - def ask_ai_for_command(self, log_summary): - """询问AI获取标准化防御指令""" - - prompt = f""" - 请分析以下系统日志摘要,并返回一个标准化的防御指令: +# 颜色定义 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' - 日志摘要: - {json.dumps(log_summary, indent=2, ensure_ascii=False)} +# 初始化 +init() { + echo -e "${BLUE}🚀 AI驱动问题修复系统启动...${NC}" + mkdir -p "$TEMP_DIR" + sudo touch "$LOG_FILE" + sudo chmod 644 "$LOG_FILE" + log "系统初始化完成" +} - 请从以下指令中选择最合适的一个返回(只返回指令关键词): - - "高危攻击":检测到严重入侵企图 - - "立即封锁":需要立即封锁IP - - "暴力破解":检测到暴力破解攻击 - - "端口扫描":检测到端口扫描行为 - - "可疑行为":一般可疑活动,需要监控 - - "持续监控":无明显威胁,继续监控 - - "反向追踪":需要追踪攻击源 - - "流量限制":限制该IP的访问频率 - - "服务隐藏":隐藏服务端口 - - "蜜罐诱捕":设置蜜罐进行反制 +# 日志记录 +log() { + local message="$1" + echo "$(date '+%Y-%m-%d %H:%M:%S') - $message" | sudo tee -a "$LOG_FILE" + echo -e "${BLUE}[AI修复]${NC} $message" +} - 同时请分析: - 1. 攻击的严重程度 - 2. 是否需要立即响应 - 3. 建议的具体防御措施 - - 只返回指令关键词,不要其他内容。 - """ - - headers = { - "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json" - } - - data = { - "model": "deepseek-chat", +# 调用DeepSeek API +call_ai() { + local context="$1" + local problem="$2" + + log "咨询AI助手解决问题: $problem" + + # 构建API请求 + local api_response=$(curl -s -X POST "https://api.deepseek.com/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer $DEEPSEEK_API_KEY" \ + -d '{ + "model": "deepseek-coder", "messages": [ { - "role": "system", - "content": "你是一个网络安全专家,专门分析日志和提供防御指令。只返回标准化指令关键词。" + "role": "system", + "content": "你是一个Linux系统和编程专家。请直接给出解决问题的具体命令或代码,用中文简要解释。当前系统:Debian Linux" }, { "role": "user", - "content": prompt + "content": "'"问题上下文: $context\n\n具体问题: $problem\n\n请提供具体的修复命令或脚本:"'" } ], - "temperature": 0.1, - "max_tokens": 50 - } - - try: - response = requests.post(self.api_url, headers=headers, json=data, timeout=30) - response.raise_for_status() - result = response.json() - ai_response = result['choices'][0]['message']['content'].strip() - - # 提取指令关键词 - for command in self.ai_commands.keys(): - if command in ai_response: - return command - - # 如果没有匹配的指令,返回默认指令 - return "持续监控" - - except Exception as e: - print(f"AI API调用错误: {e}") - return "持续监控" - - def execute_defense_command(self, command, ip_address=None, log_data=None): - """执行AI指令对应的防御动作""" - print(f"🎯 执行AI指令: {command} | 目标IP: {ip_address}") - - if command in self.ai_commands: - # 记录到数据库 - self.log_defense_action(ip_address, command, "开始执行") - - # 执行对应的防御函数 - result = self.ai_commands[command](ip_address, log_data) - - # 更新日志 - self.log_defense_action(ip_address, command, f"执行完成: {result}") - - return result - else: - print(f"未知指令: {command}") - return "未知指令" - - def defend_critical_attack(self, ip, log_data): - """高危攻击响应""" - actions = [] - - # 1. 立即封锁IP - actions.append(self.block_ip_iptables(ip)) - - # 2. 记录到黑名单 - actions.append(self.add_to_blacklist(ip)) - - # 3. 发送紧急警报 - actions.append(self.send_alert(f"高危攻击检测", f"IP: {ip} 被判定为高危攻击")) - - # 4. 收集攻击证据 - actions.append(self.collect_evidence(ip)) - - return " | ".join(actions) - - def defend_immediate_block(self, ip, log_data): - """立即封锁响应""" - return self.block_ip_iptables(ip) - - def defend_bruteforce(self, ip, log_data): - """暴力破解响应""" - actions = [] - actions.append(self.block_ip_iptables(ip)) - actions.append(self.add_to_blacklist(ip)) - actions.append(self.change_ssh_port()) # 更改SSH端口 - return " | ".join(actions) - - def defend_port_scan(self, ip, log_data): - """端口扫描响应""" - actions = [] - actions.append(self.rate_limit_ip(ip)) - actions.append(self.hide_services()) - actions.append(self.monitor_ip(ip)) - return " | ".join(actions) - - def defend_suspicious(self, ip, log_data): - """可疑行为响应""" - return self.monitor_ip(ip) - - def defend_monitor_only(self, ip, log_data): - """持续监控""" - return "保持监控状态" - - def defend_traceback(self, ip, log_data): - """反向追踪(合法方式)""" - try: - # 使用traceroute进行路径追踪 - result = subprocess.run( - f"traceroute -m 10 {ip}", - shell=True, capture_output=True, text=True, timeout=30 - ) - trace_info = result.stdout[:500] # 只保存前500字符 - - # 保存追踪结果 - with open(f'traceback_{ip}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log', 'w') as f: - f.write(trace_info) - - return f"反向追踪完成: {ip}" - except Exception as e: - return f"追踪失败: {e}" - - def defend_rate_limit(self, ip, log_data): - """流量限制""" - try: - # 使用iptables限制连接频率 - cmd = f"sudo iptables -A INPUT -s {ip} -m limit --limit 10/minute -j ACCEPT" - subprocess.run(cmd, shell=True, check=True) - cmd = f"sudo iptables -A INPUT -s {ip} -j DROP" - subprocess.run(cmd, shell=True, check=True) - return f"流量限制已设置: {ip}" - except Exception as e: - return f"流量限制失败: {e}" - - def defend_service_hide(self, ip, log_data): - """服务隐藏""" - try: - # 更改SSH端口(示例) - cmd = "sudo sed -i 's/#Port 22/Port 2222/' /etc/ssh/sshd_config" - subprocess.run(cmd, shell=True, check=True) - subprocess.run("sudo systemctl restart sshd", shell=True, check=True) - return "SSH服务已隐藏到2222端口" - except Exception as e: - return f"服务隐藏失败: {e}" - - def defend_honeypot(self, ip, log_data): - """蜜罐诱捕""" - try: - # 创建简单的蜜罐服务 - cmd = "sudo nohup python3 -m http.server 8080 --directory /tmp/ &" - subprocess.run(cmd, shell=True, check=True) - return "蜜罐服务已在8080端口启动" - except Exception as e: - return f"蜜罐设置失败: {e}" - - # 具体的防御动作实现 - def block_ip_iptables(self, ip): - """使用iptables封锁IP""" - try: - check_cmd = f"sudo iptables -C INPUT -s {ip} -j DROP 2>/dev/null" - result = subprocess.run(check_cmd, shell=True, capture_output=True) - - if result.returncode != 0: - block_cmd = f"sudo iptables -A INPUT -s {ip} -j DROP" - subprocess.run(block_cmd, shell=True, check=True) - return f"IP已封锁: {ip}" - else: - return f"IP已存在封锁规则: {ip}" - except Exception as e: - return f"封锁失败: {e}" - - def add_to_blacklist(self, ip): - """添加到黑名单文件""" - try: - with open('/tmp/ip_blacklist.txt', 'a') as f: - f.write(f"{ip} # Blocked at {datetime.now()}\n") - return "已添加至黑名单" - except Exception as e: - return f"黑名单添加失败: {e}" - - def change_ssh_port(self): - """更改SSH端口""" - return "建议手动更改SSH端口配置" - - def hide_services(self): - """隐藏服务""" - return "服务隐藏策略已执行" - - def monitor_ip(self, ip): - """监控IP""" - return f"开始重点监控: {ip}" - - def send_alert(self, title, message): - """发送警报""" - print(f"🚨 警报: {title} - {message}") - return "警报已发送" - - def collect_evidence(self, ip): - """收集证据""" - try: - # 收集网络连接信息 - cmd = f"netstat -an | grep {ip} > /tmp/evidence_{ip}.log" - subprocess.run(cmd, shell=True) - return "证据收集完成" - except: - return "证据收集失败" - - def log_defense_action(self, ip, command, action): - """记录防御动作到数据库""" - cursor = self.conn.cursor() - cursor.execute(''' - INSERT INTO defense_logs (timestamp, ip, ai_command, action_taken, threat_level) - VALUES (?, ?, ?, ?, ?) - ''', (datetime.now(), ip, command, action, "high" if "封锁" in command else "medium")) - self.conn.commit() - - def monitor_loop(self): - """主监控循环""" - print("🤖 AI驱动防御系统启动...") - print("📋 可用指令:", list(self.ai_commands.keys())) - - while True: - try: - # 模拟获取日志(实际使用时替换为真实日志读取) - sample_logs = [ - f"{datetime.now()} - Failed password for root from 192.168.1.100", - f"{datetime.now()} - Connection reset by peer from 10.0.0.50", - ] - - # 分析日志 - log_summary = self.get_log_summary(sample_logs) - - if log_summary['suspicious_ips']: - print(f"🔍 发现可疑IP: {log_summary['suspicious_ips']}") - - # 询问AI获取指令 - ai_command = self.ask_ai_for_command(log_summary) - - # 对每个可疑IP执行指令 - for ip in log_summary['suspicious_ips']: - result = self.execute_defense_command(ai_command, ip, log_summary) - print(f"✅ 执行结果: {result}") - - time.sleep(30) # 每30秒检查一次 - - except KeyboardInterrupt: - print("\n🛑 防御系统已停止") - break - except Exception as e: - print(f"❌ 监控错误: {e}") - time.sleep(30) - -def main(): - API_KEY = "您的DeepSeek_API_Key" + "temperature": 0.3 + }') - # 检查权限 - try: - subprocess.run(['sudo', 'iptables', '-L'], capture_output=True) - print("✅ 具备防御操作权限") - except: - print("⚠️ 需要root权限执行防御动作") + # 提取AI回复 + local ai_response=$(echo "$api_response" | grep -o '"content":"[^"]*"' | cut -d'"' -f4) - defense_system = sk-61d6716fe6b2452d94a0cee3bc5c4e2e - defense_system.monitor_loop() + if [ -z "$ai_response" ]; then + echo "AI请求失败,使用备用方案" + return 1 + fi + + echo "$ai_response" +} -if __name__ == "__main__": - main() +# 分析错误信息 +analyze_error() { + local error_output="$1" + log "分析错误信息..." + + echo "$error_output" > "$TEMP_DIR/error.log" + + # 检测错误类型 + if echo "$error_output" | grep -q "command not found"; then + echo "Python脚本被当作bash执行" + return 1 + elif echo "$error_output" | grep -q "import.*command not found"; then + echo "Python代码在bash中执行错误" + return 2 + elif echo "$error_output" | grep -q "syntax error"; then + echo "语法错误" + return 3 + else + echo "未知错误类型" + return 4 + fi +} + +# 自动修复Python脚本问题 +fix_python_script() { + local script_url="$1" + log "修复Python脚本: $script_url" + + # 下载脚本 + local original_content=$(curl -sSL "$script_url") + + if [ -z "$original_content" ]; then + log "下载脚本失败" + return 1 + fi + + echo "$original_content" > "$TEMP_DIR/original_script" + + # 分析问题并获取AI修复方案 + local context="原始脚本内容:\n$original_content\n\n错误: Python代码被当作bash执行" + local problem="这是一个Python脚本,但被用bash命令执行了,导致import等Python语句报错" + + local ai_solution=$(call_ai "$context" "$problem") + + if [ $? -eq 0 ]; then + echo -e "${GREEN}🤖 AI修复方案:${NC}" + echo "$ai_solution" + echo "$ai_solution" > "$TEMP_DIR/ai_solution" + + # 执行AI的修复命令 + execute_ai_solution "$ai_solution" + else + # 备用修复方案 + backup_fix "$original_content" + fi +} + +# 执行AI的解决方案 +execute_ai_solution() { + local solution="$1" + log "执行AI解决方案..." + + # 提取命令部分(假设AI返回中包含具体命令) + local commands=$(echo "$solution" | grep -E "^(sudo |bash |python |curl |wget |apt)" || echo "$solution") + + # 执行每个找到的命令 + while IFS= read -r line; do + if [[ "$line" =~ ^(sudo|bash|python|curl|wget|apt|pip) ]]; then + log "执行: $line" + eval "$line" 2>&1 | sudo tee -a "$LOG_FILE" + fi + done <<< "$commands" +} + +# 备用修复方案 +backup_fix() { + local original_content="$1" + log "使用备用方案修复..." + + # 检测脚本类型 + if echo "$original_content" | grep -q "#!/usr/bin/env python"; then + echo -e "${YELLOW}检测到Python脚本,重新下载并正确执行...${NC}" + + # 下载为Python文件 + curl -sSL "$script_url" -o "$TEMP_DIR/script.py" + chmod +x "$TEMP_DIR/script.py" + + # 安装必要的Python依赖 + sudo apt update + sudo apt install -y python3 python3-pip + + # 尝试执行Python脚本 + python3 "$TEMP_DIR/script.py" + + elif echo "$original_content" | grep -q "import.*"; then + echo -e "${YELLOW}这明显是Python代码,创建正确的Python文件...${NC}" + + # 创建正确的Python脚本 + cat > "$TEMP_DIR/fixed_script.py" << EOF +#!/usr/bin/env python3 +$original_content +EOF + + chmod +x "$TEMP_DIR/fixed_script.py" + python3 "$TEMP_DIR/fixed_script.py" + + else + echo -e "${RED}无法自动识别脚本类型${NC}" + return 1 + fi +} + +# 创建智能启动器 +create_smart_launcher() { + log "创建智能脚本启动器..." + + cat > /usr/local/bin/ai-run << 'EOF' +#!/bin/bash + +# AI智能脚本启动器 - 自动识别脚本类型并正确执行 + +SCRIPT_URL="$1" +TEMP_SCRIPT="/tmp/ai_script_$$" + +# 颜色定义 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +echo -e "${BLUE}🤖 AI智能启动器运行中...${NC}" + +# 下载脚本 +if ! curl -sSL "$SCRIPT_URL" -o "$TEMP_SCRIPT"; then + echo -e "${RED}❌ 下载脚本失败${NC}" + exit 1 +fi + +# 检测脚本类型 +detect_script_type() { + local script_content=$(cat "$TEMP_SCRIPT") + + # 检查shebang + if head -1 "$TEMP_SCRIPT" | grep -q "^#!/usr/bin/env python"; then + echo "python" + elif head -1 "$TEMP_SCRIPT" | grep -q "^#!/bin/bash"; then + echo "bash" + elif head -1 "$TEMP_SCRIPT" | grep -q "^#!/bin/sh"; then + echo "shell" + elif echo "$script_content" | grep -q "import.*"; then + echo "python" + elif echo "$script_content" | grep -q "def.*("; then + echo "python" + else + echo "unknown" + fi +} + +SCRIPT_TYPE=$(detect_script_type) + +echo -e "${YELLOW}检测到脚本类型: $SCRIPT_TYPE${NC}" + +# 根据类型执行 +case "$SCRIPT_TYPE" in + "python") + echo -e "${GREEN}执行Python脚本...${NC}" + # 确保有Python环境 + if ! command -v python3 &> /dev/null; then + sudo apt update && sudo apt install -y python3 + fi + python3 "$TEMP_SCRIPT" + ;; + "bash"|"shell") + echo -e "${GREEN}执行Bash脚本...${NC}" + chmod +x "$TEMP_SCRIPT" + bash "$TEMP_SCRIPT" + ;; + *) + echo -e "${YELLOW}未知类型,尝试智能执行...${NC}" + # 尝试作为bash执行 + if bash -n "$TEMP_SCRIPT" 2>/dev/null; then + echo -e "${GREEN}作为Bash脚本执行${NC}" + chmod +x "$TEMP_SCRIPT" + bash "$TEMP_SCRIPT" + else + echo -e "${GREEN}作为Python脚本执行${NC}" + python3 "$TEMP_SCRIPT" + fi + ;; +esac + +# 清理 +rm -f "$TEMP_SCRIPT" +EOF + + chmod +x /usr/local/bin/ai-run + echo -e "${GREEN}✅ 智能启动器安装完成${NC}" + echo -e "${YELLOW}使用方法: ai-run <脚本URL>${NC}" +} + +# 主修复函数 +main_fix() { + local script_url="https://github.com/xzx3344521/dock/raw/refs/heads/main/ai" + + echo -e "${BLUE}🔍 开始分析问题...${NC}" + log "目标脚本: $script_url" + + # 分析问题 + analyze_error "Python脚本被当作bash执行,import语句报错" + + # 修复问题 + fix_python_script "$script_url" + + # 创建智能启动器防止未来问题 + create_smart_launcher + + echo -e "${GREEN}✅ 修复完成!${NC}" + echo -e "${YELLOW}🎯 现在你可以使用: ai-run https://github.com/xzx3344521/dock/raw/refs/heads/main/ai${NC}" +} + +# 显示使用说明 +show_help() { + echo -e "${GREEN}AI驱动问题修复系统${NC}" + echo "使用方法:" + echo " $0 fix - 修复当前问题" + echo " $0 run - 智能运行脚本" + echo " $0 help - 显示此帮助" +} + +# 主程序 +case "${1:-}" in + "fix") + init + main_fix + ;; + "run") + if [ -z "$2" ]; then + echo "请提供脚本URL" + exit 1 + fi + /usr/local/bin/ai-run "$2" + ;; + "help"|"") + show_help + ;; + *) + echo "未知命令: $1" + show_help + ;; +esac