Alvaros
.
- Регистрация
- 14.05.16
- Сообщения
- 21.452
- Реакции
- 101
- Репутация
- 204
К 2040 году в большинстве крупных городов мира автомобили будут ездить без водителей,
В этой статье я расскажу о своем курсовом проекте: о модуле проверки задач, который я написал для эмулятора
Об авторе
Меня зовут Даниил Плющенко, я студент первого (уже второго) курса магистерской программы «
Платформа Duckietown
Платформа Duckietown состоит из двух частей. Во-первых, это уменьшенная модель городской транспортной среды с дорогами, зданиями, дорожными знаками и препятствиями. Во-вторых, это транспорт. Небольшие мобильные роботы (Duckiebots) под управлением Raspberry Pi получают информацию об окружающем мире через камеру и перевозят по дорогам жителей города — желтых резиновых уточек.
Я работал с эмулятором Duckietown. Он называется
Если вам интересно попробовать, рекомендую склонировать себе репозиторий и запустить
Скриншот из эмулятора
Эмулятор можно использовать как среду для исполнения задач. Поставим такую задачу: по заданной карте, которая состоит из одной дорожной полосы, нужно проехать один метр по прямой.
Для решения задачи можно использовать такой алгоритм:
for _ in range(25):
env.step([1, 0])
env.render()
Здесь в переменной env хранится состояние среды.
Метод step принимает на вход action: список из двух элементов, описывающий действие бота. Первый элемент задает скорость, второй — угол поворота. Метод render перерисовывает картинку, учитывая новую позицию бота. Количество шагов и скорость подобраны эмпирически: именно такие значения нужны, чтобы бот проехал ровно один метр по прямой.
Если вы захотите использовать этот фрагмент в manual_control.py, то вставьте его
Тестирующая система
Хотелось бы иметь возможность проверять такие задачи автоматически: взять реализацию алгоритма управления ботом, промоделировать поездку и сообщить, корректно ли предложенный алгоритм выполняет поставленную задачу. Такая тестирующая система позволила бы использовать среду во время подготовки к соревнованиям по управлению автономным транспортом, а также в образовательных целях: выдавать обучающимся набор задач и проверять их решения автоматически. Я занимался ее разработкой во время работы над курсовым проектом и ниже расскажу о том, что у меня получилось.
Последовательность шагов при проверке решения выглядит так:
Задачи в тестирующую систему можно добавить самостоятельно или обратиться к тем, которые сделал я. У каждой задачи есть генератор условия — класс, который генерирует состояние окружающей среды. В нем указываются название карты и стартовая позиция бота внутри начальной клетки. Также задаются целевые координаты: в данном случае это точка в одном метре от начальной позиции.
class Ride1MTaskGenerator(TaskGenerator):
def __init__(self, args):
super().__init__(args)
def generate_task(self):
env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
env = env_loader(
map_name="straight_road",
position_on_initial_road_tile=PositionOnInitialRoadTile(
x_coefficient=0.5,
z_coefficient=0.5,
angle=0,
),
)
self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
self.generated_task['env'] = env
env.render()
return self.generated_task
Здесь TrackingDuckietownEnv и CVTaskEnv — классы-обертки, которые используются для сохранения информации о поездке для дальнейшего анализа.
class TrackingDuckietownEnv:
def __init__(self, **kwargs):
self.__wrapped = DuckietownEnv(**kwargs)
…
…
def step(self, action):
obs, reward, done, misc = self.__wrapped.step(action)
message = misc['Simulator']['msg']
if 'invalid pose' in message.lower():
raise InvalidPoseException(message)
for t in self.trackers:
t.track(self)
return obs, reward, done, misc
Трекеры собирают информацию о текущем состоянии, например, о позиции бота.
CVTaskEnv используется, если требуется решение с использованием только информации с камеры («компьютерного зрения»), а не функций эмулятора: например, если нужно узнать, насколько далеко бот располагается от центра полосы или где находится ближайший видимый объект. Обращение к функциям эмулятора может упростить решение задачи, и класс CVTaskEnv ограничивает вызов методов эмулятора. Он используется, когда выставлен флаг is_cv_task.
class CVTaskEnv:
def __init__(self, **kwargs):
self.__wrapped = TrackingDuckietownEnv(**kwargs)
def __getattr__(self, item):
if item in self.__wrapped.overriden_methods:
return self.__wrapped.__getattribute__(item)
ALLOWED_FOR_CV_TASKS = [
'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
'road_tile_size', 'trip_statistics'
]
if item in ALLOWED_FOR_CV_TASKS:
return self.__wrapped.__getattr__(item)
else:
raise AttributeError(item + " call is not allowed in CV tasks")
После того как исполнение решения завершено — при условии, что оно не было прервано по тайм-ауту, — информация о поездке пропускается через последовательность чекеров. Если все чекеры отработали успешно, задача считается решенной правильно. Иначе выводится поясняющий вердикт — например, бот врезался, съехал с дороги и т. д.
Вот один из стандартных чекеров. Он проверяет, что бот в конце поездки вернулся в начальную точку. Это может быть полезно, если, например, в задаче требуется доехать до определенной точки, а затем вернуться обратно.
class SameInitialAndFinalCoordinatesChecker(Checker):
def __init__(self, maximum_deviation=0.1, **kwargs):
super().__init__(**kwargs)
self.maximum_deviation = maximum_deviation
def check(self, generated_task, trackers, **kwargs):
trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
trip_data = trip_statistics.trip_data
if len(trip_data) == 0:
return True
initial_coordinates = trip_data[0].position.coordinates
final_coordinates = trip_data[-1].position.coordinates
return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation
Такой тестирующей системой можно пользоваться в ручном режиме, то есть вручную запускать проверку, а затем визуально изучать вердикт. Если бы мы хотели, к примеру, запустить онлайн-курс по автономному транспорту на
Интеграция с онлайн-платформами
Для тестирования задач часто используют технологию External Grader, которая была разработана платформой
При использовании External Grader образовательная платформа не занимается проверкой задач самостоятельно, а формирует очередь посылок, которые отправляет на другое устройство. Функционал подключения к очереди реализован в проекте
Рассмотрим подробнее момент подключения к очереди. После того как образовательная платформа предоставит данные для подключения, их нужно будет добавить в
Xqueue-watcher вызывает endpoint
Запустить xqueue-watcher можно так:
make requirements && python -m xqueue_watcher -d conf.d/
Допустим, мы хотим воспользоваться технологией External Grader, но не хотим запускать курс на онлайн-платформе. Xqueue-watcher реализован в предположении, что есть некоторое файловое хранилище, куда выгружаются файлы с решениями (платформы такое хранилище имеют). Мы можем модифицировать Xqueue, чтобы файловое хранилище стало не нужно, и подобные системы можно будет запускать, в общем-то, даже у себя на ноутбуке.
Для начала нужно научиться поддерживать саму очередь посылок. Функционал очереди предоставляет проект
Картинка взята из
Запустить его можно так:
apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address
Может понадобиться создать файл ~/edx/edx.log
По умолчанию xqueue отдает xqueue-watcher’у не содержимое посылок, то есть файлы с решением задачи, а ссылки на эти файлы в файловом хранилище. Чтобы не зависеть от файлового хранилища, можно сделать так, чтобы пересылались сами файлы, и хранить их на той же машине, с которой запущен xqueue-watcher.
Вот как нужно изменить исходный код, чтобы этого достигнуть:
Реализацию метода
def _upload(file_to_upload, path, name):
'''
Upload file using the provided keyname.
Returns:
URL to access uploaded file
'''
full_path = os.path.join(path, name)
return default_storage.save(full_path, file_to_upload)
Если никакое файловое хранилище не было подключено, то данный метод сохранит файл с решением по пути $queue_name/$file_to_upload_hash.
В реализации get_sumbission в файле ext_interface.py вместо
xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
with open(xqueue_files[xqueue_file], 'r') as f:
xqueue_files[xqueue_file] = f.readlines()
Передадим не ссылки (пути) на файлы, а их содержимое.
Каждое решение исполняется в «одноразовом» докер-контейнере с ограниченными ресурсами, то есть для исполнения каждого решения создается отдельный контейнер, который удаляется после окончания тестирования. Чтобы создавать такие контейнеры и исполнять команды в них, используется
Итог
В этой статье я рассказал о том, как была создана тестирующая система и задачи по автономному транспорту, а также об интеграции данной системы с образовательными онлайн-платформами, которые используют технологию External Grader. Надеюсь, что в скором времени будет запущен курс, использующий эту систему, а часть про интеграцию с онлайн-платформами пригодится желающим создать собственный офлайн- или онлайн-курс.
You must be registered for see links
. Но чтобы расслабиться на дороге через 20 лет, сейчас нужно хорошо поработать над алгоритмами автономного вождения. Для этого в MIT разработали платформу
You must be registered for see links
, которая позволяет делать это с минимальными затратами. В Duckietown недорогие мобильные роботы перевозят желтых резиновых уточек по уменьшенной модели города. На базе этой платформы проводят соревнования
You must be registered for see links
и запускают курсы в университетах по применению технологий искусственного интеллекта в управлении беспилотным транспортом.В этой статье я расскажу о своем курсовом проекте: о модуле проверки задач, который я написал для эмулятора
You must be registered for see links
. Речь пойдет о тестирующей системе и об интеграции этой системы с образовательными онлайн-платформами, которые используют технологию External Grader — например, с платформой
You must be registered for see links
.
Об авторе
Меня зовут Даниил Плющенко, я студент первого (уже второго) курса магистерской программы «
You must be registered for see links
» в Питерской Вышке. В 2019 году я закончил бакалавриат «
You must be registered for see links
» в этом же университете.Платформа Duckietown
You must be registered for see links
— это исследовательский проект в области беспилотного транспорта. Организаторы проекта создали платформу, которая помогает внедрять новый подход к обучению в области искусственного интеллекта и робототехники. Все началось как курс в MIT в 2016 году, но постепенно распространилось по всему миру и по разным ступеням образования: от старшей школы до магистерских программ.Платформа Duckietown состоит из двух частей. Во-первых, это уменьшенная модель городской транспортной среды с дорогами, зданиями, дорожными знаками и препятствиями. Во-вторых, это транспорт. Небольшие мобильные роботы (Duckiebots) под управлением Raspberry Pi получают информацию об окружающем мире через камеру и перевозят по дорогам жителей города — желтых резиновых уточек.
Я работал с эмулятором Duckietown. Он называется
You must be registered for see links
, и это open-source проект, написанный на языке Python. Эмулятор помещает вашего бота внутрь города, изменяет его положение в зависимости от алгоритма, который вы используете (или от кнопки, которую нажали), перерисовывает картинку и записывает в логи текущую позицию бота. Если вам интересно попробовать, рекомендую склонировать себе репозиторий и запустить
You must be registered for see links
: так ботом можно будет управлять при помощи стрелок на клавиатуре.
Скриншот из эмулятора
Эмулятор можно использовать как среду для исполнения задач. Поставим такую задачу: по заданной карте, которая состоит из одной дорожной полосы, нужно проехать один метр по прямой.
Для решения задачи можно использовать такой алгоритм:
for _ in range(25):
env.step([1, 0])
env.render()
Здесь в переменной env хранится состояние среды.
Метод step принимает на вход action: список из двух элементов, описывающий действие бота. Первый элемент задает скорость, второй — угол поворота. Метод render перерисовывает картинку, учитывая новую позицию бота. Количество шагов и скорость подобраны эмпирически: именно такие значения нужны, чтобы бот проехал ровно один метр по прямой.
Если вы захотите использовать этот фрагмент в manual_control.py, то вставьте его
You must be registered for see links
. Код до этого места занимается загрузкой среды. Для простоты можно его переиспользовать, а после добавить предложенное выше решение задачи.Тестирующая система
Хотелось бы иметь возможность проверять такие задачи автоматически: взять реализацию алгоритма управления ботом, промоделировать поездку и сообщить, корректно ли предложенный алгоритм выполняет поставленную задачу. Такая тестирующая система позволила бы использовать среду во время подготовки к соревнованиям по управлению автономным транспортом, а также в образовательных целях: выдавать обучающимся набор задач и проверять их решения автоматически. Я занимался ее разработкой во время работы над курсовым проектом и ниже расскажу о том, что у меня получилось.
Последовательность шагов при проверке решения выглядит так:
Задачи в тестирующую систему можно добавить самостоятельно или обратиться к тем, которые сделал я. У каждой задачи есть генератор условия — класс, который генерирует состояние окружающей среды. В нем указываются название карты и стартовая позиция бота внутри начальной клетки. Также задаются целевые координаты: в данном случае это точка в одном метре от начальной позиции.
class Ride1MTaskGenerator(TaskGenerator):
def __init__(self, args):
super().__init__(args)
def generate_task(self):
env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
env = env_loader(
map_name="straight_road",
position_on_initial_road_tile=PositionOnInitialRoadTile(
x_coefficient=0.5,
z_coefficient=0.5,
angle=0,
),
)
self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
self.generated_task['env'] = env
env.render()
return self.generated_task
Здесь TrackingDuckietownEnv и CVTaskEnv — классы-обертки, которые используются для сохранения информации о поездке для дальнейшего анализа.
class TrackingDuckietownEnv:
def __init__(self, **kwargs):
self.__wrapped = DuckietownEnv(**kwargs)
…
…
def step(self, action):
obs, reward, done, misc = self.__wrapped.step(action)
message = misc['Simulator']['msg']
if 'invalid pose' in message.lower():
raise InvalidPoseException(message)
for t in self.trackers:
t.track(self)
return obs, reward, done, misc
Трекеры собирают информацию о текущем состоянии, например, о позиции бота.
CVTaskEnv используется, если требуется решение с использованием только информации с камеры («компьютерного зрения»), а не функций эмулятора: например, если нужно узнать, насколько далеко бот располагается от центра полосы или где находится ближайший видимый объект. Обращение к функциям эмулятора может упростить решение задачи, и класс CVTaskEnv ограничивает вызов методов эмулятора. Он используется, когда выставлен флаг is_cv_task.
class CVTaskEnv:
def __init__(self, **kwargs):
self.__wrapped = TrackingDuckietownEnv(**kwargs)
def __getattr__(self, item):
if item in self.__wrapped.overriden_methods:
return self.__wrapped.__getattribute__(item)
ALLOWED_FOR_CV_TASKS = [
'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
'road_tile_size', 'trip_statistics'
]
if item in ALLOWED_FOR_CV_TASKS:
return self.__wrapped.__getattr__(item)
else:
raise AttributeError(item + " call is not allowed in CV tasks")
После того как исполнение решения завершено — при условии, что оно не было прервано по тайм-ауту, — информация о поездке пропускается через последовательность чекеров. Если все чекеры отработали успешно, задача считается решенной правильно. Иначе выводится поясняющий вердикт — например, бот врезался, съехал с дороги и т. д.
Вот один из стандартных чекеров. Он проверяет, что бот в конце поездки вернулся в начальную точку. Это может быть полезно, если, например, в задаче требуется доехать до определенной точки, а затем вернуться обратно.
class SameInitialAndFinalCoordinatesChecker(Checker):
def __init__(self, maximum_deviation=0.1, **kwargs):
super().__init__(**kwargs)
self.maximum_deviation = maximum_deviation
def check(self, generated_task, trackers, **kwargs):
trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
trip_data = trip_statistics.trip_data
if len(trip_data) == 0:
return True
initial_coordinates = trip_data[0].position.coordinates
final_coordinates = trip_data[-1].position.coordinates
return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation
Такой тестирующей системой можно пользоваться в ручном режиме, то есть вручную запускать проверку, а затем визуально изучать вердикт. Если бы мы хотели, к примеру, запустить онлайн-курс по автономному транспорту на
You must be registered for see links
, нам понадобилась бы интеграция с платформой. Об этом и пойдет речь в следующей части статьи.Интеграция с онлайн-платформами
Для тестирования задач часто используют технологию External Grader, которая была разработана платформой
You must be registered for see links
.При использовании External Grader образовательная платформа не занимается проверкой задач самостоятельно, а формирует очередь посылок, которые отправляет на другое устройство. Функционал подключения к очереди реализован в проекте
You must be registered for see links
. Xqueue-watcher извлекает посылки, и затем они тестируются модулем проверки (в котором обычно происходят более нетривиальные действия, чем сравнение текста/чисел). После этого вердикт проверки отправляется обратно на сторону образовательной платформы.Рассмотрим подробнее момент подключения к очереди. После того как образовательная платформа предоставит данные для подключения, их нужно будет добавить в
You must be registered for see links
, а в методе
You must be registered for see links
реализовать непосредственно запуск проверки. Более подробные инструкции можно найти
You must be registered for see links
и
You must be registered for see links
.Xqueue-watcher вызывает endpoint
You must be registered for see links
, который извлекает посылку из очереди, если это возможно. После этого она отправляется на тестирование. Затем xqueue-watcher вызывает
You must be registered for see links
для возврата вердикта.Запустить xqueue-watcher можно так:
make requirements && python -m xqueue_watcher -d conf.d/
Допустим, мы хотим воспользоваться технологией External Grader, но не хотим запускать курс на онлайн-платформе. Xqueue-watcher реализован в предположении, что есть некоторое файловое хранилище, куда выгружаются файлы с решениями (платформы такое хранилище имеют). Мы можем модифицировать Xqueue, чтобы файловое хранилище стало не нужно, и подобные системы можно будет запускать, в общем-то, даже у себя на ноутбуке.
Для начала нужно научиться поддерживать саму очередь посылок. Функционал очереди предоставляет проект
You must be registered for see links
.
Картинка взята из
You must be registered for see links
.Запустить его можно так:
apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address
Может понадобиться создать файл ~/edx/edx.log
По умолчанию xqueue отдает xqueue-watcher’у не содержимое посылок, то есть файлы с решением задачи, а ссылки на эти файлы в файловом хранилище. Чтобы не зависеть от файлового хранилища, можно сделать так, чтобы пересылались сами файлы, и хранить их на той же машине, с которой запущен xqueue-watcher.
Вот как нужно изменить исходный код, чтобы этого достигнуть:
Реализацию метода
You must be registered for see links
в файле lms_interface.py заменим на эту:def _upload(file_to_upload, path, name):
'''
Upload file using the provided keyname.
Returns:
URL to access uploaded file
'''
full_path = os.path.join(path, name)
return default_storage.save(full_path, file_to_upload)
Если никакое файловое хранилище не было подключено, то данный метод сохранит файл с решением по пути $queue_name/$file_to_upload_hash.
В реализации get_sumbission в файле ext_interface.py вместо
You must be registered for see links
напишем:xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
with open(xqueue_files[xqueue_file], 'r') as f:
xqueue_files[xqueue_file] = f.readlines()
Передадим не ссылки (пути) на файлы, а их содержимое.
Каждое решение исполняется в «одноразовом» докер-контейнере с ограниченными ресурсами, то есть для исполнения каждого решения создается отдельный контейнер, который удаляется после окончания тестирования. Чтобы создавать такие контейнеры и исполнять команды в них, используется
You must be registered for see links
(по факту в качестве обертки над Docker API).Итог
В этой статье я рассказал о том, как была создана тестирующая система и задачи по автономному транспорту, а также об интеграции данной системы с образовательными онлайн-платформами, которые используют технологию External Grader. Надеюсь, что в скором времени будет запущен курс, использующий эту систему, а часть про интеграцию с онлайн-платформами пригодится желающим создать собственный офлайн- или онлайн-курс.



