1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
| import os import random import re from utils.http_utils import AsyncHttpx from configs.path_config import IMAGE_PATH, DATA_PATH from services.log import logger from utils.message_builder import image, face from configs.config import Config, NICKNAME from .utils import ai_message_manager from copy import deepcopy from transformers import GPT2TokenizerFast import openai
openai.api_key = "xxxxxxxxxxxxxx"
try: import ujson as json except ModuleNotFoundError: import json
session_config = { 'preset': '你是一个大型语言模型,可以回答我的问题。如果我有任何问题,请随时告诉你,你会尽力为我解答。', 'context': '' } sessions = {} tokenizer = GPT2TokenizerFast.from_pretrained("gpt2-large") check_url = "https://v2.alapi.cn/api/censor/text" index = 0 anime_data = json.load(open(DATA_PATH / "anime.json", "r", encoding="utf8"))
def get_chat_session(sessionid): if sessionid not in sessions: config = deepcopy(session_config) config['id'] = sessionid sessions[sessionid] = config return sessions[sessionid] def chat_with_gpt(prompt): try: resp = openai.Completion.create( model = "text-davinci-003", temperature = 0.9, max_tokens=3000, top_p=1, presence_penalty=0, frequency_penalty=0, prompt=prompt) resp = resp['choices'][0]['text'] except openai.OpenAIError as e: print('openai 接口报错: ' + str(e)) resp = str(e) return resp
async def get_chat_result(text: str, img_url: str, user_id: int, nickname: str) -> str: """ 获取 AI 返回值,顺序: 特殊回复 -> GPT3 -> 青云客 :param text: 问题 :param img_url: 图片链接 :param user_id: 用户id :param nickname: 用户昵称 :return: 回答 """ global index ai_message_manager.add_message(user_id, text) special_rst = await ai_message_manager.get_result(user_id, nickname) if special_rst: ai_message_manager.add_result(user_id, special_rst) return special_rst if index == 5: index = 0 if len(text) < 6 and random.random() < 0.6: keys = anime_data.keys() for key in keys: if text.find(key) != -1: return random.choice(anime_data[key]).replace("你", nickname) rst = await GPT_3(text, user_id) if not rst: rst = await xie_ai(text) if not rst: return no_result() if nickname: if len(nickname) < 5: if random.random() < 0.5: nickname = "~".join(nickname) + "~" if random.random() < 0.2: if nickname.find("大人") == -1: nickname += "大~人~" rst = str(rst).replace("小主人", nickname).replace("小朋友", nickname) ai_message_manager.add_result(user_id, rst) return rst
async def GPT_3(msg: str, sessionid: int) -> str: """ 获取GPT3接口的回复 指令如下(群内需@机器人):1.[重置会话] 请发送 重置会话2.[设置人格] 请发送 设置人格+人格描述3.[重置人格] 请发送 重置人格。 注意:重置会话不会清空人格,重置人格会重置会话!设置人格后人格将一直存在,除非重置人格或重启逻辑端! """ try: if msg.strip() == '': return '您好,我是人工智能助手,如果您有任何问题,请随时告诉我,我将尽力回答。\n如果您需要重置我们的会话,请回复`重置会话`' session = get_chat_session(sessionid) if '重置会话' == msg.strip(): session['context'] = '' return "会话已重置" if '重置人格' == msg.strip(): session['context'] = '' session['preset'] = session_config['preset'] return '人格已重置' if msg.strip().startswith('设置人格'): session['preset'] = msg.strip().replace('设置人格', '') session['context'] = '' token_limit = 4096 - 3000 - len(tokenizer.encode(session['preset'])) - 3 session['context'] = session['context'] + "\n\nQ:" + msg + "\nA:" ids = tokenizer.encode(session['context']) tokens = tokenizer.decode(ids[-token_limit:]) char_limit = len(''.join(tokens)) session['context'] = session['context'][-char_limit:] pos = session['context'].find('Q:') session['context'] = session['context'][pos:] msg = session['preset'] + '\n\n' + session['context'] print(msg) message = chat_with_gpt(msg) print("会话ID: " + str(sessionid)) print("ChatGPT返回内容: ") print(message) return message except Exception as error: traceback.print_exc() return str('异常: ' + str(error))
async def xie_ai(text: str) -> str: """ 获取青云客回复 :param text: 问题 :return: 青云可回复 """ res = await AsyncHttpx.get(f"http://api.qingyunke.com/api.php?key=free&appid=0&msg={text}") content = "" try: data = json.loads(res.text) if data["result"] == 0: content = data["content"] if "菲菲" in content: content = content.replace("菲菲", NICKNAME) if "艳儿" in content: content = content.replace("艳儿", NICKNAME) if "公众号" in content: content = "" if "{br}" in content: content = content.replace("{br}", "\n") if "提示" in content: content = content[: content.find("提示")] if "淘宝" in content or "taobao.com" in content: return "" while True: r = re.search("{face:(.*)}", content) if r: id_ = r.group(1) content = content.replace( "{" + f"face:{id_}" + "}", str(face(int(id_))) ) else: break return ( content if not content and not Config.get_config("ai", "ALAPI_AI_CHECK") else await check_text(content) ) except Exception as e: logger.error(f"Ai xie_ai 发生错误 {type(e)}:{e}") return ""
def hello() -> str: """ 一些打招呼的内容 """ result = random.choice( ( "哦豁?!", "你好!Ov<", f"库库库,呼唤{NICKNAME}做什么呢", "我在呢!", "呼呼,叫俺干嘛", ) ) img = random.choice(os.listdir(IMAGE_PATH / "zai")) if img[-4:] == ".gif": result += image(img, "zai") else: result += image(img, "zai") return result
def no_result() -> str: """ 没有回答时的回复 """ return ( random.choice( [ "你在说啥子?", f"纯洁的{NICKNAME}没听懂", "下次再告诉你(下次一定)", "你觉得我听懂了吗?嗯?", "我!不!知!道!", ] ) + image(random.choice(os.listdir(IMAGE_PATH / "noresult")), "noresult") )
async def check_text(text: str) -> str: """ ALAPI文本检测,主要针对青云客API,检测为恶俗文本改为无回复的回答 :param text: 回复 """ if not Config.get_config("alapi", "ALAPI_TOKEN"): return text params = {"token": Config.get_config("alapi", "ALAPI_TOKEN"), "text": text} try: data = (await AsyncHttpx.get(check_url, timeout=2, params=params)).json() if data["code"] == 200: if data["data"]["conclusion_type"] == 2: return "" except Exception as e: logger.error(f"检测违规文本错误...{type(e)}:{e}") return text
|