import pandas
from pathlib import Path
import pandas as pd
import simplejson
import argparse
from urllib.request import urlopen
from jsonschema import validate
import museval
import os
from .version import _version
from decimal import Decimal as D
import numpy as np
[docs]class TrackStore(object):
"""
Holds the metric scores for several frames of one track.
This is the fundamental building block of other succeeding scores such as `MethodStore` and `EvalStore`. Where as the latter use pandas dataframes, this store is using a simple dict that can easily exported to json using the builtin tools
Attributes
----------
track_name : str
name of track.
win : float, optional
evaluation window duration in seconds, default to 1 second
hop : float, optional
hop length in seconds, defaults to 1 second
scores : Dict
Nested Dictionary of all scores
frames_agg : callable or str
aggregation function for frames, defaults to `'median' == `np.nanmedian`
"""
def __init__(self, track_name, win=1, hop=1, frames_agg="median"):
super(TrackStore, self).__init__()
self.win = win
self.hop = hop
if frames_agg == "median":
self.frames_agg = np.nanmedian
elif frames_agg == "mean":
self.frames_agg = np.nanmean
else:
self.frames_agg = frames_agg
self.track_name = track_name
schema_path = os.path.join(museval.__path__[0], "musdb.schema.json")
with open(schema_path) as json_file:
self.schema = simplejson.load(json_file)
self.scores = {"targets": [], "museval_version": _version}
[docs] def add_target(self, target_name, values):
"""add scores of target to the data store
Parameters
----------
target_name : str
name of target to be added to list of targets
values : List(Dict)
List of framewise data entries, see `musdb.schema.json`
"""
target_data = {"name": target_name, "frames": []}
for i, _ in enumerate(values["SDR"]):
frame_data = {
"time": i * self.hop,
"duration": self.win,
"metrics": {
"SDR": self._q(values["SDR"][i]),
"SIR": self._q(values["SIR"][i]),
"SAR": self._q(values["SAR"][i]),
"ISR": self._q(values["ISR"][i]),
},
}
target_data["frames"].append(frame_data)
self.scores["targets"].append(target_data)
@property
def json(self):
"""formats the track scores as json string
Returns
----------
json_string : str
json dump of the scores dictionary
"""
json_string = simplejson.dumps(
self.scores, indent=2, allow_nan=True, use_decimal=True
)
return json_string
@property
def df(self):
"""return track scores as pandas dataframe
Returns
----------
df : DataFrame
pandas dataframe object of track scores
"""
# encode and decode to json first
return json2df(simplejson.loads(self.json, allow_nan=True), self.track_name)
def __repr__(self):
"""Print the frames_aggregated values instead of all frames
Returns
----------
str
frames_aggreagted values of all target metrics
"""
out = ""
for t in self.scores["targets"]:
out += t["name"].ljust(16) + "==> "
for metric in ["SDR", "SIR", "ISR", "SAR"]:
out += (
metric
+ ":"
+ "{:>8.3f}".format(
self.frames_agg(
[float(f["metrics"][metric]) for f in t["frames"]]
)
)
+ " "
)
out += "\n"
return out
[docs] def validate(self):
"""Validate scores against `musdb.schema`"""
return validate(self.scores, self.schema)
def _q(self, number, precision=".00001"):
"""quantiztion of BSSEval values"""
if np.isinf(number):
return np.nan
else:
return D(D(number).quantize(D(precision)))
[docs] def save(self, path):
"""Saved the track scores as json format"""
with open(path, "w+") as f:
f.write(self.json)
[docs]class EvalStore(object):
"""
Evaluation Storage that holds the scores for multiple tracks.
This is based on a Pandas DataFrame.
Attributes
----------
df : DataFrame
Pandas DataFrame
frames_agg : str
aggregation function for frames supports `mean` and `median`, defaults to `median`
tracks_agg : str
aggregation function for frames supports `mean` and `median`, defaults to `'median'
"""
def __init__(self, frames_agg="median", tracks_agg="median"):
super(EvalStore, self).__init__()
self.df = pd.DataFrame()
self.frames_agg = frames_agg
self.tracks_agg = tracks_agg
[docs] def add_track(self, track):
"""add track score object to dataframe
Parameters
----------
track : TrackStore or DataFrame
track store object
"""
if isinstance(track, TrackStore):
self.df = pd.concat([self.df, track.df], ignore_index=True)
else:
self.df = pd.concat([self.df, track], ignore_index=True)
[docs] def add_eval_dir(self, path):
"""add precomputed json folder to dataframe
Parameters
----------
path : str
path to evaluation results
"""
p = Path(path)
if p.exists():
json_paths = p.glob("test/**/*.json")
for json_path in json_paths:
with open(json_path) as json_file:
json_string = simplejson.loads(json_file.read(), allow_nan=True)
track_df = json2df(json_string, json_path.stem)
self.add_track(track_df)
[docs] def agg_frames_scores(self):
"""aggregates frames scores
Returns
-------
df_aggregated_frames : GroupBy
data frame with frames aggregated by mean or median
"""
df_aggregated_frames_gb = self.df.groupby(["track", "target", "metric"])[
"score"
]
if self.frames_agg == "median":
df_aggregated_frames = df_aggregated_frames_gb.median()
elif self.frames_agg == "mean":
df_aggregated_frames = df_aggregated_frames_gb.mean()
return df_aggregated_frames
[docs] def agg_frames_tracks_scores(self):
"""aggregates frames and track scores
Returns
-------
df_aggregated_frames : GroupBy
data frame with frames and tracks aggregated by mean or median
"""
df_aggregated_frames = self.agg_frames_scores().reset_index()
if self.tracks_agg == "median":
df_aggregated_tracks = df_aggregated_frames.groupby(["target", "metric"])[
"score"
].median()
elif self.tracks_agg == "mean":
df_aggregated_tracks = df_aggregated_frames.groupby(["target", "metric"])[
"score"
].mean()
return df_aggregated_tracks
[docs] def load(self, path):
"""loads pickled dataframe
Parameters
----------
path : str
"""
self.df = pd.read_pickle(path)
[docs] def save(self, path):
"""saves pickled dataframe
Parameters
----------
path : str
"""
self.df.to_pickle(path)
def __repr__(self):
targets = self.df["target"].unique()
out = "Aggrated Scores ({} over frames, {} over tracks)\n".format(
self.frames_agg, self.tracks_agg
)
for target in targets:
out += target.ljust(16) + "==> "
for metric in ["SDR", "SIR", "ISR", "SAR"]:
out += (
metric
+ ":"
+ "{:>8.3f}".format(
self.agg_frames_tracks_scores().unstack()[metric][target]
)
+ " "
)
out += "\n"
return out
[docs]class MethodStore(object):
"""
Holds a pandas DataFrame that stores data for several methods.
Attributes
----------
df : DataFrame
Pandas DataFrame
frames_agg : str
aggregation function for frames supports `mean` and `median`, defaults to `median`
tracks_agg : str
aggregation function for frames supports `mean` and `median`, defaults to `'median'
"""
def __init__(self, frames_agg="median", tracks_agg="median"):
super(MethodStore, self).__init__()
self.df = pd.DataFrame()
self.frames_agg = frames_agg
self.tracks_agg = tracks_agg
[docs] def add_sisec18(self):
"""adds sisec18 participants results to DataFrame.
Scores will be downloaded on demand.
"""
print("Downloading SISEC18 Evaluation data...")
raw_data = urlopen(
"https://github.com/sigsep/sigsep-mus-2018-analysis/releases/download/v1.0.0/sisec18_mus.pandas"
)
print("Done!")
df_sisec = pd.read_pickle(raw_data, compression=None)
self.df = pd.concat([self.df, df_sisec], ignore_index=True)
[docs] def add_eval_dir(self, path):
"""add precomputed json folder to dataframe.
The method name will be defined by the basename of provided path
Parameters
----------
path : str
path to evaluation results
"""
method = EvalStore()
p = Path(path)
if p.exists():
json_paths = p.glob("test/**/*.json")
for json_path in json_paths:
with open(json_path) as json_file:
json_string = simplejson.loads(json_file.read(), allow_nan=True)
track_df = json2df(json_string, json_path.stem)
method.add_track(track_df)
self.add_evalstore(method, p.stem)
[docs] def add_evalstore(self, method, name):
"""add DataFrame
The method name will be defined by the basename of provided path
Parameters
----------
method : EvalStore
EvalStore object
name : str
name of method
"""
df_to_add = method.df
df_to_add["method"] = name
self.df = pd.concat([self.df, df_to_add], ignore_index=True)
[docs] def agg_frames_scores(self):
"""aggregates frames scores
Returns
-------
df_aggregated_frames : GroupBy
data frame with frames and tracks aggregated by mean or median
"""
df_aggregated_frames_gb = self.df.groupby(
["method", "track", "target", "metric"]
)["score"]
if self.frames_agg == "median":
df_aggregated_frames = df_aggregated_frames_gb.median()
elif self.frames_agg == "mean":
df_aggregated_frames = df_aggregated_frames_gb.mean()
return df_aggregated_frames
[docs] def agg_frames_tracks_scores(self):
"""aggregates frames and track scores
Returns
-------
df_aggregated_frames : GroupBy
data frame with frames and tracks aggregated by mean or median
"""
df_aggregated_frames = self.agg_frames_scores().reset_index()
if self.tracks_agg == "median":
df_aggregated_tracks = df_aggregated_frames.groupby(
["method", "target", "metric"]
)["score"].median()
elif self.tracks_agg == "mean":
df_aggregated_tracks = df_aggregated_frames.groupby(
["method", "target", "metric"]
)["score"].mean()
return df_aggregated_tracks
[docs] def load(self, path):
"""loads pickled dataframe
Parameters
----------
path : str
"""
self.df = pd.read_pickle(path)
[docs] def save(self, path):
"""saves pickled dataframe
Parameters
----------
path : str
"""
self.df.to_pickle(path)
[docs]def json2df(json_string, track_name):
"""converts json scores into pandas dataframe
Parameters
----------
json_string : str
track_name : str
"""
df = pd.json_normalize(json_string["targets"], ["frames"], ["name"])
df.columns = [col.replace("metrics.", "") for col in df.columns]
df = pd.melt(
df,
var_name="metric",
value_name="score",
id_vars=["time", "name"],
value_vars=["SDR", "SAR", "ISR", "SIR"],
)
df["track"] = track_name
df = df.rename(index=str, columns={"name": "target"})
return df