本文共534字,预计阅读需要2分钟

今日阅读:

今日软件:

Nitter

讲一下Nitter的部署,因为现在部署还是有一些坑在里面的。

推荐使用Nim编译的方式,因为现在一些特殊原因需要用到一个guest_accounts分支。

首先去Nim官网找一键安装脚本,输入之后同意选项即可自动安装,这里要记得把nimble指令设置到bash环境变量里。

image1

之后就是克隆项目开始编译:

git clone <https://github.com/zedeus/nitter>  
cd /nitter  
git checkout guest_accounts  
nimble build -d:release  
nimble scss  
nimble md  
cp nitter.example.conf nitter.conf

这里记得在conf里面设置好服务运行端口和proxy代理信息(国内搭建必需)。

然后新建一个bash脚本用来获取访客token,这个token的获取方法是逆向了APP的端口

#!/bin/bash
 
guest_token=$(curl -s -XPOST https://api.twitter.com/1.1/guest/activate.json -H 'Authorization: Bearer AAAAAAAAAAAAAAAAAAAAAFXzAwAAAAAAMHCxpeSDG1gLNLghVe8d74hl6k4%3DRUMF4xAQLsbeBhTSRrCiQpJtxoGWeyHrDb5te2jpGskWDFW82F' | jq -r '.guest_token')
 
flow_token=$(curl -s -XPOST 'https://api.twitter.com/1.1/onboarding/task.json?flow_name=welcome' \
            -H 'Authorization: Bearer AAAAAAAAAAAAAAAAAAAAAFXzAwAAAAAAMHCxpeSDG1gLNLghVe8d74hl6k4%3DRUMF4xAQLsbeBhTSRrCiQpJtxoGWeyHrDb5te2jpGskWDFW82F' \
            -H 'Content-Type: application/json' \
            -H "User-Agent: TwitterAndroid/10.10.0" \
            -H "X-Guest-Token: ${guest_token}" \
            -d '{"flow_token":null,"input_flow_data":{"flow_context":{"start_location":{"location":"splash_screen"}}}}' | jq -r .flow_token)
 
curl -s -XPOST 'https://api.twitter.com/1.1/onboarding/task.json' \
            -H 'Authorization: Bearer AAAAAAAAAAAAAAAAAAAAAFXzAwAAAAAAMHCxpeSDG1gLNLghVe8d74hl6k4%3DRUMF4xAQLsbeBhTSRrCiQpJtxoGWeyHrDb5te2jpGskWDFW82F' \
            -H 'Content-Type: application/json' \
            -H "User-Agent: TwitterAndroid/10.10.0" \
            -H "X-Guest-Token: ${guest_token}" \
            -d "{\"flow_token\":\"${flow_token}\",\"subtask_inputs\":[{\"open_link\":{\"link\":\"next_link\"},\"subtask_id\":\"NextTaskOpenLink\"}]}" | jq -c -r '.subtasks[0]|if(.open_account) then {oauth_token: .open_account.oauth_token, oauth_token_secret: .open_account.oauth_token_secret} else empty end'
chmod +x get_guest_accounts.sh  
./get_guest_accounts.sh > guest_accounts.jsonl  
./nitter

这样就成功运行起实例。

按照官方的描述,每15分钟只能请求500次,超过了就会被屏蔽掉。

image2

今日代码:

根据nitter写的一个靠RSS自动获取媒体类推文内容的脚本,并不完美,后续慢慢优化。

说起来,我之前有关注过一个叫ntscraper的项目,又撞了创意。

twitter_rss.py
import requests
import os
import re
import sqlite3
from concurrent.futures import ThreadPoolExecutor
import time
import feedparser
 
# 代理设置
proxies = {
    'http': 'http://localhost:7890',
    'https': 'http://localhost:7890'
}
 
# 检查推文是否已经下载
def is_tweet_downloaded(tweet_id, db_path):
    connection = sqlite3.connect(db_path)
    cursor = connection.cursor()
    cursor.execute("SELECT COUNT(1) FROM tweets WHERE tweet_id = ?", (tweet_id,))
    exists = cursor.fetchone()[0] > 0
    connection.close()
    return exists
 
# 解析RSS订阅并提取推文信息
def parse_rss(url, db_path):
    try:
        # 使用 requests 获取 RSS 数据,应用代理
        response = requests.get(url, proxies=proxies)
        response.raise_for_status()  # 确保请求成功
 
        # 使用 feedparser 解析从 requests 获取的数据
        feed = feedparser.parse(response.content)
        tweets = []
        for entry in feed.entries:
            tweet_id = entry.id.split('/')[-1].split('#')[0]
            # 检查推文是否已下载
            if not is_tweet_downloaded(tweet_id, db_path):
                media_urls = re.findall(r'src="(https://[^"]+)"', entry.description)
                tweets.append({
                    'tweet_id': tweet_id,
                    'creator': entry.author,
                    'pubDate': entry.published,
                    'media_urls': media_urls,
                    'content': entry.title
                })
        return tweets
    except requests.exceptions.RequestException as e:
        print(f"Error fetching RSS feed for {url}: {e}")
        return []
 
# 下载媒体文件和保存推文内容
def download_and_save(tweet, base_dir):
    account_dir = os.path.join(base_dir, tweet['creator'])
    tweet_dir = os.path.join(account_dir, tweet['tweet_id'])
    os.makedirs(tweet_dir, exist_ok=True)
 
    for i, url in enumerate(tweet['media_urls']):
        response = requests.get(url, proxies=proxies)
        if response.status_code == 200:
            media_path = os.path.join(tweet_dir, f"{tweet['tweet_id']}_img{i+1}.jpg")
            with open(media_path, 'wb') as file:
                file.write(response.content)
 
    text_path = os.path.join(tweet_dir, f"{tweet['tweet_id']}_txt.txt")
    with open(text_path, 'w', encoding='utf-8') as file:
        file.write(tweet['content'])
 
    return tweet['tweet_id']
 
# 记录保存的推文信息到数据库
def log_tweet_info(db_path, tweet, saved_id):
    connection = sqlite3.connect(db_path)
    cursor = connection.cursor()
    cursor.execute('''CREATE TABLE IF NOT EXISTS tweets 
                        (tweet_id TEXT PRIMARY KEY, creator TEXT, pub_date TEXT, saved_id TEXT)''')
    cursor.execute('''INSERT OR REPLACE INTO tweets 
                        (tweet_id, creator, pub_date, saved_id) VALUES (?, ?, ?, ?)''',
                    (tweet['tweet_id'], tweet['creator'], tweet['pubDate'], saved_id))
    connection.commit()
    connection.close()
 
# 处理RSS订阅
def process_rss_feed(url, base_dir, db_path):
    tweets = parse_rss(url, db_path)
    with ThreadPoolExecutor() as executor:
        for tweet in tweets:
            time.sleep(60)
            saved_id = executor.submit(download_and_save, tweet, base_dir).result()
            print(f"Saved tweet {saved_id} Time: {tweet['pubDate']}")
            log_tweet_info(db_path, tweet, saved_id)
 
# 周期性执行的任务
def periodic_task(interval, user_list, base_dir, db_path):
    while True:
        for user in user_list:
            rss_url = f"http://<实例地址>/{user}/media/rss"
            process_rss_feed(rss_url, base_dir, db_path)
        time.sleep(interval)
 
# 示例用法
usernames = []
base_directory = "./femboy_twitter/twitter_downloads"
database_path = "./femboy_twitter/twitter_log.db"
periodic_task(600,usernames,base_directory,database_path)

今日见闻:

随着 3G 关闭时间的临近,270 万英国人仍在使用纯 3G 手机。

微软正在将 ChatGPT AI 添加到 Win 11 的记事本中。

亚马逊 Twitch 将裁员 500 名,约占员工总数的 35%。

今日废话:

搞了个重庆移动IPv6-KVM-mini的小鸡,有独立IPV6单核521MB加5G,除了做代理和挂服务貌似没其他用途。

但是月付只要5元,真的是非常便宜。