This commit is contained in:
guorong.zheng 2024-07-04 17:59:53 +08:00
parent 2dda28352d
commit 7c1a6f1a3e
20 changed files with 1027 additions and 987 deletions

View file

@ -62,22 +62,6 @@ Fork 本项目并开启工作流更新
[更新日志](./CHANGELOG.md)
## 免责声明
本项目是为了提供编程学习和研究的资源。项目中收集的数据来源于网络,开发者不对数据的准确性、完整性或可靠性做任何保证。
开发者不对任何可能因使用这些代码或数据而产生的任何直接或间接损失负责。使用者应自行判断其使用的合法性和风险。
本项目的代码和数据仅供学习和研究使用,不得用于任何商业用途。任何人或组织在使用时,应遵守相关法律法规,尊重并保护开发者的权益。
如果您使用了本项目的代码或数据,即表示您已了解并同意此免责声明。如果您不同意此免责声明,您应立即停止使用本项目的代码和数据。
此外,本项目的代码和数据可能会不定期进行更新,但不保证更新的及时性和准确性,也不保证代码的稳定性和功能性。
在任何情况下,因使用或无法使用本项目的代码或数据所产生的任何损害或其他责任,开发者和任何贡献者都不承担任何责任。
使用本项目的代码或数据即表示您已经了解并接受这些条款。
## 许可证
[MIT](./LICENSE) License © 2024-PRESENT [Govin](https://github.com/guovin)

View file

@ -62,22 +62,6 @@ If you don't want to bother, and my configuration just meets your needs, you can
[Changelog](./CHANGELOG.md)
## Disclaimer
This project is provided for programming learning and research resources. The data collected in the project comes from the network, and the developer does not make any guarantees about the accuracy, completeness, or reliability of the data.
The developer is not responsible for any direct or indirect losses that may be caused by the use of these codes or data. Users should judge the legality and risk of their use by themselves.
The code and data of this project are only for learning and research use, and must not be used for any commercial purposes. Anyone or organization should abide by relevant laws and regulations when using it, respect and protect the rights and interests of developers.
If you use the code or data of this project, it means that you have understood and agreed to this disclaimer. If you do not agree with this disclaimer, you should stop using the code and data of this project immediately.
In addition, the code and data of this project may be updated irregularly, but there is no guarantee of the timeliness and accuracy of the update, nor the stability and functionality of the code.
In any case, the developer and any contributor do not assume any responsibility for any damage or other liability caused by the use or inability to use the code or data of this project.
Using the code or data of this project means that you have understood and accepted these terms.
## License
[MIT](./LICENSE) License © 2024-PRESENT [Govin](https://github.com/guovin)

0
driver/__init__.py Normal file
View file

23
driver/setup.py Normal file
View file

@ -0,0 +1,23 @@
from selenium import webdriver
def setup_driver(proxy=None):
"""
Setup the driver for selenium
"""
options = webdriver.ChromeOptions()
options.add_argument("start-maximized")
options.add_argument("--headless")
options.add_experimental_option("excludeSwitches", ["enable-logging"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument("blink-settings=imagesEnabled=false")
options.add_argument("--log-level=3")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--allow-running-insecure-content")
options.add_argument("blink-settings=imagesEnabled=false")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
if proxy:
options.add_argument("--proxy-server=%s" % proxy)
driver = webdriver.Chrome(options=options)
return driver

0
fofa/__init__.py Normal file
View file

116
fofa/request.py Normal file
View file

@ -0,0 +1,116 @@
from utils.config import get_config
from tqdm.asyncio import tqdm_asyncio
from time import time
from asyncio import Queue, get_running_loop
from request import get
from concurrent.futures import ThreadPoolExecutor
import fofa_map
from driver.setup import setup_driver
import re
from utils.retry import retry_func
from utils.channel import format_channel_name
from utils.utils import merge_objects, get_pbar_remaining
config = get_config()
timeout = 10
def get_fofa_urls_from_region_list():
"""
Get the FOFA url from region
"""
region_list = getattr(config, "region_list", [])
urls = []
region_url = getattr(fofa_map, "region_url")
if "all" in region_list:
urls = [url for url in region_url.values() if url]
else:
for region in region_list:
if region in region_url:
urls.append(region_url[region])
return urls
async def get_channels_by_fofa(callback):
"""
Get the channel by FOFA
"""
fofa_urls = get_fofa_urls_from_region_list()
fofa_urls_len = len(fofa_urls)
pbar = tqdm_asyncio(total=fofa_urls_len)
start_time = time()
fofa_results = {}
pbar.set_description(f"Processing multicast, {fofa_urls_len} regions remaining")
callback(f"正在获取组播源更新, 共{fofa_urls_len}个地区", 0)
fofa_queue = Queue()
for fofa_url in fofa_urls:
await fofa_queue.put(fofa_url)
def process_fofa_channels(fofa_url, fofa_urls_len, callback):
driver = setup_driver()
try:
retry_func(lambda: driver.get(fofa_url), name=fofa_url)
driver.get(fofa_url)
fofa_source = re.sub(r"<!--.*?-->", "", driver.page_source, flags=re.DOTALL)
urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
channels = {}
for url in urls:
try:
final_url = url + "/iptv/live/1000.json?key=txiptv"
# response = retry_func(
# lambda: get(final_url, timeout=timeout),
# name=final_url,
# )
response = get(final_url, timeout=timeout)
try:
json_data = response.json()
if json_data["code"] == 0:
try:
for item in json_data["data"]:
if isinstance(item, dict):
item_name = format_channel_name(
item.get("name")
)
item_url = item.get("url").strip()
if item_name and item_url:
total_url = url + item_url
if item_name not in channels:
channels[item_name] = [total_url]
else:
channels[item_name].append(total_url)
except Exception as e:
# print(f"Error on fofa: {e}")
continue
except Exception as e:
# print(f"{url}: {e}")
continue
except Exception as e:
# print(f"{url}: {e}")
continue
merge_objects(fofa_results, channels)
except Exception as e:
# print(e)
pass
finally:
fofa_queue.task_done()
pbar.update()
remain = fofa_urls_len - pbar.n
pbar.set_description(f"Processing multicast, {remain} regions remaining")
callback(
f"正在获取组播源更新, 剩余{remain}个地区待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / fofa_urls_len) * 100),
)
if config.open_online_search and pbar.n / fofa_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
driver.quit()
with ThreadPoolExecutor(max_workers=10) as pool:
while not fofa_queue.empty():
loop = get_running_loop()
fofa_url = await fofa_queue.get()
loop.run_in_executor(
pool, process_fofa_channels, fofa_url, fofa_urls_len, callback
)
print("Finish processing fofa url")
pbar.close()
return fofa_results

28
main.py
View file

@ -1,20 +1,20 @@
import asyncio
from utils import (
from utils.config import get_config
from utils.channel import (
get_channel_items,
update_channel_urls_txt,
update_file,
sort_urls_by_speed_and_resolution,
get_total_urls_from_info_list,
get_channels_by_subscribe_urls,
check_url_by_patterns,
get_channels_by_fofa,
get_channels_by_online_search,
format_channel_name,
resource_path,
load_external_config,
)
from utils.utils import (
update_file,
check_url_by_patterns,
get_pbar_remaining,
get_ip_address,
)
from utils.speed import sort_urls_by_speed_and_resolution, get_total_urls_from_info_list
from subscribe import get_channels_by_subscribe_urls
from fofa import get_channels_by_fofa
from online_search import get_channels_by_online_search
import logging
from logging.handlers import RotatingFileHandler
import os
@ -24,13 +24,7 @@ from time import time
from flask import Flask, render_template_string
import sys
config_path = resource_path("user_config.py")
default_config_path = resource_path("config.py")
config = (
load_external_config("user_config.py")
if os.path.exists(config_path)
else load_external_config("config.py")
)
config = get_config()
app = Flask(__name__)

View file

173
online_search/request.py Normal file
View file

@ -0,0 +1,173 @@
from asyncio import create_task, gather
from utils.speed import get_speed
from utils.channel import format_channel_name
from utils.utils import (
check_url_by_patterns,
get_pbar_remaining,
get_results_from_soup,
)
from utils.config import get_config
from proxy.request import get_proxy_list, get_proxy_list_with_test
from time import time, sleep
from driver.setup import setup_driver
from utils.retry import (
retry_func,
locate_element_with_retry,
find_clickable_element_with_retry,
)
from selenium.webdriver.common.by import By
import re
from bs4 import BeautifulSoup
from asyncio import Queue, get_running_loop
from tqdm.asyncio import tqdm_asyncio
from concurrent.futures import ThreadPoolExecutor
config = get_config()
async def use_accessible_url(callback):
"""
Check if the url is accessible
"""
callback(f"正在获取最优的在线检索节点", 0)
baseUrl1 = "https://www.foodieguide.com/iptvsearch/"
baseUrl2 = "http://tonkiang.us/"
task1 = create_task(get_speed(baseUrl1, timeout=30))
task2 = create_task(get_speed(baseUrl2, timeout=30))
task_results = await gather(task1, task2)
callback(f"获取在线检索节点完成", 100)
if task_results[0] == float("inf") and task_results[1] == float("inf"):
return None
if task_results[0] < task_results[1]:
return baseUrl1
else:
return baseUrl2
async def get_channels_by_online_search(names, callback):
"""
Get the channels by online search
"""
channels = {}
pageUrl = await use_accessible_url(callback)
if not pageUrl:
return channels
if config.open_proxy:
proxy_list = await get_proxy_list(3)
proxy_list_test = (
await get_proxy_list_with_test(pageUrl, proxy_list) if proxy_list else []
)
proxy_index = 0
start_time = time()
def process_channel_by_online_search(name, proxy=None):
driver = setup_driver(proxy)
info_list = []
try:
retry_func(lambda: driver.get(pageUrl), name=f"online search:{name}")
search_box = locate_element_with_retry(
driver, (By.XPATH, '//input[@type="text"]')
)
if not search_box:
return
search_box.clear()
search_box.send_keys(name)
submit_button = find_clickable_element_with_retry(
driver, (By.XPATH, '//input[@type="submit"]')
)
if not submit_button:
return
sleep(3)
driver.execute_script("arguments[0].click();", submit_button)
isFavorite = name in config.favorite_list
pageNum = (
config.favorite_page_num if isFavorite else config.default_page_num
)
retry_limit = 3
for page in range(1, pageNum + 1):
retries = 0
while retries < retry_limit:
try:
if page > 1:
page_link = find_clickable_element_with_retry(
driver,
(
By.XPATH,
f'//a[contains(@href, "={page}") and contains(@href, "{name}")]',
),
)
if not page_link:
break
sleep(3)
driver.execute_script("arguments[0].click();", page_link)
sleep(3)
source = re.sub(
r"<!--.*?-->",
"",
driver.page_source,
flags=re.DOTALL,
)
soup = BeautifulSoup(source, "html.parser")
if soup:
results = get_results_from_soup(soup, name)
print(name, "page:", page, "results num:", len(results))
if len(results) == 0 and retries < retry_limit - 1:
print(
f"{name}:No results found, refreshing page and retrying..."
)
driver.refresh()
retries += 1
continue
for result in results:
url, date, resolution = result
if url and check_url_by_patterns(url):
info_list.append((url, date, resolution))
break
else:
print(
f"{name}:No results found, refreshing page and retrying..."
)
driver.refresh()
retries += 1
continue
except Exception as e:
print(f"{name}:Error on page {page}: {e}")
break
if retries == retry_limit:
print(f"{name}:Reached retry limit, moving to next page")
except Exception as e:
print(f"{name}:Error on search: {e}")
pass
finally:
channels[format_channel_name(name)] = info_list
names_queue.task_done()
pbar.update()
pbar.set_description(
f"Processing online search, {names_len - pbar.n} channels remaining"
)
callback(
f"正在线上查询更新, 剩余{names_len - pbar.n}个频道待查询, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / names_len) * 100),
)
driver.quit()
names_queue = Queue()
for name in names:
await names_queue.put(name)
names_len = names_queue.qsize()
pbar = tqdm_asyncio(total=names_len)
pbar.set_description(f"Processing online search, {names_len} channels remaining")
callback(f"正在线上查询更新, 共{names_len}个频道", 0)
with ThreadPoolExecutor(max_workers=10) as pool:
while not names_queue.empty():
loop = get_running_loop()
name = await names_queue.get()
proxy = (
proxy_list_test[0] if config.open_proxy and proxy_list_test else None
)
if config.open_proxy and proxy_list_test:
proxy_index = (proxy_index + 1) % len(proxy_list_test)
loop.run_in_executor(pool, process_channel_by_online_search, name, proxy)
print("Finished processing online search")
pbar.close()
return channels

0
proxy/__init__.py Normal file
View file

91
proxy/request.py Normal file
View file

@ -0,0 +1,91 @@
from asyncio import Queue, get_running_loop, Semaphore
import re
from bs4 import BeautifulSoup
from tqdm.asyncio import tqdm_asyncio
from concurrent.futures import ThreadPoolExecutor
from driver.setup import setup_driver
from utils.retry import retry_func
from time import sleep
from utils.speed import get_speed
async def get_proxy_list(page_count=1):
"""
Get proxy list, parameter page_count is the number of pages to get
"""
url_pattern = [
"https://www.zdaye.com/free/{}/",
"https://www.kuaidaili.com/free/inha/{}/",
"https://www.kuaidaili.com/free/intr/{}/",
]
proxy_list = []
url_queue = Queue()
for page_index in range(1, page_count + 1):
for pattern in url_pattern:
url = pattern.format(page_index)
await url_queue.put(url)
pbar = tqdm_asyncio(total=url_queue.qsize(), desc="Getting proxy list")
def get_proxy(url):
driver = setup_driver()
try:
url = pattern.format(page_index)
retry_func(lambda: driver.get(url), name=url)
sleep(1)
source = re.sub(
r"<!--.*?-->",
"",
driver.page_source,
flags=re.DOTALL,
)
soup = BeautifulSoup(source, "html.parser")
table = soup.find("table")
trs = table.find_all("tr") if table else []
for tr in trs[1:]:
tds = tr.find_all("td")
ip = tds[0].get_text().strip()
port = tds[1].get_text().strip()
proxy = f"http://{ip}:{port}"
proxy_list.append(proxy)
finally:
driver.quit()
url_queue.task_done()
pbar.update()
with ThreadPoolExecutor(max_workers=10) as executor:
while not url_queue.empty():
loop = get_running_loop()
url = await url_queue.get()
loop.run_in_executor(executor, get_proxy, url)
pbar.close()
return proxy_list
async def get_proxy_list_with_test(base_url, proxy_list):
"""
Get the proxy list with speed test
"""
if not proxy_list:
return []
semaphore = Semaphore(10)
async def get_speed_task(url, timeout, proxy):
async with semaphore:
return await get_speed(url, timeout=timeout, proxy=proxy)
response_times = await tqdm_asyncio.gather(
*(get_speed_task(base_url, timeout=30, proxy=url) for url in proxy_list),
desc="Testing proxy speed",
)
proxy_list_with_test = [
(proxy, response_time)
for proxy, response_time in zip(proxy_list, response_times)
if response_time != float("inf")
]
if not proxy_list_with_test:
print("No valid proxy found")
return []
proxy_list_with_test.sort(key=lambda x: x[1])
proxy_urls = [url for url, _ in proxy_list_with_test]
print(f"{len(proxy_urls)} valid proxy found")
return proxy_urls

0
subscribe/__init__.py Normal file
View file

84
subscribe/request.py Normal file
View file

@ -0,0 +1,84 @@
from utils.config import get_config
from tqdm.asyncio import tqdm_asyncio
from time import time
from asyncio import Queue
from request import get, exceptions
from utils.retry import retry_func
import re
from utils.channel import format_channel_name
from utils.utils import get_pbar_remaining
from concurrent.futures import ThreadPoolExecutor
from asyncio import get_running_loop
config = get_config()
timeout = 10
async def get_channels_by_subscribe_urls(callback):
"""
Get the channels by subscribe urls
"""
channels = {}
pattern = r"^(.*?),(?!#genre#)(.*?)$"
subscribe_urls_len = len(config.subscribe_urls)
pbar = tqdm_asyncio(total=subscribe_urls_len)
start_time = time()
pbar.set_description(f"Processing subscribe, {subscribe_urls_len} urls remaining")
callback(f"正在获取订阅源更新, 共{subscribe_urls_len}个订阅源", 0)
subscribe_queue = Queue()
for subscribe_url in config.subscribe_urls:
await subscribe_queue.put(subscribe_url)
def process_subscribe_channels(subscribe_url):
try:
response = None
try:
response = retry_func(
lambda: get(subscribe_url, timeout=timeout),
name=subscribe_url,
)
except exceptions.Timeout:
print(f"Timeout on subscribe: {subscribe_url}")
if response:
content = response.text
lines = content.split("\n")
for line in lines:
matcher = re.match(pattern, line)
if matcher is not None:
key = matcher.group(1)
resolution_match = re.search(r"_(\((.*?)\))", key)
resolution = (
resolution_match.group(2)
if resolution_match is not None
else None
)
url = matcher.group(2)
value = (url, None, resolution)
name = format_channel_name(key)
if name in channels:
if value not in channels[name]:
channels[name].append(value)
else:
channels[name] = [value]
except Exception as e:
print(f"Error on {subscribe_url}: {e}")
finally:
subscribe_queue.task_done()
pbar.update()
remain = subscribe_urls_len - pbar.n
pbar.set_description(f"Processing subscribe, {remain} urls remaining")
callback(
f"正在获取订阅源更新, 剩余{remain}个订阅源待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / subscribe_urls_len) * 100),
)
if config.open_online_search and pbar.n / subscribe_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
with ThreadPoolExecutor(max_workers=10) as pool:
loop = get_running_loop()
subscribe_url = await subscribe_queue.get()
loop.run_in_executor(pool, process_subscribe_channels, subscribe_url)
print("Finished processing subscribe urls")
pbar.close()
return channels

938
utils.py
View file

@ -1,938 +0,0 @@
from selenium import webdriver
import aiohttp
import asyncio
from time import time
import re
import datetime
import os
import urllib.parse
import ipaddress
from urllib.parse import urlparse
import requests
import re
from bs4 import BeautifulSoup
from bs4 import NavigableString
import fofa_map
from collections import defaultdict
from tqdm.asyncio import tqdm_asyncio
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import concurrent.futures
import sys
import importlib.util
from time import sleep
import socket
timeout = 10
max_retries = 3
def retry_func(func, retries=max_retries + 1, name=""):
"""
Retry the function
"""
for i in range(retries):
try:
sleep(3)
return func()
except Exception as e:
count = retries - 1
if name and i < count:
print(f"Failed to connect to the {name}. Retrying {i+1}...")
if i == count:
break
else:
continue
def locate_element_with_retry(driver, locator, timeout=timeout, retries=max_retries):
"""
Locate the element with retry
"""
wait = WebDriverWait(driver, timeout)
for _ in range(retries):
try:
return wait.until(EC.presence_of_element_located(locator))
except TimeoutException:
driver.refresh()
return None
def find_clickable_element_with_retry(
driver, locator, timeout=timeout, retries=max_retries
):
"""
Find the clickable element with retry
"""
wait = WebDriverWait(driver, timeout)
for _ in range(retries):
try:
return wait.until(EC.element_to_be_clickable(locator))
except TimeoutException:
driver.refresh()
return None
def resource_path(relative_path, persistent=False):
"""
Get the resource path
"""
base_path = os.path.abspath(".")
total_path = os.path.join(base_path, relative_path)
if persistent or os.path.exists(total_path):
return total_path
else:
try:
base_path = sys._MEIPASS
return os.path.join(base_path, relative_path)
except Exception:
return total_path
def load_external_config(name):
"""
Load the external config file
"""
config = None
config_path = name
config_filename = os.path.join(os.path.dirname(sys.executable), config_path)
if os.path.exists(config_filename):
spec = importlib.util.spec_from_file_location(name, config_filename)
config = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config)
else:
import config
return config
config_path = resource_path("user_config.py")
default_config_path = resource_path("config.py")
config = (
load_external_config("user_config.py")
if os.path.exists(config_path)
else load_external_config("config.py")
)
def setup_driver(proxy=None):
"""
Setup the driver for selenium
"""
options = webdriver.ChromeOptions()
options.add_argument("start-maximized")
options.add_argument("--headless")
options.add_experimental_option("excludeSwitches", ["enable-logging"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument("blink-settings=imagesEnabled=false")
options.add_argument("--log-level=3")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--allow-running-insecure-content")
options.add_argument("blink-settings=imagesEnabled=false")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
if proxy:
options.add_argument("--proxy-server=%s" % proxy)
driver = webdriver.Chrome(options=options)
return driver
async def get_proxy_list(page_count=1):
"""
Get proxy list, parameter page_count is the number of pages to get
"""
url_pattern = [
"https://www.zdaye.com/free/{}/",
"https://www.kuaidaili.com/free/inha/{}/",
"https://www.kuaidaili.com/free/intr/{}/",
]
proxy_list = []
url_queue = asyncio.Queue()
for page_index in range(1, page_count + 1):
for pattern in url_pattern:
url = pattern.format(page_index)
await url_queue.put(url)
pbar = tqdm_asyncio(total=url_queue.qsize(), desc="Getting proxy list")
def get_proxy(url):
driver = setup_driver()
try:
url = pattern.format(page_index)
retry_func(lambda: driver.get(url), name=url)
sleep(1)
source = re.sub(
r"<!--.*?-->",
"",
driver.page_source,
flags=re.DOTALL,
)
soup = BeautifulSoup(source, "html.parser")
table = soup.find("table")
trs = table.find_all("tr") if table else []
for tr in trs[1:]:
tds = tr.find_all("td")
ip = tds[0].get_text().strip()
port = tds[1].get_text().strip()
proxy = f"http://{ip}:{port}"
proxy_list.append(proxy)
finally:
driver.quit()
url_queue.task_done()
pbar.update()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
while not url_queue.empty():
loop = asyncio.get_running_loop()
url = await url_queue.get()
loop.run_in_executor(executor, get_proxy, url)
pbar.close()
return proxy_list
async def get_proxy_list_with_test(base_url, proxy_list):
"""
Get the proxy list with speed test
"""
if not proxy_list:
return []
semaphore = asyncio.Semaphore(10)
async def get_speed_task(url, timeout, proxy):
async with semaphore:
return await get_speed(url, timeout=timeout, proxy=proxy)
response_times = await tqdm_asyncio.gather(
*(get_speed_task(base_url, timeout=30, proxy=url) for url in proxy_list),
desc="Testing proxy speed",
)
proxy_list_with_test = [
(proxy, response_time)
for proxy, response_time in zip(proxy_list, response_times)
if response_time != float("inf")
]
if not proxy_list_with_test:
print("No valid proxy found")
return []
proxy_list_with_test.sort(key=lambda x: x[1])
proxy_urls = [url for url, _ in proxy_list_with_test]
print(f"{len(proxy_urls)} valid proxy found")
return proxy_urls
def format_channel_name(name):
"""
Format the channel name with sub and replace and lower
"""
sub_pattern = (
r"-|_|\((.*?)\)|\[(.*?)\]| |频道|标清|高清|HD|hd|超清|超高|超高清|中央|央视|台"
)
name = re.sub(sub_pattern, "", name)
name = name.replace("plus", "+")
name = name.replace("PLUS", "+")
name = name.replace("", "+")
name = name.replace("CCTV1综合", "CCTV1")
name = name.replace("CCTV2财经", "CCTV2")
name = name.replace("CCTV3综艺", "CCTV3")
name = name.replace("CCTV4国际", "CCTV4")
name = name.replace("CCTV4中文国际", "CCTV4")
name = name.replace("CCTV4欧洲", "CCTV4")
name = name.replace("CCTV5体育", "CCTV5")
name = name.replace("CCTV5+体育赛视", "CCTV5+")
name = name.replace("CCTV5+体育赛事", "CCTV5+")
name = name.replace("CCTV5+体育", "CCTV5+")
name = name.replace("CCTV6电影", "CCTV6")
name = name.replace("CCTV7军事", "CCTV7")
name = name.replace("CCTV7军农", "CCTV7")
name = name.replace("CCTV7农业", "CCTV7")
name = name.replace("CCTV7国防军事", "CCTV7")
name = name.replace("CCTV8电视剧", "CCTV8")
name = name.replace("CCTV9记录", "CCTV9")
name = name.replace("CCTV9纪录", "CCTV9")
name = name.replace("CCTV10科教", "CCTV10")
name = name.replace("CCTV11戏曲", "CCTV11")
name = name.replace("CCTV12社会与法", "CCTV12")
name = name.replace("CCTV13新闻", "CCTV13")
name = name.replace("CCTV新闻", "CCTV13")
name = name.replace("CCTV14少儿", "CCTV14")
name = name.replace("CCTV15音乐", "CCTV15")
name = name.replace("CCTV16奥林匹克", "CCTV16")
name = name.replace("CCTV17农业农村", "CCTV17")
name = name.replace("CCTV17农业", "CCTV17")
return name.lower()
def get_channel_items():
"""
Get the channel items from the source file
"""
# Open the source file and read all lines.
user_source_file = (
"user_" + config.source_file
if os.path.exists("user_" + config.source_file)
else getattr(config, "source_file", "demo.txt")
)
# Create a dictionary to store the channels.
channels = defaultdict(lambda: defaultdict(list))
current_category = ""
pattern = r"^(.*?),(?!#genre#)(.*?)$"
with open(resource_path(user_source_file), "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if "#genre#" in line:
# This is a new channel, create a new key in the dictionary.
current_category = line.split(",")[0]
else:
# This is a url, add it to the list of urls for the current channel.
match = re.search(pattern, line)
if match is not None:
name = match.group(1).strip()
url = match.group(2).strip()
if url and url not in channels[current_category][name]:
channels[current_category][name].append(url)
return channels
def get_pbar_remaining(pbar, start_time):
"""
Get the remaining time of the progress bar
"""
try:
elapsed = time() - start_time
completed_tasks = pbar.n
if completed_tasks > 0:
avg_time_per_task = elapsed / completed_tasks
remaining_tasks = pbar.total - completed_tasks
remaining_time = pbar.format_interval(avg_time_per_task * remaining_tasks)
else:
remaining_time = "未知"
return remaining_time
except Exception as e:
print(f"Error: {e}")
async def get_channels_by_subscribe_urls(callback):
"""
Get the channels by subscribe urls
"""
channels = {}
pattern = r"^(.*?),(?!#genre#)(.*?)$"
subscribe_urls_len = len(config.subscribe_urls)
pbar = tqdm_asyncio(total=subscribe_urls_len)
start_time = time()
pbar.set_description(f"Processing subscribe, {subscribe_urls_len} urls remaining")
callback(f"正在获取订阅源更新, 共{subscribe_urls_len}个订阅源", 0)
subscribe_queue = asyncio.Queue()
for subscribe_url in config.subscribe_urls:
await subscribe_queue.put(subscribe_url)
def process_subscribe_channels(subscribe_url):
try:
response = None
try:
response = retry_func(
lambda: requests.get(subscribe_url, timeout=timeout),
name=subscribe_url,
)
except requests.exceptions.Timeout:
print(f"Timeout on subscribe: {subscribe_url}")
if response:
content = response.text
lines = content.split("\n")
for line in lines:
matcher = re.match(pattern, line)
if matcher is not None:
key = matcher.group(1)
resolution_match = re.search(r"_(\((.*?)\))", key)
resolution = (
resolution_match.group(2)
if resolution_match is not None
else None
)
url = matcher.group(2)
value = (url, None, resolution)
name = format_channel_name(key)
if name in channels:
if value not in channels[name]:
channels[name].append(value)
else:
channels[name] = [value]
except Exception as e:
print(f"Error on {subscribe_url}: {e}")
finally:
subscribe_queue.task_done()
pbar.update()
remain = subscribe_urls_len - pbar.n
pbar.set_description(f"Processing subscribe, {remain} urls remaining")
callback(
f"正在获取订阅源更新, 剩余{remain}个订阅源待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / subscribe_urls_len) * 100),
)
if config.open_online_search and pbar.n / subscribe_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
loop = asyncio.get_running_loop()
subscribe_url = await subscribe_queue.get()
loop.run_in_executor(pool, process_subscribe_channels, subscribe_url)
print("Finished processing subscribe urls")
pbar.close()
return channels
async def get_channels_by_online_search(names, callback):
"""
Get the channels by online search
"""
channels = {}
pageUrl = await use_accessible_url(callback)
if not pageUrl:
return channels
if config.open_proxy:
proxy_list = await get_proxy_list(3)
proxy_list_test = (
await get_proxy_list_with_test(pageUrl, proxy_list) if proxy_list else []
)
proxy_index = 0
start_time = time()
def process_channel_by_online_search(name, proxy=None):
driver = setup_driver(proxy)
info_list = []
try:
retry_func(lambda: driver.get(pageUrl), name=f"online search:{name}")
search_box = locate_element_with_retry(
driver, (By.XPATH, '//input[@type="text"]')
)
if not search_box:
return
search_box.clear()
search_box.send_keys(name)
submit_button = find_clickable_element_with_retry(
driver, (By.XPATH, '//input[@type="submit"]')
)
if not submit_button:
return
sleep(3)
driver.execute_script("arguments[0].click();", submit_button)
isFavorite = name in config.favorite_list
pageNum = (
config.favorite_page_num if isFavorite else config.default_page_num
)
retry_limit = 3
for page in range(1, pageNum + 1):
retries = 0
while retries < retry_limit:
try:
if page > 1:
page_link = find_clickable_element_with_retry(
driver,
(
By.XPATH,
f'//a[contains(@href, "={page}") and contains(@href, "{name}")]',
),
)
if not page_link:
break
sleep(3)
driver.execute_script("arguments[0].click();", page_link)
sleep(3)
source = re.sub(
r"<!--.*?-->",
"",
driver.page_source,
flags=re.DOTALL,
)
soup = BeautifulSoup(source, "html.parser")
if soup:
results = get_results_from_soup(soup, name)
print(name, "page:", page, "results num:", len(results))
if len(results) == 0 and retries < retry_limit - 1:
print(
f"{name}:No results found, refreshing page and retrying..."
)
driver.refresh()
retries += 1
continue
for result in results:
url, date, resolution = result
if url and check_url_by_patterns(url):
info_list.append((url, date, resolution))
break
else:
print(
f"{name}:No results found, refreshing page and retrying..."
)
driver.refresh()
retries += 1
continue
except Exception as e:
print(f"{name}:Error on page {page}: {e}")
break
if retries == retry_limit:
print(f"{name}:Reached retry limit, moving to next page")
except Exception as e:
print(f"{name}:Error on search: {e}")
pass
finally:
channels[format_channel_name(name)] = info_list
names_queue.task_done()
pbar.update()
pbar.set_description(
f"Processing online search, {names_len - pbar.n} channels remaining"
)
callback(
f"正在线上查询更新, 剩余{names_len - pbar.n}个频道待查询, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / names_len) * 100),
)
driver.quit()
names_queue = asyncio.Queue()
for name in names:
await names_queue.put(name)
names_len = names_queue.qsize()
pbar = tqdm_asyncio(total=names_len)
pbar.set_description(f"Processing online search, {names_len} channels remaining")
callback(f"正在线上查询更新, 共{names_len}个频道", 0)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
while not names_queue.empty():
loop = asyncio.get_running_loop()
name = await names_queue.get()
proxy = (
proxy_list_test[0] if config.open_proxy and proxy_list_test else None
)
if config.open_proxy and proxy_list_test:
proxy_index = (proxy_index + 1) % len(proxy_list_test)
loop.run_in_executor(pool, process_channel_by_online_search, name, proxy)
print("Finished processing online search")
pbar.close()
return channels
def update_channel_urls_txt(cate, name, urls):
"""
Update the category and channel urls to the final file
"""
genre_line = cate + ",#genre#\n"
filename = "result_new.txt"
if not os.path.exists(filename):
open(filename, "w").close()
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
with open(filename, "a", encoding="utf-8") as f:
if genre_line not in content:
f.write(genre_line)
for url in urls:
if url is not None:
f.write(name + "," + url + "\n")
def update_file(final_file, old_file):
"""
Update the file
"""
old_file_path = resource_path(old_file, persistent=True)
final_file_path = resource_path(final_file, persistent=True)
if os.path.exists(old_file_path):
os.replace(old_file_path, final_file_path)
def get_channel_url(element):
"""
Get the url, date and resolution
"""
url = None
urlRegex = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
url_search = re.search(
urlRegex,
element.get_text(strip=True),
)
if url_search:
url = url_search.group()
return url
def get_channel_info(element):
"""
Get the channel info
"""
date, resolution = None, None
info_text = element.get_text(strip=True)
if info_text:
date, resolution = (
(info_text.partition(" ")[0] if info_text.partition(" ")[0] else None),
(
info_text.partition(" ")[2].partition("")[2]
if info_text.partition(" ")[2].partition("")[2]
else None
),
)
return date, resolution
def get_results_from_soup(soup, name):
"""
Get the results from the soup
"""
results = []
for element in soup.descendants:
if isinstance(element, NavigableString):
url = get_channel_url(element)
if url and not any(item[0] == url for item in results):
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
if url_element:
name_element = url_element.find_previous_sibling()
if name_element:
channel_name = name_element.get_text(strip=True)
if format_channel_name(name) == format_channel_name(
channel_name
):
info_element = url_element.find_next_sibling()
date, resolution = get_channel_info(info_element)
results.append((url, date, resolution))
return results
async def get_speed(url, timeout=timeout, proxy=None):
"""
Get the speed of the url
"""
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=False), trust_env=True
) as session:
start = time()
end = None
try:
async with session.get(url, timeout=timeout, proxy=proxy) as response:
resStatus = response.status
if resStatus == 200:
end = time()
else:
return float("inf")
except Exception as e:
return float("inf")
return int(round((end - start) * 1000)) if end else float("inf")
async def sort_urls_by_speed_and_resolution(infoList):
"""
Sort by speed and resolution
"""
response_times = await asyncio.gather(*(get_speed(url) for url, _, _ in infoList))
valid_responses = [
(info, rt) for info, rt in zip(infoList, response_times) if rt != float("inf")
]
def extract_resolution(resolution_str):
numbers = re.findall(r"\d+x\d+", resolution_str)
if numbers:
width, height = map(int, numbers[0].split("x"))
return width * height
else:
return 0
default_response_time_weight = 0.5
default_resolution_weight = 0.5
response_time_weight = getattr(
config, "response_time_weight", default_response_time_weight
)
resolution_weight = getattr(config, "resolution_weight", default_resolution_weight)
# Check if weights are valid
if not (
0 <= response_time_weight <= 1
and 0 <= resolution_weight <= 1
and response_time_weight + resolution_weight == 1
):
response_time_weight = default_response_time_weight
resolution_weight = default_resolution_weight
def combined_key(item):
(_, _, resolution), response_time = item
resolution_value = extract_resolution(resolution) if resolution else 0
return (
-(response_time_weight * response_time)
+ resolution_weight * resolution_value
)
sorted_res = sorted(valid_responses, key=combined_key, reverse=True)
return sorted_res
def filter_by_date(data):
"""
Filter by date and limit
"""
default_recent_days = 30
use_recent_days = getattr(config, "recent_days", 30)
if not isinstance(use_recent_days, int) or use_recent_days <= 0:
use_recent_days = default_recent_days
start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
recent_data = []
unrecent_data = []
for (url, date, resolution), response_time in data:
item = ((url, date, resolution), response_time)
if date:
date = datetime.datetime.strptime(date, "%m-%d-%Y")
if date >= start_date:
recent_data.append(item)
else:
unrecent_data.append(item)
else:
unrecent_data.append(item)
recent_data_len = len(recent_data)
if recent_data_len == 0:
recent_data = unrecent_data
elif recent_data_len < config.urls_limit:
recent_data.extend(unrecent_data[: config.urls_limit - len(recent_data)])
return recent_data
def get_total_urls_from_info_list(infoList):
"""
Get the total urls from info list
"""
total_urls = [url for url, _, _ in infoList]
return list(dict.fromkeys(total_urls))[: int(config.urls_limit)]
def get_total_urls_from_sorted_data(data):
"""
Get the total urls with filter by date and depulicate from sorted data
"""
total_urls = []
if len(data) > config.urls_limit:
total_urls = [url for (url, _, _), _ in filter_by_date(data)]
else:
total_urls = [url for (url, _, _), _ in data]
return list(dict.fromkeys(total_urls))[: config.urls_limit]
def is_ipv6(url):
"""
Check if the url is ipv6
"""
try:
host = urllib.parse.urlparse(url).hostname
ipaddress.IPv6Address(host)
return True
except ValueError:
return False
def check_url_ipv_type(url):
"""
Check if the url is compatible with the ipv type in the config
"""
ipv_type = getattr(config, "ipv_type", "ipv4")
if ipv_type == "ipv4":
return not is_ipv6(url)
elif ipv_type == "ipv6":
return is_ipv6(url)
else:
return True
def check_by_domain_blacklist(url):
"""
Check by domain blacklist
"""
domain_blacklist = [
urlparse(domain).netloc if urlparse(domain).scheme else domain
for domain in getattr(config, "domain_blacklist", [])
]
return urlparse(url).netloc not in domain_blacklist
def check_by_url_keywords_blacklist(url):
"""
Check by URL blacklist keywords
"""
url_keywords_blacklist = getattr(config, "url_keywords_blacklist", [])
return not any(keyword in url for keyword in url_keywords_blacklist)
def check_url_by_patterns(url):
"""
Check the url by patterns
"""
return (
check_url_ipv_type(url)
and check_by_domain_blacklist(url)
and check_by_url_keywords_blacklist(url)
)
def filter_urls_by_patterns(urls):
"""
Filter urls by patterns
"""
urls = [url for url in urls if check_url_ipv_type(url)]
urls = [url for url in urls if check_by_domain_blacklist(url)]
urls = [url for url in urls if check_by_url_keywords_blacklist(url)]
return urls
async def use_accessible_url(callback):
"""
Check if the url is accessible
"""
callback(f"正在获取最优的在线检索节点", 0)
baseUrl1 = "https://www.foodieguide.com/iptvsearch/"
baseUrl2 = "http://tonkiang.us/"
task1 = asyncio.create_task(get_speed(baseUrl1, timeout=30))
task2 = asyncio.create_task(get_speed(baseUrl2, timeout=30))
task_results = await asyncio.gather(task1, task2)
callback(f"获取在线检索节点完成", 100)
if task_results[0] == float("inf") and task_results[1] == float("inf"):
return None
if task_results[0] < task_results[1]:
return baseUrl1
else:
return baseUrl2
def get_fofa_urls_from_region_list():
"""
Get the FOFA url from region
"""
region_list = getattr(config, "region_list", [])
urls = []
region_url = getattr(fofa_map, "region_url")
if "all" in region_list:
urls = [url for url in region_url.values() if url]
else:
for region in region_list:
if region in region_url:
urls.append(region_url[region])
return urls
async def get_channels_by_fofa(callback):
"""
Get the channel by FOFA
"""
fofa_urls = get_fofa_urls_from_region_list()
fofa_urls_len = len(fofa_urls)
pbar = tqdm_asyncio(total=fofa_urls_len)
start_time = time()
fofa_results = {}
pbar.set_description(f"Processing multicast, {fofa_urls_len} regions remaining")
callback(f"正在获取组播源更新, 共{fofa_urls_len}个地区", 0)
fofa_queue = asyncio.Queue()
for fofa_url in fofa_urls:
await fofa_queue.put(fofa_url)
def process_fofa_channels(fofa_url, fofa_urls_len, callback):
driver = setup_driver()
try:
retry_func(lambda: driver.get(fofa_url), name=fofa_url)
driver.get(fofa_url)
fofa_source = re.sub(r"<!--.*?-->", "", driver.page_source, flags=re.DOTALL)
urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
channels = {}
for url in urls:
try:
final_url = url + "/iptv/live/1000.json?key=txiptv"
# response = retry_func(
# lambda: requests.get(final_url, timeout=timeout),
# name=final_url,
# )
response = requests.get(final_url, timeout=timeout)
try:
json_data = response.json()
if json_data["code"] == 0:
try:
for item in json_data["data"]:
if isinstance(item, dict):
item_name = format_channel_name(
item.get("name")
)
item_url = item.get("url").strip()
if item_name and item_url:
total_url = url + item_url
if item_name not in channels:
channels[item_name] = [total_url]
else:
channels[item_name].append(total_url)
except Exception as e:
# print(f"Error on fofa: {e}")
continue
except Exception as e:
# print(f"{url}: {e}")
continue
except Exception as e:
# print(f"{url}: {e}")
continue
merge_objects(fofa_results, channels)
except Exception as e:
# print(e)
pass
finally:
fofa_queue.task_done()
pbar.update()
remain = fofa_urls_len - pbar.n
pbar.set_description(f"Processing multicast, {remain} regions remaining")
callback(
f"正在获取组播源更新, 剩余{remain}个地区待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / fofa_urls_len) * 100),
)
if config.open_online_search and pbar.n / fofa_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
driver.quit()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
while not fofa_queue.empty():
loop = asyncio.get_running_loop()
fofa_url = await fofa_queue.get()
loop.run_in_executor(
pool, process_fofa_channels, fofa_url, fofa_urls_len, callback
)
print("Finish processing fofa url")
pbar.close()
return fofa_results
def merge_objects(*objects):
"""
Merge objects
"""
merged_dict = {}
for obj in objects:
if not isinstance(obj, dict):
raise TypeError("All input objects must be dictionaries")
for key, value in obj.items():
if key not in merged_dict:
merged_dict[key] = set()
if isinstance(value, set):
merged_dict[key].update(value)
elif isinstance(value, list):
for item in value:
merged_dict[key].add(item)
else:
merged_dict[key].add(value)
for key, value in merged_dict.items():
merged_dict[key] = list(value)
return merged_dict
def get_ip_address():
"""
Get the IP address
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("10.255.255.255", 1))
IP = s.getsockname()[0]
except Exception:
IP = "127.0.0.1"
finally:
s.close()
return f"http://{IP}:8000"

0
utils/__init__.py Normal file
View file

136
utils/channel.py Normal file
View file

@ -0,0 +1,136 @@
from utils.config import get_config, resource_path
import os
from collections import defaultdict
import re
config = get_config()
def get_channel_items():
"""
Get the channel items from the source file
"""
# Open the source file and read all lines.
user_source_file = (
"user_" + config.source_file
if os.path.exists("user_" + config.source_file)
else getattr(config, "source_file", "demo.txt")
)
# Create a dictionary to store the channels.
channels = defaultdict(lambda: defaultdict(list))
current_category = ""
pattern = r"^(.*?),(?!#genre#)(.*?)$"
with open(resource_path(user_source_file), "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if "#genre#" in line:
# This is a new channel, create a new key in the dictionary.
current_category = line.split(",")[0]
else:
# This is a url, add it to the list of urls for the current channel.
match = re.search(pattern, line)
if match is not None:
name = match.group(1).strip()
url = match.group(2).strip()
if url and url not in channels[current_category][name]:
channels[current_category][name].append(url)
return channels
def format_channel_name(name):
"""
Format the channel name with sub and replace and lower
"""
sub_pattern = (
r"-|_|\((.*?)\)|\[(.*?)\]| |频道|标清|高清|HD|hd|超清|超高|超高清|中央|央视|台"
)
name = re.sub(sub_pattern, "", name)
name = name.replace("plus", "+")
name = name.replace("PLUS", "+")
name = name.replace("", "+")
name = name.replace("CCTV1综合", "CCTV1")
name = name.replace("CCTV2财经", "CCTV2")
name = name.replace("CCTV3综艺", "CCTV3")
name = name.replace("CCTV4国际", "CCTV4")
name = name.replace("CCTV4中文国际", "CCTV4")
name = name.replace("CCTV4欧洲", "CCTV4")
name = name.replace("CCTV5体育", "CCTV5")
name = name.replace("CCTV5+体育赛视", "CCTV5+")
name = name.replace("CCTV5+体育赛事", "CCTV5+")
name = name.replace("CCTV5+体育", "CCTV5+")
name = name.replace("CCTV6电影", "CCTV6")
name = name.replace("CCTV7军事", "CCTV7")
name = name.replace("CCTV7军农", "CCTV7")
name = name.replace("CCTV7农业", "CCTV7")
name = name.replace("CCTV7国防军事", "CCTV7")
name = name.replace("CCTV8电视剧", "CCTV8")
name = name.replace("CCTV9记录", "CCTV9")
name = name.replace("CCTV9纪录", "CCTV9")
name = name.replace("CCTV10科教", "CCTV10")
name = name.replace("CCTV11戏曲", "CCTV11")
name = name.replace("CCTV12社会与法", "CCTV12")
name = name.replace("CCTV13新闻", "CCTV13")
name = name.replace("CCTV新闻", "CCTV13")
name = name.replace("CCTV14少儿", "CCTV14")
name = name.replace("CCTV15音乐", "CCTV15")
name = name.replace("CCTV16奥林匹克", "CCTV16")
name = name.replace("CCTV17农业农村", "CCTV17")
name = name.replace("CCTV17农业", "CCTV17")
return name.lower()
def update_channel_urls_txt(cate, name, urls):
"""
Update the category and channel urls to the final file
"""
genre_line = cate + ",#genre#\n"
filename = "result_new.txt"
if not os.path.exists(filename):
open(filename, "w").close()
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
with open(filename, "a", encoding="utf-8") as f:
if genre_line not in content:
f.write(genre_line)
for url in urls:
if url is not None:
f.write(name + "," + url + "\n")
def get_channel_url(element):
"""
Get the url, date and resolution
"""
url = None
urlRegex = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
url_search = re.search(
urlRegex,
element.get_text(strip=True),
)
if url_search:
url = url_search.group()
return url
def get_channel_info(element):
"""
Get the channel info
"""
date, resolution = None, None
info_text = element.get_text(strip=True)
if info_text:
date, resolution = (
(info_text.partition(" ")[0] if info_text.partition(" ")[0] else None),
(
info_text.partition(" ")[2].partition("")[2]
if info_text.partition(" ")[2].partition("")[2]
else None
),
)
return date, resolution

50
utils/config.py Normal file
View file

@ -0,0 +1,50 @@
from os import path
from sys import _MEIPASS, executable
from importlib import util
def resource_path(relative_path, persistent=False):
"""
Get the resource path
"""
base_path = path.abspath(".")
total_path = path.join(base_path, relative_path)
if persistent or path.exists(total_path):
return total_path
else:
try:
base_path = _MEIPASS
return path.join(base_path, relative_path)
except Exception:
return total_path
def load_external_config(name):
"""
Load the external config file
"""
config = None
config_path = name
config_filename = path.join(path.dirname(executable), config_path)
if path.exists(config_filename):
spec = util.spec_from_file_location(name, config_filename)
config = util.module_from_spec(spec)
spec.loader.exec_module(config)
else:
import config
return config
def get_config():
"""
Get the config
"""
config_path = resource_path("user_config.py")
config = (
load_external_config("user_config.py")
if path.exists(config_path)
else load_external_config("config.py")
)
return config

53
utils/retry.py Normal file
View file

@ -0,0 +1,53 @@
from time import sleep
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
max_retries = 3
timeout = 10
def retry_func(func, retries=max_retries + 1, name=""):
"""
Retry the function
"""
for i in range(retries):
try:
sleep(3)
return func()
except Exception as e:
count = retries - 1
if name and i < count:
print(f"Failed to connect to the {name}. Retrying {i+1}...")
if i == count:
break
else:
continue
def locate_element_with_retry(driver, locator, timeout=timeout, retries=max_retries):
"""
Locate the element with retry
"""
wait = WebDriverWait(driver, timeout)
for _ in range(retries):
try:
return wait.until(EC.presence_of_element_located(locator))
except TimeoutException:
driver.refresh()
return None
def find_clickable_element_with_retry(
driver, locator, timeout=timeout, retries=max_retries
):
"""
Find the clickable element with retry
"""
wait = WebDriverWait(driver, timeout)
for _ in range(retries):
try:
return wait.until(EC.element_to_be_clickable(locator))
except TimeoutException:
driver.refresh()
return None

74
utils/speed.py Normal file
View file

@ -0,0 +1,74 @@
from aiohttp import ClientSession, TCPConnector
from time import time
from asyncio import gather
import re
from utils.config import get_config
config = get_config()
timeout = 10
async def get_speed(url, timeout=timeout, proxy=None):
"""
Get the speed of the url
"""
async with ClientSession(
connector=TCPConnector(verify_ssl=False), trust_env=True
) as session:
start = time()
end = None
try:
async with session.get(url, timeout=timeout, proxy=proxy) as response:
resStatus = response.status
if resStatus == 200:
end = time()
else:
return float("inf")
except Exception as e:
return float("inf")
return int(round((end - start) * 1000)) if end else float("inf")
async def sort_urls_by_speed_and_resolution(infoList):
"""
Sort by speed and resolution
"""
response_times = await gather(*(get_speed(url) for url, _, _ in infoList))
valid_responses = [
(info, rt) for info, rt in zip(infoList, response_times) if rt != float("inf")
]
def extract_resolution(resolution_str):
numbers = re.findall(r"\d+x\d+", resolution_str)
if numbers:
width, height = map(int, numbers[0].split("x"))
return width * height
else:
return 0
default_response_time_weight = 0.5
default_resolution_weight = 0.5
response_time_weight = getattr(
config, "response_time_weight", default_response_time_weight
)
resolution_weight = getattr(config, "resolution_weight", default_resolution_weight)
# Check if weights are valid
if not (
0 <= response_time_weight <= 1
and 0 <= resolution_weight <= 1
and response_time_weight + resolution_weight == 1
):
response_time_weight = default_response_time_weight
resolution_weight = default_resolution_weight
def combined_key(item):
(_, _, resolution), response_time = item
resolution_value = extract_resolution(resolution) if resolution else 0
return (
-(response_time_weight * response_time)
+ resolution_weight * resolution_value
)
sorted_res = sorted(valid_responses, key=combined_key, reverse=True)
return sorted_res

216
utils/utils.py Normal file
View file

@ -0,0 +1,216 @@
from time import time
import datetime
import os
import urllib.parse
import ipaddress
from urllib.parse import urlparse
from bs4 import NavigableString
import socket
from utils.config import get_config, resource_path
from utils.channel import get_channel_url, get_channel_info, format_channel_name
config = get_config()
timeout = 10
def get_pbar_remaining(pbar, start_time):
"""
Get the remaining time of the progress bar
"""
try:
elapsed = time() - start_time
completed_tasks = pbar.n
if completed_tasks > 0:
avg_time_per_task = elapsed / completed_tasks
remaining_tasks = pbar.total - completed_tasks
remaining_time = pbar.format_interval(avg_time_per_task * remaining_tasks)
else:
remaining_time = "未知"
return remaining_time
except Exception as e:
print(f"Error: {e}")
def update_file(final_file, old_file):
"""
Update the file
"""
old_file_path = resource_path(old_file, persistent=True)
final_file_path = resource_path(final_file, persistent=True)
if os.path.exists(old_file_path):
os.replace(old_file_path, final_file_path)
def get_results_from_soup(soup, name):
"""
Get the results from the soup
"""
results = []
for element in soup.descendants:
if isinstance(element, NavigableString):
url = get_channel_url(element)
if url and not any(item[0] == url for item in results):
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
if url_element:
name_element = url_element.find_previous_sibling()
if name_element:
channel_name = name_element.get_text(strip=True)
if format_channel_name(name) == format_channel_name(
channel_name
):
info_element = url_element.find_next_sibling()
date, resolution = get_channel_info(info_element)
results.append((url, date, resolution))
return results
def filter_by_date(data):
"""
Filter by date and limit
"""
default_recent_days = 30
use_recent_days = getattr(config, "recent_days", 30)
if not isinstance(use_recent_days, int) or use_recent_days <= 0:
use_recent_days = default_recent_days
start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
recent_data = []
unrecent_data = []
for (url, date, resolution), response_time in data:
item = ((url, date, resolution), response_time)
if date:
date = datetime.datetime.strptime(date, "%m-%d-%Y")
if date >= start_date:
recent_data.append(item)
else:
unrecent_data.append(item)
else:
unrecent_data.append(item)
recent_data_len = len(recent_data)
if recent_data_len == 0:
recent_data = unrecent_data
elif recent_data_len < config.urls_limit:
recent_data.extend(unrecent_data[: config.urls_limit - len(recent_data)])
return recent_data
def get_total_urls_from_info_list(infoList):
"""
Get the total urls from info list
"""
total_urls = [url for url, _, _ in infoList]
return list(dict.fromkeys(total_urls))[: int(config.urls_limit)]
def get_total_urls_from_sorted_data(data):
"""
Get the total urls with filter by date and depulicate from sorted data
"""
total_urls = []
if len(data) > config.urls_limit:
total_urls = [url for (url, _, _), _ in filter_by_date(data)]
else:
total_urls = [url for (url, _, _), _ in data]
return list(dict.fromkeys(total_urls))[: config.urls_limit]
def is_ipv6(url):
"""
Check if the url is ipv6
"""
try:
host = urllib.parse.urlparse(url).hostname
ipaddress.IPv6Address(host)
return True
except ValueError:
return False
def check_url_ipv_type(url):
"""
Check if the url is compatible with the ipv type in the config
"""
ipv_type = getattr(config, "ipv_type", "ipv4")
if ipv_type == "ipv4":
return not is_ipv6(url)
elif ipv_type == "ipv6":
return is_ipv6(url)
else:
return True
def check_by_domain_blacklist(url):
"""
Check by domain blacklist
"""
domain_blacklist = [
urlparse(domain).netloc if urlparse(domain).scheme else domain
for domain in getattr(config, "domain_blacklist", [])
]
return urlparse(url).netloc not in domain_blacklist
def check_by_url_keywords_blacklist(url):
"""
Check by URL blacklist keywords
"""
url_keywords_blacklist = getattr(config, "url_keywords_blacklist", [])
return not any(keyword in url for keyword in url_keywords_blacklist)
def check_url_by_patterns(url):
"""
Check the url by patterns
"""
return (
check_url_ipv_type(url)
and check_by_domain_blacklist(url)
and check_by_url_keywords_blacklist(url)
)
def filter_urls_by_patterns(urls):
"""
Filter urls by patterns
"""
urls = [url for url in urls if check_url_ipv_type(url)]
urls = [url for url in urls if check_by_domain_blacklist(url)]
urls = [url for url in urls if check_by_url_keywords_blacklist(url)]
return urls
def merge_objects(*objects):
"""
Merge objects
"""
merged_dict = {}
for obj in objects:
if not isinstance(obj, dict):
raise TypeError("All input objects must be dictionaries")
for key, value in obj.items():
if key not in merged_dict:
merged_dict[key] = set()
if isinstance(value, set):
merged_dict[key].update(value)
elif isinstance(value, list):
for item in value:
merged_dict[key].add(item)
else:
merged_dict[key].add(value)
for key, value in merged_dict.items():
merged_dict[key] = list(value)
return merged_dict
def get_ip_address():
"""
Get the IP address
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("10.255.255.255", 1))
IP = s.getsockname()[0]
except Exception:
IP = "127.0.0.1"
finally:
s.close()
return f"http://{IP}:8000"