Files
dock/ai
2025-11-03 17:55:41 +08:00

392 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
AI驱动智能防御系统 - 标准化指令响应
"""
import requests
import time
import re
import json
import subprocess
from datetime import datetime
import threading
import sqlite3
import os
class AIDrivenDefenseSystem:
def __init__(self, api_key, log_file_path="/var/log/auth.log"):
self.api_key = api_key
self.api_url = "https://api.deepseek.com/v1/chat/completions"
self.log_file_path = log_file_path
self.last_position = 0
# AI指令映射表
self.ai_commands = {
# 监控指令
"高危攻击": self.defend_critical_attack,
"立即封锁": self.defend_immediate_block,
"暴力破解": self.defend_bruteforce,
"端口扫描": self.defend_port_scan,
"可疑行为": self.defend_suspicious,
"持续监控": self.defend_monitor_only,
# 反击指令(合法范围内)
"反向追踪": self.defend_traceback,
"流量限制": self.defend_rate_limit,
"服务隐藏": self.defend_service_hide,
"蜜罐诱捕": self.defend_honeypot,
}
# 初始化数据库
self.init_database()
def init_database(self):
"""初始化防御数据库"""
self.conn = sqlite3.connect('defense_actions.db', check_same_thread=False)
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS defense_logs (
id INTEGER PRIMARY KEY,
timestamp TIMESTAMP,
ip TEXT,
ai_command TEXT,
action_taken TEXT,
threat_level TEXT
)
''')
self.conn.commit()
def get_log_summary(self, log_lines):
"""从日志中提取关键信息"""
summary = {
'failed_logins': [],
'suspicious_ips': [],
'port_scan_signs': [],
'error_messages': [],
'timeline': []
}
for line in log_lines[-100:]: # 分析最近100行
line = line.strip()
# SSH相关检测
if "Failed password" in line:
ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
ip = ip_match.group(1)
summary['failed_logins'].append(f"SSH失败: {ip}")
if ip not in summary['suspicious_ips']:
summary['suspicious_ips'].append(ip)
# 端口扫描检测
elif "Connection reset by peer" in line or "refused connect" in line.lower():
ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
summary['port_scan_signs'].append(f"端口扫描: {ip_match.group(1)}")
# 其他可疑行为
elif "invalid user" in line.lower() or "authentication failure" in line.lower():
ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line) or re.search(r'rhost=(\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
summary['suspicious_ips'].append(ip_match.group(1))
summary['error_messages'].append(line)
return summary
def ask_ai_for_command(self, log_summary):
"""询问AI获取标准化防御指令"""
prompt = f"""
请分析以下系统日志摘要,并返回一个标准化的防御指令:
日志摘要:
{json.dumps(log_summary, indent=2, ensure_ascii=False)}
请从以下指令中选择最合适的一个返回(只返回指令关键词):
- "高危攻击":检测到严重入侵企图
- "立即封锁"需要立即封锁IP
- "暴力破解":检测到暴力破解攻击
- "端口扫描":检测到端口扫描行为
- "可疑行为":一般可疑活动,需要监控
- "持续监控":无明显威胁,继续监控
- "反向追踪":需要追踪攻击源
- "流量限制"限制该IP的访问频率
- "服务隐藏":隐藏服务端口
- "蜜罐诱捕":设置蜜罐进行反制
同时请分析:
1. 攻击的严重程度
2. 是否需要立即响应
3. 建议的具体防御措施
只返回指令关键词,不要其他内容。
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "你是一个网络安全专家,专门分析日志和提供防御指令。只返回标准化指令关键词。"
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.1,
"max_tokens": 50
}
try:
response = requests.post(self.api_url, headers=headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()
ai_response = result['choices'][0]['message']['content'].strip()
# 提取指令关键词
for command in self.ai_commands.keys():
if command in ai_response:
return command
# 如果没有匹配的指令,返回默认指令
return "持续监控"
except Exception as e:
print(f"AI API调用错误: {e}")
return "持续监控"
def execute_defense_command(self, command, ip_address=None, log_data=None):
"""执行AI指令对应的防御动作"""
print(f"🎯 执行AI指令: {command} | 目标IP: {ip_address}")
if command in self.ai_commands:
# 记录到数据库
self.log_defense_action(ip_address, command, "开始执行")
# 执行对应的防御函数
result = self.ai_commands[command](ip_address, log_data)
# 更新日志
self.log_defense_action(ip_address, command, f"执行完成: {result}")
return result
else:
print(f"未知指令: {command}")
return "未知指令"
def defend_critical_attack(self, ip, log_data):
"""高危攻击响应"""
actions = []
# 1. 立即封锁IP
actions.append(self.block_ip_iptables(ip))
# 2. 记录到黑名单
actions.append(self.add_to_blacklist(ip))
# 3. 发送紧急警报
actions.append(self.send_alert(f"高危攻击检测", f"IP: {ip} 被判定为高危攻击"))
# 4. 收集攻击证据
actions.append(self.collect_evidence(ip))
return " | ".join(actions)
def defend_immediate_block(self, ip, log_data):
"""立即封锁响应"""
return self.block_ip_iptables(ip)
def defend_bruteforce(self, ip, log_data):
"""暴力破解响应"""
actions = []
actions.append(self.block_ip_iptables(ip))
actions.append(self.add_to_blacklist(ip))
actions.append(self.change_ssh_port()) # 更改SSH端口
return " | ".join(actions)
def defend_port_scan(self, ip, log_data):
"""端口扫描响应"""
actions = []
actions.append(self.rate_limit_ip(ip))
actions.append(self.hide_services())
actions.append(self.monitor_ip(ip))
return " | ".join(actions)
def defend_suspicious(self, ip, log_data):
"""可疑行为响应"""
return self.monitor_ip(ip)
def defend_monitor_only(self, ip, log_data):
"""持续监控"""
return "保持监控状态"
def defend_traceback(self, ip, log_data):
"""反向追踪(合法方式)"""
try:
# 使用traceroute进行路径追踪
result = subprocess.run(
f"traceroute -m 10 {ip}",
shell=True, capture_output=True, text=True, timeout=30
)
trace_info = result.stdout[:500] # 只保存前500字符
# 保存追踪结果
with open(f'traceback_{ip}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log', 'w') as f:
f.write(trace_info)
return f"反向追踪完成: {ip}"
except Exception as e:
return f"追踪失败: {e}"
def defend_rate_limit(self, ip, log_data):
"""流量限制"""
try:
# 使用iptables限制连接频率
cmd = f"sudo iptables -A INPUT -s {ip} -m limit --limit 10/minute -j ACCEPT"
subprocess.run(cmd, shell=True, check=True)
cmd = f"sudo iptables -A INPUT -s {ip} -j DROP"
subprocess.run(cmd, shell=True, check=True)
return f"流量限制已设置: {ip}"
except Exception as e:
return f"流量限制失败: {e}"
def defend_service_hide(self, ip, log_data):
"""服务隐藏"""
try:
# 更改SSH端口示例
cmd = "sudo sed -i 's/#Port 22/Port 2222/' /etc/ssh/sshd_config"
subprocess.run(cmd, shell=True, check=True)
subprocess.run("sudo systemctl restart sshd", shell=True, check=True)
return "SSH服务已隐藏到2222端口"
except Exception as e:
return f"服务隐藏失败: {e}"
def defend_honeypot(self, ip, log_data):
"""蜜罐诱捕"""
try:
# 创建简单的蜜罐服务
cmd = "sudo nohup python3 -m http.server 8080 --directory /tmp/ &"
subprocess.run(cmd, shell=True, check=True)
return "蜜罐服务已在8080端口启动"
except Exception as e:
return f"蜜罐设置失败: {e}"
# 具体的防御动作实现
def block_ip_iptables(self, ip):
"""使用iptables封锁IP"""
try:
check_cmd = f"sudo iptables -C INPUT -s {ip} -j DROP 2>/dev/null"
result = subprocess.run(check_cmd, shell=True, capture_output=True)
if result.returncode != 0:
block_cmd = f"sudo iptables -A INPUT -s {ip} -j DROP"
subprocess.run(block_cmd, shell=True, check=True)
return f"IP已封锁: {ip}"
else:
return f"IP已存在封锁规则: {ip}"
except Exception as e:
return f"封锁失败: {e}"
def add_to_blacklist(self, ip):
"""添加到黑名单文件"""
try:
with open('/tmp/ip_blacklist.txt', 'a') as f:
f.write(f"{ip} # Blocked at {datetime.now()}\n")
return "已添加至黑名单"
except Exception as e:
return f"黑名单添加失败: {e}"
def change_ssh_port(self):
"""更改SSH端口"""
return "建议手动更改SSH端口配置"
def hide_services(self):
"""隐藏服务"""
return "服务隐藏策略已执行"
def monitor_ip(self, ip):
"""监控IP"""
return f"开始重点监控: {ip}"
def send_alert(self, title, message):
"""发送警报"""
print(f"🚨 警报: {title} - {message}")
return "警报已发送"
def collect_evidence(self, ip):
"""收集证据"""
try:
# 收集网络连接信息
cmd = f"netstat -an | grep {ip} > /tmp/evidence_{ip}.log"
subprocess.run(cmd, shell=True)
return "证据收集完成"
except:
return "证据收集失败"
def log_defense_action(self, ip, command, action):
"""记录防御动作到数据库"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO defense_logs (timestamp, ip, ai_command, action_taken, threat_level)
VALUES (?, ?, ?, ?, ?)
''', (datetime.now(), ip, command, action, "high" if "封锁" in command else "medium"))
self.conn.commit()
def monitor_loop(self):
"""主监控循环"""
print("🤖 AI驱动防御系统启动...")
print("📋 可用指令:", list(self.ai_commands.keys()))
while True:
try:
# 模拟获取日志(实际使用时替换为真实日志读取)
sample_logs = [
f"{datetime.now()} - Failed password for root from 192.168.1.100",
f"{datetime.now()} - Connection reset by peer from 10.0.0.50",
]
# 分析日志
log_summary = self.get_log_summary(sample_logs)
if log_summary['suspicious_ips']:
print(f"🔍 发现可疑IP: {log_summary['suspicious_ips']}")
# 询问AI获取指令
ai_command = self.ask_ai_for_command(log_summary)
# 对每个可疑IP执行指令
for ip in log_summary['suspicious_ips']:
result = self.execute_defense_command(ai_command, ip, log_summary)
print(f"✅ 执行结果: {result}")
time.sleep(30) # 每30秒检查一次
except KeyboardInterrupt:
print("\n🛑 防御系统已停止")
break
except Exception as e:
print(f"❌ 监控错误: {e}")
time.sleep(30)
def main():
API_KEY = "您的DeepSeek_API_Key"
# 检查权限
try:
subprocess.run(['sudo', 'iptables', '-L'], capture_output=True)
print("✅ 具备防御操作权限")
except:
print("⚠️ 需要root权限执行防御动作")
defense_system = sk-61d6716fe6b2452d94a0cee3bc5c4e2e
defense_system.monitor_loop()
if __name__ == "__main__":
main()