feat: 新增 musicinfos 接口用于批量查询歌曲信息

This commit is contained in:
涵曦 2024-09-23 01:10:04 +08:00
parent 7c9576874b
commit 44860d495e
4 changed files with 139 additions and 89 deletions

View File

@ -5,7 +5,7 @@
groups = ["default", "dev", "lint"] groups = ["default", "dev", "lint"]
strategy = ["inherit_metadata"] strategy = ["inherit_metadata"]
lock_version = "4.5.0" lock_version = "4.5.0"
content_hash = "sha256:743f0a2ac59e1902f4f5389375ec5df7e2469502b0dff7cef40d391febd1ad92" content_hash = "sha256:d7209c5b89041122b16847a761d8f522ea404543d50b859fb283f9061d4e9f36"
[[metadata.targets]] [[metadata.targets]]
requires_python = "==3.10.12" requires_python = "==3.10.12"
@ -757,6 +757,17 @@ files = [
{file = "mutagen-1.47.0.tar.gz", hash = "sha256:719fadef0a978c31b4cf3c956261b3c58b6948b32023078a2117b1de09f0fc99"}, {file = "mutagen-1.47.0.tar.gz", hash = "sha256:719fadef0a978c31b4cf3c956261b3c58b6948b32023078a2117b1de09f0fc99"},
] ]
[[package]]
name = "opencc-python-reimplemented"
version = "0.1.7"
summary = "OpenCC made with Python"
groups = ["default"]
marker = "python_full_version == \"3.10.12\""
files = [
{file = "opencc-python-reimplemented-0.1.7.tar.gz", hash = "sha256:4f777ea3461a25257a7b876112cfa90bb6acabc6dfb843bf4d11266e43579dee"},
{file = "opencc_python_reimplemented-0.1.7-py2.py3-none-any.whl", hash = "sha256:41b3b92943c7bed291f448e9c7fad4b577c8c2eae30fcfe5a74edf8818493aa6"},
]
[[package]] [[package]]
name = "packaging" name = "packaging"
version = "24.1" version = "24.1"

View File

@ -12,7 +12,7 @@ from dataclasses import asdict
from typing import Annotated from typing import Annotated
import aiofiles import aiofiles
from fastapi import Depends, FastAPI, HTTPException, Request, status from fastapi import Depends, FastAPI, HTTPException, Query, Request, status
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.security import HTTPBasic, HTTPBasicCredentials
@ -241,6 +241,25 @@ async def musicinfo(
return info return info
@app.get("/musicinfos")
async def musicinfos(
name: list[str] = Query(None),
musictag: bool = False,
Verifcation=Depends(verification),
):
ret = []
for music_name in name:
url = xiaomusic.get_music_url(music_name)
info = {
"name": music_name,
"url": url,
}
if musictag:
info["tags"] = xiaomusic.get_music_tags(music_name)
ret.append(info)
return ret
@app.get("/curplaylist") @app.get("/curplaylist")
async def curplaylist(did: str = "", Verifcation=Depends(verification)): async def curplaylist(did: str = "", Verifcation=Depends(verification)):
if not xiaomusic.did_exist(did): if not xiaomusic.did_exist(did):

View File

@ -16,6 +16,7 @@ import string
import subprocess import subprocess
import tempfile import tempfile
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from dataclasses import dataclass
from http.cookies import SimpleCookie from http.cookies import SimpleCookie
from urllib.parse import urlparse from urllib.parse import urlparse
@ -28,13 +29,12 @@ from mutagen.mp3 import MP3
from mutagen.mp4 import MP4 from mutagen.mp4 import MP4
from mutagen.oggvorbis import OggVorbis from mutagen.oggvorbis import OggVorbis
from mutagen.wave import WAVE from mutagen.wave import WAVE
from opencc import OpenCC
from requests.utils import cookiejar_from_dict from requests.utils import cookiejar_from_dict
from xiaomusic.const import SUPPORT_MUSIC_TYPE from xiaomusic.const import SUPPORT_MUSIC_TYPE
from opencc import OpenCC
cc = OpenCC("t2s") # convert from Traditional Chinese to Simplified Chinese
cc = OpenCC('t2s') # convert from Traditional Chinese to Simplified Chinese
### HELP FUNCTION ### ### HELP FUNCTION ###
@ -90,8 +90,9 @@ def validate_proxy(proxy_str: str) -> bool:
# 模糊搜索 # 模糊搜索
def fuzzyfinder(user_input, collection, extra_search_index=None): def fuzzyfinder(user_input, collection, extra_search_index=None):
return find_best_match(user_input, collection, cutoff=0.1, n=10, return find_best_match(
extra_search_index=extra_search_index) user_input, collection, cutoff=0.1, n=10, extra_search_index=extra_search_index
)
def traditional_to_simple(to_convert: str): def traditional_to_simple(to_convert: str):
@ -107,11 +108,11 @@ def keyword_detection(user_input, str_list, n):
matched.append(item) matched.append(item)
else: else:
remains.append(item) remains.append(item)
# 如果 n 是 -1如果 n 大于匹配的数量,返回所有匹配的结果 # 如果 n 是 -1如果 n 大于匹配的数量,返回所有匹配的结果
if n == -1 or n > len(matched): if n == -1 or n > len(matched):
return matched, remains return matched, remains
# 随机选择 n 个匹配的结果 # 随机选择 n 个匹配的结果
return random.sample(matched, n), remains return random.sample(matched, n), remains
@ -120,14 +121,14 @@ def real_search(prompt, candidates, cutoff, n):
matches, remains = keyword_detection(prompt, candidates, n=n) matches, remains = keyword_detection(prompt, candidates, n=n)
if len(matches) < n: if len(matches) < n:
# 如果没有准确关键词匹配,开始模糊匹配 # 如果没有准确关键词匹配,开始模糊匹配
matches += difflib.get_close_matches( matches += difflib.get_close_matches(prompt, remains, n=n, cutoff=cutoff)
prompt, remains, n=n, cutoff=cutoff
)
return matches return matches
def find_best_match(user_input, collection, cutoff=0.6, n=1, extra_search_index=None): def find_best_match(user_input, collection, cutoff=0.6, n=1, extra_search_index=None):
lower_collection = {traditional_to_simple(item.lower()): item for item in collection} lower_collection = {
traditional_to_simple(item.lower()): item for item in collection
}
user_input = traditional_to_simple(user_input.lower()) user_input = traditional_to_simple(user_input.lower())
matches = real_search(user_input, lower_collection.keys(), cutoff, n) matches = real_search(user_input, lower_collection.keys(), cutoff, n)
cur_matched_collection = [lower_collection[match] for match in matches] cur_matched_collection = [lower_collection[match] for match in matches]
@ -141,8 +142,8 @@ def find_best_match(user_input, collection, cutoff=0.6, n=1, extra_search_index=
matches = real_search(user_input, lower_extra_search_index.keys(), cutoff, n) matches = real_search(user_input, lower_extra_search_index.keys(), cutoff, n)
cur_matched_collection += [lower_extra_search_index[match] for match in matches] cur_matched_collection += [lower_extra_search_index[match] for match in matches]
return cur_matched_collection[:n] return cur_matched_collection[:n]
# 歌曲排序 # 歌曲排序
def custom_sort_key(s): def custom_sort_key(s):
# 使用正则表达式分别提取字符串的数字前缀和数字后缀 # 使用正则表达式分别提取字符串的数字前缀和数字后缀
@ -506,113 +507,112 @@ def get_audio_metadata(file_path):
raise ValueError("Unsupported file type") raise ValueError("Unsupported file type")
@dataclass
class Metadata:
title: str = ""
artist: str = ""
album: str = ""
year: str = ""
genre: str = ""
picture: str = ""
lyrics: str = ""
def get_mp3_metadata(file_path): def get_mp3_metadata(file_path):
audio = MP3(file_path, ID3=ID3) audio = MP3(file_path, ID3=ID3)
tags = audio.tags tags = audio.tags
if tags is None: if tags is None:
return None return Metadata()
metadata = { metadata = Metadata(
"title": tags.get("TIT2", [""])[0] if "TIT2" in tags else "", title=tags.get("TIT2", [""])[0] if "TIT2" in tags else "",
"artist": tags.get("TPE1", [""])[0] if "TPE1" in tags else "", artist=tags.get("TPE1", [""])[0] if "TPE1" in tags else "",
"album": tags.get("TALB", [""])[0] if "TALB" in tags else "", album=tags.get("TALB", [""])[0] if "TALB" in tags else "",
"year": tags.get("TDRC", [""])[0] if "TDRC" in tags else "", year=tags.get("TDRC", [""])[0] if "TDRC" in tags else "",
"genre": tags.get("TCON", [""])[0] if "TCON" in tags else "", genre=tags.get("TCON", [""])[0] if "TCON" in tags else "",
"picture": "", )
"lyrics": "",
}
for tag in tags.values(): for tag in tags.values():
if isinstance(tag, APIC): if isinstance(tag, APIC):
metadata["picture"] = base64.b64encode(tag.data).decode("utf-8") metadata.picture = base64.b64encode(tag.data).decode("utf-8")
break break
lyrics = tags.getall("USLT") lyrics = tags.getall("USLT")
if lyrics: if lyrics:
metadata["lyrics"] = lyrics[0] metadata.lyrics = lyrics[0]
return metadata return metadata
def get_flac_metadata(file_path): def get_flac_metadata(file_path):
audio = FLAC(file_path) audio = FLAC(file_path)
metadata = { metadata = Metadata(
"title": audio.get("title", [""])[0], title=audio.get("title", [""])[0],
"artist": audio.get("artist", [""])[0], artist=audio.get("artist", [""])[0],
"album": audio.get("album", [""])[0], album=audio.get("album", [""])[0],
"year": audio.get("date", [""])[0], year=audio.get("date", [""])[0],
"genre": audio.get("genre", [""])[0], genre=audio.get("genre", [""])[0],
"picture": "", )
"lyrics": "",
}
if audio.pictures: if audio.pictures:
picture = audio.pictures[0] picture = audio.pictures[0]
metadata["picture"] = base64.b64encode(picture.data).decode("utf-8") metadata.picture = base64.b64encode(picture.data).decode("utf-8")
if "lyrics" in audio: if "lyrics" in audio:
metadata["lyrics"] = audio["lyrics"][0] metadata.lyrics = audio["lyrics"][0]
return metadata return metadata
def get_wav_metadata(file_path): def get_wav_metadata(file_path):
audio = WAVE(file_path) audio = WAVE(file_path)
metadata = { metadata = Metadata(
"title": audio.get("TIT2", [""])[0], title=audio.get("TIT2", [""])[0],
"artist": audio.get("TPE1", [""])[0], artist=audio.get("TPE1", [""])[0],
"album": audio.get("TALB", [""])[0], album=audio.get("TALB", [""])[0],
"year": audio.get("TDRC", [""])[0], year=audio.get("TDRC", [""])[0],
"genre": audio.get("TCON", [""])[0], genre=audio.get("TCON", [""])[0],
"picture": "", )
"lyrics": "",
}
return metadata return metadata
def get_ape_metadata(file_path): def get_ape_metadata(file_path):
audio = MonkeysAudio(file_path) audio = MonkeysAudio(file_path)
metadata = { metadata = Metadata(
"title": audio.get("TIT2", [""])[0], title=audio.get("TIT2", [""])[0],
"artist": audio.get("TPE1", [""])[0], artist=audio.get("TPE1", [""])[0],
"album": audio.get("TALB", [""])[0], album=audio.get("TALB", [""])[0],
"year": audio.get("TDRC", [""])[0], year=audio.get("TDRC", [""])[0],
"genre": audio.get("TCON", [""])[0], genre=audio.get("TCON", [""])[0],
"picture": "", )
"lyrics": "",
}
return metadata return metadata
def get_ogg_metadata(file_path): def get_ogg_metadata(file_path):
audio = OggVorbis(file_path) audio = OggVorbis(file_path)
metadata = { metadata = Metadata(
"title": audio.get("title", [""])[0], title=audio.get("title", [""])[0],
"artist": audio.get("artist", [""])[0], artist=audio.get("artist", [""])[0],
"album": audio.get("album", [""])[0], album=audio.get("album", [""])[0],
"year": audio.get("date", [""])[0], year=audio.get("date", [""])[0],
"genre": audio.get("genre", [""])[0], genre=audio.get("genre", [""])[0],
"picture": "", )
"lyrics": "",
}
return metadata return metadata
def get_m4a_metadata(file_path): def get_m4a_metadata(file_path):
audio = MP4(file_path) audio = MP4(file_path)
metadata = { metadata = Metadata(
"title": audio.tags.get("\xa9nam", [""])[0], title=audio.tags.get("\xa9nam", [""])[0],
"artist": audio.tags.get("\xa9ART", [""])[0], artist=audio.tags.get("\xa9ART", [""])[0],
"album": audio.tags.get("\xa9alb", [""])[0], album=audio.tags.get("\xa9alb", [""])[0],
"year": audio.tags.get("\xa9day", [""])[0], year=audio.tags.get("\xa9day", [""])[0],
"genre": audio.tags.get("\xa9gen", [""])[0], genre=audio.tags.get("\xa9gen", [""])[0],
"picture": "", )
"lyrics": "",
}
if "covr" in audio.tags: if "covr" in audio.tags:
cover = audio.tags["covr"][0] cover = audio.tags["covr"][0]
metadata["picture"] = base64.b64encode(cover).decode("utf-8") metadata.picture = base64.b64encode(cover).decode("utf-8")
return metadata return metadata

View File

@ -9,10 +9,10 @@ import random
import re import re
import time import time
import urllib.parse import urllib.parse
from collections import OrderedDict
from dataclasses import asdict from dataclasses import asdict
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
from pathlib import Path from pathlib import Path
from collections import OrderedDict
from aiohttp import ClientSession, ClientTimeout from aiohttp import ClientSession, ClientTimeout
from miservice import MiAccount, MiNAService from miservice import MiAccount, MiNAService
@ -36,6 +36,7 @@ from xiaomusic.const import (
from xiaomusic.crontab import Crontab from xiaomusic.crontab import Crontab
from xiaomusic.plugin import PluginManager from xiaomusic.plugin import PluginManager
from xiaomusic.utils import ( from xiaomusic.utils import (
Metadata,
chinese_to_number, chinese_to_number,
convert_file_to_mp3, convert_file_to_mp3,
custom_sort_key, custom_sort_key,
@ -46,11 +47,11 @@ from xiaomusic.utils import (
get_local_music_duration, get_local_music_duration,
get_web_music_duration, get_web_music_duration,
is_mp3, is_mp3,
list2str,
parse_cookie_string, parse_cookie_string,
parse_str_to_dict, parse_str_to_dict,
remove_id3_tags, remove_id3_tags,
traverse_music_directory, traverse_music_directory,
list2str,
) )
@ -395,7 +396,7 @@ class XiaoMusic:
return sec, url return sec, url
def get_music_tags(self, name): def get_music_tags(self, name):
return self.all_music_tags.get(name, {}) return self.all_music_tags.get(name, Metadata())
def get_music_url(self, name): def get_music_url(self, name):
if self.is_web_music(name): if self.is_web_music(name):
@ -731,14 +732,20 @@ class XiaoMusic:
all_music_list = list(self.all_music.keys()) all_music_list = list(self.all_music.keys())
real_names = find_best_match( real_names = find_best_match(
name, all_music_list, cutoff=self.config.fuzzy_match_cutoff, n=n, name,
all_music_list,
cutoff=self.config.fuzzy_match_cutoff,
n=n,
extra_search_index=self._extra_index_search, extra_search_index=self._extra_index_search,
) )
if real_names: if real_names:
if n > 1: if n > 1:
# 扩大范围再找,最后保留随机 n 个 # 扩大范围再找,最后保留随机 n 个
real_names = find_best_match( real_names = find_best_match(
name, all_music_list, cutoff=self.config.fuzzy_match_cutoff, n=n * 2, name,
all_music_list,
cutoff=self.config.fuzzy_match_cutoff,
n=n * 2,
extra_search_index=self._extra_index_search, extra_search_index=self._extra_index_search,
) )
random.shuffle(real_names) random.shuffle(real_names)
@ -793,7 +800,9 @@ class XiaoMusic:
# 模糊搜一个播放列表(只需要一个,不需要 extra index # 模糊搜一个播放列表(只需要一个,不需要 extra index
real_name = find_best_match( real_name = find_best_match(
list_name, self.music_list, cutoff=self.config.fuzzy_match_cutoff, list_name,
self.music_list,
cutoff=self.config.fuzzy_match_cutoff,
n=1, n=1,
)[0] )[0]
if real_name: if real_name:
@ -1095,9 +1104,10 @@ class XiaoMusicDevice:
# 没有重置 list 且非初始化 # 没有重置 list 且非初始化
if self.device.cur_playlist == "临时搜索列表" and len(self._play_list) > 0: if self.device.cur_playlist == "临时搜索列表" and len(self._play_list) > 0:
# 更新总播放列表为了UI显示 # 更新总播放列表为了UI显示
self.xiaomusic.music_list['临时搜索列表'] = copy.copy(self._play_list) self.xiaomusic.music_list["临时搜索列表"] = copy.copy(self._play_list)
elif (self.device.cur_playlist == "临时搜索列表" and len(self._play_list) == 0 elif (
) or (self.device.cur_playlist not in self.xiaomusic.music_list): self.device.cur_playlist == "临时搜索列表" and len(self._play_list) == 0
) or (self.device.cur_playlist not in self.xiaomusic.music_list):
self.device.cur_playlist = "全部" self.device.cur_playlist = "全部"
else: else:
pass # 指定了已知的播放列表名称 pass # 指定了已知的播放列表名称
@ -1108,12 +1118,18 @@ class XiaoMusicDevice:
if reorder: if reorder:
if self.device.play_type == PLAY_TYPE_RND: if self.device.play_type == PLAY_TYPE_RND:
random.shuffle(self._play_list) random.shuffle(self._play_list)
self.log.info(f"随机打乱 {list_name} {list2str(self._play_list, self.config.verbose)}") self.log.info(
f"随机打乱 {list_name} {list2str(self._play_list, self.config.verbose)}"
)
else: else:
self._play_list = sorted(self._play_list) self._play_list = sorted(self._play_list)
self.log.info(f"没打乱 {list_name} {list2str(self._play_list, self.config.verbose)}") self.log.info(
f"没打乱 {list_name} {list2str(self._play_list, self.config.verbose)}"
)
else: else:
self.log.info(f"更新 {list_name} {list2str(self._play_list, self.config.verbose)}") self.log.info(
f"更新 {list_name} {list2str(self._play_list, self.config.verbose)}"
)
# 播放歌曲 # 播放歌曲
async def play(self, name="", search_key=""): async def play(self, name="", search_key=""):
@ -1144,7 +1160,9 @@ class XiaoMusicDevice:
self.device.cur_playlist = "临时搜索列表" self.device.cur_playlist = "临时搜索列表"
self.update_playlist(reorder=False) self.update_playlist(reorder=False)
name = names[0] name = names[0]
self.log.debug(f"当前播放列表为:{list2str(self._play_list, self.config.verbose)}") self.log.debug(
f"当前播放列表为:{list2str(self._play_list, self.config.verbose)}"
)
elif not self.xiaomusic.is_music_exist(name): elif not self.xiaomusic.is_music_exist(name):
if self.config.disable_download: if self.config.disable_download:
await self.do_tts(f"本地不存在歌曲{name}") await self.do_tts(f"本地不存在歌曲{name}")
@ -1222,7 +1240,9 @@ class XiaoMusicDevice:
self.device.cur_playlist = "临时搜索列表" self.device.cur_playlist = "临时搜索列表"
self.update_playlist(reorder=False) self.update_playlist(reorder=False)
name = names[0] name = names[0]
self.log.debug(f"当前播放列表为:{list2str(self._play_list, self.config.verbose)}") self.log.debug(
f"当前播放列表为:{list2str(self._play_list, self.config.verbose)}"
)
elif not self.xiaomusic.is_music_exist(name): elif not self.xiaomusic.is_music_exist(name):
await self.do_tts(f"本地不存在歌曲{name}") await self.do_tts(f"本地不存在歌曲{name}")
return return