
• Creates a small dataset (you can replace with real features from proxy logs or threat intel).
• Builds a feedforward neural network classifier.
• Trains, evaluates (accuracy / ROC AUC), and saves the model.
• Shows feature ideas and how to adapt for real data.
⸻
2) PyTorch example (ready to copy & run)
# Save as proxy_detector_pytorch.py and run with:
“`
python proxy_detector_pytorch.py
import random
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler
# ———————–
# Synthetic dataset
# ———————–
def generate_synthetic(n=5000, seed=0):
random.seed(seed)
np.random.seed(seed)
X = []
y = []
for _ in range(n):
# features (examples you should replace with real feature extraction):
# 0: ip_entropy (0..1) – higher entropy more likely proxy
# 1: num_ports_seen (0..20) – proxies sometimes use many ports
# 2: tls_client_hello_length (scaled)
# 3: country_code_similarity (0..1) – e.g., mismatch between claimed and ASN country
# 4: avg_requests_per_min (0..200)
# 5: asn_blacklist_flag (0/1)
# 6: http_via_header_flag (0/1)
# We'll synthesize some correlations
is_proxy = np.random.rand() < 0.25 # 25% proxies
if is_proxy:
ip_entropy = min(1.0, np.random.beta(5,2)) # higher
num_ports = np.random.poisson(6)
tls_len = np.random.normal(400, 80)
country_mismatch = np.random.beta(6,2)
req_pm = np.random.exponential(40)
asn_black = np.random.rand() < 0.15
via_header = np.random.rand() < 0.4
else:
ip_entropy = np.random.beta(2,5)
num_ports = np.random.poisson(1)
tls_len = np.random.normal(200, 50)
country_mismatch = np.random.beta(2,6)
req_pm = np.random.exponential(10)
asn_black = np.random.rand() < 0.02
via_header = np.random.rand() < 0.01
feat = [ip_entropy, num_ports, tls_len, country_mismatch, req_pm, float(asn_black), float(via_header)]
X.append(feat)
y.append(1 if is_proxy else 0)
X = np.array(X, dtype=float)
y = np.array(y, dtype=int)
return X, y
# ———————–
# Dataset wrapper
# ———————–
class TabularDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
def __len__(self):
return len(self.y)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# ———————–
# Model
# ———————–
class MLP(nn.Module):
def __init__(self, n_in):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_in, 64),
nn.ReLU(),
nn.BatchNorm1d(64),
nn.Dropout(0.2),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1)
)
def forward(self, x):
return self.net(x)
# ———————–
# Main training loop
# ———————–
def train_and_eval():
X, y = generate_synthetic(n=8000)
# scale numeric features
scaler = StandardScaler()
X = scaler.fit_transform(X)
dataset = TabularDataset(X, y)
n_val = int(len(dataset)*0.15)
n_test = int(len(dataset)*0.10)
n_train = len(dataset) – n_val – n_test
train_ds, val_ds, test_ds = random_split(dataset, [n_train, n_val, n_test],
generator=torch.Generator().manual_seed(42))
batch_size = 128
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
test_dl = DataLoader(test_ds, batch_size=batch_size, shuffle=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MLP(n_in=X.shape[1]).to(device)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.BCEWithLogitsLoss()
best_val_auc = 0.0
for epoch in range(1, 31):
model.train()
total_loss = 0.0
for xb, yb in train_dl:
xb, yb = xb.to(device), yb.to(device)
logits = model(xb)
loss = loss_fn(logits, yb)
opt.zero_grad()
loss.backward()
opt.step()
total_loss += loss.item() * xb.size(0)
avg_loss = total_loss / len(train_dl.dataset)
# validation
model.eval()
preds = []
trues = []
with torch.no_grad():
for xb, yb in val_dl:
xb = xb.to(device)
logits = model(xb).cpu().numpy().ravel()
probs = 1 / (1 + np.exp(-logits))
preds.extend(probs.tolist())
trues.extend(yb.numpy().ravel().tolist())
val_acc = accuracy_score(np.array(trues) > 0.5, np.array(preds) > 0.5)
try:
val_auc = roc_auc_score(trues, preds)
except ValueError:
val_auc = 0.0
if val_auc > best_val_auc:
best_val_auc = val_auc
torch.save({
'model_state_dict': model.state_dict(),
'scaler': scaler
}, "best_proxy_model.pt")
if epoch % 5 == 0 or epoch==1:
print(f"Epoch {epoch:02d} loss={avg_loss:.4f} val_acc={val_acc:.4f} val_auc={val_auc:.4f}")
# test
checkpoint = torch.load("best_proxy_model.pt", map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()
preds = []
trues = []
with torch.no_grad():
for xb, yb in test_dl:
xb = xb.to(device)
logits = model(xb).cpu().numpy().ravel()
probs = 1/(1+np.exp(-logits))
preds.extend(probs.tolist())
trues.extend(yb.numpy().ravel().tolist())
test_acc = accuracy_score(np.array(trues) > 0.5, np.array(preds) > 0.5)
test_auc = roc_auc_score(trues, preds)
print(f"Final test_acc={test_acc:.4f} test_auc={test_auc:.4f}")
print("Saved best model to best_proxy_model.pt (includes scaler object).")
if __name__ == "__main__":
train_and_eval()
“`
⸻
3) Keras / TensorFlow compact alternative
“`
# Keras variant (requires tensorflow)
import numpy as np
from tensorflow import keras
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, accuracy_score
# reuse generate_synthetic from earlier or implement similarly
X, y = generate_synthetic(n=8000)
scaler = StandardScaler(); X = scaler.fit_transform(X)
# split
n = len(X)
idx = np.arange(n)
np.random.shuffle(idx)
train_idx = idx[:int(0.75*n)]
val_idx = idx[int(0.75*n):int(0.9*n)]
test_idx = idx[int(0.9*n):]
x_train, y_train = X[train_idx], y[train_idx]
x_val, y_val = X[val_idx], y[val_idx]
x_test, y_test = X[test_idx], y[test_idx]
model = keras.Sequential([
keras.layers.Input(shape=(X.shape[1],)),
keras.layers.Dense(64, activation="relu"),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.2),
keras.layers.Dense(32, activation="relu"),
keras.layers.Dense(1, activation="sigmoid")
])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.fit(x_train, y_train, validation_data=(x_val, y_val), epochs=30, batch_size=128)
probs = model.predict(x_test).ravel()
print("test_acc", accuracy_score(y_test>0.5, probs>0.5), "test_auc", roc_auc_score(y_test, probs))
model.save("proxy_detector_keras")
“`
⸻
4) Feature ideas (when using real proxy logs)
Collect features from logs and enrichment sources:
• IP-level: IP entropy (how random octets look), ASN, ASN reputation, geolocation (country-match with user info), reverse DNS patterns.
• Connection: TLS fingerprint (JA3/JA3S), TLS client\_hello length, cipher list, SNI patterns.
• HTTP: presence of Via or X-Forwarded-For, user-agent entropy, missing headers, unusual Accept headers.
• Behavioral: requests per minute, session length, time-of-day patterns, repeated uncommon URLs.
• Network: open ports count, speed/latency patterns, NAT-like behavior (many distinct client ports from same IP).
• External: known proxy/VPN lists, Tor exit node lists, cloud provider IP ranges (but cloud IPs are not proxies necessarily).
Label sources: use known proxy lists + internal heuristics, or manual labeling, or active probing.
⸻
5) Production notes & inference
• Feature pipeline: extract features from logs (batch or streaming), scale with saved scaler, pass into model for probability.
• Thresholding: choose threshold based on precision/recall for your use case.
• Drift: proxies change — retrain periodically and monitor calibration.
• Explainability: use SHAP or LIME to inspect features influencing predictions.
• Privacy & legality: ensure you comply with privacy rules when processing IPs/users.
⸻
6) If you meant something else
I guessed you wanted a proxy-detection classifier. If instead you meant:
• “create a neural network proxy server” (proxy that runs a model) — I can provide an API server example (FastAPI) that loads the model and exposes an inference endpoint.
• “create neural network from proxy variables (statistical proxies)” — I can show how to handle proxy variables and train a model accounting for bias.
• “create neural network from proxy logs specifically” — I can give a full ETL + feature-extraction pipeline example.
