1
0
Fork 0
mirror of https://github.com/sockspls/badfish synced 2025-04-29 16:23:09 +00:00
BadFish/tests/testing.py
Disservin a06e7004c1 Port instrumented testing to python
Since an unknown amount of time the instrumented CI has been a bit
flawed, explained here
https://github.com/official-stockfish/Stockfish/issues/5185. It also
experiences random timeout issues where restarting the workflow fixes it
or very long run times (more than other workflows) and is not very
portable.

The intention of this commit is to port the instrumented.sh to python
which also works on other operating systems. It should also be
relatively easy for beginners to add new tests to assert stockfish's
output and to run it.
From the source directory the following command can be run.

`python3 ../tests/instrumented.py --none ./stockfish`

A test runner will go over the test suites and run the test cases.

All instrumented tests should have been ported over.
The required python version for this is should be 3.7 (untested) + the
requests package, testing.py includes some infrastructure code which
setups the testing.

fixes https://github.com/official-stockfish/Stockfish/issues/5185
closes https://github.com/official-stockfish/Stockfish/pull/5583

No functional change
2024-09-17 20:24:17 +02:00

378 lines
11 KiB
Python

import subprocess
from typing import List
import os
import collections
import time
import sys
import traceback
import fnmatch
from functools import wraps
from contextlib import redirect_stdout
import io
import tarfile
import pathlib
import concurrent.futures
import tempfile
import shutil
import requests
CYAN_COLOR = "\033[36m"
GRAY_COLOR = "\033[2m"
RED_COLOR = "\033[31m"
GREEN_COLOR = "\033[32m"
RESET_COLOR = "\033[0m"
WHITE_BOLD = "\033[1m"
MAX_TIMEOUT = 60 * 5
PATH = pathlib.Path(__file__).parent.resolve()
class Valgrind:
@staticmethod
def get_valgrind_command():
return [
"valgrind",
"--error-exitcode=42",
"--errors-for-leak-kinds=all",
"--leak-check=full",
]
@staticmethod
def get_valgrind_thread_command():
return ["valgrind", "--error-exitcode=42", "--fair-sched=try"]
class TSAN:
@staticmethod
def set_tsan_option():
with open(f"tsan.supp", "w") as f:
f.write(
"""
race:Stockfish::TTEntry::read
race:Stockfish::TTEntry::save
race:Stockfish::TranspositionTable::probe
race:Stockfish::TranspositionTable::hashfull
"""
)
os.environ["TSAN_OPTIONS"] = "suppressions=./tsan.supp"
@staticmethod
def unset_tsan_option():
os.environ.pop("TSAN_OPTIONS", None)
os.remove(f"tsan.supp")
class EPD:
@staticmethod
def create_bench_epd():
with open(f"{os.path.join(PATH,'bench_tmp.epd')}", "w") as f:
f.write(
"""
Rn6/1rbq1bk1/2p2n1p/2Bp1p2/3Pp1pP/1N2P1P1/2Q1NPB1/6K1 w - - 2 26
rnbqkb1r/ppp1pp2/5n1p/3p2p1/P2PP3/5P2/1PP3PP/RNBQKBNR w KQkq - 0 3
3qnrk1/4bp1p/1p2p1pP/p2bN3/1P1P1B2/P2BQ3/5PP1/4R1K1 w - - 9 28
r4rk1/1b2ppbp/pq4pn/2pp1PB1/1p2P3/1P1P1NN1/1PP3PP/R2Q1RK1 w - - 0 13
"""
)
@staticmethod
def delete_bench_epd():
os.remove(f"{os.path.join(PATH,'bench_tmp.epd')}")
class Syzygy:
@staticmethod
def get_syzygy_path():
return os.path.abspath("syzygy")
@staticmethod
def download_syzygy():
if not os.path.isdir(os.path.join(PATH, "syzygy")):
url = "https://api.github.com/repos/niklasf/python-chess/tarball/9b9aa13f9f36d08aadfabff872882f4ab1494e95"
file = "niklasf-python-chess-9b9aa13"
with tempfile.TemporaryDirectory() as tmpdirname:
tarball_path = os.path.join(tmpdirname, f"{file}.tar.gz")
response = requests.get(url, stream=True)
with open(tarball_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
with tarfile.open(tarball_path, "r:gz") as tar:
tar.extractall(tmpdirname)
shutil.move(os.path.join(tmpdirname, file), os.path.join(PATH, "syzygy"))
class OrderedClassMembers(type):
@classmethod
def __prepare__(self, name, bases):
return collections.OrderedDict()
def __new__(self, name, bases, classdict):
classdict["__ordered__"] = [
key for key in classdict.keys() if key not in ("__module__", "__qualname__")
]
return type.__new__(self, name, bases, classdict)
class TimeoutException(Exception):
def __init__(self, message: str, timeout: int):
self.message = message
self.timeout = timeout
def timeout_decorator(timeout: float):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(func, *args, **kwargs)
try:
result = future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
raise TimeoutException(
f"Function {func.__name__} timed out after {timeout} seconds",
timeout,
)
return result
return wrapper
return decorator
class MiniTestFramework:
def __init__(self):
self.passed_test_suites = 0
self.failed_test_suites = 0
self.passed_tests = 0
self.failed_tests = 0
def has_failed(self) -> bool:
return self.failed_test_suites > 0
def run(self, classes: List[type]) -> bool:
self.start_time = time.time()
for test_class in classes:
with tempfile.TemporaryDirectory() as tmpdirname:
original_cwd = os.getcwd()
os.chdir(tmpdirname)
try:
if self.__run(test_class):
self.failed_test_suites += 1
else:
self.passed_test_suites += 1
finally:
os.chdir(original_cwd)
self.__print_summary(round(time.time() - self.start_time, 2))
return self.has_failed()
def __run(self, test_class) -> bool:
test_instance = test_class()
test_name = test_instance.__class__.__name__
test_methods = [m for m in test_instance.__ordered__ if m.startswith("test_")]
print(f"\nTest Suite: {test_name}")
if hasattr(test_instance, "beforeAll"):
test_instance.beforeAll()
fails = 0
for method in test_methods:
fails += self.__run_test_method(test_instance, method)
if hasattr(test_instance, "afterAll"):
test_instance.afterAll()
self.failed_tests += fails
return fails > 0
def __run_test_method(self, test_instance, method: str) -> int:
print(f" Running {method}... \r", end="", flush=True)
buffer = io.StringIO()
fails = 0
try:
t0 = time.time()
with redirect_stdout(buffer):
if hasattr(test_instance, "beforeEach"):
test_instance.beforeEach()
getattr(test_instance, method)()
if hasattr(test_instance, "afterEach"):
test_instance.afterEach()
duration = time.time() - t0
self.print_success(f" {method} ({duration * 1000:.2f}ms)")
self.passed_tests += 1
except Exception as e:
if isinstance(e, TimeoutException):
self.print_failure(
f" {method} (hit execution limit of {e.timeout} seconds)"
)
if isinstance(e, AssertionError):
self.__handle_assertion_error(t0, method)
fails += 1
finally:
self.__print_buffer_output(buffer)
return fails
def __handle_assertion_error(self, start_time, method: str):
duration = time.time() - start_time
self.print_failure(f" {method} ({duration * 1000:.2f}ms)")
traceback_output = "".join(traceback.format_tb(sys.exc_info()[2]))
colored_traceback = "\n".join(
f" {CYAN_COLOR}{line}{RESET_COLOR}"
for line in traceback_output.splitlines()
)
print(colored_traceback)
def __print_buffer_output(self, buffer: io.StringIO):
output = buffer.getvalue()
if output:
indented_output = "\n".join(f" {line}" for line in output.splitlines())
print(f" {RED_COLOR}⎯⎯⎯⎯⎯OUTPUT⎯⎯⎯⎯⎯{RESET_COLOR}")
print(f"{GRAY_COLOR}{indented_output}{RESET_COLOR}")
print(f" {RED_COLOR}⎯⎯⎯⎯⎯OUTPUT⎯⎯⎯⎯⎯{RESET_COLOR}")
def __print_summary(self, duration: float):
print(f"\n{WHITE_BOLD}Test Summary{RESET_COLOR}\n")
print(
f" Test Suites: {GREEN_COLOR}{self.passed_test_suites} passed{RESET_COLOR}, {RED_COLOR}{self.failed_test_suites} failed{RESET_COLOR}, {self.passed_test_suites + self.failed_test_suites} total"
)
print(
f" Tests: {GREEN_COLOR}{self.passed_tests} passed{RESET_COLOR}, {RED_COLOR}{self.failed_tests} failed{RESET_COLOR}, {self.passed_tests + self.failed_tests} total"
)
print(f" Time: {duration}s\n")
def print_failure(self, add: str):
print(f" {RED_COLOR}{RESET_COLOR}{add}", flush=True)
def print_success(self, add: str):
print(f" {GREEN_COLOR}{RESET_COLOR}{add}", flush=True)
class Stockfish:
def __init__(
self,
prefix: List[str],
path: str,
args: List[str] = [],
cli: bool = False,
):
self.path = path
self.process = None
self.args = args
self.cli = cli
self.prefix = prefix
self.output = []
self.start()
def start(self):
if self.cli:
self.process = subprocess.run(
self.prefix + [self.path] + self.args,
capture_output=True,
text=True,
)
self.process.stdout
return
self.process = subprocess.Popen(
self.prefix + [self.path] + self.args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
def setoption(self, name: str, value: str):
self.send_command(f"setoption name {name} value {value}")
def send_command(self, command: str):
if not self.process:
raise RuntimeError("Stockfish process is not started")
self.process.stdin.write(command + "\n")
self.process.stdin.flush()
@timeout_decorator(MAX_TIMEOUT)
def equals(self, expected_output: str):
for line in self.readline():
if line == expected_output:
return
@timeout_decorator(MAX_TIMEOUT)
def expect(self, expected_output: str):
for line in self.readline():
if fnmatch.fnmatch(line, expected_output):
return
@timeout_decorator(MAX_TIMEOUT)
def contains(self, expected_output: str):
for line in self.readline():
if expected_output in line:
return
@timeout_decorator(MAX_TIMEOUT)
def starts_with(self, expected_output: str):
for line in self.readline():
if line.startswith(expected_output):
return
@timeout_decorator(MAX_TIMEOUT)
def check_output(self, callback):
if not callback:
raise ValueError("Callback function is required")
for line in self.readline():
if callback(line) == True:
return
def readline(self):
if not self.process:
raise RuntimeError("Stockfish process is not started")
while True:
line = self.process.stdout.readline().strip()
self.output.append(line)
yield line
def clear_output(self):
self.output = []
def get_output(self) -> List[str]:
return self.output
def quit(self):
self.send_command("quit")
def close(self):
if self.process:
self.process.stdin.close()
self.process.stdout.close()
return self.process.wait()
return 0