import copy
import os
import logging
from typing import Optional
import numpy as np
import pandas as pd
from multiprocessing import Pool
from rich.progress import track
from pathlib import Path
from tracklab.datastruct import TrackingDataset, TrackingSet
from tracklab.utils import wandb
log = logging.getLogger(__name__)
[docs]
class MOT(TrackingDataset):
def __init__(self, dataset_path: str, categories_list: list, nvid: int = -1, nframes: int = -1,
vids_dict: list = None, public_dets_subpath: str = None,
leave_one_out_idx: Optional[int] = None, *args, **kwargs):
self.categories_list = categories_list
self.dataset_path = Path(dataset_path)
self.public_dets_subpath = public_dets_subpath
assert self.dataset_path.exists(), "'{}' directory does not exist".format(
self.dataset_path
)
set_names = ['train', 'val', 'test']
with Pool(processes=3) as pool:
pool_args = [(set_name, self.dataset_path, nvid, vids_dict[set_name]) for set_name in set_names]
results = pool.map(self.load_set_wrapper, pool_args)
sets_dict = {set_name: result for set_name, result in results}
if leave_one_out_idx is not None:
video_ids = list(sets_dict["train"].video_metadatas.id)
loo_video_id = int(sets_dict["train"].video_metadatas.iloc[leave_one_out_idx].id)
video_ids.remove(loo_video_id)
train_set = copy.deepcopy(sets_dict["train"])
val_set = copy.deepcopy(sets_dict["train"])
train_set.filter_videos(video_ids)
sets_dict["train"] = train_set
val_set.filter_videos([loo_video_id])
sets_dict["val"] = val_set
log.info(sets_dict.keys())
super().__init__(dataset_path, sets_dict, nvid, nframes, vids_dict, *args, **kwargs)
[docs]
def load_set_wrapper(self, args):
set_name, dataset_path, nvid, vids_dict = args
set_path = dataset_path / set_name
if os.path.isdir(set_path):
return set_name, self.load_set(set_path, nvid, vids_dict)
else:
log.warning(f"The {set_name} split does not exist.")
return set_name, None
[docs]
def read_ini_file(self, file_path):
with open(file_path, 'r') as file:
lines = file.readlines()
return {k: v for line in lines if len((split_line := line.strip().split('='))) == 2 for k, v in [split_line]}
[docs]
def load_set(self, dataset_path, nvid=-1, vids_filter_set=None):
video_metadatas_list = []
image_metadata_list = []
detections_list = []
public_detections_list = []
split = os.path.basename(dataset_path) # Get the split name from the dataset path
video_list = os.listdir(dataset_path)
video_list = [v for v in video_list if not v.startswith('.')]
video_list.sort()
if nvid > 0:
assert vids_filter_set is None or len(vids_filter_set) == 0, "Cannot use both 'nvid' and 'vids_dict' arguments at the same time."
video_list = video_list[:nvid]
image_counter = 0
person_counter = 0
warning_trigger_gt = False
warning_trigger_public_det = False
for video_folder in track(sorted(video_list),
f"Loading {self.__class__.__name__} '{split}' set videos"): # Sort videos by name
video_folder_path = os.path.join(dataset_path, video_folder)
if os.path.isdir(video_folder_path):
# Read seqinfo.ini
seqinfo_path = os.path.join(video_folder_path, 'seqinfo.ini')
seqinfo_data = self.read_ini_file(seqinfo_path)
# Read ground truth detections
gt_path = os.path.join(video_folder_path, 'gt', 'gt.txt')
if os.path.isfile(gt_path):
detections_df = self.read_motchallenge_formatted_file(gt_path)
detections_df['person_id'] = detections_df['track_id'] + person_counter
detections_df['image_id'] = detections_df['image_id'] - 1 + image_counter
detections_df['video_id'] = len(video_metadatas_list) + 1
# detections_df['visibility'] = 1 # FIXME not sure to put it to 1
detections_list.append(detections_df)
person_counter += len(detections_df['track_id'].unique())
else:
if not warning_trigger_gt:
warning_trigger_gt = True
log.warning(f"Warning: The {split} split does not contain ground truth.")
# read public detections file
if self.public_dets_subpath is not None:
det_path = os.path.join(video_folder_path, self.public_dets_subpath)
if os.path.isfile(det_path):
detections_df = self.read_motchallenge_result_formatted_file(det_path)
if detections_df['image_id'].min() == 1:
detections_df['image_id'] = detections_df['image_id'] - 1
detections_df['image_id'] = detections_df['image_id'] + image_counter
detections_df['video_id'] = len(video_metadatas_list) + 1
public_detections_list.append(detections_df)
else:
if not warning_trigger_public_det:
warning_trigger_public_det = True
log.warning(f"Warning: The {split} split does not contain public detections.")
# Append video metadata
nframes = int(seqinfo_data.get('seqLength', 0))
video_metadata = {
'id': len(video_metadatas_list) + 1,
'nframes': nframes,
'frame_rate': int(seqinfo_data.get('frameRate', 0)),
'seq_length': nframes,
'im_width': int(seqinfo_data.get('imWidth', 0)),
'im_height': int(seqinfo_data.get('imHeight', 0)),
'name': video_folder,
}
# Append video metadata
video_metadatas_list.append(video_metadata)
# Append image metadata
img_folder_path = os.path.join(video_folder_path, 'img1')
img_metadata_df = pd.DataFrame({
'frame': [i for i in range(0, nframes)],
'nframes': nframes,
'id': [image_counter + i for i in range(0, nframes)],
'video_id': len(video_metadatas_list),
'file_path': [os.path.join(img_folder_path, f) for f in
sorted([f for f in os.listdir(img_folder_path) if f.endswith('.jpg')])],
})
image_counter += nframes
image_metadata_list.append(img_metadata_df)
# Assign the categories to the video metadata # TODO at dataset level?
for video_metadata in video_metadatas_list:
video_metadata['categories'] = self.categories_list
# Concatenate dataframes
video_metadata = pd.DataFrame(video_metadatas_list)
image_metadata = pd.concat(image_metadata_list, ignore_index=True)
if len(detections_list):
detections = pd.concat(detections_list, ignore_index=True)
else:
detections = pd.DataFrame(
columns=['image_id', 'track_id', 'bbox_ltwh', 'bbox_conf', 'class', 'visibility', 'person_id',
'video_id'])
if self.public_dets_subpath is not None:
if len(public_detections_list):
public_detections = pd.concat(public_detections_list, ignore_index=True)
public_detections = public_detections.sort_values(by=['video_id', 'image_id'],
ascending=[True, True])
else:
public_detections = pd.DataFrame(
columns=['image_id', 'bbox_ltwh', 'bbox_conf', 'video_id', 'category_id'])
# Use video_id, image_id, track_id as unique id
detections = detections.sort_values(by=['video_id', 'image_id', 'track_id'], ascending=[True, True, True])
# detections['id'] = detections['video_id'].astype(str) + "_" + \
# detections['image_id'].astype(str) + "_" + detections['track_id'].astype(str)
detections['id'] = detections.index
# Add category id to detections
detections['category_id'] = detections['class']
detections.set_index("id", drop=False, inplace=True)
image_metadata.set_index("id", drop=False, inplace=True)
video_metadata.set_index("id", drop=False, inplace=True)
# Add is_labeled column to image_metadata
image_metadata['is_labeled'] = True
# Reorder columns in dataframes
video_metadata_columns = ['name', 'nframes', 'frame_rate', 'seq_length', 'im_width', 'im_height']
video_metadata_columns.extend(set(video_metadata.columns) - set(video_metadata_columns))
video_metadata = video_metadata[video_metadata_columns]
image_metadata_columns = ['video_id', 'frame', 'file_path', 'is_labeled']
image_metadata_columns.extend(set(image_metadata.columns) - set(image_metadata_columns))
image_metadata = image_metadata[image_metadata_columns]
image_gt = image_metadata.copy()
detections_column_ordered = ['image_id', 'video_id', 'track_id', 'person_id', 'bbox_ltwh', 'bbox_conf', 'class',
'visibility']
detections_column_ordered.extend(set(detections.columns) - set(detections_column_ordered))
detections = detections[detections_column_ordered]
# filter out videos not in vids_filter_set
# FIXME should normally be done before loading the videos, but cannot do that because it would change some ids from one run to the other, since image_id, person_id etc are computed with a counter.
if vids_filter_set is not None and len(vids_filter_set) > 0:
missing_videos = set(vids_filter_set) - set(video_list)
assert not missing_videos, f"The following videos provided in config 'dataset.vids_dict' do not exist in {split} set: {missing_videos}"
video_list = [video for video in video_list if video in vids_filter_set]
# get video ids
video_ids = set(video_metadata[video_metadata['name'].isin(video_list)]['id'].tolist())
# filter out detections, image_metadata and video_metadata
detections = detections[detections['video_id'].isin(video_ids)]
image_metadata = image_metadata[image_metadata['video_id'].isin(video_ids)]
video_metadata = video_metadata[video_metadata['id'].isin(video_ids)]
tracking_set = TrackingSet(
video_metadata,
image_metadata,
detections,
image_gt,
)
if self.public_dets_subpath is not None:
tracking_set.detections_public = public_detections
return tracking_set
[docs]
def process_trackeval_results(self, results, dataset_config, eval_config):
if "SUMMARIES" in results and "pedestrian" in results["SUMMARIES"]:
res = {
f"{k}": float(v) if '.' in v else int(v)
for _, metrics in results["SUMMARIES"]["pedestrian"].items()
for k, v in metrics.items()
}
wandb.log(res)
res_by_vid = {}
for video_name, video_data in results.items():
if video_name != "SUMMARIES":
for category, metrics in video_data["pedestrian"].items():
for metric_name, metric_value in metrics.items():
if not isinstance(metric_value, np.ndarray): # Ignore np.array values
res_by_vid[f"tracking_by_video/{video_name}/{metric_name}"] = metric_value
wandb.log(res_by_vid)