1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
import os import io from fastapi import FastAPI, HTTPException, Request,Form import shutil from fastapi.responses import JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN", "office.local") FILE_DIR = os.getenv("FILE_DIR", "./files") TEMPLATE_DIR = os.getenv("TEMPLATE_DIR", "./templates") os.makedirs(FILE_DIR, exist_ok=True) os.makedirs(TEMPLATE_DIR, exist_ok=True)
DEFAULT_TEMPLATES = { "docx": "empty.docx", "xlsx": "empty.xlsx", "pptx": "empty.pptx" }
app = FastAPI(title="FastAPI WOPI Host (HTTP)")
app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], )
def verify_token(token: str): if token != ACCESS_TOKEN: raise HTTPException(status_code=401, detail="Invalid access token")
def file_path(file_id: str): return os.path.join(FILE_DIR, file_id)
def file_version(file_id: str): path = file_path(file_id) if os.path.exists(path): return str(int(os.path.getmtime(path))) return "0"
@app.get("/wopi/files/{file_id}") async def check_file_info(file_id: str, access_token: str): verify_token(access_token) path = file_path(file_id) if not os.path.exists(path): raise HTTPException(status_code=404, detail="File not found") return JSONResponse({ "BaseFileName": os.path.basename(path), "OwnerId": "user1", "UserId": "user1", "Size": os.path.getsize(path), "Version": file_version(file_id), "ReadOnly": False, "UserCanWrite": True, "SupportsUpdate": True })
@app.get("/wopi/files/{file_id}/contents") async def get_file(file_id: str, access_token: str): verify_token(access_token) path = file_path(file_id) if not os.path.exists(path): raise HTTPException(status_code=404, detail="File not found") return StreamingResponse(open(path, "rb"), media_type="application/octet-stream")
@app.post("/wopi/files/{file_id}/contents") async def put_file(file_id: str, request: Request, access_token: str): verify_token(access_token) path = file_path(file_id) data = await request.body() with open(path, "wb") as f: f.write(data) return {"status": "success", "version": file_version(file_id)}
@app.post("/wopi/files/new") async def create_new_file(filetype: str = Form(...)): """ 新建一个文件(docx / xlsx / pptx) """ filetype = filetype.lower() if filetype not in DEFAULT_TEMPLATES: raise HTTPException(status_code=400, detail="不支持的文件类型")
template_file = os.path.join(TEMPLATE_DIR, DEFAULT_TEMPLATES[filetype]) if not os.path.exists(template_file): open(template_file, "wb").close()
existing = [f for f in os.listdir(FILE_DIR) if f.endswith("." + filetype)] new_name = f"new_{len(existing)+1}.{filetype}" new_path = os.path.join(FILE_DIR, new_name)
shutil.copyfile(template_file, new_path)
wopi_src = f"http://192.168.0.105:8000/wopi/files/{new_name}" return JSONResponse({ "filename": new_name, "url": wopi_src, "access_token": ACCESS_TOKEN })
LOCKS = {}
@app.post("/wopi/files/{file_id}/lock") async def lock_file(file_id: str, request: Request, access_token: str): verify_token(access_token) LOCKS[file_id] = True return {"Lock": "locked"}
@app.post("/wopi/files/{file_id}/unlock") async def unlock_file(file_id: str, request: Request, access_token: str): verify_token(access_token) LOCKS[file_id] = False return {"Lock": "unlocked"}
|