3 Commits
GUI ... GUIv1.2

Author SHA1 Message Date
399015856d Update readme.txt
Added terms into the readme. Do NOT use this script for evil.
2025-11-05 16:00:41 +00:00
10c2a878ee Update IPDingGUI.py 2025-11-03 19:17:42 +00:00
52267b517c Add readme.txt 2025-11-03 19:11:59 +00:00
2 changed files with 256 additions and 49 deletions

View File

@@ -1,14 +1,13 @@
#!/usr/bin/env python3
import sys
import subprocess
from pathlib import Path
import platform
from pathlib import Path
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
IP_LIST = Path("iplist.txt")
SUCCESS_FILE = Path("successful.txt")
ERROR_FILE = Path("error.txt")
PROCESSED_SUCCESS_CSV = Path("processed.csv")
PROCESSED_ERROR_CSV = Path("processederror.csv")
# ---------- Core ping + conversion logic ----------
def ping(host: str, timeout_ms: int = 1000) -> bool:
"""
@@ -17,10 +16,10 @@ def ping(host: str, timeout_ms: int = 1000) -> bool:
- Windows: ping -n 1 -w <ms>
- Unix: ping -c 1 -W <sec>
"""
if platform.system().lower().startswith("win"):
system = platform.system().lower()
if system.startswith("win"):
cmd = ["ping", "-n", "1", "-w", str(timeout_ms), host]
else:
# On most Unix, -W is timeout (seconds) for each reply; convert ms to sec ceiling
sec = max(1, (timeout_ms + 999) // 1000)
cmd = ["ping", "-c", "1", "-W", str(sec), host]
try:
@@ -32,58 +31,251 @@ def ping(host: str, timeout_ms: int = 1000) -> bool:
def convert_txt_to_csv(txt_path: Path, csv_path: Path, header: str):
"""
Converts lines from a .txt file to a single-column CSV where spaces in a line
are replaced with commas to mimic the batch 'set "line=!line: =,!"' behavior.
are replaced with commas (matching your batch behavior).
"""
# Read all lines (strip trailing newlines)
if not txt_path.exists():
return
lines = [line.rstrip("\r\n") for line in txt_path.read_text(encoding="utf-8", errors="ignore").splitlines()]
with csv_path.open("w", encoding="utf-8", newline="") as f:
# We'll produce a simple CSV: each transformed line as one row.
# The header is a single column.
f.write(header + "\n")
for line in lines:
# Replace spaces with commas—matching batch behavior
transformed = line.replace(" ", ",")
f.write(transformed + "\n")
def main():
if not IP_LIST.exists():
print("IP list file not found.")
sys.exit(1)
# ---------- GUI App with parallel pings ----------
# Truncate / create output files
SUCCESS_FILE.write_text("", encoding="utf-8")
ERROR_FILE.write_text("", encoding="utf-8")
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Dinger Pinger — GUI (Parallel)")
self.geometry("780x520")
self.minsize(680, 460)
# Process each line in iplist.txt
for raw in IP_LIST.read_text(encoding="utf-8", errors="ignore").splitlines():
self.selected_file: Path | None = None
self.stop_requested = False
# Top controls
top = ttk.Frame(self, padding=10)
top.pack(side=tk.TOP, fill=tk.X)
self.select_btn = ttk.Button(top, text="Select IP List & Run", command=self.select_and_run)
self.select_btn.pack(side=tk.LEFT)
self.timeout_label = ttk.Label(top, text="Timeout (ms):")
self.timeout_label.pack(side=tk.LEFT, padx=(16, 4))
self.timeout_var = tk.StringVar(value="1000")
self.timeout_entry = ttk.Entry(top, width=8, textvariable=self.timeout_var)
self.timeout_entry.pack(side=tk.LEFT)
self.conc_label = ttk.Label(top, text="Concurrency:")
self.conc_label.pack(side=tk.LEFT, padx=(16, 4))
self.concurrency_var = tk.IntVar(value=64) # IO-bound; high default is fine
self.conc_spin = ttk.Spinbox(
top, from_=1, to=1024, textvariable=self.concurrency_var, width=6, wrap=True
)
self.conc_spin.pack(side=tk.LEFT)
self.stop_btn = ttk.Button(top, text="Stop", command=self.request_stop, state=tk.DISABLED)
self.stop_btn.pack(side=tk.LEFT, padx=(16, 0))
# Progress + status
prog_frame = ttk.Frame(self, padding=(10, 0, 10, 10))
prog_frame.pack(side=tk.TOP, fill=tk.X)
self.progress = ttk.Progressbar(prog_frame, mode="determinate")
self.progress.pack(fill=tk.X)
self.status_var = tk.StringVar(value="Select a file to begin.")
self.status = ttk.Label(prog_frame, textvariable=self.status_var)
self.status.pack(anchor="w", pady=(6, 0))
# Output text area
txt_frame = ttk.Frame(self, padding=(10, 0, 10, 10))
txt_frame.pack(fill=tk.BOTH, expand=True)
self.text = tk.Text(txt_frame, wrap="word")
self.text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
yscroll = ttk.Scrollbar(txt_frame, orient="vertical", command=self.text.yview)
yscroll.pack(side=tk.RIGHT, fill=tk.Y)
self.text.configure(yscrollcommand=yscroll.set)
# Style
try:
self.style = ttk.Style(self)
if "vista" in self.style.theme_names():
self.style.theme_use("vista")
elif "clam" in self.style.theme_names():
self.style.theme_use("clam")
except Exception:
pass
# Locks for thread-safe logs/counters
self._log_lock = threading.Lock()
def log(self, msg: str):
with self._log_lock:
self.text.insert(tk.END, msg + "\n")
self.text.see(tk.END)
self.update_idletasks()
def set_running_state(self, running: bool):
self.select_btn.config(state=tk.DISABLED if running else tk.NORMAL)
self.stop_btn.config(state=tk.NORMAL if running else tk.DISABLED)
self.timeout_entry.config(state=tk.DISABLED if running else tk.NORMAL)
self.conc_spin.config(state=tk.DISABLED if running else tk.NORMAL)
def request_stop(self):
self.stop_requested = True
self.status_var.set("Stopping after current tasks...")
def select_and_run(self):
file_path = filedialog.askopenfilename(
title="Select IP list (text file)",
filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
)
if not file_path:
return
self.selected_file = Path(file_path)
try:
timeout = int(self.timeout_var.get().strip())
if timeout <= 0:
raise ValueError
except ValueError:
messagebox.showerror("Invalid Timeout", "Please enter a positive integer for timeout (ms).")
return
try:
conc = int(self.concurrency_var.get())
if conc < 1:
raise ValueError
except ValueError:
messagebox.showerror("Invalid Concurrency", "Concurrency must be an integer ≥ 1.")
return
self.stop_requested = False
self.text.delete("1.0", tk.END)
self.status_var.set(f"Running pings from {self.selected_file} with concurrency={conc} ...")
self.set_running_state(True)
thread = threading.Thread(
target=self.run_pings_parallel_worker,
args=(self.selected_file, timeout, conc),
daemon=True
)
thread.start()
def run_pings_parallel_worker(self, ip_list_path: Path, timeout_ms: int, concurrency: int):
# Read / prepare
try:
lines = ip_list_path.read_text(encoding="utf-8", errors="ignore").splitlines()
except Exception as e:
self.after(0, lambda: messagebox.showerror("Read Error", f"Failed to read file:\n{e}"))
self.after(0, lambda: self.set_running_state(False))
return
out_dir = ip_list_path.parent
success_file = out_dir / "successful.txt"
error_file = out_dir / "error.txt"
processed_success_csv = out_dir / "processed.csv"
processed_error_csv = out_dir / "processederror.csv"
targets = []
for raw in lines:
ip = raw.strip()
# Skip empties and comments
if not ip or ip.startswith("#") or ip.startswith(";"):
continue
targets.append(ip)
ok = ping(ip, timeout_ms=1000)
if ok:
with SUCCESS_FILE.open("a", encoding="utf-8") as s:
s.write(ip + "\n")
total = len(targets)
self.after(0, lambda: self.progress.configure(maximum=total, value=0))
self.after(0, lambda: self.status_var.set(f"{total} target(s) queued..."))
# Collect results in-memory (thread-safe)
ok_list = []
err_list = []
ok_count = 0
err_count = 0
progress_count = 0
counters_lock = threading.Lock()
def _task(ip: str):
"""Single task executed in thread pool."""
if self.stop_requested:
# Early exit; treat as not processed (won't be counted)
return None, None
result = ping(ip, timeout_ms=timeout_ms)
return ip, result
try:
with ThreadPoolExecutor(max_workers=concurrency, thread_name_prefix="ping") as ex:
futures = []
# Submit gradually so Stop can halt further submissions
for ip in targets:
if self.stop_requested:
break
futures.append(ex.submit(_task, ip))
# If stop is requested now, cancel any futures not yet started
if self.stop_requested:
for f in futures:
f.cancel()
# Best effort; there may be a few already running.
for f in as_completed(futures):
if self.stop_requested and f.cancelled():
continue
try:
ip, result = f.result()
except Exception as e:
ip, result = None, None
self.after(0, lambda e=e: self.log(f"[WORKER ERROR] {e}"))
if ip is None:
continue
with counters_lock:
progress_count += 1
if result:
ok_list.append(ip)
ok_count += 1
else:
with ERROR_FILE.open("a", encoding="utf-8") as e:
e.write(ip + "\n")
err_list.append(ip)
err_count += 1
# Convert both lists to CSVs, replacing spaces with commas (even though lines are likely single IPs)
# The batch file we saw created a "processederror.csv" with "Error" as header.
# We'll also produce "processed.csv" for successful.txt with "Successful" header.
if SUCCESS_FILE.exists():
convert_txt_to_csv(SUCCESS_FILE, PROCESSED_SUCCESS_CSV, header="Successful")
print(f'Converted {SUCCESS_FILE} -> {PROCESSED_SUCCESS_CSV}')
if ERROR_FILE.exists():
convert_txt_to_csv(ERROR_FILE, PROCESSED_ERROR_CSV, header="Error")
print(f'Converted {ERROR_FILE} -> {PROCESSED_ERROR_CSV}')
# UI updates
self.after(0, lambda pc=progress_count: self.progress.configure(value=pc))
self.after(
0,
lambda ip=ip, result=result, pc=progress_count, t=total, ok=ok_count, err=err_count:
(self.log(f"[OK] {ip}") if result else self.log(f"[ERR] {ip}"),
self.status_var.set(f"Processed {pc}/{t} • OK: {ok} • ERR: {err}"))
)
except Exception as e:
self.after(0, lambda e=e: self.log(f"[POOL ERROR] {e}"))
print("Done. Outputs:")
print(f" {SUCCESS_FILE.resolve()}")
print(f" {ERROR_FILE.resolve()}")
print(f" {PROCESSED_SUCCESS_CSV.resolve()}")
print(f" {PROCESSED_ERROR_CSV.resolve()}")
# Write outputs once
try:
success_file.write_text("\n".join(ok_list) + ("\n" if ok_list else ""), encoding="utf-8")
error_file.write_text("\n".join(err_list) + ("\n" if err_list else ""), encoding="utf-8")
except Exception as e:
self.after(0, lambda e=e: self.log(f"[WRITE ERROR] {e}"))
# Convert to CSVs (space->comma), matching original behavior
try:
convert_txt_to_csv(success_file, processed_success_csv, header="Successful")
convert_txt_to_csv(error_file, processed_error_csv, header="Error")
except Exception as e:
self.after(0, lambda e=e: self.log(f"[CSV ERROR] {e}"))
def finish_ui():
self.set_running_state(False)
done_msg = "Stopped (no new tasks submitted)." if self.stop_requested else "Done."
self.log("")
self.log(done_msg)
self.log("Outputs:")
self.log(f" {success_file.resolve()}")
self.log(f" {error_file.resolve()}")
self.log(f" {processed_success_csv.resolve()}")
self.log(f" {processed_error_csv.resolve()}")
self.status_var.set(f"{done_msg} • OK: {ok_count} • ERR: {err_count}")
self.after(0, finish_ui)
if __name__ == "__main__":
main()
App().mainloop()

15
readme.txt Normal file
View File

@@ -0,0 +1,15 @@
Hello! Welcome to the world of pings.
To use this tool you must have tkinter installed on your system. That should be the only requirement.
Click the button in the top left corner and you can then open a .txt with the addresses one per line.
It will then display its actions, and show its progress.
Upon completion all file locations are displayed to eliminate any confusion.
You only have permission to use this script for your own network.
I am not responsible for any trouble you may land yourself in.
USE THIS SCRIPT WITH YOUR NOGGIN. DO NOT BE MALICIOUS.