|
|
import os |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, balanced_accuracy_score |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
|
|
|
import tensorflow as tf |
|
|
|
|
|
|
|
|
def read_binary_file(file_path, max_length=2_000_000): |
|
|
""" |
|
|
λ°μ΄λ리 νμΌμ μ½μ΄ μ μ λ°°μ΄λ‘ λ³ν |
|
|
λ
Όλ¬Έ μ¬μ: 2MBκΉμ§ μ²λ¦¬ |
|
|
""" |
|
|
try: |
|
|
with open(file_path, 'rb') as f: |
|
|
raw_bytes = f.read() |
|
|
|
|
|
|
|
|
byte_array = np.frombuffer(raw_bytes, dtype=np.uint8) |
|
|
|
|
|
if len(byte_array) > max_length: |
|
|
|
|
|
return byte_array[:max_length] |
|
|
else: |
|
|
|
|
|
padded = np.zeros(max_length, dtype=np.uint8) |
|
|
padded[:len(byte_array)] = byte_array |
|
|
return padded |
|
|
|
|
|
except Exception as e: |
|
|
print(f"νμΌ μ½κΈ° μ€λ₯ {file_path}: {e}") |
|
|
return np.zeros(max_length, dtype=np.uint8) |
|
|
|
|
|
def load_dataset_from_directory(malware_dir, benign_dir, max_length=2_000_000, max_samples_per_class=None): |
|
|
""" |
|
|
λλ ν 리μμ μ§μ λ°μ΄λ리 νμΌλ€μ λ‘λ |
|
|
|
|
|
Args: |
|
|
malware_dir: μ
μ±μ½λ νμΌλ€μ΄ μλ λλ ν 리 |
|
|
benign_dir: μ μ νμΌλ€μ΄ μλ λλ ν 리 |
|
|
max_length: μ΅λ λ°μ΄νΈ κΈΈμ΄ |
|
|
max_samples_per_class: ν΄λμ€λΉ μ΅λ μν μ |
|
|
""" |
|
|
X, y = [], [] |
|
|
|
|
|
|
|
|
if os.path.exists(malware_dir): |
|
|
malware_files = [f for f in os.listdir(malware_dir) if os.path.isfile(os.path.join(malware_dir, f))] |
|
|
if max_samples_per_class: |
|
|
malware_files = malware_files[:max_samples_per_class] |
|
|
|
|
|
print(f"μ
μ±μ½λ νμΌ λ‘λ© μ€... ({len(malware_files)}κ°)") |
|
|
for i, filename in enumerate(malware_files): |
|
|
file_path = os.path.join(malware_dir, filename) |
|
|
byte_array = read_binary_file(file_path, max_length) |
|
|
X.append(byte_array) |
|
|
y.append(0) |
|
|
|
|
|
if (i + 1) % 100 == 0: |
|
|
print(f" {i + 1}/{len(malware_files)} μ²λ¦¬ μλ£") |
|
|
|
|
|
|
|
|
if os.path.exists(benign_dir): |
|
|
benign_files = [f for f in os.listdir(benign_dir) if os.path.isfile(os.path.join(benign_dir, f))] |
|
|
if max_samples_per_class: |
|
|
benign_files = benign_files[:max_samples_per_class] |
|
|
|
|
|
print(f"μ μ νμΌ λ‘λ© μ€... ({len(benign_files)}κ°)") |
|
|
for i, filename in enumerate(benign_files): |
|
|
file_path = os.path.join(benign_dir, filename) |
|
|
byte_array = read_binary_file(file_path, max_length) |
|
|
X.append(byte_array) |
|
|
y.append(1) |
|
|
|
|
|
if (i + 1) % 100 == 0: |
|
|
print(f" {i + 1}/{len(benign_files)} μ²λ¦¬ μλ£") |
|
|
|
|
|
X = np.array(X) |
|
|
y = np.array(y) |
|
|
|
|
|
print(f"\nλ°μ΄ν°μ
λ‘λ© μλ£:") |
|
|
print(f" μ΄ μν: {len(X)}") |
|
|
print(f" μ
μ±μ½λ: {np.sum(y == 0)}") |
|
|
print(f" μ μνμΌ: {np.sum(y == 1)}") |
|
|
|
|
|
return X, y |
|
|
|
|
|
def load_dataset_from_csv(csv_path, max_length=2_000_000): |
|
|
"""CSV νμΌμμ λ°μ΄ν°μ
λ‘λ""" |
|
|
df = pd.read_csv(csv_path) |
|
|
|
|
|
X, y = [], [] |
|
|
|
|
|
print("CSVμμ νμΌ λ‘λ© μ€...") |
|
|
for idx, row in df.iterrows(): |
|
|
file_path = row['filepath'] |
|
|
label = row['label'] |
|
|
|
|
|
if os.path.exists(file_path): |
|
|
byte_array = read_binary_file(file_path, max_length) |
|
|
X.append(byte_array) |
|
|
y.append(label) |
|
|
else: |
|
|
print(f"νμΌμ μ°Ύμ μ μμ΅λλ€: {file_path}") |
|
|
|
|
|
if (idx + 1) % 1000 == 0: |
|
|
print(f" {idx + 1} νμΌ μ²λ¦¬ μλ£") |
|
|
|
|
|
return np.array(X), np.array(y) |
|
|
|
|
|
def configure_gpu_memory(): |
|
|
"""GPU λ©λͺ¨λ¦¬ μ€μ """ |
|
|
gpus = tf.config.experimental.list_physical_devices('GPU') |
|
|
if gpus: |
|
|
try: |
|
|
for gpu in gpus: |
|
|
tf.config.experimental.set_memory_growth(gpu, True) |
|
|
print(f"GPU μ€μ μλ£: {len(gpus)}κ° GPU μ¬μ©") |
|
|
return True |
|
|
except RuntimeError as e: |
|
|
print(f"GPU μ€μ μ€λ₯: {e}") |
|
|
return False |
|
|
|
|
|
def plot_training_history(history): |
|
|
"""νλ ¨ νμ€ν 리 μκ°ν""" |
|
|
fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
|
|
|
|
|
|
|
|
axes[0, 0].plot(history.history['loss'], label='Training Loss') |
|
|
if 'val_loss' in history.history: |
|
|
axes[0, 0].plot(history.history['val_loss'], label='Validation Loss') |
|
|
axes[0, 0].set_title('Model Loss') |
|
|
axes[0, 0].set_xlabel('Epoch') |
|
|
axes[0, 0].set_ylabel('Loss') |
|
|
axes[0, 0].legend() |
|
|
axes[0, 0].grid(True) |
|
|
|
|
|
|
|
|
axes[0, 1].plot(history.history['accuracy'], label='Training Accuracy') |
|
|
if 'val_accuracy' in history.history: |
|
|
axes[0, 1].plot(history.history['val_accuracy'], label='Validation Accuracy') |
|
|
axes[0, 1].set_title('Model Accuracy') |
|
|
axes[0, 1].set_xlabel('Epoch') |
|
|
axes[0, 1].set_ylabel('Accuracy') |
|
|
axes[0, 1].legend() |
|
|
axes[0, 1].grid(True) |
|
|
|
|
|
|
|
|
if 'auc' in history.history: |
|
|
axes[1, 0].plot(history.history['auc'], label='Training AUC') |
|
|
if 'val_auc' in history.history: |
|
|
axes[1, 0].plot(history.history['val_auc'], label='Validation AUC') |
|
|
axes[1, 0].set_title('Model AUC') |
|
|
axes[1, 0].set_xlabel('Epoch') |
|
|
axes[1, 0].set_ylabel('AUC') |
|
|
axes[1, 0].legend() |
|
|
axes[1, 0].grid(True) |
|
|
|
|
|
|
|
|
if 'lr' in history.history: |
|
|
axes[1, 1].plot(history.history['lr'], label='Learning Rate', color='red') |
|
|
axes[1, 1].set_title('Learning Rate Schedule') |
|
|
axes[1, 1].set_xlabel('Epoch') |
|
|
axes[1, 1].set_ylabel('Learning Rate') |
|
|
axes[1, 1].set_yscale('log') |
|
|
axes[1, 1].legend() |
|
|
axes[1, 1].grid(True) |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.show() |
|
|
|
|
|
def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"): |
|
|
"""νΌλ νλ ¬ μκ°ν""" |
|
|
cm = confusion_matrix(y_true, y_pred) |
|
|
|
|
|
plt.figure(figsize=(8, 6)) |
|
|
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', |
|
|
xticklabels=['Malware', 'Benign'], |
|
|
yticklabels=['Malware', 'Benign']) |
|
|
plt.title(title) |
|
|
plt.ylabel('True Label') |
|
|
plt.xlabel('Predicted Label') |
|
|
plt.show() |
|
|
|
|
|
def evaluate_model(model, X_test, y_test, batch_size=16): |
|
|
"""λͺ¨λΈ μ±λ₯ νκ°""" |
|
|
print("λͺ¨λΈ νκ° μ€...") |
|
|
|
|
|
|
|
|
y_pred_prob = model.predict(X_test, batch_size=batch_size, verbose=1) |
|
|
y_pred = (y_pred_prob > 0.5).astype(int).flatten() |
|
|
|
|
|
|
|
|
accuracy = np.mean(y_pred == y_test) |
|
|
balanced_acc = balanced_accuracy_score(y_test, y_pred) |
|
|
auc_score = roc_auc_score(y_test, y_pred_prob) |
|
|
|
|
|
print(f"\n=== νκ° κ²°κ³Ό ===") |
|
|
print(f"Accuracy: {accuracy:.4f}") |
|
|
print(f"Balanced Accuracy: {balanced_acc:.4f}") |
|
|
print(f"AUC Score: {auc_score:.4f}") |
|
|
|
|
|
print(f"\nλΆλ₯ 리ν¬νΈ:") |
|
|
print(classification_report(y_test, y_pred, target_names=['Malware', 'Benign'])) |
|
|
|
|
|
|
|
|
plot_confusion_matrix(y_test, y_pred, "MalConv Performance") |
|
|
|
|
|
return { |
|
|
'accuracy': accuracy, |
|
|
'balanced_accuracy': balanced_acc, |
|
|
'auc': auc_score, |
|
|
'predictions': y_pred_prob |
|
|
} |
|
|
|
|
|
def get_file_paths_and_labels(malware_dir, benign_dir, max_samples_per_class=None): |
|
|
""" |
|
|
λλ ν 리μμ νμΌ κ²½λ‘μ λ μ΄λΈ λͺ©λ‘μ κ°μ Έμ΅λλ€. (λ©λͺ¨λ¦¬μ νμΌ λ‘λ μν¨) |
|
|
""" |
|
|
filepaths = [] |
|
|
labels = [] |
|
|
|
|
|
|
|
|
if os.path.exists(malware_dir): |
|
|
malware_files = [os.path.join(malware_dir, f) for f in os.listdir(malware_dir) if os.path.isfile(os.path.join(malware_dir, f))] |
|
|
if max_samples_per_class: |
|
|
malware_files = malware_files[:max_samples_per_class] |
|
|
filepaths.extend(malware_files) |
|
|
labels.extend([0] * len(malware_files)) |
|
|
print(f"μ
μ±μ½λ νμΌ κ²½λ‘ λ‘λ©: {len(malware_files)}κ°") |
|
|
|
|
|
|
|
|
if os.path.exists(benign_dir): |
|
|
benign_files = [os.path.join(benign_dir, f) for f in os.listdir(benign_dir) if os.path.isfile(os.path.join(benign_dir, f))] |
|
|
if max_samples_per_class: |
|
|
benign_files = benign_files[:max_samples_per_class] |
|
|
filepaths.extend(benign_files) |
|
|
labels.extend([1] * len(benign_files)) |
|
|
print(f"μ μ νμΌ κ²½λ‘ λ‘λ©: {len(benign_files)}κ°") |
|
|
|
|
|
print(f"\nμ΄ νμΌ κ²½λ‘: {len(filepaths)}") |
|
|
print(f" μ
μ±μ½λ: {labels.count(0)}") |
|
|
print(f" μ μνμΌ: {labels.count(1)}") |
|
|
|
|
|
|
|
|
indices = np.arange(len(filepaths)) |
|
|
np.random.shuffle(indices) |
|
|
filepaths = np.array(filepaths)[indices].tolist() |
|
|
labels = np.array(labels)[indices] |
|
|
|
|
|
return filepaths, labels |
|
|
|
|
|
|
|
|
def data_generator(filepaths, labels, batch_size, max_length=2_000_000, shuffle=True): |
|
|
""" |
|
|
λ°μ΄ν°λ₯Ό λ°°μΉ λ¨μλ‘ μμ±νλ μ λλ μ΄ν° |
|
|
""" |
|
|
num_samples = len(filepaths) |
|
|
if num_samples == 0: |
|
|
return |
|
|
|
|
|
while True: |
|
|
indices = np.arange(num_samples) |
|
|
if shuffle: |
|
|
np.random.shuffle(indices) |
|
|
|
|
|
for i in range(0, num_samples, batch_size): |
|
|
batch_indices = indices[i:i+batch_size] |
|
|
|
|
|
X_batch = [] |
|
|
y_batch_list = [] |
|
|
|
|
|
for j in batch_indices: |
|
|
try: |
|
|
X_batch.append(read_binary_file(filepaths[j], max_length)) |
|
|
y_batch_list.append(labels[j]) |
|
|
except Exception as e: |
|
|
print(f"Warning: Skipping file {filepaths[j]} due to error: {e}") |
|
|
continue |
|
|
|
|
|
if not X_batch: |
|
|
continue |
|
|
|
|
|
yield np.array(X_batch), np.array(y_batch_list) |