1 changed files with 269 additions and 0 deletions
@ -0,0 +1,269 @@ |
|||
#!/usr/bin/python3 |
|||
# -- coding: utf-8 -- |
|||
# ------------------------------- |
|||
# @Author : github@limoruirui https://github.com/limoruirui |
|||
# @Time : 2022/8/10 13:23 |
|||
# ------------------------------- |
|||
# cron "30 9 * * *" script-path=xxx.py,tag=匹配cron用 |
|||
# const $ = new Env('某通阅读'); |
|||
""" |
|||
联通app抽奖 入口:联通app 搜索 阅读专区 进入话费派送中 |
|||
1. 脚本仅供学习交流使用, 请在下载后24h内删除 |
|||
2. 需要第三方库 pycryptodome 支持 命令行安装 pip3 install pycryptodome或者根据自己环境自行安装 |
|||
3. 环境变量说明 PHONE_NUM(必需) UNICOM_LOTTER(选填) UNICOM_USERAGENT(选填) 自行新建环境变量添加 |
|||
PHONE_NUM 为你的手机号 |
|||
UNICOM_LOTTER 默认自动抽奖, 若不需要 则添加环境变量值为 False |
|||
UNICOM_USERAGENT 联通app的useragent 随便一个数据包的请求头里应该都有 建议自己抓一个填上 不填也能跑 数据内容参考 line 44 双引号的内容 |
|||
推送通知的变量同青龙 只写了tgbot(支持反代api)和pushplus |
|||
""" |
|||
"""updateTime: 2023.1.1 log: 更新aes加密key |
|||
updateTime: 2022.12.1 log: 活动重新上架 改用 pycryptodome 替代 cryptography 进行aes加密 |
|||
updateTime: 2022.9.1 log: 每个月活动id改变更新 |
|||
""" |
|||
|
|||
from requests import post, get |
|||
from time import sleep, time |
|||
from datetime import datetime |
|||
from hashlib import md5 as md5Encode |
|||
from random import randint, uniform, choice |
|||
from os import environ |
|||
from sys import stdout, exit |
|||
from base64 import b64encode |
|||
from json import dumps |
|||
|
|||
from tools.encrypt_symmetric import Crypt |
|||
from tools.send_msg import push |
|||
from tools.tool import get_environ, random_sleep |
|||
# random_sleep(0, 1600) |
|||
|
|||
|
|||
"""主类""" |
|||
class China_Unicom: |
|||
def __init__(self, phone_num): |
|||
self.phone_num = phone_num |
|||
default_ua = f"Mozilla/5.0 (Linux; Android {randint(8, 13)}; SM-S908U Build/TP1A.220810.014; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/{randint(95, 108)}.0.5359.128 Mobile Safari/537.36; unicom{{version:android@9.0{randint(0,6)}00,desmobile:{self.phone_num}}};devicetype{{deviceBrand:,deviceModel:}};{{yw_code:}}" |
|||
run_ua = get_environ(key="UNICOM_USERAGENT", default=default_ua, output=False) |
|||
self.headers = { |
|||
"Host": "10010.woread.com.cn", |
|||
"Accept": "application/json, text/plain, */*", |
|||
"Accept-Language": "zh-CN,zh-Hans;q=0.9", |
|||
"Accept-Encoding": "gzip, deflate, br", |
|||
"Content-Type": "application/json;charset=utf-8", |
|||
"Origin": "https://10010.woread.com.cn", |
|||
"User-Agent": run_ua, |
|||
"Connection": "keep-alive", |
|||
"Referer": "https://10010.woread.com.cn/ng_woread/", |
|||
} |
|||
self.fail_num = 0 |
|||
|
|||
def timestamp(self): |
|||
return round(time() * 1000) |
|||
|
|||
def print_now(self, content): |
|||
print(content) |
|||
stdout.flush() |
|||
|
|||
def md5(self, str): |
|||
m = md5Encode(str.encode(encoding='utf-8')) |
|||
return m.hexdigest() |
|||
|
|||
def req(self, url, crypt_text, retry_num=5): |
|||
while retry_num > 0: |
|||
body = { |
|||
"sign": b64encode(Crypt(crypt_type="AES", key="update!@#1234567", iv="16-Bytes--String", mode="CBC").encrypt(crypt_text).encode()).decode() |
|||
} |
|||
self.headers["Content-Length"] = str(len(dumps(body).replace(" ", ""))) |
|||
try: |
|||
res = post(url, headers=self.headers, json=body) |
|||
data = res.json() |
|||
return data |
|||
except Exception as e: |
|||
print(f"本次请求失败, 正在重新发送请求 剩余次数{retry_num}") |
|||
print(f"本次请求失败原因------{e}") |
|||
retry_num -= 1 |
|||
sleep(5) |
|||
return self.req(url, crypt_text, retry_num) |
|||
|
|||
def referer_login(self): |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
timestamp = self.timestamp() |
|||
url = f"https://10010.woread.com.cn/ng_woread_service/rest/app/auth/10000002/{timestamp}/{self.md5(f'100000027k1HcDL8RKvc{timestamp}')}" |
|||
crypt_text = f'{{"timestamp":"{date}"}}' |
|||
body = { |
|||
"sign": b64encode(Crypt(crypt_type="AES", key="1234567890abcdef").encrypt(crypt_text).encode()).decode() |
|||
} |
|||
self.headers["Content-Length"] = str(len(str(body)) - 1) |
|||
data = post(url, headers=self.headers, json=body).json() |
|||
if data["code"] == "0000": |
|||
self.headers["accesstoken"] = data["data"]["accesstoken"] |
|||
else: |
|||
self.print_now(f"设备登录失败,日志为{data}") |
|||
exit(0) |
|||
|
|||
def get_userinfo(self): |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/account/login" |
|||
crypt_text = f'{{"phone":"{self.phone_num}","timestamp":"{date}"}}' |
|||
data = self.req(url, crypt_text) |
|||
if data["code"] == "0000": |
|||
self.userinfo = data["data"] |
|||
else: |
|||
self.print_now(f"手机号登录失败, 日志为{data}") |
|||
exit(0) |
|||
|
|||
def watch_video(self): |
|||
self.print_now("看广告获取积分任务: ") |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/yearEnd/obtainScoreByAd" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"value":"947728124","timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
for i in range(3): |
|||
data = self.req(url, crypt_text) |
|||
self.print_now(data) |
|||
if self.fail_num == 3: |
|||
self.print_now("当前任务出现异常 且错误次数达到3次 请手动检查") |
|||
push("某通阅读", "当前任务出现异常 且错误次数达到3次 请手动检查") |
|||
exit(0) |
|||
if data["code"] == "9999": |
|||
self.print_now("当前任务出现异常 正在重新执行") |
|||
self.fail_num += 1 |
|||
self.main() |
|||
return False |
|||
sleep(uniform(2, 8)) |
|||
return True |
|||
|
|||
def read_novel(self): |
|||
self.get_cardid() |
|||
self.get_cntindex() |
|||
self.get_chapterallindex() |
|||
self.print_now(f"{self.phone_num} :正在执行观看300次小说, 此过程较久, 最大时长为300 * 8s = 40min") |
|||
for i in range(300): |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
chapterAllIndex = choice(self.chapterallindex_list) |
|||
url = f"https://10010.woread.com.cn/ng_woread_service/rest/cnt/wordsDetail?catid={self.catid}&pageIndex={self.pageIndex}&cardid={randint(10000, 99999)}&cntindex={self.cntindex}&chapterallindex={chapterAllIndex}&chapterseno=3" |
|||
crypt_text = f'{{"chapterAllIndex":{chapterAllIndex},"cntIndex":{self.cntindex},"cntTypeFlag":"1","timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
if self.fail_num == 3: |
|||
self.print_now("当前任务出现异常 且错误次数达到3次 请手动检查") |
|||
push("某通阅读", "阅读任务出现异常 且错误次数达到3次 请手动检查") |
|||
exit(0) |
|||
if data.get("code") != "0000": |
|||
self.print_now("阅读小说发生异常, 正在重新登录执行, 接口返回") |
|||
self.print_now(data) |
|||
return self.main() |
|||
sleep(uniform(2, 4)) |
|||
|
|||
def query_score(self): |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/yearEnd/queryUserScore" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"activeIndex":{self.activeIndex},"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
total_score = data["data"]["validScore"] |
|||
self.lotter_num = int(total_score / 50) |
|||
self.print_now(f"{self.phone_num} 当前有积分{total_score}, 可以抽奖{self.lotter_num}次") |
|||
|
|||
def get_activetion_id(self): |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/yearEnd/queryActiveInfo" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
if data["code"] == "0000": |
|||
self.activeIndex = data["data"]["activeindex"] |
|||
else: |
|||
self.print_now(f"活动id获取失败 将影响抽奖和查询积分") |
|||
def get_cardid(self): |
|||
""" |
|||
获取cardid |
|||
:return: |
|||
""" |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/basics/getIntellectRecommend" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"cntsize":8,"recommendsize":5,"recommendid":0,"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
# print(data) |
|||
self.pageIndex = data["data"]["recommendindex"] if "recommendindex" in data["data"] else "10725" |
|||
self.cardid = data["data"]["catindex"] if "catindex" in data["data"] else "119056" |
|||
def get_cntindex(self): |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/basics/recommposdetail/12279" |
|||
self.headers.pop("Content-Length", "no") |
|||
data = get(url, headers=self.headers).json() |
|||
booklist = data["data"]["booklist"]["message"] |
|||
book_num = len(booklist) |
|||
self.catid = booklist[0]["catindex"] if "catindex" in booklist[0] else "119411" |
|||
self.cntindex = booklist[randint(0, book_num - 1)]["cntindex"] |
|||
def get_chapterallindex(self): |
|||
url = f"https://10010.woread.com.cn/ng_woread_service/rest/cnt/chalist?catid=119411&pageIndex=10725&cardid=12279&cntindex={self.cntindex}" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"curPage":1,"limit":30,"index":"{self.cntindex}","sort":0,"finishFlag":1,"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
chapterallindexlist = data["list"][0]["charptercontent"] |
|||
chapterallindex_num = len(chapterallindexlist) |
|||
self.chapterallindex_list = [0] * chapterallindex_num |
|||
i = 0 |
|||
while i < chapterallindex_num: |
|||
self.chapterallindex_list[i] = chapterallindexlist[i]["chapterallindex"] |
|||
i += 1 |
|||
def lotter(self): |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/yearEnd/handleDrawLottery" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"activeIndex":{self.activeIndex},"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
if data["code"] == "0000": |
|||
self.print_now(f"抽奖成功, 获得{data['data']['prizename']}") |
|||
else: |
|||
self.print_now(f"抽奖失败, 日志为{data}") |
|||
|
|||
def watch_ad(self): |
|||
self.print_now("观看广告得话费红包: ") |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/userTakeActive" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"activeIndex":6880,"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
self.print_now(data) |
|||
|
|||
def exchange(self): |
|||
# ticketValue activeid来自于https://10010.woread.com.cn/ng_woread_service/rest/phone/vouchers/getSysConfig get请求 |
|||
# {"ticketValue":"300","activeid":"61yd210901","timestamp":"20220816213709","token":"","userId":"","userIndex":,"userAccount":"","verifyCode":""} |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/phone/vouchers/exchange" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"ticketValue":"300","activeid":"61yd210901","timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
print(data) |
|||
|
|||
def query_red(self): |
|||
url = "https://10010.woread.com.cn/ng_woread_service/rest/phone/vouchers/queryTicketAccount" |
|||
date = datetime.today().__format__("%Y%m%d%H%M%S") |
|||
crypt_text = f'{{"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}' |
|||
data = self.req(url, crypt_text) |
|||
if data["code"] == "0000": |
|||
can_use_red = data["data"]["usableNum"] / 100 |
|||
if can_use_red >= 3: |
|||
self.print_now(f"查询成功 {self.phone_num} 当前有话费红包{can_use_red} 可以去兑换了") |
|||
push("某通阅读", f"查询成功 {self.phone_num} 当前有话费红包{can_use_red} 可以去兑换了") |
|||
else: |
|||
self.print_now(f"查询成功 {self.phone_num} 当前有话费红包{can_use_red} 不足兑换的最低额度") |
|||
push("某通阅读", f"查询成功 {self.phone_num} 当前有话费红包{can_use_red} 不足兑换的最低额度") |
|||
|
|||
def main(self): |
|||
self.referer_login() |
|||
self.get_userinfo() |
|||
if not self.watch_video(): |
|||
return |
|||
self.get_activetion_id() |
|||
self.read_novel() |
|||
self.query_score() |
|||
self.watch_ad() |
|||
if unicom_lotter: |
|||
for i in range(self.lotter_num): |
|||
self.lotter() |
|||
sleep(2) |
|||
self.query_score() |
|||
self.query_red() |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
"""读取环境变量 get_environ("PHONE_NUM")""" |
|||
phone_num ="13072885408" |
|||
unicom_lotter = get_environ("UNICOM_LOTTER", default=True) |
|||
if phone_num == "": |
|||
exit(0) |
|||
China_Unicom(phone_num).main() |
Loading…
Reference in new issue