import requests
import json
import sys
import re
from datetime import datetime
from urllib.parse import urlparse
from typing import Dict, List, Any, Optional
def print_text(text: str) -> None:
print(text)
def input_text(prompt: str) -> str:
return input(prompt)
def get_base_url() -> str:
while True:
base_url_input = input_text("Enter IPTV link: ").strip()
if not base_url_input:
continue
if not base_url_input.startswith(("http://", "https://")):
base_url_input = "http://" + base_url_input
parsed = urlparse(base_url_input)
if not parsed.hostname:
print_text("Invalid URL, please try again.")
continue
port = f":{parsed.port}" if parsed.port else ""
return f"http://{parsed.hostname}{port}"
def get_mac_address() -> str:
while True:
mac = input_text("Enter MAC address (e.g. 00:1A:79:xx:xx:xx): ").strip().upper()
if re.match(r"^([0-9A-F]{2}:){5}[0-9A-F]{2}$", mac):
return mac
print_text("Invalid MAC format, please try again.")
def get_token(session: requests.Session, base_url: str) -> Optional[str]:
url = f"{base_url}/portal.php?action=handshake&type=stb&token=&JsHttpRequest=1-xml"
try:
print_text("Getting token...")
res = session.get(url, timeout=10)
data = json.loads(res.text)
js = data.get("js", {}) if isinstance(data, dict) else {}
token = js.get("token")
if not token:
print_text("No token in response.")
return token
except Exception as e:
print_text(f"Error getting token: {e}")
return None
def get_subscription(session: requests.Session, base_url: str, token: str) -> bool:
url = f"{base_url}/portal.php?type=account_info&action=get_main_info&JsHttpRequest=1-xml"
headers = {"Authorization": f"Bearer {token}"}
try:
print_text("Checking subscription info...")
res = session.get(url, headers=headers, timeout=10)
data = json.loads(res.text)
js = data.get("js", {}) if isinstance(data, dict) else {}
mac = js.get("mac", "N/A")
expiry = js.get("expire_billing_date", "N/A")
print_text(f"MAC: {mac} | Expiry: {expiry}")
return True
except Exception as e:
print_text(f"Subscription info failed: {e}")
return False
def get_channel_list(session: requests.Session, base_url: str, token: str):
headers = {"Authorization": f"Bearer {token}"}
try:
print_text("Fetching groups...")
res = session.get(f"{base_url}/server/load.php?type=itv&action=get_genres&JsHttpRequest=1-xml", headers=headers, timeout=10)
genres_data = json.loads(res.text)
groups = {}
js_genres = genres_data.get("js", []) if isinstance(genres_data, dict) else []
for g in js_genres:
if isinstance(g, dict) and str(g.get("id", "")).isdigit():
groups[int(g["id"])] = g.get("title", "General")
print_text("Fetching channel list...")
res2 = session.get(f"{base_url}/portal.php?type=itv&action=get_all_channels&JsHttpRequest=1-xml", headers=headers, timeout=10)
ch_data = json.loads(res2.text)
js_channels = ch_data.get("js", {}).get("data", []) if isinstance(ch_data, dict) else []
channels = [c for c in js_channels if isinstance(c, dict)]
return channels, groups
except Exception as e:
print_text(f"Error fetching channel list: {e}")
return None, None
def save_m3u(base_url: str, mac: str, channels: List[Dict[str, Any]], groups: Dict[int, str], selected_ids: List[int], filename: str):
now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_file = f"{filename}_{now}.m3u"
print_text(f"Saving M3U to {output_file}")
group_map = {gid: [] for gid in groups.keys()}
for ch in channels:
gid = int(ch.get("tv_genre_id") or 0)
group_map.setdefault(gid, []).append(ch)
try:
with open(output_file, "w", encoding="utf-8") as f:
f.write("#EXTM3U\n")
count = 0
for gid in selected_ids:
gname = groups.get(gid, "General")
for ch in group_map.get(gid, []):
name = ch.get("name", "Unknown")
logo = ch.get("logo", "")
cmds = ch.get("cmds", [])
if not cmds or not isinstance(cmds[0], dict):
continue
url = cmds[0].get("url", "").replace("ffmpeg ", "")
if "localhost" in url:
m = re.search(r"/ch/(\d+)_", url)
if m:
ch_id = m.group(1)
url = f"{base_url.rstrip('/')}/play/live.php?mac={mac}&stream={ch_id}&extension=ts"
elif url.startswith("/"):
url = f"{base_url.rstrip('/')}{url}"
f.write(f'#EXTINF:-1 tvg-logo="{logo}" group-title="{gname}",{name}\n{url}\n')
count += 1
print_text(f"Saved {count} channels to {output_file}")
except Exception as e:
print_text(f"Error saving file: {e}")
def main():
print_text("=== IPTV M3U Generator (CLI Edition) ===")
base_url = get_base_url()
mac = get_mac_address()
session = requests.Session()
session.cookies.update({"mac": mac})
token = get_token(session, base_url)
if not token:
print_text("Failed to get token. Exiting.")
sys.exit(1)
if not get_subscription(session, base_url, token):
print_text("Subscription check failed.")
sys.exit(1)
channels, groups = get_channel_list(session, base_url, token)
if not channels or not groups:
print_text("Failed to fetch channel list.")
sys.exit(1)
print_text("\nAvailable groups:")
for gid, name in groups.items():
print_text(f" [{gid}] {name}")
sel = input_text("\nEnter group IDs to include (comma separated, or 'all'): ").strip()
if sel.lower() == "all":
selected = list(groups.keys())
else:
try:
selected = [int(x) for x in sel.split(",") if x.strip().isdigit()]
except ValueError:
print_text("Invalid input.")
return
if not selected:
print_text("No groups selected. Exiting.")
return
filename = input_text("Enter output file name (without extension): ").strip()
if not filename:
filename = "iptv_channels"
save_m3u(base_url, mac, channels, groups, selected, filename)
print_text("Done")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print_text("Aborted by user.")