mirror of https://github.com/openclaw/openclaw.git
chore: format python scripts
This commit is contained in:
parent
4c56672e97
commit
8b6dbfb7a1
|
|
@ -5,7 +5,6 @@ import pathlib
|
|||
import re
|
||||
import sys
|
||||
|
||||
|
||||
INPUT_INTERPOLATION_RE = re.compile(r"\$\{\{\s*inputs\.")
|
||||
RUN_LINE_RE = re.compile(r"^(\s*)run:\s*(.*)$")
|
||||
USING_COMPOSITE_RE = re.compile(r"^\s*using:\s*composite\s*$", re.MULTILINE)
|
||||
|
|
|
|||
|
|
@ -9,12 +9,12 @@ from __future__ import annotations
|
|||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from collections.abc import Iterable
|
||||
from dataclasses import dataclass
|
||||
from datetime import date, datetime, timedelta
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
from typing import Any
|
||||
|
||||
|
||||
def positive_int(value: str) -> int:
|
||||
|
|
@ -31,7 +31,7 @@ def eprint(msg: str) -> None:
|
|||
print(msg, file=sys.stderr)
|
||||
|
||||
|
||||
def run_codexbar_cost(provider: str) -> List[Dict[str, Any]]:
|
||||
def run_codexbar_cost(provider: str) -> list[dict[str, Any]]:
|
||||
cmd = ["codexbar", "cost", "--format", "json", "--provider", provider]
|
||||
try:
|
||||
output = subprocess.check_output(cmd, text=True)
|
||||
|
|
@ -48,12 +48,12 @@ def run_codexbar_cost(provider: str) -> List[Dict[str, Any]]:
|
|||
return payload
|
||||
|
||||
|
||||
def load_payload(input_path: Optional[str], provider: str) -> Dict[str, Any]:
|
||||
def load_payload(input_path: str | None, provider: str) -> dict[str, Any]:
|
||||
if input_path:
|
||||
if input_path == "-":
|
||||
raw = sys.stdin.read()
|
||||
else:
|
||||
with open(input_path, "r", encoding="utf-8") as handle:
|
||||
with open(input_path, encoding="utf-8") as handle:
|
||||
raw = handle.read()
|
||||
data = json.loads(raw)
|
||||
else:
|
||||
|
|
@ -77,7 +77,7 @@ class ModelCost:
|
|||
cost: float
|
||||
|
||||
|
||||
def parse_daily_entries(payload: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
def parse_daily_entries(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
daily = payload.get("daily")
|
||||
if not daily:
|
||||
return []
|
||||
|
|
@ -86,18 +86,18 @@ def parse_daily_entries(payload: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|||
return [entry for entry in daily if isinstance(entry, dict)]
|
||||
|
||||
|
||||
def parse_date(value: str) -> Optional[date]:
|
||||
def parse_date(value: str) -> date | None:
|
||||
try:
|
||||
return datetime.strptime(value, "%Y-%m-%d").date()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def filter_by_days(entries: List[Dict[str, Any]], days: Optional[int]) -> List[Dict[str, Any]]:
|
||||
def filter_by_days(entries: list[dict[str, Any]], days: int | None) -> list[dict[str, Any]]:
|
||||
if not days:
|
||||
return entries
|
||||
cutoff = date.today() - timedelta(days=days - 1)
|
||||
filtered: List[Dict[str, Any]] = []
|
||||
filtered: list[dict[str, Any]] = []
|
||||
for entry in entries:
|
||||
day = entry.get("date")
|
||||
if not isinstance(day, str):
|
||||
|
|
@ -108,8 +108,8 @@ def filter_by_days(entries: List[Dict[str, Any]], days: Optional[int]) -> List[D
|
|||
return filtered
|
||||
|
||||
|
||||
def aggregate_costs(entries: Iterable[Dict[str, Any]]) -> Dict[str, float]:
|
||||
totals: Dict[str, float] = {}
|
||||
def aggregate_costs(entries: Iterable[dict[str, Any]]) -> dict[str, float]:
|
||||
totals: dict[str, float] = {}
|
||||
for entry in entries:
|
||||
breakdowns = entry.get("modelBreakdowns")
|
||||
if not breakdowns:
|
||||
|
|
@ -129,7 +129,7 @@ def aggregate_costs(entries: Iterable[Dict[str, Any]]) -> Dict[str, float]:
|
|||
return totals
|
||||
|
||||
|
||||
def pick_current_model(entries: List[Dict[str, Any]]) -> Tuple[Optional[str], Optional[str]]:
|
||||
def pick_current_model(entries: list[dict[str, Any]]) -> tuple[str | None, str | None]:
|
||||
if not entries:
|
||||
return None, None
|
||||
sorted_entries = sorted(
|
||||
|
|
@ -139,7 +139,7 @@ def pick_current_model(entries: List[Dict[str, Any]]) -> Tuple[Optional[str], Op
|
|||
for entry in reversed(sorted_entries):
|
||||
breakdowns = entry.get("modelBreakdowns")
|
||||
if isinstance(breakdowns, list) and breakdowns:
|
||||
scored: List[ModelCost] = []
|
||||
scored: list[ModelCost] = []
|
||||
for item in breakdowns:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
|
|
@ -158,13 +158,13 @@ def pick_current_model(entries: List[Dict[str, Any]]) -> Tuple[Optional[str], Op
|
|||
return None, None
|
||||
|
||||
|
||||
def usd(value: Optional[float]) -> str:
|
||||
def usd(value: float | None) -> str:
|
||||
if value is None:
|
||||
return "—"
|
||||
return f"${value:,.2f}"
|
||||
|
||||
|
||||
def latest_day_cost(entries: List[Dict[str, Any]], model: str) -> Tuple[Optional[str], Optional[float]]:
|
||||
def latest_day_cost(entries: list[dict[str, Any]], model: str) -> tuple[str | None, float | None]:
|
||||
if not entries:
|
||||
return None, None
|
||||
sorted_entries = sorted(
|
||||
|
|
@ -188,10 +188,10 @@ def latest_day_cost(entries: List[Dict[str, Any]], model: str) -> Tuple[Optional
|
|||
def render_text_current(
|
||||
provider: str,
|
||||
model: str,
|
||||
latest_date: Optional[str],
|
||||
total_cost: Optional[float],
|
||||
latest_cost: Optional[float],
|
||||
latest_cost_date: Optional[str],
|
||||
latest_date: str | None,
|
||||
total_cost: float | None,
|
||||
latest_cost: float | None,
|
||||
latest_cost_date: str | None,
|
||||
entry_count: int,
|
||||
) -> str:
|
||||
lines = [f"Provider: {provider}", f"Current model: {model}"]
|
||||
|
|
@ -204,7 +204,7 @@ def render_text_current(
|
|||
return "\n".join(lines)
|
||||
|
||||
|
||||
def render_text_all(provider: str, totals: Dict[str, float]) -> str:
|
||||
def render_text_all(provider: str, totals: dict[str, float]) -> str:
|
||||
lines = [f"Provider: {provider}", "Models:"]
|
||||
for model, cost in sorted(totals.items(), key=lambda item: item[1], reverse=True):
|
||||
lines.append(f"- {model}: {usd(cost)}")
|
||||
|
|
@ -214,12 +214,12 @@ def render_text_all(provider: str, totals: Dict[str, float]) -> str:
|
|||
def build_json_current(
|
||||
provider: str,
|
||||
model: str,
|
||||
latest_date: Optional[str],
|
||||
total_cost: Optional[float],
|
||||
latest_cost: Optional[float],
|
||||
latest_cost_date: Optional[str],
|
||||
latest_date: str | None,
|
||||
total_cost: float | None,
|
||||
latest_cost: float | None,
|
||||
latest_cost_date: str | None,
|
||||
entry_count: int,
|
||||
) -> Dict[str, Any]:
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"provider": provider,
|
||||
"mode": "current",
|
||||
|
|
@ -232,7 +232,7 @@ def build_json_current(
|
|||
}
|
||||
|
||||
|
||||
def build_json_all(provider: str, totals: Dict[str, float]) -> Dict[str, Any]:
|
||||
def build_json_all(provider: str, totals: dict[str, float]) -> dict[str, Any]:
|
||||
return {
|
||||
"provider": provider,
|
||||
"mode": "all",
|
||||
|
|
|
|||
|
|
@ -70,42 +70,32 @@ def choose_output_resolution(
|
|||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Generate images using Nano Banana Pro (Gemini 3 Pro Image)"
|
||||
)
|
||||
parser = argparse.ArgumentParser(description="Generate images using Nano Banana Pro (Gemini 3 Pro Image)")
|
||||
parser.add_argument("--prompt", "-p", required=True, help="Image description/prompt")
|
||||
parser.add_argument("--filename", "-f", required=True, help="Output filename (e.g., sunset-mountains.png)")
|
||||
parser.add_argument(
|
||||
"--prompt", "-p",
|
||||
required=True,
|
||||
help="Image description/prompt"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--filename", "-f",
|
||||
required=True,
|
||||
help="Output filename (e.g., sunset-mountains.png)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--input-image", "-i",
|
||||
"--input-image",
|
||||
"-i",
|
||||
action="append",
|
||||
dest="input_images",
|
||||
metavar="IMAGE",
|
||||
help="Input image path(s) for editing/composition. Can be specified multiple times (up to 14 images)."
|
||||
help="Input image path(s) for editing/composition. Can be specified multiple times (up to 14 images).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--resolution", "-r",
|
||||
"--resolution",
|
||||
"-r",
|
||||
choices=["1K", "2K", "4K"],
|
||||
default=None,
|
||||
help="Output resolution: 1K, 2K, or 4K. If omitted with input images, auto-detect from largest image dimension."
|
||||
help="Output resolution: 1K, 2K, or 4K. If omitted with input images, auto-detect from largest image dimension.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--aspect-ratio", "-a",
|
||||
"--aspect-ratio",
|
||||
"-a",
|
||||
choices=SUPPORTED_ASPECT_RATIOS,
|
||||
default=None,
|
||||
help=f"Output aspect ratio (default: model decides). Options: {', '.join(SUPPORTED_ASPECT_RATIOS)}"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--api-key", "-k",
|
||||
help="Gemini API key (overrides GEMINI_API_KEY env var)"
|
||||
help=f"Output aspect ratio (default: model decides). Options: {', '.join(SUPPORTED_ASPECT_RATIOS)}",
|
||||
)
|
||||
parser.add_argument("--api-key", "-k", help="Gemini API key (overrides GEMINI_API_KEY env var)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
|
@ -158,10 +148,7 @@ def main():
|
|||
has_input_images=bool(input_images),
|
||||
)
|
||||
if auto_detected:
|
||||
print(
|
||||
f"Auto-detected resolution: {output_resolution} "
|
||||
f"(from max input dimension {max_input_dim})"
|
||||
)
|
||||
print(f"Auto-detected resolution: {output_resolution} (from max input dimension {max_input_dim})")
|
||||
|
||||
# Build contents (images first if editing, prompt only if generating)
|
||||
if input_images:
|
||||
|
|
@ -182,9 +169,8 @@ def main():
|
|||
model="gemini-3-pro-image-preview",
|
||||
contents=contents,
|
||||
config=types.GenerateContentConfig(
|
||||
response_modalities=["TEXT", "IMAGE"],
|
||||
image_config=types.ImageConfig(**image_cfg_kwargs)
|
||||
)
|
||||
response_modalities=["TEXT", "IMAGE"], image_config=types.ImageConfig(**image_cfg_kwargs)
|
||||
),
|
||||
)
|
||||
|
||||
# Process response and convert to PNG
|
||||
|
|
@ -201,19 +187,20 @@ def main():
|
|||
if isinstance(image_data, str):
|
||||
# If it's a string, it might be base64
|
||||
import base64
|
||||
|
||||
image_data = base64.b64decode(image_data)
|
||||
|
||||
image = PILImage.open(BytesIO(image_data))
|
||||
|
||||
# Ensure RGB mode for PNG (convert RGBA to RGB with white background if needed)
|
||||
if image.mode == 'RGBA':
|
||||
rgb_image = PILImage.new('RGB', image.size, (255, 255, 255))
|
||||
if image.mode == "RGBA":
|
||||
rgb_image = PILImage.new("RGB", image.size, (255, 255, 255))
|
||||
rgb_image.paste(image, mask=image.split()[3])
|
||||
rgb_image.save(str(output_path), 'PNG')
|
||||
elif image.mode == 'RGB':
|
||||
image.save(str(output_path), 'PNG')
|
||||
rgb_image.save(str(output_path), "PNG")
|
||||
elif image.mode == "RGB":
|
||||
image.save(str(output_path), "PNG")
|
||||
else:
|
||||
image.convert('RGB').save(str(output_path), 'PNG')
|
||||
image.convert("RGB").save(str(output_path), "PNG")
|
||||
image_saved = True
|
||||
|
||||
if image_saved:
|
||||
|
|
|
|||
|
|
@ -58,9 +58,7 @@ def pick_prompts(count: int) -> list[str]:
|
|||
]
|
||||
prompts: list[str] = []
|
||||
for _ in range(count):
|
||||
prompts.append(
|
||||
f"{random.choice(styles)} of {random.choice(subjects)}, {random.choice(lighting)}"
|
||||
)
|
||||
prompts.append(f"{random.choice(styles)} of {random.choice(subjects)}, {random.choice(lighting)}")
|
||||
return prompts
|
||||
|
||||
|
||||
|
|
@ -100,9 +98,7 @@ def normalize_optional_flag(
|
|||
value = aliases.get(value, value)
|
||||
|
||||
if value not in allowed:
|
||||
raise ValueError(
|
||||
f"Invalid --{flag_name} '{raw_value}'. Allowed values: {allowed_text}."
|
||||
)
|
||||
raise ValueError(f"Invalid --{flag_name} '{raw_value}'. Allowed values: {allowed_text}.")
|
||||
return value
|
||||
|
||||
|
||||
|
|
@ -115,10 +111,7 @@ def normalize_background(model: str, background: str) -> str:
|
|||
supported=lambda candidate: candidate.startswith("gpt-image"),
|
||||
allowed={"transparent", "opaque", "auto"},
|
||||
allowed_text="transparent, opaque, auto",
|
||||
unsupported_message=(
|
||||
"Warning: --background is only supported for gpt-image models; "
|
||||
"ignoring for '{model}'."
|
||||
),
|
||||
unsupported_message=("Warning: --background is only supported for gpt-image models; ignoring for '{model}'."),
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -131,9 +124,7 @@ def normalize_style(model: str, style: str) -> str:
|
|||
supported=lambda candidate: candidate == "dall-e-3",
|
||||
allowed={"vivid", "natural"},
|
||||
allowed_text="vivid, natural",
|
||||
unsupported_message=(
|
||||
"Warning: --style is only supported for dall-e-3; ignoring for '{model}'."
|
||||
),
|
||||
unsupported_message=("Warning: --style is only supported for dall-e-3; ignoring for '{model}'."),
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -147,8 +138,7 @@ def normalize_output_format(model: str, output_format: str) -> str:
|
|||
allowed={"png", "jpeg", "webp"},
|
||||
allowed_text="png, jpeg, webp",
|
||||
unsupported_message=(
|
||||
"Warning: --output-format is only supported for gpt-image models; "
|
||||
"ignoring for '{model}'."
|
||||
"Warning: --output-format is only supported for gpt-image models; ignoring for '{model}'."
|
||||
),
|
||||
aliases={"jpg": "jpeg"},
|
||||
)
|
||||
|
|
@ -245,9 +235,15 @@ def main() -> int:
|
|||
ap.add_argument("--prompt", help="Single prompt. If omitted, random prompts are generated.")
|
||||
ap.add_argument("--count", type=int, default=8, help="How many images to generate.")
|
||||
ap.add_argument("--model", default="gpt-image-1", help="Image model id.")
|
||||
ap.add_argument("--size", default="", help="Image size (e.g. 1024x1024, 1536x1024). Defaults based on model if not specified.")
|
||||
ap.add_argument("--quality", default="", help="Image quality (e.g. high, standard). Defaults based on model if not specified.")
|
||||
ap.add_argument("--background", default="", help="Background transparency (GPT models only): transparent, opaque, or auto.")
|
||||
ap.add_argument(
|
||||
"--size", default="", help="Image size (e.g. 1024x1024, 1536x1024). Defaults based on model if not specified."
|
||||
)
|
||||
ap.add_argument(
|
||||
"--quality", default="", help="Image quality (e.g. high, standard). Defaults based on model if not specified."
|
||||
)
|
||||
ap.add_argument(
|
||||
"--background", default="", help="Background transparency (GPT models only): transparent, opaque, or auto."
|
||||
)
|
||||
ap.add_argument("--output-format", default="", help="Output format (GPT models only): png, jpeg, or webp.")
|
||||
ap.add_argument("--style", default="", help="Image style (dall-e-3 only): vivid or natural.")
|
||||
ap.add_argument("--out-dir", default="", help="Output directory (default: ./tmp/openai-image-gen-<ts>).")
|
||||
|
|
@ -265,7 +261,10 @@ def main() -> int:
|
|||
|
||||
count = args.count
|
||||
if args.model == "dall-e-3" and count > 1:
|
||||
print(f"Warning: dall-e-3 only supports generating 1 image at a time. Reducing count from {count} to 1.", file=sys.stderr)
|
||||
print(
|
||||
f"Warning: dall-e-3 only supports generating 1 image at a time. Reducing count from {count} to 1.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
count = 1
|
||||
|
||||
out_dir = Path(args.out_dir).expanduser() if args.out_dir else default_out_dir()
|
||||
|
|
|
|||
|
|
@ -82,6 +82,8 @@ def test_normalize_output_format_normalizes_case_for_supported_values():
|
|||
|
||||
def test_normalize_output_format_strips_whitespace_for_supported_values():
|
||||
assert normalize_output_format("gpt-image-1", " png ") == "png"
|
||||
|
||||
|
||||
def test_normalize_output_format_keeps_supported_values():
|
||||
assert normalize_output_format("gpt-image-1", "png") == "png"
|
||||
assert normalize_output_format("gpt-image-1", "jpeg") == "jpeg"
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ Quick validation script for skills - minimal version
|
|||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
try:
|
||||
import yaml
|
||||
|
|
@ -16,7 +15,7 @@ except ModuleNotFoundError:
|
|||
MAX_SKILL_NAME_LENGTH = 64
|
||||
|
||||
|
||||
def _extract_frontmatter(content: str) -> Optional[str]:
|
||||
def _extract_frontmatter(content: str) -> str | None:
|
||||
lines = content.splitlines()
|
||||
if not lines or lines[0].strip() != "---":
|
||||
return None
|
||||
|
|
@ -26,13 +25,13 @@ def _extract_frontmatter(content: str) -> Optional[str]:
|
|||
return None
|
||||
|
||||
|
||||
def _parse_simple_frontmatter(frontmatter_text: str) -> Optional[dict[str, str]]:
|
||||
def _parse_simple_frontmatter(frontmatter_text: str) -> dict[str, str] | None:
|
||||
"""
|
||||
Minimal fallback parser used when PyYAML is unavailable.
|
||||
Supports simple `key: value` mappings used by SKILL.md frontmatter.
|
||||
"""
|
||||
parsed: dict[str, str] = {}
|
||||
current_key: Optional[str] = None
|
||||
current_key: str | None = None
|
||||
for raw_line in frontmatter_text.splitlines():
|
||||
stripped = raw_line.strip()
|
||||
if not stripped or stripped.startswith("#"):
|
||||
|
|
@ -43,9 +42,7 @@ def _parse_simple_frontmatter(frontmatter_text: str) -> Optional[dict[str, str]]
|
|||
if current_key is None:
|
||||
return None
|
||||
current_value = parsed[current_key]
|
||||
parsed[current_key] = (
|
||||
f"{current_value}\n{stripped}" if current_value else stripped
|
||||
)
|
||||
parsed[current_key] = f"{current_value}\n{stripped}" if current_value else stripped
|
||||
continue
|
||||
|
||||
if ":" not in stripped:
|
||||
|
|
@ -55,9 +52,7 @@ def _parse_simple_frontmatter(frontmatter_text: str) -> Optional[dict[str, str]]
|
|||
value = value.strip()
|
||||
if not key:
|
||||
return None
|
||||
if (value.startswith('"') and value.endswith('"')) or (
|
||||
value.startswith("'") and value.endswith("'")
|
||||
):
|
||||
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")):
|
||||
value = value[1:-1]
|
||||
parsed[key] = value
|
||||
current_key = key
|
||||
|
|
@ -129,8 +124,7 @@ def validate_skill(skill_path):
|
|||
if len(name) > MAX_SKILL_NAME_LENGTH:
|
||||
return (
|
||||
False,
|
||||
f"Name is too long ({len(name)} characters). "
|
||||
f"Maximum is {MAX_SKILL_NAME_LENGTH} characters.",
|
||||
f"Name is too long ({len(name)} characters). Maximum is {MAX_SKILL_NAME_LENGTH} characters.",
|
||||
)
|
||||
|
||||
description = frontmatter.get("description", "")
|
||||
|
|
|
|||
Loading…
Reference in New Issue