可通过以下命令方式查看:
sudo cat /var/log/auth.log | grep sshd # 过滤出SSH相关日志
实时监控日志
sudo tail -f /var/log/auth.log | grep sshd # 实时输出最新SSH日志
分析登录失败记录
sudo grep "Failed password" /var/log/auth.log # 提取登录失败信息
import re
from collections import defaultdict
from datetime import datetime
# 日志文件路径(根据实际情况调整)
LOG_FILE = 'auth.log'
def parse_log_line(line):
"""
解析日志行,提取日期和相关信息
"""
# 匹配日期部分 (Aug 10 00:05:01)
date_match = re.match(r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', line)
if date_match:
date_str = date_match.group(1)
# 添加年份以便解析
date_str_with_year = f"{datetime.now().year} {date_str}"
try:
date_obj = datetime.strptime(date_str_with_year, '%Y %b %d %H:%M:%S')
return date_obj, line
except ValueError:
pass
return None, line
def analyze_ssh_log():
"""
分析SSH登录日志,统计成功和失败的登录尝试
"""
# 存储每天的统计数据
daily_stats = defaultdict(lambda: {
'successful_logins': 0,
'failed_logins': 0,
'failed_ips': defaultdict(int),
'successful_ips': defaultdict(int),
'failed_users': defaultdict(int)
})
# 存储总统计数据
total_stats = {
'successful_logins': 0,
'failed_logins': 0,
'unique_ips': set(),
'unique_failed_ips': set()
}
try:
with open(LOG_FILE, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
date_obj, log_line = parse_log_line(line.strip())
if not date_obj:
continue
date_key = date_obj.strftime('%Y-%m-%d')
# 检查登录成功的模式
# 1. 公钥认证成功
if 'Accepted publickey' in log_line:
total_stats['successful_logins'] += 1
daily_stats[date_key]['successful_logins'] += 1
# 提取IP地址
ip_match = re.search(r'from ([\d\.]+) port', log_line)
if ip_match:
ip = ip_match.group(1)
daily_stats[date_key]['successful_ips'][ip] += 1
total_stats['unique_ips'].add(ip)
# 2. session opened for user (非CRON)
elif 'sshd:session' in log_line and 'session opened for user' in log_line:
total_stats['successful_logins'] += 1
daily_stats[date_key]['successful_logins'] += 1
# 提取IP地址
ip_match = re.search(r'from ([\d\.]+) port', log_line)
if ip_match:
ip = ip_match.group(1)
daily_stats[date_key]['successful_ips'][ip] += 1
total_stats['unique_ips'].add(ip)
# 检查登录失败的模式
elif 'Failed password' in log_line:
total_stats['failed_logins'] += 1
daily_stats[date_key]['failed_logins'] += 1
# 提取IP地址
ip_match = re.search(r'from ([\d\.]+) port', log_line)
if ip_match:
ip = ip_match.group(1)
daily_stats[date_key]['failed_ips'][ip] += 1
total_stats['unique_failed_ips'].add(ip)
# 提取用户名
user_match = re.search(r'Failed password for (?:invalid user )?(\S+) from', log_line)
if user_match:
user = user_match.group(1)
daily_stats[date_key]['failed_users'][user] += 1
# 检查无效用户登录尝试
elif 'Invalid user' in log_line:
total_stats['failed_logins'] += 1
daily_stats[date_key]['failed_logins'] += 1
# 提取IP地址
ip_match = re.search(r'from ([\d\.]+) port', log_line)
if ip_match:
ip = ip_match.group(1)
daily_stats[date_key]['failed_ips'][ip] += 1
total_stats['unique_failed_ips'].add(ip)
except FileNotFoundError:
print(f"错误: 找不到日志文件 {LOG_FILE}")
return
except Exception as e:
print(f"处理日志文件时出错: {e}")
return
# 输出统计结果
print("=" * 70)
print("SSH 登录统计分析报告")
print("=" * 70)
print(f"总成功登录次数: {total_stats['successful_logins']}")
print(f"总失败登录次数: {total_stats['failed_logins']}")
print(f"唯一成功登录IP数: {len(total_stats['unique_ips'])}")
print(f"唯一失败登录IP数: {len(total_stats['unique_failed_ips'])}")
print()
# 按日期排序输出每日统计
print("每日登录统计:")
print("-" * 70)
print(f"{'日期':<12} {'成功登录':<10} {'失败登录':<10} {'成功率':<10} {'失败率':<10}")
print("-" * 70)
sorted_dates = sorted(daily_stats.keys())
for date in sorted_dates:
stats = daily_stats[date]
total_attempts = stats['successful_logins'] + stats['failed_logins']
success_rate = (stats['successful_logins'] / total_attempts * 100) if total_attempts > 0 else 0
failure_rate = (stats['failed_logins'] / total_attempts * 100) if total_attempts > 0 else 0
print(f"{date:<12} {stats['successful_logins']:<10} {stats['failed_logins']:<10} "
f"{success_rate:<9.1f}% {failure_rate:<9.1f}%")
# 输出最近几天的详细信息
print("\n最近几天的详细信息:")
print("-" * 70)
for date in sorted(daily_stats.keys(), reverse=True)[:5]: # 显示最近5天
stats = daily_stats[date]
print(f"\n{date} 详细信息:")
print(f" 成功登录: {stats['successful_logins']}")
print(f" 失败登录: {stats['failed_logins']}")
if stats['failed_users']:
print(" 最常见的失败登录用户:")
sorted_users = sorted(stats['failed_users'].items(), key=lambda x: x[1], reverse=True)[:5]
for user, count in sorted_users:
print(f" {user}: {count} 次")
if stats['failed_ips']:
print(" 最常见的失败登录IP:")
sorted_ips = sorted(stats['failed_ips'].items(), key=lambda x: x[1], reverse=True)[:5]
for ip, count in sorted_ips:
print(f" {ip}: {count} 次")
if stats['successful_ips']:
print(" 成功登录IP:")
sorted_succ_ips = sorted(stats['successful_ips'].items(), key=lambda x: x[1], reverse=True)[:5]
for ip, count in sorted_succ_ips:
print(f" {ip}: {count} 次")
# 输出安全建议
print("\n安全建议:")
print("-" * 70)
if total_stats['failed_logins'] > total_stats['successful_logins'] * 2:
print(" ⚠️ 失败登录次数远超成功登录次数,可能正在遭受暴力破解攻击")
all_failed_ips = set()
for date in daily_stats:
all_failed_ips.update(daily_stats[date]['failed_ips'].keys())
if len(all_failed_ips) > 50:
print(" ⚠️ 检测到来自大量不同IP的登录尝试,可能正在遭受分布式攻击")
# 检查是否有特别频繁的IP
ip_counts = defaultdict(int)
for date in daily_stats:
for ip, count in daily_stats[date]['failed_ips'].items():
ip_counts[ip] += count
frequent_ips = [ip for ip, count in ip_counts.items() if count > 100]
if frequent_ips:
print(f" ⚠️ 检测到来自以下IP的频繁登录尝试 (>100次): {', '.join(frequent_ips)}")
if __name__ == "__main__":
analyze_ssh_log()
西风骑士团「火花骑士」,可莉,前来报到。呃,后面该说什么词来着?可莉背不下来啦。
早啊,带可莉出去玩吧,我们一起来冒险。