AIGC - Stable Diffusion 搭建【从零到一】
AIGC - Stable Diffusion 搭建
文章目录
AIGC - Stable Diffusion 搭建 1. Huggingface Token 2. Cloud GPU 3. autocover.txt 4. userinput_write_pyfile.py: 5. SSH API 6. ffmpeg 7. Piano Audio To Midi 8. Spleeter1. Huggingface Token
url = ’‘ 1. 注册获取账号 2. 生成获取用户项目token
2. Cloud GPU
url = '' # 可租用矩池云 1. 注册获取账号 2. 选择GPU档次及预装型号(建议huggingface,Pytorch) 3. huggingface-cli login # Cloud服务器通过token验证 | 下载模型 4. JupyterLab pip3 install diffusers pip3 install accelerate pip3 install spacy ftfy==4.4.3
3. autocover.txt
import diffusers, torch, os from time import strftime, localtime model_id = "CompVis/stable-diffusion-v1-4" device = torch.device("cuda") pipe = diffusers.StableDiffusionPipeline.from_pretrained(model_id, use_auth_token=True) pipe = pipe.to(device) image = pipe(prompt).images[0] timeprint = strftime("%Y%m%d%H%M%S", localtime()) fldpath = os.path.join('/mnt/', 'xxx/img') if not os.path.exists(fldpath): os.mkdir(fldpath) img_filenm = f'{timeprint}.png' image.save(os.path.join(fldpath, img_filenm)) print(f'{timeprint}.png')
4. userinput_write_pyfile.py:
def userinput_write_pyfile(user_input,local_put_path): ''' - 将用户输入的文字信息写入远程py待执行文件中 ''' import os f = open(local_put_path,'r') index = 0 lines = [] for line in f.readlines(): index +=1 if index == 3: prompt = f'prompt = "{user_input}"' lines.append(prompt) lines.append(line) usr_path = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/upload') pyfilepath = os.path.join(usr_path,'autocover.py') pf = open(pyfilepath,'w') for line in lines: pf.write(''.join(line)) pf.close() return pyfilepath
5. SSH API
# -*- coding:utf-8 -*- def get_SSH_account(): host = "" # 远程服务器ip port = 0000 # 远程服务器端口 password = "" # 远程服务器密码 user = "" # 远程服务器用户名 account_info = [host,port,password,user] # 返回SSH登陆信息 return account_info def SSH_connection(user_input,accounts,paths,write_py, specail_fname,is_DownloadALL,filter_fname): import paramiko,os,time (host, port, password, user) = accounts (remote_get_path, remote_put_paths, local_get_path, local_put_paths, pyfilepath, python_path) = paths class Pysftp(object): """ python3 自动上传、下载文件 """ global local_put_paths, local_put_path, local_get_path, remote_put_paths,remote_put_path, remote_get_path, pyfilepath, python_path def __init__(self, host, port, user, password): self.host = host self.port = port self.user = user self.password = password def connect(self): """ 建立ssh连接 """ self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh.connect(self.host, self.port, self.user, self.password) print("[SSH] -> 连接成功") # def cmd (self, cmd): # """ # 需要执行的命令 # """ # cmd = "ls" # stdin, stdout, stderr = self.ssh.exec_command(cmd) # print(stdout.read().decode("utf8")) # def mkdir(self): # """ # 创建本地文件夹,存放上传、下载的文件 # """ # for lp in [local_put_path, local_get_path]: # if not os.path.exists(lp): # os.makedirs(lp, 666) # print("创建本地文件夹:{}".format(lp)) # else: # print("本地文件夹:{}已存在".format(lp)) def run_pyfile(self,pyfilepath): ''' 执行python文件 ''' cmd = f"export PATH={python_path}:$PATH;python3 -u {pyfilepath}" stdin, stdout, stderr = self.ssh.exec_command(cmd) for line in stderr : print(line.rstrip()) flnm = stdout.readlines()[0].replace('\n', '') print("[SSH] ->生成成功") return flnm def put(self,user_input,local_put_path,remote_put_path,path_i): """ 上传文件 """ sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport()) # sftp = self.ssh.open_sftp() if path_i == 0: pyfile_path = write_py(user_input,local_put_path) # ♻️ local_full_name = pyfile_path fname = os.path.basename(pyfile_path) else: local_full_name = local_put_path fname = specail_fname # local_done_write(local_full_name) # 本地文件已写入完成,可以上传了 sftp.put(local_full_name, os.path.join(remote_put_path, fname)) print(f'[{fname}] -> 上传成功') # os.remove(local_full_name) # print("{}\n上传成功:本地文件:{}====>远程{}:{},已删除该本地文件\n".format(datetime.datetime.now(), local_full_name, self.host, remote_put_path)) def get(self,flnm,is_DownloadALL,filter_fname): """ 下载文件 """ sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport()) flidx = 0 for fname in sftp.listdir(remote_get_path): flidx += 1 try: if is_DownloadALL: print(f'[{fname}] -> 正在下载') remote_full_name = os.path.join(remote_get_path, fname) self.remote_done_transffer(remote_full_name) sftp.get(remote_full_name, os.path.join(local_get_path, fname)) # sftp.remove(remote_full_name) print(f'[{fname}] -> 下载成功') else: if len(filter_fname) > 0: if filter_fname in fname: print(f'[{fname}] -> 正在下载') remote_full_name = os.path.join(remote_get_path, fname) self.remote_done_transffer(remote_full_name) from time import strftime,localtime timeprint = strftime("%Y%m%d%H%M%S", localtime()) nfname = f'{timeprint}{filter_fname}' sftp.get(remote_full_name, os.path.join(local_get_path, nfname)) # sftp.remove(remote_full_name) print(f'[{fname}] -> 下载成功') flnm = nfname break else: print(f'[{fname}] -> [{filter_fname}]') if flidx == len(sftp.listdir(remote_get_path)) - 1: print(f'[{fname}] -> [{filter_fname}] 无法找寻A') else: if flnm in fname: print(f'[{fname}] -> 正在下载') remote_full_name = os.path.join(remote_get_path, fname) self.remote_done_transffer(remote_full_name) sftp.get(remote_full_name, os.path.join(local_get_path, fname)) # sftp.remove(remote_full_name) print(f'[{fname}] -> 下载成功') break else: if flidx == len(sftp.listdir(remote_get_path))-1: print(f'[{fname}] -> [{flnm}] 无法找寻B') except Exception as e: print(e) return flnm def stat(self, fpath): """ 检查远程服务器文件状态 :param fpath:文件绝对路径 """ sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport()) # sftp = self.ssh.open_sftp() return sftp.stat(fpath) def remote_done_transffer(self, fpath): """ 检查文件是否传输完成 :param fpath:远程服务器上待下载文件绝对路径 """ while True: old_size = self.stat(fpath).st_size time.sleep(3) new_size = self.stat(fpath).st_size if new_size <= old_size: # 传输已完成 return def close(self): """ 关闭ssh连接 """ self.ssh.close() print("[SSH] -> 连接关闭") def local_done_write(self, fpath,old_size): """ 检查本地文件是否已写入完成 :param fpath:本地待上传文件绝对路径 """ new_size = os.stat(fpath).st_size if new_size > old_size: print('[SSH] -> ? 获取成功') return else: print('[SSH] -> 没有新的写入') def transffer(local_get_path): """ 传输函数 """ old_size = os.stat(local_get_path).st_size obj = Pysftp(host, port, user, password) obj.connect() # SSH连接 for path_i in range(len(local_put_paths)): # 上传py文件/其它附件 local_put_path = local_put_paths[path_i] remote_put_path = remote_put_paths[path_i] obj.put(user_input,local_put_path,remote_put_path,path_i) flnm = obj.run_pyfile(pyfilepath) # 执行py文件 flnm = obj.get(flnm,is_DownloadALL,filter_fname)# 下载文件 obj.local_done_write(local_get_path,old_size) # 下载检查 obj.close() # 关闭连接 return flnm flnm = transffer(local_get_path) flpath = os.path.join(local_get_path,flnm) return flpath
6. ffmpeg
解决音频内容GPU运算时遇到ffmpeg引起问题
# conda terminal install conda config --add channels conda-forge conda install ffmpeg pip3 install ffmpy # 找到ffmpeg安装的位置 which ffmpeg # 寻找ffdec.py文件并修改33行 【/root/miniconda3/envs/myconda/lib/python3.8/site-packages/audioread/】 ori_COMMANDS = ('ffmpeg', 'avconv') new_COMMANDS = ('/root/miniconda3/envs/myconda/bin/ffmpeg', 'avconv')
7. Piano Audio To Midi
# pip3 install pip3 install piano_transcription_inference # 将训练集放置在指定位置
8. Spleeter
write_py
# pip3 install # pip3 install Spleeter # pip3 install pydub # 将训练集放置在指定位置 def auto_spleeter_pywrite(userinput,local_put_path): import os f = open(local_put_path, 'r') index = 0 lines = [] for line in f.readlines(): index +=1 if index == 1: song_convert_path = f'song_convert_path = "/mnt/aigc_music/spleeter/source/song/"' bounce_path = f'bounce_path = "/mnt/aigc_music/spleeter/bounce"' lines.append(song_convert_path) lines.append('\n') lines.append(bounce_path) lines.append('\n') lines.append(line) mainpath = os.path.join(os.path.expanduser('~'), 'Desktop/SSH') usr_path = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/upload') downloadpath = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/download') paths = [mainpath, usr_path, downloadpath] for path in paths: if not os.path.exists(path): os.mkdir(path) pyfilepath = os.path.join(usr_path, 'audio_spleeter.py') pf = open(pyfilepath, 'w') for line in lines: pf.write(''.join(line)) pf.close() return pyfilepath
audio_spleeter.txt
import os,subprocess,spleeter audio_path = os.path.dirname(song_convert_path) + '/' result_path = bounce_path # 执行spleeter cmd = 'for f in ' + audio_path + '*.mp3; do spleeter separate -o ' + result_path + ' -p spleeter:5stems-16kHz $f -B tensorflow; done' subprocess.call(cmd, shell=True) # 压缩音频文件 def wav_to_mp3(): from pydub import AudioSegment wav_foldepath = [os.path.join(bounce_path,i) for i in os.listdir(bounce_path) if not i.startswith('.')][0] wav_flodepath = '/mnt/aigc_music/spleeter/bounce/temp_split' # os.path.dirname(wav_foldepath) wav_files = [os.path.join(wav_foldepath,i) for i in os.listdir(wav_flodepath) if i.endswith('.wav')] for w in wav_files: sound = AudioSegment.from_wav(w) mp3_flnm = w.replace('.wav', '.mp3') sound.export(mp3_flnm, format='mp3', bitrate='320k') wav_to_mp3() # zip打包分轨 def get_zip_filepath(): zipfolders = [os.path.join(bounce_path,i) for i in os.listdir(bounce_path) if not i.startswith('.')][0] zipfolders = '/mnt/aigc_music/spleeter/bounce/temp_split' # os.path.dirname(zipfolders) + '/' zipsource_files = [os.path.join(zipfolders,i) for i in os.listdir(zipfolders) if i.endswith('.mp3')] zip_file = os.path.join(bounce_path+'/temp_split/','zip_temp.zip') def file2zip(zip_file, zipsource_files): import zipfile, os with zipfile.ZipFile(zip_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for file in zipsource_files: parent_path, name = os.path.split(file) zf.write(file, arcname=name) file2zip( zip_file=zip_file, zipsource_files=zipsource_files, ) # ♻️ zip_tmpfolderpath = zipfolders if not os.path.exists(zip_tmpfolderpath): os.mkdir(zip_tmpfolderpath) zip_path = os.path.join(zip_tmpfolderpath,'zip_temp.zip') return zip_path zip_path = get_zip_filepath() # ♻️ print(zip_path)
sshcodewordmp3apppython服务器aigctokenclidiffusiongpuhuggingfacepromptrpaerp文件夹urliospngstable diffusioniconapistem用户名音频内容codingjupyterflowstablediffusionshellpytorchscripttensorflow