From 1167747faea012621d8783a614cc41d576443d7b Mon Sep 17 00:00:00 2001 From: Corentin Date: Wed, 3 Sep 2025 14:12:58 +0900 Subject: [PATCH] Initial commit --- .gitignore | 3 + map.py | 254 +++++++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 61 ++++++++++++ 3 files changed, 318 insertions(+) create mode 100644 .gitignore create mode 100644 map.py create mode 100644 pyproject.toml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..65c2696 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.json +*.osm +*.pyc diff --git a/map.py b/map.py new file mode 100644 index 0000000..0dd4284 --- /dev/null +++ b/map.py @@ -0,0 +1,254 @@ +from argparse import ArgumentParser +from collections import namedtuple +import json +from pathlib import Path +import xml.etree.ElementTree as ET + +import cv2 +import numpy as np +from PIL import Image, ImageDraw + + +Bounds = namedtuple('Bounds', ['maxlat', 'maxlon', 'minlat', 'minlon']) +BoundsTypes = [float, float, float, float] + +Member = namedtuple('Member', ['type', 'ref', 'role']) +MemberTypes = [str, int, str] + +Node = namedtuple('Node', ['id', 'lat', 'lon', 'tags', 'uid', 'version', 'visible']) +NodeTypes = [int, float, float, dict, int, int, bool] + +Relation = namedtuple('Relation', ['id', 'members', 'tags', 'uid', 'version', 'visible']) +RelationTypes = [int, list, dict, int, int, bool] + +Way = namedtuple('Way', ['id', 'nodes', 'tags', 'uid', 'version', 'visible']) +WayTypes = [int, list, dict, int, int, bool] + + +def create_bounds(xml) -> Bounds: + return Bounds(*(key_type(xml.get(key)) for key_type, key in zip(BoundsTypes, Bounds._fields, strict=True))) + + +def create_node(xml) -> Node: + tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')} + return Node(*(tags if key == 'tags' else key_type(xml.get(key)) + for key_type, key in zip(NodeTypes, Node._fields, strict=True))) + + +def create_way(xml) -> Way: + nodes = [int(nd.get('ref')) for nd in xml.iter('nd')] + tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')} + params = [] + for key_type, key in zip(WayTypes, Way._fields, strict=True): + match key: + case 'tags': + params.append(tags) + case 'nodes': + params.append(nodes) + case _: + params.append(key_type(xml.get(key))) + return Way(*params) + + +def create_member(xml) -> Member: + return Member(*(key_type(xml.get(key)) for key_type, key in zip(MemberTypes, Member._fields, strict=True))) + + +def create_relation(xml) -> Relation: + members = [create_member(member) for member in xml.iter('member')] + tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')} + params = [] + for key_type, key in zip(RelationTypes, Relation._fields, strict=True): + match key: + case 'tags': + params.append(tags) + case 'members': + params.append(members) + case _: + params.append(key_type(xml.get(key))) + return Relation(*params) + + +def parse_xml(xml_path: Path) -> tuple[Bounds, dict[int, Node], dict[int, Way], dict[int, Relation]]: + root = ET.parse(xml_path).getroot() + + bounds = root.find('bounds') + if bounds is None: + raise RuntimeError('Cannot find bounds') + bounds = create_bounds(bounds) + + nodes = {} + relations = {} + ways = {} + for element in root: + match element.tag: + case 'node': + node = create_node(element) + nodes[node.id] = node + case 'way': + way = create_way(element) + ways[way.id] = way + case 'relation': + relation = create_relation(element) + relations[relation.id] = relation + case 'bounds': + pass + case _: + raise RuntimeError(f'Unexpected tag : {element.tag}') + return bounds, nodes, ways, relations + + +def main(): + parser = ArgumentParser() + parser.add_argument('osm', type=Path, help='Path to OSM file to parse') + parser.add_argument('--convert', action='store_true', default=False, help='Convert data into JSON') + parser.add_argument('--map-height', type=int, default=720, help='Height of output map in pixels') + arguments = parser.parse_args() + + osm_path: Path = arguments.osm + convert_data: bool = arguments.convert + map_height: int = arguments.map_height + del arguments + + bounds, nodes, ways, relations = parse_xml(osm_path) + print(f'{bounds} {len(nodes)=} {len(ways)=} {len(relations)=}') + + if convert_data: + with (osm_path.parent / f'{osm_path.stem}.json').open(mode='w', encoding='utf-8') as json_file: + json_file.write('{\n') + json_file.write( + f'"bounds:" {json.dumps(dict(zip(bounds._fields, bounds, strict=True)), ensure_ascii=False)},\n') + json_file.write('"nodes": [\n') + previous_entry = '' + for node in nodes.values(): + if previous_entry: + json_file.write(f' {previous_entry},\n') + previous_entry = json.dumps( + {k: v for k, v in zip(node._fields, node, strict=True) if not isinstance(v, dict) or v}, + ensure_ascii=False) + if previous_entry: + json_file.write(f' {previous_entry}\n') + json_file.write('],\n"ways": [\n') + previous_entry = '' + for way in ways.values(): + if previous_entry: + json_file.write(f' {previous_entry},\n') + previous_entry = json.dumps( + {k: v for k, v in zip(way._fields, way, strict=True) if not isinstance(v, dict) or v}, + ensure_ascii=False) + if previous_entry: + json_file.write(f' {previous_entry}\n') + json_file.write('],\n"relations": [\n') + previous_entry = '' + for relation in relations.values(): + if previous_entry: + json_file.write(f' {previous_entry},\n') + previous_entry = json.dumps( + {k: (v if k != 'members' else [dict(zip(m._fields, m, strict=True)) for m in v]) + for k, v in zip(relation._fields, relation, strict=True) if not isinstance(v, dict) or v}, + ensure_ascii=False) + if previous_entry: + json_file.write(f' {previous_entry}\n') + json_file.write('}') + return + + lat_range = bounds.maxlat - bounds.minlat + lon_range = bounds.maxlon - bounds.minlon + map_ratio = lon_range / lat_range + map_width = int(map_height * map_ratio) + lat_scale = map_height / lat_range + lon_scale = map_width / lon_range + + background_color = (235, 235, 235) + image = Image.new('RGB', (map_width, map_height), background_color) + draw = ImageDraw.Draw(image, 'RGB') + + def draw_way_polygon(way: Way, color: tuple[int, ...] | str, outline: tuple[int, ...] | str | None = None, + width: int = 1): + nonlocal bounds, draw, lat_scale, lon_scale, map_height, map_width, nodes + coords = [] + for node_id in way.nodes: + node = nodes[node_id] + new_coord = ( + min(max((node.lon - bounds.minlon) * lon_scale, 0), map_width), + min(max(map_height - ((node.lat - bounds.minlat) * lat_scale), 0), map_height)) + if not coords or new_coord != coords[-1]: + coords.append(new_coord) + draw.polygon(coords, fill=color, outline=outline, width=width) + + def draw_way_line(way: Way, width: int, color: tuple[int, ...] | str, joint: str | None = None): + nonlocal bounds, draw, lat_scale, lon_scale, map_height, map_width, nodes + coords = [] + for node_id in way.nodes: + node = nodes[node_id] + new_coord = ( + min(max((node.lon - bounds.minlon) * lon_scale, 0), map_width), + min(max(map_height - ((node.lat - bounds.minlat) * lat_scale), 0), map_height)) + if not coords or new_coord != coords[-1]: + coords.append(new_coord) + draw.line(coords, width=width, fill=color, joint=joint) + + for relation in relations.values(): + if 'natural' in relation.tags and relation.tags['natural'] == 'wood': + outer_ways: list[Way] = [] + inner_ways: list[Way] = [] + for member in relation.members: + member: Member + if member.type != 'way' or member.ref not in ways: + continue + if member.role == 'outer': + outer_ways.append(ways[member.ref]) + elif member.role == 'inner': + inner_ways.append(ways[member.ref]) + for way in outer_ways: + draw_way_polygon(way, (140, 200, 120)) + for way in inner_ways: + draw_way_polygon(way, background_color) + + bridge_outline = (80, 80, 80) + for way in ways.values(): + if 'amenity' in way.tags: + match way.tags['amenity']: + case 'school' | 'kindergarten' | 'library' | 'research_institute' | 'university': + draw_way_polygon(way, (250, 250, 220)) + case ('bicycle_parking' | 'bus_station' | 'fuel' | 'parking' | 'parking_entrance' | 'parking_space' + | 'taxi'): + draw_way_polygon(way, (220, 220, 220), outline=(180, 180, 180)) + case _: + draw_way_polygon(way, (240, 200, 200)) + + for way in ways.values(): + if 'highway' in way.tags: + match way.tags['highway']: + case 'motorway' | 'motorway_link': + draw_way_line(way, 15, (220, 160, 160), joint='curve') + case 'trunk' | 'trunk_link': + draw_way_line(way, 13, bridge_outline if 'bridge' in way.tags else (230, 180, 130), joint='curve') + draw_way_line(way, 11, (240, 200, 170), joint='curve') + case 'primary' | 'primary_link': + draw_way_line(way, 11, bridge_outline if 'bridge' in way.tags else (230, 200, 110), joint='curve') + draw_way_line(way, 9, (245, 225, 150), joint='curve') + case 'secondary' | 'secondary_link': + draw_way_line(way, 11, bridge_outline if 'bridge' in way.tags else (200, 200, 140), joint='curve') + draw_way_line(way, 9, (220, 220, 180), joint='curve') + case 'tertiary' | 'tertiraty_link': + draw_way_line(way, 9, bridge_outline if 'bridge' in way.tags else (160, 160, 160), joint='curve') + draw_way_line(way, 7, (245, 245, 245), joint='curve') + case 'residential' | 'unclassified' | 'road': + draw_way_line(way, 7, bridge_outline if 'bridge' in way.tags else (160, 160, 160), joint='curve') + draw_way_line(way, 5, (250, 250, 250), joint='curve') + elif 'waterway' in way.tags and way.tags['waterway'] in {'river', 'steam', 'tidal_channel', 'flowline'}: + draw_way_line(way, 11, (180, 200, 240), joint='curve') + elif 'building' in way.tags: + draw_way_polygon(way, (190, 180, 160), outline=(120, 120, 120)) + + image = cv2.cvtColor(np.asarray(image), cv2.COLOR_BGR2RGB) + print(f'{image.shape=}') + cv2.namedWindow('map', cv2.WINDOW_KEEPRATIO) + while cv2.waitKey(50) & 0xff != ord('q'): + cv2.imshow('map', image) + cv2.destroyAllWindows() + + +if __name__ == '__main__': + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..32add01 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,61 @@ +[project] +name = "map" +version = "0.0.1" +requires-python = ">=3.10" + +[project.optional-dependencies] +test = ["pytest"] + +[tool.ruff] +cache-dir = "/tmp/ruff" +exclude = [ + ".git", + ".ruff_cache", + ".venv" +] +line-length = 120 +indent-width = 4 +target-version = "py310" + + +[tool.isort] +combine_as_imports = true +force_sort_within_sections = true +lexicographical = true +lines_after_imports = 2 +multi_line_output = 4 +no_sections = false +order_by_type = true + +[tool.pytest.ini_options] +addopts = "-p no:cacheprovider -s -vv" +testpaths = ["tests"] +pythonpath = ["."] + +[tool.ruff.lint] +preview = true +select = ["A", "ARG", "B", "C", "E", "F", "FURB", "G", "I","ICN", "ISC", "PERF", "PIE", "PL", "PLE", "PTH", + "Q", "RET", "RSE", "RUF", "SLF", "SIM", "T20", "TCH", "UP", "W"] +ignore = ["E275", "FURB140", "I001", "PERF203", "RET502", "RET503", "SIM105"] + +[tool.ruff.lint.per-file-ignores] +"tests/*" = ["SLF001"] + +[tool.ruff.lint.flake8-quotes] +inline-quotes = "single" + +[tool.ruff.lint.pylint] +max-args=16 +max-branches=24 +max-locals=16 +max-nested-blocks=8 +max-public-methods=16 +max-returns=8 +max-statements=96 + +[tool.ruff.lint.mccabe] +max-complexity = 20 + +[tool.coverage.run] +command_line = "-m pytest" +omit = ["tests/*"]