refactor:config(#226)

This commit is contained in:
guorong.zheng 2024-08-12 15:55:00 +08:00
parent f656074028
commit e46c025251
17 changed files with 320 additions and 272 deletions

View file

@ -35,10 +35,12 @@ jobs:
run: |
echo "OPEN_DRIVER=$(python -c '
try:
import user_config as config
except ImportError:
import config
print(config.open_driver)')" >> $GITHUB_ENV
from utils.config import get_config
config = get_config()
open_driver = config.getboolean("Settings", "open_driver")
except:
open_driver = False
print(open_driver)')" >> $GITHUB_ENV
- name: Install Selenium
if: env.OPEN_DRIVER == 'True'
run: |
@ -72,10 +74,12 @@ jobs:
git diff
final_file=$(python -c '
try:
import user_config as config
except ImportError:
import config
print(config.final_file)')
from utils.config import get_config
config = get_config()
final_file = config.get("Settings", "final_file")
except:
final_file = "output/result.txt"
print(final_file)')
if [[ -f "$final_file" ]]; then
git add -f "$final_file"
fi

View file

@ -1,41 +0,0 @@
open_update = True
open_use_old_result = True
source_file = "demo.txt"
final_file = "result.txt"
favorite_list = [
"广东珠江",
"CCTV-1",
"CCTV-5",
"CCTV-5+",
"CCTV-13",
"广东体育",
"广东卫视",
"大湾区卫视",
"浙江卫视",
"湖南卫视",
"翡翠台",
]
open_online_search = False
favorite_page_num = 5
default_page_num = 3
urls_limit = 15
open_keep_all = False
open_sort = True
response_time_weight = 0.5
resolution_weight = 0.5
recent_days = 30
ipv_type = "ipv4"
domain_blacklist = ["epg.pw"]
url_keywords_blacklist = []
open_subscribe = False
subscribe_urls = [
"https://m3u.ibert.me/txt/fmml_dv6.txt",
"https://m3u.ibert.me/txt/o_cn.txt",
"https://m3u.ibert.me/txt/j_iptv.txt",
"https://github.moeyy.xyz/https://raw.githubusercontent.com/PizazzGY/TVBox/main/live.txt",
]
open_multicast = True
region_list = ["广东"]
open_proxy = False
open_driver = False
open_use_old_result = True

30
config/config.ini Normal file
View file

@ -0,0 +1,30 @@
[Settings]
open_update = True
open_use_old_result = True
source_file = config/demo.txt
final_file = output/result.txt
favorite_list = 广东珠江,CCTV-1,CCTV-5,CCTV-5+,CCTV-13,广东体育,广东卫视,大湾区卫视,浙江卫视,湖南卫视,翡翠台
open_online_search = False
favorite_page_num = 5
default_page_num = 3
urls_limit = 15
open_keep_all = False
open_sort = True
response_time_weight = 0.5
resolution_weight = 0.5
recent_days = 30
ipv_type = ipv4
domain_blacklist = epg.pw
url_keywords_blacklist =
open_subscribe = False
subscribe_urls = https://m3u.ibert.me/txt/fmml_dv6.txt,https://m3u.ibert.me/txt/o_cn.txt,https://m3u.ibert.me/txt/j_iptv.txt,https://github.moeyy.xyz/https://raw.githubusercontent.com/PizazzGY/TVBox/main/live.txt
open_multicast = True
region_list = 广东
open_proxy = False
open_driver = False

View file

@ -1,7 +1,4 @@
from selenium import webdriver
from utils.config import get_config
config = get_config()
def setup_driver(proxy=None):

28
main.py
View file

@ -30,7 +30,7 @@ app = Flask(__name__)
@app.route("/")
def show_result():
user_final_file = getattr(config, "final_file", "result.txt")
user_final_file = config.get("Settings", "final_file")
with open(user_final_file, "r", encoding="utf-8") as file:
content = file.read()
return render_template_string("<pre>{{ content }}</pre>", content=content)
@ -51,19 +51,19 @@ class UpdateSource:
self.start_time = None
async def visit_page(self, channel_names=None):
if config.open_subscribe:
if config.getboolean("Settings", "open_subscribe"):
subscribe_task = asyncio.create_task(
get_channels_by_subscribe_urls(callback=self.update_progress)
)
self.tasks.append(subscribe_task)
self.subscribe_result = await subscribe_task
if config.open_multicast:
if config.getboolean("Settings", "open_multicast"):
multicast_task = asyncio.create_task(
get_channels_by_multicast(channel_names, self.update_progress)
)
self.tasks.append(multicast_task)
self.multicast_result = await multicast_task
if config.open_online_search:
if config.getboolean("Settings", "open_online_search"):
online_search_task = asyncio.create_task(
get_channels_by_online_search(channel_names, self.update_progress)
)
@ -94,7 +94,7 @@ class UpdateSource:
self.multicast_result,
self.online_search_result,
)
if config.open_sort:
if config.getboolean("Settings", "open_sort"):
is_ffmpeg = is_ffmpeg_installed()
if not is_ffmpeg:
print("FFmpeg is not installed, using requests for sorting.")
@ -137,15 +137,15 @@ class UpdateSource:
lambda: self.pbar_update("写入结果"),
)
self.pbar.close()
user_final_file = getattr(config, "final_file", "result.txt")
update_file(user_final_file, "result_new.txt")
if config.open_sort:
user_log_file = (
user_final_file = config.get("Settings", "final_file")
update_file(user_final_file, "output/result_new.txt")
if config.getboolean("Settings", "open_sort"):
user_log_file = "output/" + (
"user_result.log"
if os.path.exists("user_config.py")
if os.path.exists("config/user_config.ini")
else "result.log"
)
update_file(user_log_file, "result_new.log")
update_file(user_log_file, "output/result_new.log")
print(f"Update completed! Please check the {user_final_file} file!")
if not os.environ.get("GITHUB_ACTIONS"):
print(f"You can access the result at {get_ip_address()}")
@ -165,10 +165,10 @@ class UpdateSource:
self.update_progress = callback or default_callback
self.run_ui = True if callback else False
if config.open_update:
if config.getboolean("Settings", "open_update"):
await self.main()
if self.run_ui:
if not config.open_update:
if not config.getboolean("Settings", "open_update"):
print(f"You can access the result at {get_ip_address()}")
self.update_progress(
f"服务启动成功, 可访问以下链接:",
@ -187,7 +187,7 @@ class UpdateSource:
def scheduled_task():
if config.open_update:
if config.getboolean("Settings", "open_update"):
update_source = UpdateSource()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

View file

@ -2,7 +2,6 @@ import requests
import re
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from time import sleep
headers = {
"Accept": "*/*",

View file

@ -3,20 +3,14 @@ from tkinter import messagebox
from tkinter import scrolledtext
from tkinter import ttk
from tkinter import filedialog
from utils.config import resource_path, load_external_config
from utils.config import get_config, resource_path
from main import UpdateSource
import os
import asyncio
import threading
import webbrowser
config_path = resource_path("user_config.py")
default_config_path = resource_path("config.py")
config = (
load_external_config("user_config.py")
if os.path.exists(config_path)
else load_external_config("config.py")
)
config = get_config()
class TkinterUI:
@ -56,14 +50,13 @@ class TkinterUI:
]
self.result_url = None
def format_list(self, text):
return [f"{item.strip()}" for item in text.split(",") if item.strip()]
def update_open_update(self):
config.open_update = self.open_update_var.get()
config.set("Settings", "open_update", str(self.open_update_var.get()))
def update_open_use_old_result(self):
config.open_use_old_result = self.open_use_old_result_var.get()
config.set(
"Settings", "open_use_old_result", str(self.open_use_old_result_var.get())
)
def select_source_file(self):
filepath = filedialog.askopenfilename(
@ -72,7 +65,7 @@ class TkinterUI:
if filepath:
self.source_file_entry.delete(0, tk.END)
self.source_file_entry.insert(0, filepath)
config.source_file = filepath
config.set("Settings", "source_file", filepath)
def select_final_file(self):
filepath = filedialog.askopenfilename(
@ -81,77 +74,95 @@ class TkinterUI:
if filepath:
self.final_file_entry.delete(0, tk.END)
self.final_file_entry.insert(0, filepath)
config.final_file = filepath
config.set("Settings", "final_file", filepath)
def update_open_subscribe(self):
config.open_subscribe = self.open_subscribe_var.get()
config.set("Settings", "open_subscribe", str(self.open_subscribe_var.get()))
def update_open_multicast(self):
config.open_multicast = self.open_multicast_var.get()
config.set("Settings", "open_multicast", str(self.open_multicast_var.get()))
def update_open_online_search(self):
config.open_online_search = self.open_online_search_var.get()
config.set(
"Settings", "open_online_search", str(self.open_online_search_var.get())
)
def update_open_driver(self):
config.open_driver = self.open_driver_var.get()
config.set("Settings", "open_driver", str(self.open_driver_var.get()))
def update_open_proxy(self):
config.open_proxy = self.open_proxy_var.get()
config.set("Settings", "open_proxy", str(self.open_proxy_var.get()))
def update_open_keep_all(self):
config.open_keep_all = self.open_keep_all_var.get()
config.set("Settings", "open_keep_all", str(self.open_keep_all_var.get()))
def update_open_sort(self):
config.open_sort = self.open_sort_var.get()
config.set("Settings", "open_sort", str(self.open_sort_var.get()))
def update_favorite_list(self, event):
config.favorite_list = self.format_list(
self.favorite_list_text.get(1.0, tk.END)
config.set(
"Settings",
"favorite_list",
self.favorite_list_text.get(1.0, tk.END),
)
def update_favorite_page_num(self, event):
config.favorite_page_num = self.favorite_page_num_entry.get()
config.set("Settings", "favorite_page_num", self.favorite_page_num_entry.get())
def update_default_page_num(self, event):
config.default_page_num = self.default_page_num_entry.get()
config.set("Settings", "default_page_num", self.default_page_num_entry.get())
def update_urls_limit(self, event):
config.urls_limit = self.urls_limit_entry.get()
config.set("Settings", "urls_limit", self.urls_limit_entry.get())
def update_response_time_weight(self, event):
config.response_time_weight = self.response_time_weight_entry.get()
config.set(
"Settings", "response_time_weight", self.response_time_weight_entry.get()
)
def update_resolution_weight(self, event):
config.resolution_weight = self.resolution_weight_entry.get()
config.set("Settings", "resolution_weight", self.resolution_weight_entry.get())
def update_ipv_type(self, event):
config.ipv_type = f'"{self.ipv_type_combo.get()}"'
config.set("Settings", "ipv_type", self.ipv_type_combo.get())
def update_recent_days(self, event):
config.recent_days = self.recent_days_entry.get()
config.set("Settings", "recent_days", self.recent_days_entry.get())
def update_url_keywords_blacklist(self, event):
config.url_keywords_blacklist = self.format_list(
self.url_keywords_blacklist_text.get(1.0, tk.END)
config.set(
"Settings",
"url_keywords_blacklist",
self.url_keywords_blacklist_text.get(1.0, tk.END),
)
def update_domain_blacklist(self, event):
config.domain_blacklist = self.format_list(
self.domain_blacklist_text.get(1.0, tk.END)
config.set(
"Settings",
"domain_blacklist",
self.domain_blacklist_text.get(1.0, tk.END),
)
def update_url_keywords_blacklist(self, event):
config.url_keywords_blacklist = self.format_list(
self.url_keywords_blacklist_text.get(1.0, tk.END)
config.set(
"Settings",
"url_keywords_blacklist",
self.url_keywords_blacklist_text.get(1.0, tk.END),
)
def update_subscribe_urls(self, event):
config.subscribe_urls = self.format_list(
self.subscribe_urls_text.get(1.0, tk.END)
config.set(
"Settings",
"subscribe_urls",
self.subscribe_urls_text.get(1.0, tk.END),
)
def update_region_list(self, event):
config.region_list = self.format_list(self.region_list_text.get(1.0, tk.END))
config.set(
"Settings",
"region_list",
self.region_list_text.get(1.0, tk.END),
)
def view_result_link_callback(self, event):
webbrowser.open_new_tab(self.result_url)
@ -160,9 +171,9 @@ class TkinterUI:
config_values = {
"open_update": self.open_update_var.get(),
"open_use_old_result": self.open_use_old_result_var.get(),
"source_file": f'"{self.source_file_entry.get()}"',
"final_file": f'"{self.final_file_entry.get()}"',
"favorite_list": self.format_list(self.favorite_list_text.get(1.0, tk.END)),
"source_file": self.source_file_entry.get(),
"final_file": self.final_file_entry.get(),
"favorite_list": self.favorite_list_text.get(1.0, tk.END),
"open_online_search": self.open_online_search_var.get(),
"favorite_page_num": self.favorite_page_num_entry.get(),
"default_page_num": self.default_page_num_entry.get(),
@ -174,31 +185,24 @@ class TkinterUI:
"response_time_weight": self.response_time_weight_entry.get(),
"resolution_weight": self.resolution_weight_entry.get(),
"recent_days": self.recent_days_entry.get(),
"ipv_type": f'"{self.ipv_type_combo.get()}"',
"domain_blacklist": self.format_list(
self.domain_blacklist_text.get(1.0, tk.END)
),
"url_keywords_blacklist": self.format_list(
self.url_keywords_blacklist_text.get(1.0, tk.END)
),
"ipv_type": self.ipv_type_combo.get(),
"domain_blacklist": self.domain_blacklist_text.get(1.0, tk.END),
"url_keywords_blacklist": self.url_keywords_blacklist_text.get(1.0, tk.END),
"open_subscribe": self.open_subscribe_var.get(),
"subscribe_urls": self.format_list(
self.subscribe_urls_text.get(1.0, tk.END)
),
"subscribe_urls": self.subscribe_urls_text.get(1.0, tk.END),
"open_multicast": self.open_multicast_var.get(),
"region_list": self.format_list(self.region_list_text.get(1.0, tk.END)),
"region_list": self.region_list_text.get(1.0, tk.END),
}
for key, value in config_values.items():
setattr(config, key, value)
user_config_file = (
"user_config.py" if os.path.exists("user_config.py") else "config.py"
config.set("Settings", key, str(value))
user_config_file = "config/" + (
"user_config.ini" if os.path.exists("user_config.ini") else "config.ini"
)
with open(
resource_path(user_config_file, persistent=True), "w", encoding="utf-8"
) as f:
for key, value in config_values.items():
f.write(f"{key} = {value}\n")
) as configfile:
config.write(configfile)
messagebox.showinfo("提示", "保存成功")
async def run_update(self):
@ -274,7 +278,9 @@ class TkinterUI:
frame1_open_update_column1, text="开启更新:", width=8
)
self.open_update_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_update_var = tk.BooleanVar(value=config.open_update)
self.open_update_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_update")
)
self.open_update_checkbutton = ttk.Checkbutton(
frame1_open_update_column1,
variable=self.open_update_var,
@ -289,7 +295,9 @@ class TkinterUI:
frame1_open_update_column2, text="使用历史结果:", width=12
)
self.open_use_old_result_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_use_old_result_var = tk.BooleanVar(value=config.open_use_old_result)
self.open_use_old_result_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_use_old_result")
)
self.open_use_old_result_checkbutton = ttk.Checkbutton(
frame1_open_update_column2,
variable=self.open_use_old_result_var,
@ -307,7 +315,7 @@ class TkinterUI:
self.source_file_entry = tk.Entry(frame1_source_file)
self.source_file_label.pack(side=tk.LEFT, padx=4, pady=8)
self.source_file_entry.pack(fill=tk.X, padx=4, expand=True)
self.source_file_entry.insert(0, config.source_file)
self.source_file_entry.insert(0, config.get("Settings", "source_file"))
frame1_source_file_select = tk.Frame(frame1)
frame1_source_file_select.pack(fill=tk.X)
@ -324,7 +332,7 @@ class TkinterUI:
self.final_file_entry = tk.Entry(frame1_final_file)
self.final_file_label.pack(side=tk.LEFT, padx=4, pady=8)
self.final_file_entry.pack(fill=tk.X, padx=4, expand=True)
self.final_file_entry.insert(0, config.final_file)
self.final_file_entry.insert(0, config.get("Settings", "final_file"))
frame1_final_file_select = tk.Frame(frame1)
frame1_final_file_select.pack(fill=tk.X)
@ -345,7 +353,9 @@ class TkinterUI:
frame1_mode_params_column1, text="浏览器模式:", width=12
)
self.open_driver_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_driver_var = tk.BooleanVar(value=config.open_driver)
self.open_driver_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_driver")
)
self.open_driver_checkbutton = ttk.Checkbutton(
frame1_mode_params_column1,
variable=self.open_driver_var,
@ -360,7 +370,9 @@ class TkinterUI:
frame1_mode_params_column2, text="开启代理:", width=12
)
self.open_proxy_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_proxy_var = tk.BooleanVar(value=config.open_proxy)
self.open_proxy_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_proxy")
)
self.open_proxy_checkbutton = ttk.Checkbutton(
frame1_mode_params_column2,
variable=self.open_proxy_var,
@ -384,7 +396,7 @@ class TkinterUI:
self.urls_limit_label.pack(side=tk.LEFT, padx=4, pady=8)
self.urls_limit_entry = tk.Entry(frame1_channel_column1)
self.urls_limit_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.urls_limit_entry.insert(15, config.urls_limit)
self.urls_limit_entry.insert(15, config.getint("Settings", "urls_limit"))
self.urls_limit_entry.bind("<KeyRelease>", self.update_urls_limit)
self.ipv_type_label = tk.Label(
@ -408,7 +420,9 @@ class TkinterUI:
frame1_sort_column1, text="保留模式:", width=12
)
self.open_keep_all_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_keep_all_var = tk.BooleanVar(value=config.open_keep_all)
self.open_keep_all_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_keep_all")
)
self.open_keep_all_checkbutton = ttk.Checkbutton(
frame1_sort_column1,
variable=self.open_keep_all_var,
@ -423,7 +437,9 @@ class TkinterUI:
frame1_sort_column2, text="开启测速排序:", width=12
)
self.open_sort_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_sort_var = tk.BooleanVar(value=config.open_sort)
self.open_sort_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_sort")
)
self.open_sort_checkbutton = ttk.Checkbutton(
frame1_sort_column2,
variable=self.open_sort_var,
@ -446,7 +462,9 @@ class TkinterUI:
self.response_time_weight_label.pack(side=tk.LEFT, padx=4, pady=8)
self.response_time_weight_entry = tk.Entry(frame1_sort_params_column1)
self.response_time_weight_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.response_time_weight_entry.insert(0, config.response_time_weight)
self.response_time_weight_entry.insert(
0, config.getfloat("Settings", "response_time_weight")
)
self.response_time_weight_entry.bind(
"<KeyRelease>", self.update_response_time_weight
)
@ -457,7 +475,9 @@ class TkinterUI:
self.resolution_weight_label.pack(side=tk.LEFT, padx=4, pady=8)
self.resolution_weight_entry = tk.Entry(frame1_sort_params_column2)
self.resolution_weight_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.resolution_weight_entry.insert(0, config.resolution_weight)
self.resolution_weight_entry.insert(
0, config.getfloat("Settings", "resolution_weight")
)
self.resolution_weight_entry.bind("<KeyRelease>", self.update_resolution_weight)
frame1_domain_blacklist = tk.Frame(frame1)
@ -473,7 +493,9 @@ class TkinterUI:
self.domain_blacklist_text.pack(
side=tk.LEFT, padx=4, pady=8, expand=True, fill=tk.BOTH
)
self.domain_blacklist_text.insert(tk.END, ",".join(config.domain_blacklist))
self.domain_blacklist_text.insert(
tk.END, config.get("Settings", "domain_blacklist")
)
self.domain_blacklist_text.bind("<KeyRelease>", self.update_domain_blacklist)
frame1_url_keywords_blacklist = tk.Frame(frame1)
@ -490,7 +512,7 @@ class TkinterUI:
side=tk.LEFT, padx=4, pady=8, expand=True, fill=tk.BOTH
)
self.url_keywords_blacklist_text.insert(
tk.END, ",".join(config.url_keywords_blacklist)
tk.END, config.get("Settings", "url_keywords_blacklist")
)
self.url_keywords_blacklist_text.bind(
"<KeyRelease>", self.update_url_keywords_blacklist
@ -503,7 +525,9 @@ class TkinterUI:
frame2_open_online_search, text="开启在线搜索:", width=13
)
self.open_online_search_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_online_search_var = tk.BooleanVar(value=config.open_online_search)
self.open_online_search_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_online_search")
)
self.open_online_search_checkbutton = ttk.Checkbutton(
frame2_open_online_search,
variable=self.open_online_search_var,
@ -526,7 +550,7 @@ class TkinterUI:
self.favorite_list_text.pack(
side=tk.LEFT, padx=4, pady=8, expand=True, fill=tk.BOTH
)
self.favorite_list_text.insert(tk.END, ",".join(config.favorite_list))
self.favorite_list_text.insert(tk.END, config.get("Settings", "favorite_list"))
self.favorite_list_text.bind("<KeyRelease>", self.update_favorite_list)
frame2_favorite_page_num = tk.Frame(frame2)
@ -538,7 +562,9 @@ class TkinterUI:
self.favorite_page_num_label.pack(side=tk.LEFT, padx=4, pady=8)
self.favorite_page_num_entry = tk.Entry(frame2_favorite_page_num)
self.favorite_page_num_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.favorite_page_num_entry.insert(0, config.favorite_page_num)
self.favorite_page_num_entry.insert(
0, config.getint("Settings", "favorite_page_num")
)
self.favorite_page_num_entry.bind("<KeyRelease>", self.update_favorite_page_num)
frame2_default_page_num = tk.Frame(frame2)
@ -550,7 +576,9 @@ class TkinterUI:
self.default_page_num_label.pack(side=tk.LEFT, padx=4, pady=8)
self.default_page_num_entry = tk.Entry(frame2_default_page_num)
self.default_page_num_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.default_page_num_entry.insert(0, config.default_page_num)
self.default_page_num_entry.insert(
0, config.getint("Settings", "default_page_num")
)
self.default_page_num_entry.bind("<KeyRelease>", self.update_default_page_num)
frame2_recent_days = tk.Frame(frame2)
@ -562,7 +590,7 @@ class TkinterUI:
self.recent_days_label.pack(side=tk.LEFT, padx=4, pady=8)
self.recent_days_entry = tk.Entry(frame2_recent_days)
self.recent_days_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.recent_days_entry.insert(30, config.recent_days)
self.recent_days_entry.insert(30, config.getint("Settings", "recent_days"))
self.recent_days_entry.bind("<KeyRelease>", self.update_recent_days)
frame3_open_subscribe = tk.Frame(frame3)
@ -572,7 +600,9 @@ class TkinterUI:
frame3_open_subscribe, text="开启订阅源:", width=9
)
self.open_subscribe_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_subscribe_var = tk.BooleanVar(value=config.open_subscribe)
self.open_subscribe_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_subscribe")
)
self.open_subscribe_checkbutton = ttk.Checkbutton(
frame3_open_subscribe,
variable=self.open_subscribe_var,
@ -595,7 +625,9 @@ class TkinterUI:
self.subscribe_urls_text.pack(
side=tk.LEFT, padx=4, pady=8, expand=True, fill=tk.BOTH
)
self.subscribe_urls_text.insert(tk.END, ",".join(config.subscribe_urls))
self.subscribe_urls_text.insert(
tk.END, config.get("Settings", "subscribe_urls")
)
self.subscribe_urls_text.bind("<KeyRelease>", self.update_subscribe_urls)
frame4_multicast = tk.Frame(frame4)
@ -605,7 +637,9 @@ class TkinterUI:
frame4_multicast, text="开启组播源:", width=9
)
self.open_multicast_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_multicast_var = tk.BooleanVar(value=config.open_multicast)
self.open_multicast_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_multicast")
)
self.open_multicast_checkbutton = ttk.Checkbutton(
frame4_multicast,
variable=self.open_multicast_var,
@ -624,7 +658,7 @@ class TkinterUI:
self.region_list_text.pack(
side=tk.LEFT, padx=4, pady=8, expand=True, fill=tk.BOTH
)
self.region_list_text.insert(tk.END, ",".join(config.region_list))
self.region_list_text.insert(tk.END, config.get("Settings", "region_list"))
self.region_list_text.bind("<KeyRelease>", self.update_region_list)
root_operate = tk.Frame(self.root)

View file

@ -20,7 +20,7 @@ def get_fofa_urls_from_region_list():
"""
Get the FOFA url from region
"""
region_list = getattr(config, "region_list", [])
region_list = config.get("Settings", "region_list").split(",")
urls = []
region_url = getattr(fofa_map, "region_url")
if "all" in region_list:
@ -43,19 +43,21 @@ async def get_channels_by_fofa(callback):
fofa_results = {}
callback(f"正在获取组播源更新, 共{fofa_urls_len}个地区", 0)
proxy = None
if config.open_proxy:
open_proxy = config.getboolean("Settings", "open_proxy")
open_driver = config.getboolean("Settings", "open_driver")
if open_proxy:
proxy = await get_proxy(fofa_urls[0], best=True, with_test=True)
def process_fofa_channels(fofa_url):
nonlocal proxy, fofa_urls_len
nonlocal proxy, fofa_urls_len, open_driver
results = {}
try:
if config.open_driver:
if open_driver:
driver = setup_driver(proxy)
try:
retry_func(lambda: driver.get(fofa_url), name=fofa_url)
except Exception as e:
if config.open_proxy:
if open_proxy:
proxy = get_proxy_next()
driver.close()
driver.quit()
@ -76,7 +78,7 @@ async def get_channels_by_fofa(callback):
except Exception as e:
print(e)
finally:
if config.open_driver:
if open_driver:
driver.close()
driver.quit()
pbar.update()
@ -85,18 +87,16 @@ async def get_channels_by_fofa(callback):
f"正在获取组播源更新, 剩余{remain}个地区待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / fofa_urls_len) * 100),
)
if config.open_online_search and pbar.n / fofa_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
return results
max_workers = 3 if config.open_driver else 10
max_workers = 3 if open_driver else 10
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(process_fofa_channels, fofa_url) for fofa_url in fofa_urls
]
for future in futures:
fofa_results = merge_objects(fofa_results, future.result())
if not config.open_driver:
if not open_driver:
close_session()
pbar.close()
return fofa_results

View file

@ -88,7 +88,7 @@ def get_region_urls_from_IPTV_Multicast_source():
region_url[name]["联通"] = unicom
region_url[name]["移动"] = mobile
region_url[name]["电信"] = telecom
with open("multicast/multicast_map.json", "w", encoding="utf-8") as f:
with open("updates/multicast/multicast_map.json", "w", encoding="utf-8") as f:
json.dump(region_url, f, ensure_ascii=False, indent=4)
@ -96,9 +96,9 @@ def get_multicast_urls_info_from_region_list():
"""
Get the multicast urls info from region
"""
region_list = getattr(config, "region_list", [])
region_list = config.get("Settings", "region_list").split(",")
urls_info = []
with open("multicast/multicast_map.json", "r", encoding="utf-8") as f:
with open("updates/multicast/multicast_map.json", "r", encoding="utf-8") as f:
region_url = json.load(f)
if "all" in region_list:
urls_info = [
@ -125,7 +125,9 @@ async def get_multicast_region_result():
multicast_result = await get_channels_by_subscribe_urls(
urls=multicast_region_urls_info, multicast=True
)
with open("multicast/multicast_region_result.json", "w", encoding="utf-8") as f:
with open(
"updates/multicast/multicast_region_result.json", "w", encoding="utf-8"
) as f:
json.dump(multicast_result, f, ensure_ascii=False, indent=4)
@ -139,10 +141,15 @@ async def get_channels_by_multicast(names, callback):
# if not pageUrl:
# return channels
proxy = None
if config.open_proxy:
open_proxy = config.getboolean("Settings", "open_proxy")
open_driver = config.getboolean("Settings", "open_driver")
default_page_num = config.getint("Settings", "default_page_num")
if open_proxy:
proxy = await get_proxy(pageUrl, best=True, with_test=True)
start_time = time()
with open("multicast/multicast_region_result.json", "r", encoding="utf-8") as f:
with open(
"updates/multicast/multicast_region_result.json", "r", encoding="utf-8"
) as f:
multicast_region_result = json.load(f)
name_region_type_result = get_channel_multicast_name_region_type_result(
multicast_region_result, names
@ -150,18 +157,18 @@ async def get_channels_by_multicast(names, callback):
region_type_list = get_channel_multicast_region_type_list(name_region_type_result)
def process_channel_by_multicast(region, type):
nonlocal proxy, open_driver, default_page_num
name = f"{region}{type}"
info_list = []
nonlocal proxy
try:
if config.open_driver:
if open_driver:
driver = setup_driver(proxy)
try:
retry_func(
lambda: driver.get(pageUrl), name=f"multicast search:{name}"
)
except Exception as e:
if config.open_proxy:
if open_proxy:
proxy = get_proxy_next()
driver.close()
driver.quit()
@ -178,7 +185,7 @@ async def get_channels_by_multicast(names, callback):
name=f"multicast search:{name}",
)
except Exception as e:
if config.open_proxy:
if open_proxy:
proxy = get_proxy_next()
page_soup = get_soup_requests(pageUrl, data=post_form, proxy=proxy)
if not page_soup:
@ -192,19 +199,16 @@ async def get_channels_by_multicast(names, callback):
code = parse_qs(parsed_url.query).get("code", [None])[0]
if code:
break
isFavorite = name in config.favorite_list
pageNum = (
config.favorite_page_num if isFavorite else config.default_page_num
)
pageNum = default_page_num
# retry_limit = 3
for page in range(1, pageNum + 1):
# retries = 0
# if not config.open_driver and page == 1:
# if not open_driver and page == 1:
# retries = 2
# while retries < retry_limit:
try:
if page > 1:
if config.open_driver:
if open_driver:
page_link = find_clickable_element_with_retry(
driver,
(
@ -226,26 +230,22 @@ async def get_channels_by_multicast(names, callback):
name=f"multicast search:{name}, page:{page}",
)
sleep(1)
soup = (
get_soup(driver.page_source)
if config.open_driver
else page_soup
)
soup = get_soup(driver.page_source) if open_driver else page_soup
if soup:
results = (
get_results_from_multicast_soup(soup)
if config.open_driver
if open_driver
else get_results_from_multicast_soup_requests(soup)
)
print(name, "page:", page, "results num:", len(results))
if len(results) == 0:
print(f"{name}:No results found")
# if config.open_driver:
# if open_driver:
# driver.refresh()
# retries += 1
# continue
# elif len(results) <= 3:
# if config.open_driver:
# if open_driver:
# next_page_link = find_clickable_element_with_retry(
# driver,
# (
@ -255,7 +255,7 @@ async def get_channels_by_multicast(names, callback):
# retries=1,
# )
# if next_page_link:
# if config.open_proxy:
# if open_proxy:
# proxy = get_proxy_next()
# driver.close()
# driver.quit()
@ -267,7 +267,7 @@ async def get_channels_by_multicast(names, callback):
# break
else:
print(f"{name}:No results found")
# if config.open_driver:
# if open_driver:
# driver.refresh()
# retries += 1
# continue
@ -281,7 +281,7 @@ async def get_channels_by_multicast(names, callback):
print(f"{name}:Error on search: {e}")
pass
finally:
if config.open_driver:
if open_driver:
driver.close()
driver.quit()
pbar.update()
@ -318,7 +318,7 @@ async def get_channels_by_multicast(names, callback):
channels = get_channel_multicast_result(
name_region_type_result, search_region_type_result
)
if not config.open_driver:
if not open_driver:
close_session()
pbar.close()
return channels

View file

@ -70,22 +70,31 @@ async def get_channels_by_online_search(names, callback):
if not pageUrl:
return channels
proxy = None
if config.open_proxy:
open_proxy = config.getboolean("Settings", "open_proxy")
open_driver = config.getboolean("Settings", "open_driver")
favorite_list = [
favorite
for favorite in config.get("Settings", "favorite_list").split(",")
if favorite.strip()
]
favorite_page_num = config.getint("Settings", "favorite_page_num")
default_page_num = config.getint("Settings", "default_page_num")
if open_proxy:
proxy = await get_proxy(pageUrl, best=True, with_test=True)
start_time = time()
def process_channel_by_online_search(name):
nonlocal proxy, open_proxy, open_driver, favorite_list, favorite_page_num, default_page_num
info_list = []
nonlocal proxy
try:
if config.open_driver:
if open_driver:
driver = setup_driver(proxy)
try:
retry_func(
lambda: driver.get(pageUrl), name=f"online search:{name}"
)
except Exception as e:
if config.open_proxy:
if open_proxy:
proxy = get_proxy_next()
driver.close()
driver.quit()
@ -101,25 +110,22 @@ async def get_channels_by_online_search(names, callback):
name=f"online search:{name}",
)
except Exception as e:
if config.open_proxy:
if open_proxy:
proxy = get_proxy_next()
page_soup = get_soup_requests(request_url, proxy=proxy)
if not page_soup:
print(f"{name}:Request fail.")
return
isFavorite = name in config.favorite_list
pageNum = (
config.favorite_page_num if isFavorite else config.default_page_num
)
pageNum = favorite_page_num if name in favorite_list else default_page_num
retry_limit = 3
for page in range(1, pageNum + 1):
retries = 0
if not config.open_driver and page == 1:
if not open_driver and page == 1:
retries = 2
while retries < retry_limit:
try:
if page > 1:
if config.open_driver:
if open_driver:
page_link = find_clickable_element_with_retry(
driver,
(
@ -141,14 +147,12 @@ async def get_channels_by_online_search(names, callback):
)
sleep(1)
soup = (
get_soup(driver.page_source)
if config.open_driver
else page_soup
get_soup(driver.page_source) if open_driver else page_soup
)
if soup:
results = (
get_results_from_soup(soup, name)
if config.open_driver
if open_driver
else get_results_from_soup_requests(soup, name)
)
print(name, "page:", page, "results num:", len(results))
@ -156,12 +160,12 @@ async def get_channels_by_online_search(names, callback):
print(
f"{name}:No results found, refreshing page and retrying..."
)
if config.open_driver:
if open_driver:
driver.refresh()
retries += 1
continue
elif len(results) <= 3:
if config.open_driver:
if open_driver:
next_page_link = find_clickable_element_with_retry(
driver,
(
@ -171,7 +175,7 @@ async def get_channels_by_online_search(names, callback):
retries=1,
)
if next_page_link:
if config.open_proxy:
if open_proxy:
proxy = get_proxy_next()
driver.close()
driver.quit()
@ -188,7 +192,7 @@ async def get_channels_by_online_search(names, callback):
print(
f"{name}:No results found, refreshing page and retrying..."
)
if config.open_driver:
if open_driver:
driver.refresh()
retries += 1
continue
@ -201,7 +205,7 @@ async def get_channels_by_online_search(names, callback):
print(f"{name}:Error on search: {e}")
pass
finally:
if config.open_driver:
if open_driver:
driver.close()
driver.quit()
pbar.update()
@ -224,7 +228,7 @@ async def get_channels_by_online_search(names, callback):
data = result.get("data", [])
if name:
channels[name] = data
if not config.open_driver:
if not open_driver:
close_session()
pbar.close()
return channels

View file

@ -22,6 +22,7 @@ def get_proxy_list(page_count=1):
]
proxy_list = []
urls = []
open_driver = config.getboolean("Settings", "open_driver")
for page_index in range(1, page_count + 1):
for pattern in url_pattern:
url = pattern.format(page_index)
@ -29,9 +30,10 @@ def get_proxy_list(page_count=1):
pbar = tqdm(total=len(urls), desc="Getting proxy list")
def get_proxy(url):
nonlocal open_driver
proxys = []
try:
if config.open_driver:
if open_driver:
soup = retry_func(lambda: get_soup_driver(url), name=url)
else:
try:
@ -50,12 +52,12 @@ def get_proxy_list(page_count=1):
pbar.update()
return proxys
max_workers = 3 if config.open_driver else 10
max_workers = 3 if open_driver else 10
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(get_proxy, url) for url in urls]
for future in futures:
proxy_list.extend(future.result())
if not config.open_driver:
if not open_driver:
close_session()
pbar.close()
return proxy_list

View file

@ -19,7 +19,12 @@ async def get_channels_by_subscribe_urls(urls=None, multicast=False, callback=No
"""
subscribe_results = {}
pattern = r"^(.*?),(?!#genre#)(.*?)$"
subscribe_urls_len = len(urls if urls else config.subscribe_urls)
subscribe_urls = [
url
for url in config.get("Settings", "subscribe_urls").split(",")
if url.strip()
]
subscribe_urls_len = len(urls if urls else subscribe_urls)
pbar = tqdm_asyncio(total=subscribe_urls_len, desc="Processing subscribe")
start_time = time()
if callback:
@ -84,7 +89,7 @@ async def get_channels_by_subscribe_urls(urls=None, multicast=False, callback=No
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [
executor.submit(process_subscribe_channels, subscribe_url)
for subscribe_url in (urls if urls else config.subscribe_urls)
for subscribe_url in (urls if urls else subscribe_urls)
]
for future in futures:
subscribe_results = merge_objects(subscribe_results, future.result())

View file

@ -11,7 +11,14 @@ from opencc import OpenCC
config = get_config()
handler = RotatingFileHandler("result_new.log", encoding="utf-8")
log_dir = "output"
log_file = "result_new.log"
log_path = os.path.join(log_dir, log_file)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
handler = RotatingFileHandler(log_path, encoding="utf-8")
logging.basicConfig(
handlers=[handler],
format="%(message)s",
@ -46,28 +53,17 @@ def get_channel_items():
"""
Get the channel items from the source file
"""
# Open the source file and read all lines.
user_source_file = (
"user_" + config.source_file
if os.path.exists("user_" + config.source_file)
else getattr(config, "source_file", "demo.txt")
)
# Open the old final file and read all lines.
user_final_file = (
"user_" + config.final_file
if os.path.exists("user_" + config.final_file)
else getattr(config, "final_file", "result.txt")
)
# Create a dictionary to store the channels.
user_source_file = config.get("Settings", "source_file")
user_final_file = config.get("Settings", "final_file")
channels = defaultdict(lambda: defaultdict(list))
if os.path.exists(resource_path(user_source_file)):
with open(resource_path(user_source_file), "r", encoding="utf-8") as file:
channels = get_channel_data_from_file(channels, file)
if config.open_use_old_result and os.path.exists(resource_path(user_final_file)):
if config.getboolean("Settings", "open_use_old_result") and os.path.exists(
resource_path(user_final_file)
):
with open(resource_path(user_final_file), "r", encoding="utf-8") as file:
channels = get_channel_data_from_file(channels, file)
@ -78,7 +74,7 @@ def format_channel_name(name):
"""
Format the channel name with sub and replace and lower
"""
if config.open_keep_all:
if config.getboolean("Settings", "open_keep_all"):
return name
sub_pattern = (
r"-|_|\((.*?)\)|\[(.*?)\]| |频道|标清|高清|HD|hd|超清|超高|超高清|中央|央视|台"
@ -122,7 +118,7 @@ def channel_name_is_equal(name1, name2):
"""
Check if the channel name is equal
"""
if config.open_keep_all:
if config.getboolean("Settings", "open_keep_all"):
return True
cc = OpenCC("t2s")
name1_converted = cc.convert(format_channel_name(name1))
@ -214,7 +210,7 @@ def get_channel_multicast_region_type_list(result):
"""
Get the channel multicast region type list from result
"""
config_region_list = set(getattr(config, "region_list", []))
config_region_list = set(config.get("Settings", "region_list").split(","))
region_type_list = {
(region, type)
for region_type in result.values()
@ -376,7 +372,7 @@ def update_channel_urls_txt(cate, name, urls):
Update the category and channel urls to the final file
"""
genre_line = cate + ",#genre#\n"
filename = "result_new.txt"
filename = "output/result_new.txt"
if not os.path.exists(filename):
open(filename, "w").close()
@ -465,7 +461,7 @@ def append_total_data(*args, **kwargs):
"""
Append total channel data
"""
if config.open_keep_all:
if config.getboolean("Settings", "open_keep_all"):
return append_all_method_data_keep_all(*args, **kwargs)
else:
return append_all_method_data(*args, **kwargs)
@ -484,7 +480,7 @@ def append_all_method_data(
("multicast", multicast_result),
("online_search", online_search_result),
]:
if getattr(config, f"open_{method}"):
if config.getboolean("Settings", f"open_{method}"):
data = append_data_to_info_data(
data,
cate,
@ -497,7 +493,9 @@ def append_all_method_data(
len(get_channel_results_by_name(name, result)),
)
total_channel_data_len = len(data.get(cate, {}).get(name, []))
if total_channel_data_len == 0 or config.open_use_old_result:
if total_channel_data_len == 0 or config.getboolean(
"Settings", "open_use_old_result"
):
data = append_data_to_info_data(
data,
cate,
@ -524,11 +522,11 @@ def append_all_method_data_keep_all(
("multicast", multicast_result),
("online_search", online_search_result),
]:
if result and getattr(config, f"open_{result_name}"):
if result and config.getboolean("Settings", f"open_{result_name}"):
for name, urls in result.items():
data = append_data_to_info_data(data, cate, name, urls)
print(name, f"{result_name.capitalize()} num:", len(urls))
if config.open_use_old_result:
if config.getboolean("Settings", "open_use_old_result"):
old_urls = channel_obj.get(name, [])
data = append_data_to_info_data(
data,

View file

@ -1,6 +1,8 @@
from os import path
import sys
from importlib import util
# from importlib import util
import configparser
def resource_path(relative_path, persistent=False):
@ -19,29 +21,33 @@ def resource_path(relative_path, persistent=False):
return total_path
def load_external_config(config_path):
"""
Load the external config file
"""
config = None
if path.exists(config_path):
spec = util.spec_from_file_location("config", config_path)
config = util.module_from_spec(spec)
spec.loader.exec_module(config)
else:
import config
return config
# def load_external_config(config_path):
# """
# Load the external config file
# """
# config = None
# if path.exists(config_path):
# spec = util.spec_from_file_location("config", config_path)
# config = util.module_from_spec(spec)
# spec.loader.exec_module(config)
# else:
# import config.config as config
# return config
def get_config():
"""
Get the config
"""
user_config_path = resource_path("user_config.py")
default_config_path = resource_path("config.py")
config = (
load_external_config(user_config_path)
if path.exists(user_config_path)
else load_external_config(default_config_path)
)
return config
config_parser = configparser.ConfigParser()
user_config_path = resource_path("config/user_config.ini")
default_config_path = resource_path("config/config.ini")
config_files = [user_config_path, default_config_path]
for config_file in config_files:
if path.exists(config_file):
with open(config_file, "r", encoding="utf-8") as f:
config_parser.read_file(f)
break
return config_parser

View file

@ -160,10 +160,13 @@ async def sort_urls_by_speed_and_resolution(infoList, ffmpeg=False):
default_response_time_weight = 0.5
default_resolution_weight = 0.5
response_time_weight = getattr(
config, "response_time_weight", default_response_time_weight
response_time_weight = (
config.getfloat("Settings", "response_time_weight")
or default_response_time_weight
)
resolution_weight = (
config.getfloat("Settings", "resolution_weight") or default_resolution_weight
)
resolution_weight = getattr(config, "resolution_weight", default_resolution_weight)
# Check if weights are valid
if not (
0 <= response_time_weight <= 1

View file

@ -45,7 +45,7 @@ def filter_by_date(data):
Filter by date and limit
"""
default_recent_days = 30
use_recent_days = getattr(config, "recent_days", 30)
use_recent_days = config.getint("Settings", "recent_days")
if not isinstance(use_recent_days, int) or use_recent_days <= 0:
use_recent_days = default_recent_days
start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
@ -64,8 +64,10 @@ def filter_by_date(data):
recent_data_len = len(recent_data)
if recent_data_len == 0:
recent_data = unrecent_data
elif recent_data_len < config.urls_limit:
recent_data.extend(unrecent_data[: config.urls_limit - len(recent_data)])
elif recent_data_len < config.getint("Settings", "urls_limit"):
recent_data.extend(
unrecent_data[: config.getint("Settings", "urls_limit") - len(recent_data)]
)
return recent_data
@ -88,7 +90,7 @@ def get_total_urls_from_info_list(infoList):
Get the total urls from info list
"""
total_urls = [url for url, _, _ in infoList]
return list(dict.fromkeys(total_urls))[: int(config.urls_limit)]
return list(dict.fromkeys(total_urls))[: config.getint("Settings", "urls_limit")]
def get_total_urls_from_sorted_data(data):
@ -96,11 +98,11 @@ def get_total_urls_from_sorted_data(data):
Get the total urls with filter by date and depulicate from sorted data
"""
total_urls = []
if len(data) > config.urls_limit:
if len(data) > config.getint("Settings", "urls_limit"):
total_urls = [url for (url, _, _), _ in filter_by_date(data)]
else:
total_urls = [url for (url, _, _), _ in data]
return list(dict.fromkeys(total_urls))[: config.urls_limit]
return list(dict.fromkeys(total_urls))[: config.getint("Settings", "urls_limit")]
def is_ipv6(url):
@ -119,7 +121,7 @@ def check_url_ipv_type(url):
"""
Check if the url is compatible with the ipv type in the config
"""
ipv_type = getattr(config, "ipv_type", "ipv4")
ipv_type = config.get("Settings", "ipv_type")
if ipv_type == "ipv4":
return not is_ipv6(url)
elif ipv_type == "ipv6":
@ -134,7 +136,8 @@ def check_by_domain_blacklist(url):
"""
domain_blacklist = [
urlparse(domain).netloc if urlparse(domain).scheme else domain
for domain in getattr(config, "domain_blacklist", [])
for domain in config.get("Settings", "domain_blacklist").split(",")
if domain.strip()
]
return urlparse(url).netloc not in domain_blacklist
@ -143,7 +146,11 @@ def check_by_url_keywords_blacklist(url):
"""
Check by URL blacklist keywords
"""
url_keywords_blacklist = getattr(config, "url_keywords_blacklist", [])
url_keywords_blacklist = [
keyword
for keyword in config.get("Settings", "url_keywords_blacklist").split(",")
if keyword.strip()
]
return not any(keyword in url for keyword in url_keywords_blacklist)