tpotce/docker/cowrie/dist/patch_cowrie_persona_support.py

168 lines
5.4 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import argparse
from pathlib import Path
def replace_once(path, old, new):
text = path.read_text(encoding="utf-8")
if old not in text:
raise RuntimeError(f"Could not find expected text in {path}")
path.write_text(text.replace(old, new, 1), encoding="utf-8")
def replace_all(path, old, new):
text = path.read_text(encoding="utf-8")
if old not in text:
raise RuntimeError(f"Could not find expected text in {path}")
path.write_text(text.replace(old, new), encoding="utf-8")
def patch_protocol(cowrie_root):
path = cowrie_root / "src" / "cowrie" / "shell" / "protocol.py"
replace_once(path, "import traceback\n", "import traceback\nfrom pathlib import Path\n")
replace_once(
path,
""" if cmd in self.commands:
return self.commands[cmd]
if cmd[0] in (".", "/"):
path = self.fs.resolve_path(cmd, self.cwd)
if not self.fs.exists(path):
return None
else:
for i in [f"{self.fs.resolve_path(x, self.cwd)}/{cmd}" for x in paths]:
if self.fs.exists(i):
path = i
break
if path is None:
return None
""",
""" skip_python_commands = {
command.strip()
for command in CowrieConfig.get(
"shell", "skip_python_commands", fallback=""
).split(",")
if command.strip()
}
def skip_python_command(name):
return name in skip_python_commands or Path(name).name in skip_python_commands
if cmd in self.commands and not skip_python_command(cmd):
return self.commands[cmd]
if cmd[0] in (".", "/"):
path = self.fs.resolve_path(cmd, self.cwd)
if not self.fs.exists(path):
return None
else:
for i in [f"{self.fs.resolve_path(x, self.cwd)}/{cmd}" for x in paths]:
if self.fs.exists(i):
path = i
break
if path is None:
return None
""",
)
replace_once(
path,
""" try:
binary_data = read_data_bytes("txtcmds", *path.lstrip("/").split("/"))
return self.txtcmd(binary_data)
except FileNotFoundError:
pass
""",
""" txtcmds_path = CowrieConfig.get("honeypot", "txtcmds_path", fallback="")
if txtcmds_path:
operator_cmd = Path(txtcmds_path) / path.lstrip("/")
if operator_cmd.is_file():
return self.txtcmd(operator_cmd.read_bytes())
try:
binary_data = read_data_bytes("txtcmds", *path.lstrip("/").split("/"))
return self.txtcmd(binary_data)
except FileNotFoundError:
pass
""",
)
replace_once(
path,
""" if path in self.commands:
return self.commands[path]
""",
""" if path in self.commands and not skip_python_command(path):
return self.commands[path]
""",
)
def patch_ssh(cowrie_root):
path = cowrie_root / "src" / "cowrie" / "commands" / "ssh.py"
replace_once(
path,
""" self.write(
f"Linux {self.protocol.hostname} 2.6.26-2-686 #1 SMP Wed Nov 4 20:45:37 \\
UTC 2009 i686\\n"
)
""",
""" kernel_version = CowrieConfig.get("shell", "kernel_version", fallback="5.10.0")
kernel_build = CowrieConfig.get("shell", "kernel_build_string", fallback="#1 SMP")
hardware = CowrieConfig.get("shell", "hardware_platform", fallback="x86_64")
operating_system = CowrieConfig.get("shell", "operating_system", fallback="GNU/Linux")
self.write(
f"Linux {self.protocol.hostname} {kernel_version} {kernel_build} "
f"{hardware} {operating_system}\\n"
)
""",
)
def patch_netstat(cowrie_root):
path = cowrie_root / "src" / "cowrie" / "commands" / "netstat.py"
replace_all(path, "@/com/ubuntu/upstart", "/run/systemd/private")
replace_once(
path,
"Fred Baumgarten, Alan Cox, Bernd Eckenfels, Phil Blundell, Tuan Hoang and others\\n",
"Fred Baumgarten, Alan Cox, Bernd Eckenfels, Tuan Hoang and others\\n",
)
def patch_service(cowrie_root):
path = cowrie_root / "src" / "cowrie" / "commands" / "service.py"
text = path.read_text(encoding="utf-8")
start = text.index(" output = (\n")
end = text.index(" )\n for line in output:", start) + len(" )\n")
replacement = """ output = (
"[ + ] cron",
"[ + ] dbus",
"[ + ] networking",
"[ + ] ssh",
"[ + ] rsyslog",
"[ + ] udev",
"[ + ] systemd-journald",
"[ + ] systemd-networkd",
"[ - ] bluetooth",
"[ - ] cups",
"[ - ] nfs-server",
"[ - ] postfix",
"[ - ] rpcbind",
)
"""
path.write_text(text[:start] + replacement + text[end:], encoding="utf-8")
def main():
parser = argparse.ArgumentParser(description="Patch Cowrie v3 persona command support.")
parser.add_argument("--cowrie-root", required=True, type=Path)
args = parser.parse_args()
cowrie_root = args.cowrie_root.resolve()
patch_protocol(cowrie_root)
patch_ssh(cowrie_root)
patch_netstat(cowrie_root)
patch_service(cowrie_root)
if __name__ == "__main__":
main()