import os
import json
import asyncio
import argparse
import subprocess
import sys
import httpx
import pandas as pd
import akshare as ak
from openai import AsyncOpenAI
import glob
SYSTEM_PROMPT = r""" Add your system prompt here """
TOOLS = [
{
"type": "function",
"function": {
"name": "get_us_stock_index",
"description": "Get US stock index historical market data. Call this tool when the user requests US stock index data or market information.",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "US stock index symbol. Must be one of: '.IXIC' (NASDAQ Composite), '.DJI' (Dow Jones Industrial Average), '.INX' (S&P 500), '.NDX' (NASDAQ 100). Default is '.INX' if not specified.",
"enum": [".IXIC", ".DJI", ".INX", ".NDX"]
}
},
"required": ["symbol"]
}
}
}
]
common_code = """
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
import numpy as np
import seaborn as sns
from datetime import datetime, timedelta
import os
import json
"""
def get_us_stock_index(symbol: str) -> str:
"""Call the AKShare API to get US stock index historical market data"""
try:
symbol = (symbol or ".INX").strip()
# Validate symbol
valid_symbols = [".IXIC", ".DJI", ".INX", ".NDX"]
if symbol not in valid_symbols:
return (
f"Invalid index symbol '{symbol}'. "
f"Valid symbols are: {', '.join(valid_symbols)}. "
f"Symbol meanings: .IXIC (NASDAQ Composite), .DJI (Dow Jones), "
f".INX (S&P 500), .NDX (NASDAQ 100)"
)
# Get index historical data
index_df = ak.index_us_stock_sina(symbol=symbol)
if index_df is None or index_df.empty:
return (
f"Unable to retrieve index data for '{symbol}' right now. "
"Please try again later."
)
# Convert DataFrame to records and limit to recent data if too large
max_records = 1000
if len(index_df) > max_records:
# Get most recent records
index_df = index_df.tail(max_records)
truncated = True
else:
truncated = False
# Convert to list of records
records = index_df.to_dict(orient="records")
result = {
"index_symbol": symbol,
"index_name": {
".IXIC": "NASDAQ Composite Index",
".DJI": "Dow Jones Industrial Average",
".INX": "S&P 500 Index",
".NDX": "NASDAQ 100 Index"
}.get(symbol, "Unknown Index"),
"total_records": len(index_df),
"data": records
}
if truncated:
result["note"] = (
f"Returned most recent {max_records} records. "
f"Total available records: {len(ak.index_us_stock_sina(symbol=symbol))}"
)
# Add summary statistics
if len(index_df) > 0:
latest_close = index_df["close"].iloc[-1]
latest_volume = index_df["volume"].iloc[-1]
latest_amount = index_df["amount"].iloc[-1]
result["summary"] = {
"date_range": {
"start": str(index_df["date"].iloc[0]),
"end": str(index_df["date"].iloc[-1])
},
"latest_close": float(latest_close) if pd.notna(latest_close) else None,
"latest_volume": float(latest_volume) if pd.notna(latest_volume) else None,
"latest_amount": float(latest_amount) if pd.notna(latest_amount) else None
}
return json.dumps(result, ensure_ascii=False, indent=2, default=str)
except Exception as e:
return f"Error getting index data: {str(e)}"
class FormulaChatClient:
def __init__(self, moonshot_base_url: str, api_key: str):
self.openai = AsyncOpenAI(base_url=moonshot_base_url, api_key=api_key)
self.httpx = httpx.AsyncClient(
base_url=moonshot_base_url,
headers={"Authorization": f"Bearer {api_key}"},
timeout=30.0,
)
self.model = "kimi-k2.5"
self.max_tokens = 32768
self.local_execution_keywords = ["plt.savefig", "plt.save", ".to_excel", "open(", ".to_csv", "pdf.", ".tex"]
async def get_tools(self, formula_uri: str):
response = await self.httpx.get(f"/formulas/{formula_uri}/tools")
return response.json().get("tools", [])
async def call_tool(self, formula_uri: str, function: str, args: dict):
response = await self.httpx.post(
f"/formulas/{formula_uri}/fibers",
json={"name": function, "arguments": json.dumps(args)},
)
fiber = response.json()
if fiber.get("status") == "succeeded":
return fiber["context"].get("output") or fiber["context"].get("encrypted_output")
# Handle errors
error_msg = fiber.get("error") or fiber.get("context", {}).get("error") or \
fiber.get("context", {}).get("output") or "Unknown error"
return f"Error: {error_msg}"
async def handle_response(self, response, messages, all_tools, tool_to_uri):
message = response.choices[0].message
messages.append(message)
if not message.tool_calls:
print(f"\n{message.content}")
return
print(f"\n[Calling tools: {len(message.tool_calls)} tools]")
for call in message.tool_calls:
func_name = call.function.name
args = json.loads(call.function.arguments)
print(f"β {func_name}")
# Handle custom tools
if func_name == "get_us_stock_index":
symbol = args.get("symbol", ".INX")
result = get_us_stock_index(symbol)
print(f"Index data: {result}")
messages.append({"role": "tool", "tool_call_id": call.id, "content": result})
continue
# Handle remote formula tools
uri = tool_to_uri.get(func_name)
if not uri:
raise ValueError(f"No URI found for tool {func_name}")
if func_name == "code_runner":
self.execute_code_runner(args)
result = await self.call_tool(uri, func_name, args)
messages.append({"role": "tool", "tool_call_id": call.id, "content": result})
next_response = await self.openai.chat.completions.create(
model=self.model, messages=messages, tools=all_tools, max_tokens=self.max_tokens
)
await self.handle_response(next_response, messages, all_tools, tool_to_uri)
def convert_tex_to_pdf(self, tex_file):
pdf_file = tex_file.replace('.tex', '.pdf')
# Get the directory of the tex file
work_dir = os.path.dirname(os.path.abspath(tex_file))
tex_name = os.path.basename(tex_file)
try:
# Two compilations to ensure cross-references and other information are correct
for _ in range(2):
subprocess.run(
['xelatex', '-interaction=nonstopmode', tex_file],
capture_output=True,
text=True,
cwd=work_dir if work_dir else '.',
)
for ext in ['.aux', '.log', '.out']:
temp_file = os.path.join(work_dir if work_dir else '.', tex_name.replace('.tex', ext))
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except Exception:
pass
print(f" [PDF generated: {pdf_file}]")
except FileNotFoundError:
print(" [PDF conversion failed: xelatex not installed]")
except subprocess.CalledProcessError as e:
print(" [PDF conversion failed: LaTeX compilation error]")
if e.stdout:
print(f" Error output: {e.stdout[-500:]}")
except Exception as e:
print(f" [PDF conversion failed: {str(e)}]")
def execute_code_runner(self, args):
code = args.get("code", "") if isinstance(args, dict) else str(args or "")
if not code or not any(keyword in code for keyword in self.local_execution_keywords):
return
before_tex_files = set(glob.glob('*.tex'))
try:
subprocess.run(
[sys.executable, "-c", common_code+code],
capture_output=True,
text=True,
check=True,
)
after_tex_files = set(glob.glob('*.tex'))
new_tex_files = after_tex_files - before_tex_files
for tex_file in new_tex_files:
self.convert_tex_to_pdf(tex_file)
except Exception as e:
print(f" [Local execution failed: {e}]")
async def chat(self, question, messages, all_tools, tool_to_uri):
messages.append({"role": "user", "content": question})
response = await self.openai.chat.completions.create(
model=self.model, messages=messages, tools=all_tools, max_tokens=self.max_tokens
)
await self.handle_response(response, messages, all_tools, tool_to_uri)
async def close(self):
await self.httpx.aclose()
def normalize_formula_uri(uri: str) -> str:
"""Normalize formula URI with default namespace and tag"""
if "/" not in uri:
uri = f"moonshot/{uri}"
if ":" not in uri:
uri = f"{uri}:latest"
return uri
async def main():
parser = argparse.ArgumentParser(description="Formula chat client")
parser.add_argument(
"--formula",
action="append",
default=["moonshot/web-search:latest", "moonshot/rethink:latest", "moonshot/code-runner:latest"],
help="Formula URIs",
)
parser.add_argument("--question", help="Question to ask")
args = parser.parse_args()
# Process and deduplicate formula URIs
normalized_formulas = [normalize_formula_uri(uri) for uri in args.formula]
unique_formulas = list(dict.fromkeys(normalized_formulas))
moonshot_base_url = os.getenv("MOONSHOT_BASE_URL", "https://api.moonshot.ai/v1")
api_key = os.getenv("MOONSHOT_API_KEY")
if not api_key:
print("Error: MOONSHOT_API_KEY environment variable is required")
return
client = FormulaChatClient(moonshot_base_url, api_key)
# Load tools
all_tools = []
tool_to_uri = {}
for tool in TOOLS:
func = tool.get("function")
if func:
func_name = func.get("name")
all_tools.append(tool)
tool_to_uri[func_name] = "custom"
for uri in unique_formulas:
tools = await client.get_tools(uri)
for tool in tools:
func = tool.get("function")
if not func:
continue
func_name = func.get("name")
if not func_name or func_name in tool_to_uri:
continue
all_tools.append(tool)
tool_to_uri[func_name] = uri
if not all_tools:
print("Error: No tools loaded")
return
print(f"Loaded {len(all_tools)} tools")
try:
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
if args.question:
print(f"\nQuestion: {args.question}")
await client.chat(args.question, messages, all_tools, tool_to_uri)
else:
print("\nDialog mode (enter 'q' to exit)\n")
while True:
question = input("Q: ").strip()
if question.lower() == "q":
break
if question:
await client.chat(question, messages, all_tools, tool_to_uri)
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())