Last active 1772941771

Revision 941262dc527dfc6dccbb8f8fac8ebeff366cc83b

register-codex.py Raw
1import json
2import os
3import re
4import sys
5import time
6import uuid
7import math
8import random
9import string
10import secrets
11import hashlib
12import base64
13import threading
14from datetime import datetime, timezone, timedelta
15from urllib import request as urllib_request
16from urllib.parse import urlparse, parse_qs, urlencode, quote
17from dataclasses import dataclass
18from typing import Any, Dict
19import urllib
20from urllib.parse import urlparse, parse_qs
21from urllib import request
22
23from curl_cffi import requests
24
25# email
26def get(url: str, headers: dict | None=None) -> tuple[str, dict]:
27 # res, header
28 try:
29 req = urllib.request.Request(url, headers = headers or {})
30 with urllib.request.urlopen(req) as response:
31 resp_text = response.read().decode("utf-8")
32 resp_headers = dict(response.getheaders())
33 return resp_text, resp_headers
34 except Exception as e:
35 print(e)
36 return -1, {}
37
38def get_email() -> str:
39 body, _ = get("https://mail.chatgpt.org.uk/api/generate-email", {"X-API-Key": "gpt-test", "User-Agent": "Mozilla/5.0"})
40 data = json.loads(body)
41 return data["data"]["email"]
42
43def get_oai_code(email: str) -> str:
44 regex = r" (?<!\d)(\d{6})(?!\d)" #r"(?<!\d)\d{6}(?!\d)"
45 for i in range(20):
46 body,_ = get(f"https://mail.chatgpt.org.uk/api/emails?email={email}", {"referer": "https://mail.chatgpt.org.uk/", "User-Agent": "Mozilla/5.0"})
47 data = json.loads(body)
48 emails = data["data"]["emails"]
49 for email in emails:
50 if "openai" in email["from_address"]:
51 m = re.search(regex, email["subject"])
52 if m:
53 return m.group(1)
54 m = re.search(regex, email["html_content"])
55 return m.group(1)
56 else:
57 time.sleep(3)
58 continue
59# end
60
61
62# oauth
63AUTH_URL = "https://auth.openai.com/oauth/authorize"
64TOKEN_URL = "https://auth.openai.com/oauth/token"
65CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
66
67DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback"
68DEFAULT_SCOPE = "openid email profile offline_access"
69
70def _b64url_no_pad(raw: bytes) -> str:
71 return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
72
73def _sha256_b64url_no_pad(s: str) -> str:
74 return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
75
76def _random_state(nbytes: int = 16) -> str:
77 return secrets.token_urlsafe(nbytes)
78
79def _pkce_verifier() -> str:
80 # RFC 7636 allows 43..128 chars; urlsafe token is fine.
81 return secrets.token_urlsafe(64)
82
83def _parse_callback_url(callback_url: str) -> Dict[str, str]:
84 candidate = callback_url.strip()
85 if not candidate:
86 return {
87 "code": "",
88 "state": "",
89 "error": "",
90 "error_description": "",
91 }
92
93 if "://" not in candidate:
94 if candidate.startswith("?"):
95 candidate = f"http://localhost{candidate}"
96 elif any(ch in candidate for ch in "/?#") or ":" in candidate:
97 candidate = f"http://{candidate}"
98 elif "=" in candidate:
99 candidate = f"http://localhost/?{candidate}"
100
101 parsed = urllib.parse.urlparse(candidate)
102 query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
103 fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
104
105 for key, values in fragment.items():
106 if key not in query or not query[key] or not (query[key][0] or "").strip():
107 query[key] = values
108
109 def get1(k: str) -> str:
110 v = query.get(k, [""])
111 return (v[0] or "").strip()
112
113 code = get1("code")
114 state = get1("state")
115 error = get1("error")
116 error_description = get1("error_description")
117
118 if code and not state and "#" in code:
119 code, state = code.split("#", 1)
120
121 if not error and error_description:
122 error, error_description = error_description, ""
123
124 return {
125 "code": code,
126 "state": state,
127 "error": error,
128 "error_description": error_description,
129 }
130
131def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
132 if not id_token or id_token.count(".") < 2:
133 return {}
134 payload_b64 = id_token.split(".")[1]
135 pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
136 try:
137 payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii"))
138 return json.loads(payload.decode("utf-8"))
139 except Exception:
140 return {}
141
142def _to_int(v: Any) -> int:
143 try:
144 return int(v)
145 except (TypeError, ValueError):
146 return 0
147
148def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
149 body = urllib.parse.urlencode(data).encode("utf-8")
150 req = urllib.request.Request(
151 url,
152 data=body,
153 method="POST",
154 headers={
155 "Content-Type": "application/x-www-form-urlencoded",
156 "Accept": "application/json",
157 },
158 )
159 try:
160 with urllib.request.urlopen(req, timeout=timeout) as resp:
161 raw = resp.read()
162 if resp.status != 200:
163 raise RuntimeError(
164 f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}"
165 )
166 return json.loads(raw.decode("utf-8"))
167 except urllib.error.HTTPError as exc:
168 raw = exc.read()
169 raise RuntimeError(
170 f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}"
171 ) from exc
172
173
174@dataclass(frozen=True)
175class OAuthStart:
176 auth_url: str
177 state: str
178 code_verifier: str
179 redirect_uri: str
180
181
182def generate_oauth_url(*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE) -> OAuthStart:
183 state = _random_state()
184 code_verifier = _pkce_verifier()
185 code_challenge = _sha256_b64url_no_pad(code_verifier)
186
187 params = {
188 "client_id": CLIENT_ID,
189 "response_type": "code",
190 "redirect_uri": redirect_uri,
191 "scope": scope,
192 "state": state,
193 "code_challenge": code_challenge,
194 "code_challenge_method": "S256",
195 "prompt": "login",
196 "id_token_add_organizations": "true",
197 "codex_cli_simplified_flow": "true",
198 }
199 auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
200 return OAuthStart(
201 auth_url=auth_url,
202 state=state,
203 code_verifier=code_verifier,
204 redirect_uri=redirect_uri,
205 )
206
207
208def submit_callback_url(*, callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str = DEFAULT_REDIRECT_URI) -> str:
209 cb = _parse_callback_url(callback_url)
210 if cb["error"]:
211 desc = cb["error_description"]
212 raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip())
213
214 if not cb["code"]:
215 raise ValueError("callback url missing ?code=")
216 if not cb["state"]:
217 raise ValueError("callback url missing ?state=")
218 if cb["state"] != expected_state:
219 raise ValueError("state mismatch")
220
221 token_resp = _post_form(
222 TOKEN_URL,
223 {
224 "grant_type": "authorization_code",
225 "client_id": CLIENT_ID,
226 "code": cb["code"],
227 "redirect_uri": redirect_uri,
228 "code_verifier": code_verifier,
229 },
230 )
231
232 access_token = (token_resp.get("access_token") or "").strip()
233 refresh_token = (token_resp.get("refresh_token") or "").strip()
234 id_token = (token_resp.get("id_token") or "").strip()
235 expires_in = _to_int(token_resp.get("expires_in"))
236
237 claims = _jwt_claims_no_verify(id_token)
238 email = str(claims.get("email") or "").strip()
239 auth_claims = claims.get("https://api.openai.com/auth") or {}
240 account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
241
242 now = int(time.time())
243 expired_rfc3339 = time.strftime(
244 "%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))
245 )
246 now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
247
248 config = {
249 "id_token": id_token,
250 "access_token": access_token,
251 "refresh_token": refresh_token,
252 "account_id": account_id,
253 "last_refresh": now_rfc3339,
254 "email": email,
255 "type": "codex",
256 "expired": expired_rfc3339,
257 }
258
259 return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
260# end
261
262
263def run(proxy: str) -> str:
264 s = requests.Session(
265 proxies={
266 "http": proxy,
267 "https": proxy,
268 },
269 impersonate="chrome"
270 )
271 trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10)
272 trace = trace.text
273 ip_re = re.search(r"^ip=(.+)$", trace, re.MULTILINE)
274 loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE)
275 ip = ip_re.group(1) if ip_re else None
276 loc = loc_re.group(1) if loc_re else None
277 print(loc, ip)
278 if loc == "CN" or loc == "HK":
279 raise RuntimeError("检查代理哦w")
280 email = get_email()
281 print(email)
282 oauth = generate_oauth_url()
283 url = oauth.auth_url
284 resp = s.get(url)
285 did = s.cookies.get("oai-did")
286 print(did)
287 signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}'
288 sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}'
289 sen_resp = requests.post("https://sentinel.openai.com/backend-api/sentinel/req", headers={"origin": "https://sentinel.openai.com", "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", "content-type": "text/plain;charset=UTF-8"}, data=sen_req_body)
290 print(sen_resp.status_code)
291 sen_resp = sen_resp.json()["token"]
292 sentinel = f'{{"p": "", "t": "", "c": "{sen_resp}", "id": "{did}", "flow": "authorize_continue"}}'
293 signup_resp = s.post("https://auth.openai.com/api/accounts/authorize/continue", headers={"referer": "https://auth.openai.com/create-account", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": sentinel}, data=signup_body)
294 print(signup_resp.status_code)
295 otp_resp = s.post("https://auth.openai.com/api/accounts/passwordless/send-otp", headers={"referer": "https://auth.openai.com/create-account/password", "accept": "application/json", "content-type": "application/json"})
296 print(otp_resp.status_code)
297 code = get_oai_code(email)
298 print(code)
299 code_body = f'{{"code":"{code}"}}'
300 code_resp = s.post("https://auth.openai.com/api/accounts/email-otp/validate", headers={"referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json"}, data=code_body)
301 print(code_resp.status_code)
302 create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}'
303 create_account_resp = s.post("https://auth.openai.com/api/accounts/create_account", headers={"referer": "https://auth.openai.com/about-you", "accept": "application/json", "content-type": "application/json"}, data=create_account_body)
304 create_account_status = create_account_resp.status_code
305 print(create_account_status)
306 if create_account_status != 200:
307 print(create_account_resp.text)
308 return
309 print(create_account_status)
310 auth = s.cookies.get("oai-client-auth-session")
311 auth = base64.b64decode(auth.split(".")[0])
312 auth = json.loads(auth)
313 workspace_id = auth["workspaces"][0]["id"]
314 print(workspace_id)
315 select_body = f'{{"workspace_id":"{workspace_id}"}}'
316 select_resp = s.post("https://auth.openai.com/api/accounts/workspace/select", headers={"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", "content-type": "application/json"}, data=select_body)
317 print(select_resp.status_code)
318 continue_url = select_resp.json()["continue_url"]
319 final_resp = s.get(continue_url, allow_redirects=False)
320 final_resp = s.get(final_resp.headers.get("Location"), allow_redirects=False)
321 final_resp = s.get(final_resp.headers.get("Location"), allow_redirects=False)
322 cbk = final_resp.headers.get("Location")
323 return submit_callback_url(callback_url=cbk, code_verifier=oauth.code_verifier, redirect_uri=oauth.redirect_uri, expected_state=oauth.state)
324
325if __name__ == "__main__":
326 print(run(None))