register-codex.py
· 12 KiB · Python
Raw
import json
import os
import re
import sys
import time
import uuid
import math
import random
import string
import secrets
import hashlib
import base64
import threading
from datetime import datetime, timezone, timedelta
from urllib import request as urllib_request
from urllib.parse import urlparse, parse_qs, urlencode, quote
from dataclasses import dataclass
from typing import Any, Dict
import urllib
from urllib.parse import urlparse, parse_qs
from urllib import request
from curl_cffi import requests
# email
def get(url: str, headers: dict | None=None) -> tuple[str, dict]:
# res, header
try:
req = urllib.request.Request(url, headers = headers or {})
with urllib.request.urlopen(req) as response:
resp_text = response.read().decode("utf-8")
resp_headers = dict(response.getheaders())
return resp_text, resp_headers
except Exception as e:
print(e)
return -1, {}
def get_email() -> str:
body, _ = get("https://mail.chatgpt.org.uk/api/generate-email", {"X-API-Key": "gpt-test", "User-Agent": "Mozilla/5.0"})
data = json.loads(body)
return data["data"]["email"]
def get_oai_code(email: str) -> str:
regex = r" (?<!\d)(\d{6})(?!\d)" #r"(?<!\d)\d{6}(?!\d)"
for i in range(20):
body,_ = get(f"https://mail.chatgpt.org.uk/api/emails?email={email}", {"referer": "https://mail.chatgpt.org.uk/", "User-Agent": "Mozilla/5.0"})
data = json.loads(body)
emails = data["data"]["emails"]
for email in emails:
if "openai" in email["from_address"]:
m = re.search(regex, email["subject"])
if m:
return m.group(1)
m = re.search(regex, email["html_content"])
return m.group(1)
else:
time.sleep(3)
continue
# end
# oauth
AUTH_URL = "https://auth.openai.com/oauth/authorize"
TOKEN_URL = "https://auth.openai.com/oauth/token"
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback"
DEFAULT_SCOPE = "openid email profile offline_access"
def _b64url_no_pad(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
def _sha256_b64url_no_pad(s: str) -> str:
return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
def _random_state(nbytes: int = 16) -> str:
return secrets.token_urlsafe(nbytes)
def _pkce_verifier() -> str:
# RFC 7636 allows 43..128 chars; urlsafe token is fine.
return secrets.token_urlsafe(64)
def _parse_callback_url(callback_url: str) -> Dict[str, str]:
candidate = callback_url.strip()
if not candidate:
return {
"code": "",
"state": "",
"error": "",
"error_description": "",
}
if "://" not in candidate:
if candidate.startswith("?"):
candidate = f"http://localhost{candidate}"
elif any(ch in candidate for ch in "/?#") or ":" in candidate:
candidate = f"http://{candidate}"
elif "=" in candidate:
candidate = f"http://localhost/?{candidate}"
parsed = urllib.parse.urlparse(candidate)
query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
for key, values in fragment.items():
if key not in query or not query[key] or not (query[key][0] or "").strip():
query[key] = values
def get1(k: str) -> str:
v = query.get(k, [""])
return (v[0] or "").strip()
code = get1("code")
state = get1("state")
error = get1("error")
error_description = get1("error_description")
if code and not state and "#" in code:
code, state = code.split("#", 1)
if not error and error_description:
error, error_description = error_description, ""
return {
"code": code,
"state": state,
"error": error,
"error_description": error_description,
}
def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
if not id_token or id_token.count(".") < 2:
return {}
payload_b64 = id_token.split(".")[1]
pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
try:
payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii"))
return json.loads(payload.decode("utf-8"))
except Exception:
return {}
def _to_int(v: Any) -> int:
try:
return int(v)
except (TypeError, ValueError):
return 0
def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
body = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(
url,
data=body,
method="POST",
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
if resp.status != 200:
raise RuntimeError(
f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}"
)
return json.loads(raw.decode("utf-8"))
except urllib.error.HTTPError as exc:
raw = exc.read()
raise RuntimeError(
f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}"
) from exc
@dataclass(frozen=True)
class OAuthStart:
auth_url: str
state: str
code_verifier: str
redirect_uri: str
def generate_oauth_url(*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE) -> OAuthStart:
state = _random_state()
code_verifier = _pkce_verifier()
code_challenge = _sha256_b64url_no_pad(code_verifier)
params = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": redirect_uri,
"scope": scope,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"prompt": "login",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
}
auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
return OAuthStart(
auth_url=auth_url,
state=state,
code_verifier=code_verifier,
redirect_uri=redirect_uri,
)
def submit_callback_url(*, callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str = DEFAULT_REDIRECT_URI) -> str:
cb = _parse_callback_url(callback_url)
if cb["error"]:
desc = cb["error_description"]
raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip())
if not cb["code"]:
raise ValueError("callback url missing ?code=")
if not cb["state"]:
raise ValueError("callback url missing ?state=")
if cb["state"] != expected_state:
raise ValueError("state mismatch")
token_resp = _post_form(
TOKEN_URL,
{
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"code": cb["code"],
"redirect_uri": redirect_uri,
"code_verifier": code_verifier,
},
)
access_token = (token_resp.get("access_token") or "").strip()
refresh_token = (token_resp.get("refresh_token") or "").strip()
id_token = (token_resp.get("id_token") or "").strip()
expires_in = _to_int(token_resp.get("expires_in"))
claims = _jwt_claims_no_verify(id_token)
email = str(claims.get("email") or "").strip()
auth_claims = claims.get("https://api.openai.com/auth") or {}
account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
now = int(time.time())
expired_rfc3339 = time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))
)
now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
config = {
"id_token": id_token,
"access_token": access_token,
"refresh_token": refresh_token,
"account_id": account_id,
"last_refresh": now_rfc3339,
"email": email,
"type": "codex",
"expired": expired_rfc3339,
}
return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
# end
def run(proxy: str) -> str:
s = requests.Session(
proxies={
"http": proxy,
"https": proxy,
},
impersonate="chrome"
)
trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10)
trace = trace.text
ip_re = re.search(r"^ip=(.+)$", trace, re.MULTILINE)
loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE)
ip = ip_re.group(1) if ip_re else None
loc = loc_re.group(1) if loc_re else None
print(loc, ip)
if loc == "CN" or loc == "HK":
raise RuntimeError("检查代理哦w")
email = get_email()
print(email)
oauth = generate_oauth_url()
url = oauth.auth_url
resp = s.get(url)
did = s.cookies.get("oai-did")
print(did)
signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}'
sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}'
sen_resp = requests.post("https://sentinel.openai.com/backend-api/sentinel/req", headers={"origin": "https://sentinel.openai.com", "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", "content-type": "text/plain;charset=UTF-8"}, data=sen_req_body)
print(sen_resp.status_code)
sen_resp = sen_resp.json()["token"]
sentinel = f'{{"p": "", "t": "", "c": "{sen_resp}", "id": "{did}", "flow": "authorize_continue"}}'
signup_resp = s.post("https://auth.openai.com/api/accounts/authorize/continue", headers={"referer": "https://auth.openai.com/create-account", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": sentinel}, data=signup_body)
print(signup_resp.status_code)
otp_resp = s.post("https://auth.openai.com/api/accounts/passwordless/send-otp", headers={"referer": "https://auth.openai.com/create-account/password", "accept": "application/json", "content-type": "application/json"})
print(otp_resp.status_code)
code = get_oai_code(email)
print(code)
code_body = f'{{"code":"{code}"}}'
code_resp = s.post("https://auth.openai.com/api/accounts/email-otp/validate", headers={"referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json"}, data=code_body)
print(code_resp.status_code)
create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}'
create_account_resp = s.post("https://auth.openai.com/api/accounts/create_account", headers={"referer": "https://auth.openai.com/about-you", "accept": "application/json", "content-type": "application/json"}, data=create_account_body)
create_account_status = create_account_resp.status_code
print(create_account_status)
if create_account_status != 200:
print(create_account_resp.text)
return
print(create_account_status)
auth = s.cookies.get("oai-client-auth-session")
auth = base64.b64decode(auth.split(".")[0])
auth = json.loads(auth)
workspace_id = auth["workspaces"][0]["id"]
print(workspace_id)
select_body = f'{{"workspace_id":"{workspace_id}"}}'
select_resp = s.post("https://auth.openai.com/api/accounts/workspace/select", headers={"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", "content-type": "application/json"}, data=select_body)
print(select_resp.status_code)
continue_url = select_resp.json()["continue_url"]
final_resp = s.get(continue_url, allow_redirects=False)
final_resp = s.get(final_resp.headers.get("Location"), allow_redirects=False)
final_resp = s.get(final_resp.headers.get("Location"), allow_redirects=False)
cbk = final_resp.headers.get("Location")
return submit_callback_url(callback_url=cbk, code_verifier=oauth.code_verifier, redirect_uri=oauth.redirect_uri, expected_state=oauth.state)
if __name__ == "__main__":
print(run(None))
| 1 | import json |
| 2 | import os |
| 3 | import re |
| 4 | import sys |
| 5 | import time |
| 6 | import uuid |
| 7 | import math |
| 8 | import random |
| 9 | import string |
| 10 | import secrets |
| 11 | import hashlib |
| 12 | import base64 |
| 13 | import threading |
| 14 | from datetime import datetime, timezone, timedelta |
| 15 | from urllib import request as urllib_request |
| 16 | from urllib.parse import urlparse, parse_qs, urlencode, quote |
| 17 | from dataclasses import dataclass |
| 18 | from typing import Any, Dict |
| 19 | import urllib |
| 20 | from urllib.parse import urlparse, parse_qs |
| 21 | from urllib import request |
| 22 | |
| 23 | from curl_cffi import requests |
| 24 | |
| 25 | |
| 26 | def get(url: str, headers: dict | None=None) -> tuple[str, dict]: |
| 27 | # res, header |
| 28 | try: |
| 29 | req = urllib.request.Request(url, headers = headers or {}) |
| 30 | with urllib.request.urlopen(req) as response: |
| 31 | resp_text = response.read().decode("utf-8") |
| 32 | resp_headers = dict(response.getheaders()) |
| 33 | return resp_text, resp_headers |
| 34 | except Exception as e: |
| 35 | print(e) |
| 36 | return -1, {} |
| 37 | |
| 38 | def get_email() -> str: |
| 39 | body, _ = get("https://mail.chatgpt.org.uk/api/generate-email", {"X-API-Key": "gpt-test", "User-Agent": "Mozilla/5.0"}) |
| 40 | data = json.loads(body) |
| 41 | return data["data"]["email"] |
| 42 | |
| 43 | def get_oai_code(email: str) -> str: |
| 44 | regex = r" (?<!\d)(\d{6})(?!\d)" #r"(?<!\d)\d{6}(?!\d)" |
| 45 | for i in range(20): |
| 46 | body,_ = get(f"https://mail.chatgpt.org.uk/api/emails?email={email}", {"referer": "https://mail.chatgpt.org.uk/", "User-Agent": "Mozilla/5.0"}) |
| 47 | data = json.loads(body) |
| 48 | emails = data["data"]["emails"] |
| 49 | for email in emails: |
| 50 | if "openai" in email["from_address"]: |
| 51 | m = re.search(regex, email["subject"]) |
| 52 | if m: |
| 53 | return m.group(1) |
| 54 | m = re.search(regex, email["html_content"]) |
| 55 | return m.group(1) |
| 56 | else: |
| 57 | time.sleep(3) |
| 58 | continue |
| 59 | # end |
| 60 | |
| 61 | |
| 62 | # oauth |
| 63 | AUTH_URL = "https://auth.openai.com/oauth/authorize" |
| 64 | TOKEN_URL = "https://auth.openai.com/oauth/token" |
| 65 | CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann" |
| 66 | |
| 67 | DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback" |
| 68 | DEFAULT_SCOPE = "openid email profile offline_access" |
| 69 | |
| 70 | def _b64url_no_pad(raw: bytes) -> str: |
| 71 | return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=") |
| 72 | |
| 73 | def _sha256_b64url_no_pad(s: str) -> str: |
| 74 | return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest()) |
| 75 | |
| 76 | def _random_state(nbytes: int = 16) -> str: |
| 77 | return secrets.token_urlsafe(nbytes) |
| 78 | |
| 79 | def _pkce_verifier() -> str: |
| 80 | # RFC 7636 allows 43..128 chars; urlsafe token is fine. |
| 81 | return secrets.token_urlsafe(64) |
| 82 | |
| 83 | def _parse_callback_url(callback_url: str) -> Dict[str, str]: |
| 84 | candidate = callback_url.strip() |
| 85 | if not candidate: |
| 86 | return { |
| 87 | "code": "", |
| 88 | "state": "", |
| 89 | "error": "", |
| 90 | "error_description": "", |
| 91 | } |
| 92 | |
| 93 | if "://" not in candidate: |
| 94 | if candidate.startswith("?"): |
| 95 | candidate = f"http://localhost{candidate}" |
| 96 | elif any(ch in candidate for ch in "/?#") or ":" in candidate: |
| 97 | candidate = f"http://{candidate}" |
| 98 | elif "=" in candidate: |
| 99 | candidate = f"http://localhost/?{candidate}" |
| 100 | |
| 101 | parsed = urllib.parse.urlparse(candidate) |
| 102 | query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True) |
| 103 | fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True) |
| 104 | |
| 105 | for key, values in fragment.items(): |
| 106 | if key not in query or not query[key] or not (query[key][0] or "").strip(): |
| 107 | query[key] = values |
| 108 | |
| 109 | def get1(k: str) -> str: |
| 110 | v = query.get(k, [""]) |
| 111 | return (v[0] or "").strip() |
| 112 | |
| 113 | code = get1("code") |
| 114 | state = get1("state") |
| 115 | error = get1("error") |
| 116 | error_description = get1("error_description") |
| 117 | |
| 118 | if code and not state and "#" in code: |
| 119 | code, state = code.split("#", 1) |
| 120 | |
| 121 | if not error and error_description: |
| 122 | error, error_description = error_description, "" |
| 123 | |
| 124 | return { |
| 125 | "code": code, |
| 126 | "state": state, |
| 127 | "error": error, |
| 128 | "error_description": error_description, |
| 129 | } |
| 130 | |
| 131 | def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]: |
| 132 | if not id_token or id_token.count(".") < 2: |
| 133 | return {} |
| 134 | payload_b64 = id_token.split(".")[1] |
| 135 | pad = "=" * ((4 - (len(payload_b64) % 4)) % 4) |
| 136 | try: |
| 137 | payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii")) |
| 138 | return json.loads(payload.decode("utf-8")) |
| 139 | except Exception: |
| 140 | return {} |
| 141 | |
| 142 | def _to_int(v: Any) -> int: |
| 143 | try: |
| 144 | return int(v) |
| 145 | except (TypeError, ValueError): |
| 146 | return 0 |
| 147 | |
| 148 | def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]: |
| 149 | body = urllib.parse.urlencode(data).encode("utf-8") |
| 150 | req = urllib.request.Request( |
| 151 | url, |
| 152 | data=body, |
| 153 | method="POST", |
| 154 | headers={ |
| 155 | "Content-Type": "application/x-www-form-urlencoded", |
| 156 | "Accept": "application/json", |
| 157 | }, |
| 158 | ) |
| 159 | try: |
| 160 | with urllib.request.urlopen(req, timeout=timeout) as resp: |
| 161 | raw = resp.read() |
| 162 | if resp.status != 200: |
| 163 | raise RuntimeError( |
| 164 | f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}" |
| 165 | ) |
| 166 | return json.loads(raw.decode("utf-8")) |
| 167 | except urllib.error.HTTPError as exc: |
| 168 | raw = exc.read() |
| 169 | raise RuntimeError( |
| 170 | f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}" |
| 171 | ) from exc |
| 172 | |
| 173 | |
| 174 | @dataclass(frozen=True) |
| 175 | class OAuthStart: |
| 176 | auth_url: str |
| 177 | state: str |
| 178 | code_verifier: str |
| 179 | redirect_uri: str |
| 180 | |
| 181 | |
| 182 | def generate_oauth_url(*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE) -> OAuthStart: |
| 183 | state = _random_state() |
| 184 | code_verifier = _pkce_verifier() |
| 185 | code_challenge = _sha256_b64url_no_pad(code_verifier) |
| 186 | |
| 187 | params = { |
| 188 | "client_id": CLIENT_ID, |
| 189 | "response_type": "code", |
| 190 | "redirect_uri": redirect_uri, |
| 191 | "scope": scope, |
| 192 | "state": state, |
| 193 | "code_challenge": code_challenge, |
| 194 | "code_challenge_method": "S256", |
| 195 | "prompt": "login", |
| 196 | "id_token_add_organizations": "true", |
| 197 | "codex_cli_simplified_flow": "true", |
| 198 | } |
| 199 | auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}" |
| 200 | return OAuthStart( |
| 201 | auth_url=auth_url, |
| 202 | state=state, |
| 203 | code_verifier=code_verifier, |
| 204 | redirect_uri=redirect_uri, |
| 205 | ) |
| 206 | |
| 207 | |
| 208 | def submit_callback_url(*, callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str = DEFAULT_REDIRECT_URI) -> str: |
| 209 | cb = _parse_callback_url(callback_url) |
| 210 | if cb["error"]: |
| 211 | desc = cb["error_description"] |
| 212 | raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip()) |
| 213 | |
| 214 | if not cb["code"]: |
| 215 | raise ValueError("callback url missing ?code=") |
| 216 | if not cb["state"]: |
| 217 | raise ValueError("callback url missing ?state=") |
| 218 | if cb["state"] != expected_state: |
| 219 | raise ValueError("state mismatch") |
| 220 | |
| 221 | token_resp = _post_form( |
| 222 | TOKEN_URL, |
| 223 | { |
| 224 | "grant_type": "authorization_code", |
| 225 | "client_id": CLIENT_ID, |
| 226 | "code": cb["code"], |
| 227 | "redirect_uri": redirect_uri, |
| 228 | "code_verifier": code_verifier, |
| 229 | }, |
| 230 | ) |
| 231 | |
| 232 | access_token = (token_resp.get("access_token") or "").strip() |
| 233 | refresh_token = (token_resp.get("refresh_token") or "").strip() |
| 234 | id_token = (token_resp.get("id_token") or "").strip() |
| 235 | expires_in = _to_int(token_resp.get("expires_in")) |
| 236 | |
| 237 | claims = _jwt_claims_no_verify(id_token) |
| 238 | email = str(claims.get("email") or "").strip() |
| 239 | auth_claims = claims.get("https://api.openai.com/auth") or {} |
| 240 | account_id = str(auth_claims.get("chatgpt_account_id") or "").strip() |
| 241 | |
| 242 | now = int(time.time()) |
| 243 | expired_rfc3339 = time.strftime( |
| 244 | "%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0)) |
| 245 | ) |
| 246 | now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)) |
| 247 | |
| 248 | config = { |
| 249 | "id_token": id_token, |
| 250 | "access_token": access_token, |
| 251 | "refresh_token": refresh_token, |
| 252 | "account_id": account_id, |
| 253 | "last_refresh": now_rfc3339, |
| 254 | "email": email, |
| 255 | "type": "codex", |
| 256 | "expired": expired_rfc3339, |
| 257 | } |
| 258 | |
| 259 | return json.dumps(config, ensure_ascii=False, separators=(",", ":")) |
| 260 | # end |
| 261 | |
| 262 | |
| 263 | def run(proxy: str) -> str: |
| 264 | s = requests.Session( |
| 265 | proxies={ |
| 266 | "http": proxy, |
| 267 | "https": proxy, |
| 268 | }, |
| 269 | impersonate="chrome" |
| 270 | ) |
| 271 | trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10) |
| 272 | trace = trace.text |
| 273 | ip_re = re.search(r"^ip=(.+)$", trace, re.MULTILINE) |
| 274 | loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE) |
| 275 | ip = ip_re.group(1) if ip_re else None |
| 276 | loc = loc_re.group(1) if loc_re else None |
| 277 | print(loc, ip) |
| 278 | if loc == "CN" or loc == "HK": |
| 279 | raise RuntimeError("检查代理哦w") |
| 280 | email = get_email() |
| 281 | print(email) |
| 282 | oauth = generate_oauth_url() |
| 283 | url = oauth.auth_url |
| 284 | resp = s.get(url) |
| 285 | did = s.cookies.get("oai-did") |
| 286 | print(did) |
| 287 | signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}' |
| 288 | sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}' |
| 289 | sen_resp = requests.post("https://sentinel.openai.com/backend-api/sentinel/req", headers={"origin": "https://sentinel.openai.com", "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", "content-type": "text/plain;charset=UTF-8"}, data=sen_req_body) |
| 290 | print(sen_resp.status_code) |
| 291 | sen_resp = sen_resp.json()["token"] |
| 292 | sentinel = f'{{"p": "", "t": "", "c": "{sen_resp}", "id": "{did}", "flow": "authorize_continue"}}' |
| 293 | signup_resp = s.post("https://auth.openai.com/api/accounts/authorize/continue", headers={"referer": "https://auth.openai.com/create-account", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": sentinel}, data=signup_body) |
| 294 | print(signup_resp.status_code) |
| 295 | otp_resp = s.post("https://auth.openai.com/api/accounts/passwordless/send-otp", headers={"referer": "https://auth.openai.com/create-account/password", "accept": "application/json", "content-type": "application/json"}) |
| 296 | print(otp_resp.status_code) |
| 297 | code = get_oai_code(email) |
| 298 | print(code) |
| 299 | code_body = f'{{"code":"{code}"}}' |
| 300 | code_resp = s.post("https://auth.openai.com/api/accounts/email-otp/validate", headers={"referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json"}, data=code_body) |
| 301 | print(code_resp.status_code) |
| 302 | create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}' |
| 303 | create_account_resp = s.post("https://auth.openai.com/api/accounts/create_account", headers={"referer": "https://auth.openai.com/about-you", "accept": "application/json", "content-type": "application/json"}, data=create_account_body) |
| 304 | create_account_status = create_account_resp.status_code |
| 305 | print(create_account_status) |
| 306 | if create_account_status != 200: |
| 307 | print(create_account_resp.text) |
| 308 | return |
| 309 | print(create_account_status) |
| 310 | auth = s.cookies.get("oai-client-auth-session") |
| 311 | auth = base64.b64decode(auth.split(".")[0]) |
| 312 | auth = json.loads(auth) |
| 313 | workspace_id = auth["workspaces"][0]["id"] |
| 314 | print(workspace_id) |
| 315 | select_body = f'{{"workspace_id":"{workspace_id}"}}' |
| 316 | select_resp = s.post("https://auth.openai.com/api/accounts/workspace/select", headers={"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", "content-type": "application/json"}, data=select_body) |
| 317 | print(select_resp.status_code) |
| 318 | continue_url = select_resp.json()["continue_url"] |
| 319 | final_resp = s.get(continue_url, allow_redirects=False) |
| 320 | final_resp = s.get(final_resp.headers.get("Location"), allow_redirects=False) |
| 321 | final_resp = s.get(final_resp.headers.get("Location"), allow_redirects=False) |
| 322 | cbk = final_resp.headers.get("Location") |
| 323 | return submit_callback_url(callback_url=cbk, code_verifier=oauth.code_verifier, redirect_uri=oauth.redirect_uri, expected_state=oauth.state) |
| 324 | |
| 325 | if __name__ == "__main__": |
| 326 | print(run(None)) |