Last active 1772941771

liueic's Avatar liueic revised this gist 1772941771. Go to revision

No changes

liueic's Avatar liueic revised this gist 1772938686. Go to revision

1 file changed, 326 insertions

register-codex.py(file created)

@@ -0,0 +1,326 @@
1 + import json
2 + import os
3 + import re
4 + import sys
5 + import time
6 + import uuid
7 + import math
8 + import random
9 + import string
10 + import secrets
11 + import hashlib
12 + import base64
13 + import threading
14 + from datetime import datetime, timezone, timedelta
15 + from urllib import request as urllib_request
16 + from urllib.parse import urlparse, parse_qs, urlencode, quote
17 + from dataclasses import dataclass
18 + from typing import Any, Dict
19 + import urllib
20 + from urllib.parse import urlparse, parse_qs
21 + from urllib import request
22 +
23 + from curl_cffi import requests
24 +
25 + # email
26 + def get(url: str, headers: dict | None=None) -> tuple[str, dict]:
27 + # res, header
28 + try:
29 + req = urllib.request.Request(url, headers = headers or {})
30 + with urllib.request.urlopen(req) as response:
31 + resp_text = response.read().decode("utf-8")
32 + resp_headers = dict(response.getheaders())
33 + return resp_text, resp_headers
34 + except Exception as e:
35 + print(e)
36 + return -1, {}
37 +
38 + def get_email() -> str:
39 + body, _ = get("https://mail.chatgpt.org.uk/api/generate-email", {"X-API-Key": "gpt-test", "User-Agent": "Mozilla/5.0"})
40 + data = json.loads(body)
41 + return data["data"]["email"]
42 +
43 + def get_oai_code(email: str) -> str:
44 + regex = r" (?<!\d)(\d{6})(?!\d)" #r"(?<!\d)\d{6}(?!\d)"
45 + for i in range(20):
46 + body,_ = get(f"https://mail.chatgpt.org.uk/api/emails?email={email}", {"referer": "https://mail.chatgpt.org.uk/", "User-Agent": "Mozilla/5.0"})
47 + data = json.loads(body)
48 + emails = data["data"]["emails"]
49 + for email in emails:
50 + if "openai" in email["from_address"]:
51 + m = re.search(regex, email["subject"])
52 + if m:
53 + return m.group(1)
54 + m = re.search(regex, email["html_content"])
55 + return m.group(1)
56 + else:
57 + time.sleep(3)
58 + continue
59 + # end
60 +
61 +
62 + # oauth
63 + AUTH_URL = "https://auth.openai.com/oauth/authorize"
64 + TOKEN_URL = "https://auth.openai.com/oauth/token"
65 + CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
66 +
67 + DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback"
68 + DEFAULT_SCOPE = "openid email profile offline_access"
69 +
70 + def _b64url_no_pad(raw: bytes) -> str:
71 + return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
72 +
73 + def _sha256_b64url_no_pad(s: str) -> str:
74 + return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
75 +
76 + def _random_state(nbytes: int = 16) -> str:
77 + return secrets.token_urlsafe(nbytes)
78 +
79 + def _pkce_verifier() -> str:
80 + # RFC 7636 allows 43..128 chars; urlsafe token is fine.
81 + return secrets.token_urlsafe(64)
82 +
83 + def _parse_callback_url(callback_url: str) -> Dict[str, str]:
84 + candidate = callback_url.strip()
85 + if not candidate:
86 + return {
87 + "code": "",
88 + "state": "",
89 + "error": "",
90 + "error_description": "",
91 + }
92 +
93 + if "://" not in candidate:
94 + if candidate.startswith("?"):
95 + candidate = f"http://localhost{candidate}"
96 + elif any(ch in candidate for ch in "/?#") or ":" in candidate:
97 + candidate = f"http://{candidate}"
98 + elif "=" in candidate:
99 + candidate = f"http://localhost/?{candidate}"
100 +
101 + parsed = urllib.parse.urlparse(candidate)
102 + query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
103 + fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
104 +
105 + for key, values in fragment.items():
106 + if key not in query or not query[key] or not (query[key][0] or "").strip():
107 + query[key] = values
108 +
109 + def get1(k: str) -> str:
110 + v = query.get(k, [""])
111 + return (v[0] or "").strip()
112 +
113 + code = get1("code")
114 + state = get1("state")
115 + error = get1("error")
116 + error_description = get1("error_description")
117 +
118 + if code and not state and "#" in code:
119 + code, state = code.split("#", 1)
120 +
121 + if not error and error_description:
122 + error, error_description = error_description, ""
123 +
124 + return {
125 + "code": code,
126 + "state": state,
127 + "error": error,
128 + "error_description": error_description,
129 + }
130 +
131 + def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
132 + if not id_token or id_token.count(".") < 2:
133 + return {}
134 + payload_b64 = id_token.split(".")[1]
135 + pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
136 + try:
137 + payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii"))
138 + return json.loads(payload.decode("utf-8"))
139 + except Exception:
140 + return {}
141 +
142 + def _to_int(v: Any) -> int:
143 + try:
144 + return int(v)
145 + except (TypeError, ValueError):
146 + return 0
147 +
148 + def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
149 + body = urllib.parse.urlencode(data).encode("utf-8")
150 + req = urllib.request.Request(
151 + url,
152 + data=body,
153 + method="POST",
154 + headers={
155 + "Content-Type": "application/x-www-form-urlencoded",
156 + "Accept": "application/json",
157 + },
158 + )
159 + try:
160 + with urllib.request.urlopen(req, timeout=timeout) as resp:
161 + raw = resp.read()
162 + if resp.status != 200:
163 + raise RuntimeError(
164 + f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}"
165 + )
166 + return json.loads(raw.decode("utf-8"))
167 + except urllib.error.HTTPError as exc:
168 + raw = exc.read()
169 + raise RuntimeError(
170 + f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}"
171 + ) from exc
172 +
173 +
174 + @dataclass(frozen=True)
175 + class OAuthStart:
176 + auth_url: str
177 + state: str
178 + code_verifier: str
179 + redirect_uri: str
180 +
181 +
182 + def generate_oauth_url(*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE) -> OAuthStart:
183 + state = _random_state()
184 + code_verifier = _pkce_verifier()
185 + code_challenge = _sha256_b64url_no_pad(code_verifier)
186 +
187 + params = {
188 + "client_id": CLIENT_ID,
189 + "response_type": "code",
190 + "redirect_uri": redirect_uri,
191 + "scope": scope,
192 + "state": state,
193 + "code_challenge": code_challenge,
194 + "code_challenge_method": "S256",
195 + "prompt": "login",
196 + "id_token_add_organizations": "true",
197 + "codex_cli_simplified_flow": "true",
198 + }
199 + auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
200 + return OAuthStart(
201 + auth_url=auth_url,
202 + state=state,
203 + code_verifier=code_verifier,
204 + redirect_uri=redirect_uri,
205 + )
206 +
207 +
208 + def submit_callback_url(*, callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str = DEFAULT_REDIRECT_URI) -> str:
209 + cb = _parse_callback_url(callback_url)
210 + if cb["error"]:
211 + desc = cb["error_description"]
212 + raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip())
213 +
214 + if not cb["code"]:
215 + raise ValueError("callback url missing ?code=")
216 + if not cb["state"]:
217 + raise ValueError("callback url missing ?state=")
218 + if cb["state"] != expected_state:
219 + raise ValueError("state mismatch")
220 +
221 + token_resp = _post_form(
222 + TOKEN_URL,
223 + {
224 + "grant_type": "authorization_code",
225 + "client_id": CLIENT_ID,
226 + "code": cb["code"],
227 + "redirect_uri": redirect_uri,
228 + "code_verifier": code_verifier,
229 + },
230 + )
231 +
232 + access_token = (token_resp.get("access_token") or "").strip()
233 + refresh_token = (token_resp.get("refresh_token") or "").strip()
234 + id_token = (token_resp.get("id_token") or "").strip()
235 + expires_in = _to_int(token_resp.get("expires_in"))
236 +
237 + claims = _jwt_claims_no_verify(id_token)
238 + email = str(claims.get("email") or "").strip()
239 + auth_claims = claims.get("https://api.openai.com/auth") or {}
240 + account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
241 +
242 + now = int(time.time())
243 + expired_rfc3339 = time.strftime(
244 + "%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))
245 + )
246 + now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
247 +
248 + config = {
249 + "id_token": id_token,
250 + "access_token": access_token,
251 + "refresh_token": refresh_token,
252 + "account_id": account_id,
253 + "last_refresh": now_rfc3339,
254 + "email": email,
255 + "type": "codex",
256 + "expired": expired_rfc3339,
257 + }
258 +
259 + return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
260 + # end
261 +
262 +
263 + def run(proxy: str) -> str:
264 + s = requests.Session(
265 + proxies={
266 + "http": proxy,
267 + "https": proxy,
268 + },
269 + impersonate="chrome"
270 + )
271 + trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10)
272 + trace = trace.text
273 + ip_re = re.search(r"^ip=(.+)$", trace, re.MULTILINE)
274 + loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE)
275 + ip = ip_re.group(1) if ip_re else None
276 + loc = loc_re.group(1) if loc_re else None
277 + print(loc, ip)
278 + if loc == "CN" or loc == "HK":
279 + raise RuntimeError("检查代理哦w")
280 + email = get_email()
281 + print(email)
282 + oauth = generate_oauth_url()
283 + url = oauth.auth_url
284 + resp = s.get(url)
285 + did = s.cookies.get("oai-did")
286 + print(did)
287 + signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}'
288 + sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}'
289 + sen_resp = requests.post("https://sentinel.openai.com/backend-api/sentinel/req", headers={"origin": "https://sentinel.openai.com", "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", "content-type": "text/plain;charset=UTF-8"}, data=sen_req_body)
290 + print(sen_resp.status_code)
291 + sen_resp = sen_resp.json()["token"]
292 + sentinel = f'{{"p": "", "t": "", "c": "{sen_resp}", "id": "{did}", "flow": "authorize_continue"}}'
293 + signup_resp = s.post("https://auth.openai.com/api/accounts/authorize/continue", headers={"referer": "https://auth.openai.com/create-account", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": sentinel}, data=signup_body)
294 + print(signup_resp.status_code)
295 + otp_resp = s.post("https://auth.openai.com/api/accounts/passwordless/send-otp", headers={"referer": "https://auth.openai.com/create-account/password", "accept": "application/json", "content-type": "application/json"})
296 + print(otp_resp.status_code)
297 + code = get_oai_code(email)
298 + print(code)
299 + code_body = f'{{"code":"{code}"}}'
300 + code_resp = s.post("https://auth.openai.com/api/accounts/email-otp/validate", headers={"referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json"}, data=code_body)
301 + print(code_resp.status_code)
302 + create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}'
303 + create_account_resp = s.post("https://auth.openai.com/api/accounts/create_account", headers={"referer": "https://auth.openai.com/about-you", "accept": "application/json", "content-type": "application/json"}, data=create_account_body)
304 + create_account_status = create_account_resp.status_code
305 + print(create_account_status)
306 + if create_account_status != 200:
307 + print(create_account_resp.text)
308 + return
309 + print(create_account_status)
310 + auth = s.cookies.get("oai-client-auth-session")
311 + auth = base64.b64decode(auth.split(".")[0])
312 + auth = json.loads(auth)
313 + workspace_id = auth["workspaces"][0]["id"]
314 + print(workspace_id)
315 + select_body = f'{{"workspace_id":"{workspace_id}"}}'
316 + select_resp = s.post("https://auth.openai.com/api/accounts/workspace/select", headers={"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", "content-type": "application/json"}, data=select_body)
317 + print(select_resp.status_code)
318 + continue_url = select_resp.json()["continue_url"]
319 + final_resp = s.get(continue_url, allow_redirects=False)
320 + final_resp = s.get(final_resp.headers.get("Location"), allow_redirects=False)
321 + final_resp = s.get(final_resp.headers.get("Location"), allow_redirects=False)
322 + cbk = final_resp.headers.get("Location")
323 + return submit_callback_url(callback_url=cbk, code_verifier=oauth.code_verifier, redirect_uri=oauth.redirect_uri, expected_state=oauth.state)
324 +
325 + if __name__ == "__main__":
326 + print(run(None))
Newer Older