Create ai

This commit is contained in:
2025-11-03 17:28:39 +08:00
committed by GitHub
parent fda332eb3e
commit 11dde135a7

391
ai Normal file
View File

@@ -0,0 +1,391 @@
#!/usr/bin/env python3
"""
AI驱动智能防御系统 - 标准化指令响应
"""
import requests
import time
import re
import json
import subprocess
from datetime import datetime
import threading
import sqlite3
import os
class AIDrivenDefenseSystem:
def __init__(self, api_key, log_file_path="/var/log/auth.log"):
self.api_key = api_key
self.api_url = "https://api.deepseek.com/v1/chat/completions"
self.log_file_path = log_file_path
self.last_position = 0
# AI指令映射表
self.ai_commands = {
# 监控指令
"高危攻击": self.defend_critical_attack,
"立即封锁": self.defend_immediate_block,
"暴力破解": self.defend_bruteforce,
"端口扫描": self.defend_port_scan,
"可疑行为": self.defend_suspicious,
"持续监控": self.defend_monitor_only,
# 反击指令(合法范围内)
"反向追踪": self.defend_traceback,
"流量限制": self.defend_rate_limit,
"服务隐藏": self.defend_service_hide,
"蜜罐诱捕": self.defend_honeypot,
}
# 初始化数据库
self.init_database()
def init_database(self):
"""初始化防御数据库"""
self.conn = sqlite3.connect('defense_actions.db', check_same_thread=False)
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS defense_logs (
id INTEGER PRIMARY KEY,
timestamp TIMESTAMP,
ip TEXT,
ai_command TEXT,
action_taken TEXT,
threat_level TEXT
)
''')
self.conn.commit()
def get_log_summary(self, log_lines):
"""从日志中提取关键信息"""
summary = {
'failed_logins': [],
'suspicious_ips': [],
'port_scan_signs': [],
'error_messages': [],
'timeline': []
}
for line in log_lines[-100:]: # 分析最近100行
line = line.strip()
# SSH相关检测
if "Failed password" in line:
ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
ip = ip_match.group(1)
summary['failed_logins'].append(f"SSH失败: {ip}")
if ip not in summary['suspicious_ips']:
summary['suspicious_ips'].append(ip)
# 端口扫描检测
elif "Connection reset by peer" in line or "refused connect" in line.lower():
ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
summary['port_scan_signs'].append(f"端口扫描: {ip_match.group(1)}")
# 其他可疑行为
elif "invalid user" in line.lower() or "authentication failure" in line.lower():
ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line) or re.search(r'rhost=(\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
summary['suspicious_ips'].append(ip_match.group(1))
summary['error_messages'].append(line)
return summary
def ask_ai_for_command(self, log_summary):
"""询问AI获取标准化防御指令"""
prompt = f"""
请分析以下系统日志摘要,并返回一个标准化的防御指令:
日志摘要:
{json.dumps(log_summary, indent=2, ensure_ascii=False)}
请从以下指令中选择最合适的一个返回(只返回指令关键词):
- "高危攻击":检测到严重入侵企图
- "立即封锁"需要立即封锁IP
- "暴力破解":检测到暴力破解攻击
- "端口扫描":检测到端口扫描行为
- "可疑行为":一般可疑活动,需要监控
- "持续监控":无明显威胁,继续监控
- "反向追踪":需要追踪攻击源
- "流量限制"限制该IP的访问频率
- "服务隐藏":隐藏服务端口
- "蜜罐诱捕":设置蜜罐进行反制
同时请分析:
1. 攻击的严重程度
2. 是否需要立即响应
3. 建议的具体防御措施
只返回指令关键词,不要其他内容。
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "你是一个网络安全专家,专门分析日志和提供防御指令。只返回标准化指令关键词。"
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.1,
"max_tokens": 50
}
try:
response = requests.post(self.api_url, headers=headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()
ai_response = result['choices'][0]['message']['content'].strip()
# 提取指令关键词
for command in self.ai_commands.keys():
if command in ai_response:
return command
# 如果没有匹配的指令,返回默认指令
return "持续监控"
except Exception as e:
print(f"AI API调用错误: {e}")
return "持续监控"
def execute_defense_command(self, command, ip_address=None, log_data=None):
"""执行AI指令对应的防御动作"""
print(f"🎯 执行AI指令: {command} | 目标IP: {ip_address}")
if command in self.ai_commands:
# 记录到数据库
self.log_defense_action(ip_address, command, "开始执行")
# 执行对应的防御函数
result = self.ai_commands[command](ip_address, log_data)
# 更新日志
self.log_defense_action(ip_address, command, f"执行完成: {result}")
return result
else:
print(f"未知指令: {command}")
return "未知指令"
def defend_critical_attack(self, ip, log_data):
"""高危攻击响应"""
actions = []
# 1. 立即封锁IP
actions.append(self.block_ip_iptables(ip))
# 2. 记录到黑名单
actions.append(self.add_to_blacklist(ip))
# 3. 发送紧急警报
actions.append(self.send_alert(f"高危攻击检测", f"IP: {ip} 被判定为高危攻击"))
# 4. 收集攻击证据
actions.append(self.collect_evidence(ip))
return " | ".join(actions)
def defend_immediate_block(self, ip, log_data):
"""立即封锁响应"""
return self.block_ip_iptables(ip)
def defend_bruteforce(self, ip, log_data):
"""暴力破解响应"""
actions = []
actions.append(self.block_ip_iptables(ip))
actions.append(self.add_to_blacklist(ip))
actions.append(self.change_ssh_port()) # 更改SSH端口
return " | ".join(actions)
def defend_port_scan(self, ip, log_data):
"""端口扫描响应"""
actions = []
actions.append(self.rate_limit_ip(ip))
actions.append(self.hide_services())
actions.append(self.monitor_ip(ip))
return " | ".join(actions)
def defend_suspicious(self, ip, log_data):
"""可疑行为响应"""
return self.monitor_ip(ip)
def defend_monitor_only(self, ip, log_data):
"""持续监控"""
return "保持监控状态"
def defend_traceback(self, ip, log_data):
"""反向追踪(合法方式)"""
try:
# 使用traceroute进行路径追踪
result = subprocess.run(
f"traceroute -m 10 {ip}",
shell=True, capture_output=True, text=True, timeout=30
)
trace_info = result.stdout[:500] # 只保存前500字符
# 保存追踪结果
with open(f'traceback_{ip}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log', 'w') as f:
f.write(trace_info)
return f"反向追踪完成: {ip}"
except Exception as e:
return f"追踪失败: {e}"
def defend_rate_limit(self, ip, log_data):
"""流量限制"""
try:
# 使用iptables限制连接频率
cmd = f"sudo iptables -A INPUT -s {ip} -m limit --limit 10/minute -j ACCEPT"
subprocess.run(cmd, shell=True, check=True)
cmd = f"sudo iptables -A INPUT -s {ip} -j DROP"
subprocess.run(cmd, shell=True, check=True)
return f"流量限制已设置: {ip}"
except Exception as e:
return f"流量限制失败: {e}"
def defend_service_hide(self, ip, log_data):
"""服务隐藏"""
try:
# 更改SSH端口示例
cmd = "sudo sed -i 's/#Port 22/Port 2222/' /etc/ssh/sshd_config"
subprocess.run(cmd, shell=True, check=True)
subprocess.run("sudo systemctl restart sshd", shell=True, check=True)
return "SSH服务已隐藏到2222端口"
except Exception as e:
return f"服务隐藏失败: {e}"
def defend_honeypot(self, ip, log_data):
"""蜜罐诱捕"""
try:
# 创建简单的蜜罐服务
cmd = "sudo nohup python3 -m http.server 8080 --directory /tmp/ &"
subprocess.run(cmd, shell=True, check=True)
return "蜜罐服务已在8080端口启动"
except Exception as e:
return f"蜜罐设置失败: {e}"
# 具体的防御动作实现
def block_ip_iptables(self, ip):
"""使用iptables封锁IP"""
try:
check_cmd = f"sudo iptables -C INPUT -s {ip} -j DROP 2>/dev/null"
result = subprocess.run(check_cmd, shell=True, capture_output=True)
if result.returncode != 0:
block_cmd = f"sudo iptables -A INPUT -s {ip} -j DROP"
subprocess.run(block_cmd, shell=True, check=True)
return f"IP已封锁: {ip}"
else:
return f"IP已存在封锁规则: {ip}"
except Exception as e:
return f"封锁失败: {e}"
def add_to_blacklist(self, ip):
"""添加到黑名单文件"""
try:
with open('/tmp/ip_blacklist.txt', 'a') as f:
f.write(f"{ip} # Blocked at {datetime.now()}\n")
return "已添加至黑名单"
except Exception as e:
return f"黑名单添加失败: {e}"
def change_ssh_port(self):
"""更改SSH端口"""
return "建议手动更改SSH端口配置"
def hide_services(self):
"""隐藏服务"""
return "服务隐藏策略已执行"
def monitor_ip(self, ip):
"""监控IP"""
return f"开始重点监控: {ip}"
def send_alert(self, title, message):
"""发送警报"""
print(f"🚨 警报: {title} - {message}")
return "警报已发送"
def collect_evidence(self, ip):
"""收集证据"""
try:
# 收集网络连接信息
cmd = f"netstat -an | grep {ip} > /tmp/evidence_{ip}.log"
subprocess.run(cmd, shell=True)
return "证据收集完成"
except:
return "证据收集失败"
def log_defense_action(self, ip, command, action):
"""记录防御动作到数据库"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO defense_logs (timestamp, ip, ai_command, action_taken, threat_level)
VALUES (?, ?, ?, ?, ?)
''', (datetime.now(), ip, command, action, "high" if "封锁" in command else "medium"))
self.conn.commit()
def monitor_loop(self):
"""主监控循环"""
print("🤖 AI驱动防御系统启动...")
print("📋 可用指令:", list(self.ai_commands.keys()))
while True:
try:
# 模拟获取日志(实际使用时替换为真实日志读取)
sample_logs = [
f"{datetime.now()} - Failed password for root from 192.168.1.100",
f"{datetime.now()} - Connection reset by peer from 10.0.0.50",
]
# 分析日志
log_summary = self.get_log_summary(sample_logs)
if log_summary['suspicious_ips']:
print(f"🔍 发现可疑IP: {log_summary['suspicious_ips']}")
# 询问AI获取指令
ai_command = self.ask_ai_for_command(log_summary)
# 对每个可疑IP执行指令
for ip in log_summary['suspicious_ips']:
result = self.execute_defense_command(ai_command, ip, log_summary)
print(f"✅ 执行结果: {result}")
time.sleep(30) # 每30秒检查一次
except KeyboardInterrupt:
print("\n🛑 防御系统已停止")
break
except Exception as e:
print(f"❌ 监控错误: {e}")
time.sleep(30)
def main():
API_KEY = "您的DeepSeek_API_Key"
# 检查权限
try:
subprocess.run(['sudo', 'iptables', '-L'], capture_output=True)
print("✅ 具备防御操作权限")
except:
print("⚠️ 需要root权限执行防御动作")
defense_system = AIDrivenDefenseSystem(API_KEY)
defense_system.monitor_loop()
if __name__ == "__main__":
main()