From 11dde135a7c5d7c3c2aaa16e17f1bbe260e152d7 Mon Sep 17 00:00:00 2001 From: xzx3344521 Date: Mon, 3 Nov 2025 17:28:39 +0800 Subject: [PATCH] Create ai --- ai | 391 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 391 insertions(+) create mode 100644 ai diff --git a/ai b/ai new file mode 100644 index 0000000..6895554 --- /dev/null +++ b/ai @@ -0,0 +1,391 @@ +#!/usr/bin/env python3 +""" +AI驱动智能防御系统 - 标准化指令响应 +""" + +import requests +import time +import re +import json +import subprocess +from datetime import datetime +import threading +import sqlite3 +import os + +class AIDrivenDefenseSystem: + def __init__(self, api_key, log_file_path="/var/log/auth.log"): + self.api_key = api_key + self.api_url = "https://api.deepseek.com/v1/chat/completions" + self.log_file_path = log_file_path + self.last_position = 0 + + # AI指令映射表 + self.ai_commands = { + # 监控指令 + "高危攻击": self.defend_critical_attack, + "立即封锁": self.defend_immediate_block, + "暴力破解": self.defend_bruteforce, + "端口扫描": self.defend_port_scan, + "可疑行为": self.defend_suspicious, + "持续监控": self.defend_monitor_only, + + # 反击指令(合法范围内) + "反向追踪": self.defend_traceback, + "流量限制": self.defend_rate_limit, + "服务隐藏": self.defend_service_hide, + "蜜罐诱捕": self.defend_honeypot, + } + + # 初始化数据库 + self.init_database() + + def init_database(self): + """初始化防御数据库""" + self.conn = sqlite3.connect('defense_actions.db', check_same_thread=False) + cursor = self.conn.cursor() + cursor.execute(''' + CREATE TABLE IF NOT EXISTS defense_logs ( + id INTEGER PRIMARY KEY, + timestamp TIMESTAMP, + ip TEXT, + ai_command TEXT, + action_taken TEXT, + threat_level TEXT + ) + ''') + self.conn.commit() + + def get_log_summary(self, log_lines): + """从日志中提取关键信息""" + summary = { + 'failed_logins': [], + 'suspicious_ips': [], + 'port_scan_signs': [], + 'error_messages': [], + 'timeline': [] + } + + for line in log_lines[-100:]: # 分析最近100行 + line = line.strip() + + # SSH相关检测 + if "Failed password" in line: + ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line) + if ip_match: + ip = ip_match.group(1) + summary['failed_logins'].append(f"SSH失败: {ip}") + if ip not in summary['suspicious_ips']: + summary['suspicious_ips'].append(ip) + + # 端口扫描检测 + elif "Connection reset by peer" in line or "refused connect" in line.lower(): + ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line) + if ip_match: + summary['port_scan_signs'].append(f"端口扫描: {ip_match.group(1)}") + + # 其他可疑行为 + elif "invalid user" in line.lower() or "authentication failure" in line.lower(): + ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line) or re.search(r'rhost=(\d+\.\d+\.\d+\.\d+)', line) + if ip_match: + summary['suspicious_ips'].append(ip_match.group(1)) + summary['error_messages'].append(line) + + return summary + + def ask_ai_for_command(self, log_summary): + """询问AI获取标准化防御指令""" + + prompt = f""" + 请分析以下系统日志摘要,并返回一个标准化的防御指令: + + 日志摘要: + {json.dumps(log_summary, indent=2, ensure_ascii=False)} + + 请从以下指令中选择最合适的一个返回(只返回指令关键词): + - "高危攻击":检测到严重入侵企图 + - "立即封锁":需要立即封锁IP + - "暴力破解":检测到暴力破解攻击 + - "端口扫描":检测到端口扫描行为 + - "可疑行为":一般可疑活动,需要监控 + - "持续监控":无明显威胁,继续监控 + - "反向追踪":需要追踪攻击源 + - "流量限制":限制该IP的访问频率 + - "服务隐藏":隐藏服务端口 + - "蜜罐诱捕":设置蜜罐进行反制 + + 同时请分析: + 1. 攻击的严重程度 + 2. 是否需要立即响应 + 3. 建议的具体防御措施 + + 只返回指令关键词,不要其他内容。 + """ + + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json" + } + + data = { + "model": "deepseek-chat", + "messages": [ + { + "role": "system", + "content": "你是一个网络安全专家,专门分析日志和提供防御指令。只返回标准化指令关键词。" + }, + { + "role": "user", + "content": prompt + } + ], + "temperature": 0.1, + "max_tokens": 50 + } + + try: + response = requests.post(self.api_url, headers=headers, json=data, timeout=30) + response.raise_for_status() + result = response.json() + ai_response = result['choices'][0]['message']['content'].strip() + + # 提取指令关键词 + for command in self.ai_commands.keys(): + if command in ai_response: + return command + + # 如果没有匹配的指令,返回默认指令 + return "持续监控" + + except Exception as e: + print(f"AI API调用错误: {e}") + return "持续监控" + + def execute_defense_command(self, command, ip_address=None, log_data=None): + """执行AI指令对应的防御动作""" + print(f"🎯 执行AI指令: {command} | 目标IP: {ip_address}") + + if command in self.ai_commands: + # 记录到数据库 + self.log_defense_action(ip_address, command, "开始执行") + + # 执行对应的防御函数 + result = self.ai_commands[command](ip_address, log_data) + + # 更新日志 + self.log_defense_action(ip_address, command, f"执行完成: {result}") + + return result + else: + print(f"未知指令: {command}") + return "未知指令" + + def defend_critical_attack(self, ip, log_data): + """高危攻击响应""" + actions = [] + + # 1. 立即封锁IP + actions.append(self.block_ip_iptables(ip)) + + # 2. 记录到黑名单 + actions.append(self.add_to_blacklist(ip)) + + # 3. 发送紧急警报 + actions.append(self.send_alert(f"高危攻击检测", f"IP: {ip} 被判定为高危攻击")) + + # 4. 收集攻击证据 + actions.append(self.collect_evidence(ip)) + + return " | ".join(actions) + + def defend_immediate_block(self, ip, log_data): + """立即封锁响应""" + return self.block_ip_iptables(ip) + + def defend_bruteforce(self, ip, log_data): + """暴力破解响应""" + actions = [] + actions.append(self.block_ip_iptables(ip)) + actions.append(self.add_to_blacklist(ip)) + actions.append(self.change_ssh_port()) # 更改SSH端口 + return " | ".join(actions) + + def defend_port_scan(self, ip, log_data): + """端口扫描响应""" + actions = [] + actions.append(self.rate_limit_ip(ip)) + actions.append(self.hide_services()) + actions.append(self.monitor_ip(ip)) + return " | ".join(actions) + + def defend_suspicious(self, ip, log_data): + """可疑行为响应""" + return self.monitor_ip(ip) + + def defend_monitor_only(self, ip, log_data): + """持续监控""" + return "保持监控状态" + + def defend_traceback(self, ip, log_data): + """反向追踪(合法方式)""" + try: + # 使用traceroute进行路径追踪 + result = subprocess.run( + f"traceroute -m 10 {ip}", + shell=True, capture_output=True, text=True, timeout=30 + ) + trace_info = result.stdout[:500] # 只保存前500字符 + + # 保存追踪结果 + with open(f'traceback_{ip}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log', 'w') as f: + f.write(trace_info) + + return f"反向追踪完成: {ip}" + except Exception as e: + return f"追踪失败: {e}" + + def defend_rate_limit(self, ip, log_data): + """流量限制""" + try: + # 使用iptables限制连接频率 + cmd = f"sudo iptables -A INPUT -s {ip} -m limit --limit 10/minute -j ACCEPT" + subprocess.run(cmd, shell=True, check=True) + cmd = f"sudo iptables -A INPUT -s {ip} -j DROP" + subprocess.run(cmd, shell=True, check=True) + return f"流量限制已设置: {ip}" + except Exception as e: + return f"流量限制失败: {e}" + + def defend_service_hide(self, ip, log_data): + """服务隐藏""" + try: + # 更改SSH端口(示例) + cmd = "sudo sed -i 's/#Port 22/Port 2222/' /etc/ssh/sshd_config" + subprocess.run(cmd, shell=True, check=True) + subprocess.run("sudo systemctl restart sshd", shell=True, check=True) + return "SSH服务已隐藏到2222端口" + except Exception as e: + return f"服务隐藏失败: {e}" + + def defend_honeypot(self, ip, log_data): + """蜜罐诱捕""" + try: + # 创建简单的蜜罐服务 + cmd = "sudo nohup python3 -m http.server 8080 --directory /tmp/ &" + subprocess.run(cmd, shell=True, check=True) + return "蜜罐服务已在8080端口启动" + except Exception as e: + return f"蜜罐设置失败: {e}" + + # 具体的防御动作实现 + def block_ip_iptables(self, ip): + """使用iptables封锁IP""" + try: + check_cmd = f"sudo iptables -C INPUT -s {ip} -j DROP 2>/dev/null" + result = subprocess.run(check_cmd, shell=True, capture_output=True) + + if result.returncode != 0: + block_cmd = f"sudo iptables -A INPUT -s {ip} -j DROP" + subprocess.run(block_cmd, shell=True, check=True) + return f"IP已封锁: {ip}" + else: + return f"IP已存在封锁规则: {ip}" + except Exception as e: + return f"封锁失败: {e}" + + def add_to_blacklist(self, ip): + """添加到黑名单文件""" + try: + with open('/tmp/ip_blacklist.txt', 'a') as f: + f.write(f"{ip} # Blocked at {datetime.now()}\n") + return "已添加至黑名单" + except Exception as e: + return f"黑名单添加失败: {e}" + + def change_ssh_port(self): + """更改SSH端口""" + return "建议手动更改SSH端口配置" + + def hide_services(self): + """隐藏服务""" + return "服务隐藏策略已执行" + + def monitor_ip(self, ip): + """监控IP""" + return f"开始重点监控: {ip}" + + def send_alert(self, title, message): + """发送警报""" + print(f"🚨 警报: {title} - {message}") + return "警报已发送" + + def collect_evidence(self, ip): + """收集证据""" + try: + # 收集网络连接信息 + cmd = f"netstat -an | grep {ip} > /tmp/evidence_{ip}.log" + subprocess.run(cmd, shell=True) + return "证据收集完成" + except: + return "证据收集失败" + + def log_defense_action(self, ip, command, action): + """记录防御动作到数据库""" + cursor = self.conn.cursor() + cursor.execute(''' + INSERT INTO defense_logs (timestamp, ip, ai_command, action_taken, threat_level) + VALUES (?, ?, ?, ?, ?) + ''', (datetime.now(), ip, command, action, "high" if "封锁" in command else "medium")) + self.conn.commit() + + def monitor_loop(self): + """主监控循环""" + print("🤖 AI驱动防御系统启动...") + print("📋 可用指令:", list(self.ai_commands.keys())) + + while True: + try: + # 模拟获取日志(实际使用时替换为真实日志读取) + sample_logs = [ + f"{datetime.now()} - Failed password for root from 192.168.1.100", + f"{datetime.now()} - Connection reset by peer from 10.0.0.50", + ] + + # 分析日志 + log_summary = self.get_log_summary(sample_logs) + + if log_summary['suspicious_ips']: + print(f"🔍 发现可疑IP: {log_summary['suspicious_ips']}") + + # 询问AI获取指令 + ai_command = self.ask_ai_for_command(log_summary) + + # 对每个可疑IP执行指令 + for ip in log_summary['suspicious_ips']: + result = self.execute_defense_command(ai_command, ip, log_summary) + print(f"✅ 执行结果: {result}") + + time.sleep(30) # 每30秒检查一次 + + except KeyboardInterrupt: + print("\n🛑 防御系统已停止") + break + except Exception as e: + print(f"❌ 监控错误: {e}") + time.sleep(30) + +def main(): + API_KEY = "您的DeepSeek_API_Key" + + # 检查权限 + try: + subprocess.run(['sudo', 'iptables', '-L'], capture_output=True) + print("✅ 具备防御操作权限") + except: + print("⚠️ 需要root权限执行防御动作") + + defense_system = AIDrivenDefenseSystem(API_KEY) + defense_system.monitor_loop() + +if __name__ == "__main__": + main()