Source code for osrparse.replay

import lzma
import struct
from datetime import datetime, timezone, timedelta
from typing import List, Optional
import base64
from dataclasses import dataclass

from osrparse.utils import (Mod, GameMode, ReplayEvent, ReplayEventOsu,
    ReplayEventCatch, ReplayEventMania, ReplayEventTaiko, Key, KeyMania,
    KeyTaiko, LifeBarState)


class _Unpacker:
    """
    Helper class for dealing with the ``.osr`` format. Not intended to be used
    by consumers.
    """
    def __init__(self, replay_data):
        self.replay_data = replay_data
        self.offset = 0

    def string_length(self, binarystream):
        result = 0
        shift = 0
        while True:
            byte = binarystream[self.offset]
            self.offset += 1
            result = result |((byte & 0b01111111) << shift)
            if (byte & 0b10000000) == 0x00:
                break
            shift += 7
        return result

    def unpack_string(self):
        if self.replay_data[self.offset] == 0x00:
            self.offset += 1
        elif self.replay_data[self.offset] == 0x0b:
            self.offset += 1
            string_length = self.string_length(self.replay_data)
            offset_end = self.offset + string_length
            string = self.replay_data[self.offset:offset_end].decode("utf-8")
            self.offset = offset_end
            return string
        else:
            raise ValueError("Expected the first byte of a string to be 0x00 "
                f"or 0x0b, but got {self.replay_data[self.offset]}")

    def unpack_once(self, specifier):
        # always use little endian
        specifier = f"<{specifier}"
        unpacked = struct.unpack_from(specifier, self.replay_data, self.offset)
        self.offset += struct.calcsize(specifier)
        # `struct.unpack_from` always returns a tuple, even if there's only one
        # element
        return unpacked[0]

    def unpack_timestamp(self):
        ticks = self.unpack_once("q")
        timestamp = datetime.min + timedelta(microseconds=ticks/10)
        timestamp = timestamp.replace(tzinfo=timezone.utc)
        return timestamp

    def unpack_play_data(self, mode):
        replay_length = self.unpack_once("i")
        offset_end = self.offset + replay_length
        data = self.replay_data[self.offset:offset_end]
        data = lzma.decompress(data, format=lzma.FORMAT_AUTO)
        data = data.decode("ascii")
        (replay_data, rng_seed) = self.parse_replay_data(data, mode)
        self.offset = offset_end
        return (replay_data, rng_seed)

    @staticmethod
    def parse_replay_data(replay_data_str, mode):
        # remove trailing comma to make splitting easier
        replay_data_str = replay_data_str[:-1]
        events = [event.split('|') for event in replay_data_str.split(',')]

        rng_seed = None
        play_data = []
        for event in events:
            time_delta = int(event[0])
            x = event[1]
            y = event[2]
            keys = int(event[3])

            if time_delta == -12345 and event == events[-1]:
                rng_seed = keys
                continue

            if mode is GameMode.STD:
                keys = Key(keys)
                event = ReplayEventOsu(time_delta, float(x), float(y), keys)
            if mode is GameMode.TAIKO:
                event = ReplayEventTaiko(time_delta, int(x), KeyTaiko(keys))
            if mode is GameMode.CTB:
                event = ReplayEventCatch(time_delta, float(x), int(keys) == 1)
            if mode is GameMode.MANIA:
                event = ReplayEventMania(time_delta, KeyMania(keys))

            play_data.append(event)

        return (play_data, rng_seed)

    def unpack_replay_id(self):
        # old replays had replay_id stored as a short (4 bytes) instead of a
        # long (8 bytes), so fall back to short if necessary.
        # lazer checks against the gameversion before trying to parse as a
        # short, but there may be some weirdness with replays that were set
        # during this time but downloaded later having actually correct (long)
        # replay_ids, since they were likely manually migrated at some point
        # after the switch to long took place.
        # See:
        # https://github.com/ppy/osu/blob/84e1ff79a0736aa6c7a44804b585ab1c54a843
        # 99/osu.Game/Scoring/Legacy/LegacyScoreDecoder.cs#L78-L81
        try:
            replay_id = self.unpack_once("q")
        except struct.error:
            replay_id = self.unpack_once("l")
        return replay_id

    def unpack_life_bar(self):
        life_bar = self.unpack_string()
        if not life_bar:
            return None

        # remove trailing comma to make splitting easier
        life_bar = life_bar[:-1]
        states = [state.split("|") for state in life_bar.split(",")]

        return [LifeBarState(int(s[0]), float(s[1])) for s in states]

    def unpack(self):
        mode = GameMode(self.unpack_once("b"))
        game_version = self.unpack_once("i")
        beatmap_hash = self.unpack_string()
        username = self.unpack_string()
        replay_hash = self.unpack_string()
        count_300 = self.unpack_once("h")
        count_100 = self.unpack_once("h")
        count_50 = self.unpack_once("h")
        count_geki = self.unpack_once("h")
        count_katu = self.unpack_once("h")
        count_miss = self.unpack_once("h")
        score = self.unpack_once("i")
        max_combo = self.unpack_once("h")
        perfect = self.unpack_once("?")
        mods = Mod(self.unpack_once("i"))
        life_bar_graph = self.unpack_life_bar()
        timestamp = self.unpack_timestamp()
        (replay_data, rng_seed) = self.unpack_play_data(mode)
        replay_id = self.unpack_replay_id()

        return Replay(mode, game_version, beatmap_hash, username,
            replay_hash, count_300, count_100, count_50, count_geki, count_katu,
            count_miss, score, max_combo, perfect, mods, life_bar_graph,
            timestamp, replay_data, replay_id, rng_seed)


class _Packer:
    def __init__(self, replay, *, dict_size=None, mode=None):
        self.replay = replay
        self.dict_size = dict_size or 1 << 21
        self.mode = mode or lzma.MODE_FAST

    def pack_byte(self, data):
        return struct.pack("<B", data)

    def pack_short(self, data):
        return struct.pack("<H", data)

    def pack_int(self, data):
        return struct.pack("<I", data)

    def pack_long(self, data):
        return struct.pack("<Q", data)

    def pack_ULEB128(self, data):
        # https://github.com/mohanson/leb128
        r, i = [], len(data)

        while True:
            byte = i & 0x7f
            i = i >> 7

            if (i == 0 and byte & 0x40 == 0) or (i == -1 and byte & 0x40 != 0):
                r.append(byte)
                return b"".join(map(self.pack_byte, r))

            r.append(0x80 | byte)

    def pack_string(self, data):
        if data:
            return (self.pack_byte(11) + self.pack_ULEB128(data) +
                data.encode("utf-8"))
        return self.pack_byte(11) + self.pack_byte(0)

    def pack_timestamp(self):
        # windows ticks starts at year 0001, in contrast to unix time (1970).
        # 62135596800 is the number of seconds between these two years and is
        # added to account for this difference.
        # The factor of 10000000 converts seconds to ticks.

        ticks = (62135596800 + self.replay.timestamp.timestamp()) * 10000000
        ticks = int(ticks)
        return self.pack_long(ticks)

    def pack_life_bar(self):
        data = ""
        if self.replay.life_bar_graph is None:
            return self.pack_string(data)

        for state in self.replay.life_bar_graph:
            life = state.life
            # store 0 or 1 instead of 0.0 or 1.0
            if int(life) == life:
                life = int(state.life)

            data += f"{state.time}|{life},"

        return self.pack_string(data)

    def pack_replay_data(self):
        data = ""
        for event in self.replay.replay_data:
            t = event.time_delta
            if isinstance(event, ReplayEventOsu):
                data += f"{t}|{event.x}|{event.y}|{event.keys.value},"
            elif isinstance(event, ReplayEventTaiko):
                data += f"{t}|{event.x}|0|{event.keys.value},"
            elif isinstance(event, ReplayEventCatch):
                data += f"{t}|{event.x}|0|{int(event.dashing)},"
            elif isinstance(event, ReplayEventMania):
                data += f"{t}|{event.keys.value}|0|0,"

        if self.replay.rng_seed:
            data += f"-12345|0|0|{self.replay.rng_seed},"

        filters = [
            {
                "id": lzma.FILTER_LZMA1,
                "dict_size": self.dict_size,
                "mode": self.mode
            }
        ]

        data = data.encode("ascii")
        compressed = lzma.compress(data, format=lzma.FORMAT_ALONE,
            filters=filters)

        return self.pack_int(len(compressed)) + compressed

    def pack(self):
        r = self.replay
        data = b""

        data += self.pack_byte(r.mode.value)
        data += self.pack_int(r.game_version)
        data += self.pack_string(r.beatmap_hash)
        data += self.pack_string(r.username)
        data += self.pack_string(r.replay_hash)
        data += self.pack_short(r.count_300)
        data += self.pack_short(r.count_100)
        data += self.pack_short(r.count_50)
        data += self.pack_short(r.count_geki)
        data += self.pack_short(r.count_katu)
        data += self.pack_short(r.count_miss)
        data += self.pack_int(r.score)
        data += self.pack_short(r.max_combo)
        data += self.pack_byte(r.perfect)
        data += self.pack_int(r.mods.value)
        data += self.pack_life_bar()
        data += self.pack_timestamp()
        data += self.pack_replay_data()
        data += self.pack_long(r.replay_id)

        return data


[docs]@dataclass class Replay: """ A replay found in a ``.osr`` file, or following the osr format. To create a replay, you likely want ``Replay.from_path``, ``Replay.from_file``, or ``Replay.from_string``. Attributes ---------- mode: GameMode The game mode this replay was played on. game_version: int The game version this replay was played on. beatmap_hash: str The hash of the beatmap this replay was played on. username: str The user that played this replay. replay_hash: The hash of this replay. count_300: int The number of 300 judgments in this replay. count_100: int The number of 100 judgments in this replay. count_50: int The number of 50 judgments in this replay. count_geki: int The number of geki judgments in this replay. count_katu: int The number of katu judgments in this replay. count_miss: int The number of misses in this replay. score: int The score of this replay. max_combo: int The maximum combo attained in this replay. perfect: bool Whether this replay was perfect or not. mods: Mod The mods this replay was played with. life_bar_graph: Optional[List[LifeBarState]] The life bar of this replay over time. replay_data: List[ReplayEvent] The replay data of the replay, including cursor position and keys pressed. replay_id: int The replay id of this replay, or 0 if not submitted. rng_seed: Optional[int] The rng seed of this replay, or ``None`` if not present (typically not present on older replays). """ mode: GameMode game_version: int beatmap_hash: str username: str replay_hash: str count_300: int count_100: int count_50: int count_geki: int count_katu: int count_miss: int score: int max_combo: int perfect: bool mods: Mod life_bar_graph: Optional[List[LifeBarState]] timestamp: datetime replay_data: List[ReplayEvent] replay_id: int rng_seed: Optional[int]
[docs] @staticmethod def from_path(path): """ Creates a new ``Replay`` object from the ``.osr`` file at the given ``path``. Parameters ---------- path: str or os.PathLike The path to the osr file to read from. Returns ------- Replay The parsed replay object. """ with open(path, "rb") as f: return Replay.from_file(f)
[docs] @staticmethod def from_file(file): """ Creates a new ``Replay`` object from an open file object. Parameters ---------- file: file-like The file object to read from. Returns ------- Replay The parsed replay object. """ data = file.read() return Replay.from_string(data)
[docs] @staticmethod def from_string(data): """ Creates a new ``Replay`` object from a string containing ``.osr`` data. Parameters ---------- data: str The data to parse. Returns ------- Replay The parsed replay object. """ return _Unpacker(data).unpack()
[docs] def write_path(self, path, *, dict_size=None, mode=None): """ Writes the replay to the given ``path``. Parameters ---------- path: str or os.PathLike The path to where to write the replay. Notes ----- This uses the current values of any attributes, and so can be used to create an edited version of a replay, by first reading a replay, editing an attribute, then writing the replay back to its file. """ with open(path, "wb") as f: self.write_file(f, dict_size=dict_size, mode=mode)
[docs] def write_file(self, file, *, dict_size=None, mode=None): """ Writes the replay to an open file object. Parameters ---------- file: file-like The file object to write to. """ packed = self.pack(dict_size=dict_size, mode=mode) file.write(packed)
[docs] def pack(self, *, dict_size=None, mode=None): """ Returns the text representing this ``Replay``, in ``.osr`` format. The text returned by this method is suitable for writing to a file as a valid ``.osr`` file. Returns ------- str The text representing this ``Replay``, in ``.osr`` format. """ return _Packer(self, dict_size=dict_size, mode=mode).pack()
[docs]def parse_replay_data(data_string, *, decoded=False, decompressed=False, mode=GameMode.STD) -> List[ReplayEvent]: """ Parses the replay data portion of a replay from a string. This method is siutable for use with the replay data returned by api v1's ``/get_replay`` endpoint, for instance. Parameters ---------- data_string: str or bytes The replay data to parse. decoded: bool Whether ``data_string`` has already been decoded from a b64 representation. Api v1 returns a base 64 encoded string, for instance. decompressed: bool Whether ``data_string`` has already been both decompressed from lzma, and decoded to ascii. |br| For instance, the following two calls are equivalent: ``` >>> parse_replay_data(lzma_string, decoded=True) >>> ... >>> lzma_string = lzma.decompress(lzma_string).decode("ascii") >>> parse_replay_data(lzma_string, decompressed=True) ``` |br| If ``decompressed`` is ``True``, ``decoded`` is automatically set to ``True`` as well (ie, if ``decompressed`` is ``True``, we will assume ``data_string`` is not base 64 encoded). mode: GameMode What mode to parse the replay data as. """ # assume the data is already decoded if it's been decompressed if not decoded and not decompressed: data_string = base64.b64decode(data_string) if not decompressed: data_string = lzma.decompress(data_string, format=lzma.FORMAT_AUTO) data_string = data_string.decode("ascii") (replay_data, _seed) = _Unpacker.parse_replay_data(data_string, mode) return replay_data