feat: 新增自定义口令功能 #105

This commit is contained in:
涵曦 2024-07-05 04:20:16 +00:00
parent c151b826f7
commit f3e57789fa
8 changed files with 224 additions and 36 deletions

View File

@ -9,8 +9,9 @@
"conf_path": null,
"hostname": "192.168.2.5",
"port": 8090,
"public_port": 0,
"proxy": null,
"search_prefix": "ytsearch:",
"search_prefix": "bilisearch:",
"ffmpeg_location": "./ffmpeg/bin",
"active_cmd": "play,random_play,playlocal,play_music_list,stop",
"exclude_dirs": "@eaDir",
@ -21,12 +22,60 @@
"music_list_url": "",
"music_list_json": "",
"disable_download": false,
"key_word_dict": {
"播放歌曲": "play",
"播放本地歌曲": "playlocal",
"关机": "stop",
"下一首": "play_next",
"单曲循环": "set_play_type_one",
"全部循环": "set_play_type_all",
"随机播放": "random_play",
"分钟后关机": "stop_after_minute",
"播放列表": "play_music_list",
"刷新列表": "gen_music_list",
"set_volume#": "set_volume",
"get_volume#": "get_volume",
"本地播放歌曲": "playlocal",
"放歌曲": "play",
"暂停": "stop",
"停止": "stop",
"停止播放": "stop",
"测试自定义口令": "exec#code1(\"hello\")",
"测试链接": "exec#httpget(\"https://github.com/hanxi/xiaomusic\")"
},
"key_match_order": [
"set_volume#",
"get_volume#",
"分钟后关机",
"播放歌曲",
"下一首",
"单曲循环",
"全部循环",
"随机播放",
"关机",
"刷新列表",
"播放列表",
"播放本地歌曲",
"本地播放歌曲",
"放歌曲",
"暂停",
"停止",
"停止播放",
"测试自定义口令",
"测试链接"
],
"use_music_api": false,
"use_music_audio_id": "1582971365183456177",
"use_music_id": "355454500",
"log_file": "/tmp/xiaomusic.txt",
"fuzzy_match_cutoff": 0.6,
"enable_fuzzy_match": true,
"stop_tts_msg": "收到,再见",
"keywords_playlocal": "播放本地歌曲,本地播放歌曲",
"keywords_play": "播放歌曲,放歌曲",
"keywords_stop": "关机,暂停,停止,停止播放"
"keywords_stop": "关机,暂停,停止,停止播放",
"user_key_word_dict": {
"测试自定义口令": "exec#code1(\"hello\")",
"测试链接": "exec#httpget(\"https://github.com/hanxi/xiaomusic\")"
}
}

0
plugins/__init__.py Normal file
View File

4
plugins/code1.py Normal file
View File

@ -0,0 +1,4 @@
async def code1(arg1):
global log, xiaomusic
log.info(f"code1:{arg1}")
await xiaomusic.do_tts("你好,我是自定义的测试口令")

10
plugins/httpget.py Normal file
View File

@ -0,0 +1,10 @@
import requests
def httpget(url):
global log
# 发起请求
response = requests.get(url, timeout=5) # 增加超时以避免长时间挂起
response.raise_for_status() # 如果响应不是200引发HTTPError异常
log.info(f"httpget url:{url} response:{response.text}")

View File

@ -3,13 +3,15 @@ from __future__ import annotations
import argparse
import json
import os
from dataclasses import dataclass
from dataclasses import asdict, dataclass, field
from typing import get_type_hints
from xiaomusic.utils import validate_proxy
# 默认口令
DEFAULT_KEY_WORD_DICT = {
def default_key_word_dict():
return {
"播放歌曲": "play",
"播放本地歌曲": "playlocal",
"关机": "stop",
@ -22,15 +24,25 @@ DEFAULT_KEY_WORD_DICT = {
"刷新列表": "gen_music_list",
"set_volume#": "set_volume",
"get_volume#": "get_volume",
}
}
def default_user_key_word_dict():
return {
"测试自定义口令": 'exec#code1("hello")',
"测试链接": 'exec#httpget("https://github.com/hanxi/xiaomusic")',
}
# 命令参数在前面
KEY_WORD_ARG_BEFORE_DICT = {
"分钟后关机": True,
}
# 口令匹配优先级
DEFAULT_KEY_MATCH_ORDER = [
def default_key_match_order():
return [
"set_volume#",
"get_volume#",
"分钟后关机",
@ -42,7 +54,7 @@ DEFAULT_KEY_MATCH_ORDER = [
"关机",
"刷新列表",
"播放列表",
]
]
@dataclass
@ -78,8 +90,8 @@ class Config:
disable_download: bool = (
os.getenv("XIAOMUSIC_DISABLE_DOWNLOAD", "false").lower() == "true"
)
key_word_dict = DEFAULT_KEY_WORD_DICT.copy()
key_match_order = DEFAULT_KEY_MATCH_ORDER.copy()
key_word_dict: dict[str, str] = field(default_factory=default_key_word_dict)
key_match_order: list[str] = field(default_factory=default_key_match_order)
use_music_api: bool = (
os.getenv("XIAOMUSIC_USE_MUSIC_API", "false").lower() == "true"
)
@ -101,6 +113,9 @@ class Config:
)
keywords_play: str = os.getenv("XIAOMUSIC_KEYWORDS_PLAY", "播放歌曲,放歌曲")
keywords_stop: str = os.getenv("XIAOMUSIC_KEYWORDS_STOP", "关机,暂停,停止,停止播放")
user_key_word_dict: dict[str, str] = field(
default_factory=default_user_key_word_dict
)
def append_keyword(self, keys, action):
for key in keys.split(","):
@ -108,17 +123,26 @@ class Config:
if key not in self.key_match_order:
self.key_match_order.append(key)
def append_user_keyword(self):
for k, v in self.user_key_word_dict.items():
self.key_word_dict[k] = v
self.key_match_order.append(k)
def __post_init__(self) -> None:
if self.proxy:
validate_proxy(self.proxy)
self.append_keyword(self.keywords_playlocal, "playlocal")
self.append_keyword(self.keywords_play, "play")
self.append_keyword(self.keywords_stop, "stop")
self.append_user_keyword()
# 保存配置到 config-example.json 文件
# with open("config-example.json", "w") as f:
# data = asdict(self)
# json.dump(data, f, ensure_ascii=False, indent=4)
with open("config-example.json", "w") as f:
data = asdict(self)
print(data)
json.dump(data, f, ensure_ascii=False, indent=4)
@classmethod
def from_options(cls, options: argparse.Namespace) -> Config:

69
xiaomusic/plugin.py Normal file
View File

@ -0,0 +1,69 @@
import importlib
import inspect
import pkgutil
class PluginManager:
def __init__(self, xiaomusic, plugin_dir="plugins"):
self.xiaomusic = xiaomusic
self.log = xiaomusic.log
self._funcs = {}
self._load_plugins(plugin_dir)
def _load_plugins(self, plugin_dir):
# 假设 plugins 已经在搜索路径上
package_name = plugin_dir
package = importlib.import_module(package_name)
# 遍历 package 中所有模块并动态导入它们
for _, modname, _ in pkgutil.iter_modules(package.__path__, package_name + "."):
# 跳过__init__文件
if modname.endswith("__init__"):
continue
module = importlib.import_module(modname)
# 将 log 和 xiaomusic 注入模块的命名空间
module.log = self.log
module.xiaomusic = self.xiaomusic
# 动态获取模块中与文件名同名的函数
function_name = modname.split(".")[-1] # 从模块全名提取函数名
if hasattr(module, function_name):
self._funcs[function_name] = getattr(module, function_name)
else:
self.log.error(
f"No function named '{function_name}' found in module {modname}"
)
def get_func(self, plugin_name):
"""根据插件名获取插件函数"""
return self._funcs.get(plugin_name)
def get_local_namespace(self):
"""返回包含所有插件函数的字典,可以用作 exec 要执行的代码的命名空间"""
return self._funcs.copy()
async def execute_plugin(self, code):
"""
执行指定的插件代码插件函数可以是同步或异步
:param code: 需要执行的插件函数代码例如 'plugin1("hello")'
"""
# 分解代码字符串以获取函数名
func_name = code.split("(")[0]
# 根据解析出的函数名从插件字典中获取函数
plugin_func = self.get_func(func_name)
if not plugin_func:
raise ValueError(f"No plugin function named '{func_name}' found.")
# 检查函数是否是异步函数
global_namespace = globals().copy()
local_namespace = self.get_local_namespace()
if inspect.iscoroutinefunction(plugin_func):
# 如果是异步函数,构建执行用的协程对象
coroutine = eval(code, global_namespace, local_namespace)
# 等待协程执行
await coroutine
else:
# 如果是普通函数,直接执行代码
eval(code, global_namespace, local_namespace)

View File

@ -27,13 +27,33 @@ function postJSON() {
}
});
}
</script>
function sendDebugCmd() {
var cmd = $("#cmd").val();
$.ajax({
type: "POST",
url: "/cmd",
contentType: "application/json; charset=utf-8",
data: JSON.stringify({cmd: cmd}),
success: () => {
},
error: () => {
// 请求失败时执行的操作
}
});
}
</script>
</head>
<body>
<h1>Debug For XiaoMusic</h1>
<textarea id="post-input" rows="10" cols="50" placeholder="粘贴json数据..."></textarea><br>
<button onclick="postJSON()">提交</button><br>
<hr>
<input id="cmd" type="text"></input>
<button onclick="sendDebugCmd()">测试自定义口令</button><br>
</body>
<footer>
<p>Powered by <a href="https://github.com/hanxi/xiaomusic" target="_blank">xiaomusic</a></p>
</footer>

View File

@ -28,6 +28,7 @@ from xiaomusic.const import (
SUPPORT_MUSIC_TYPE,
)
from xiaomusic.httpserver import StartHTTPServer
from xiaomusic.plugin import PluginManager
from xiaomusic.utils import (
custom_sort_key,
deepcopy_data_no_sensitive_info,
@ -94,6 +95,9 @@ class XiaoMusic:
# 启动时初始化获取声音
self.set_last_record("get_volume#")
# 初始化插件
self.plugin_manager = PluginManager(self)
debug_config = deepcopy_data_no_sensitive_info(self.config)
self.log.info(f"Startup OK. {debug_config}")
@ -670,6 +674,10 @@ class XiaoMusic:
opvalue = self.check_full_match_cmd(query, ctrl_panel)
if opvalue:
self.log.info(f"完全匹配指令. query:{query} opvalue:{opvalue}")
# 自定义口令
if opvalue.startswith("exec#"):
code = opvalue.split("#", 1)[1]
return ("exec", code)
return (opvalue, "")
for opkey in self.config.key_match_order:
@ -1096,3 +1104,7 @@ class XiaoMusic:
"mediaplayer",
data,
)
async def exec(self, arg1=None):
code = arg1 if arg1 else 'code1("hello")'
await self.plugin_manager.execute_plugin(code)