Última actividad 1 day ago

's Avatar zdebel revisó este gist 1 day ago. Ir a la revisión

1 file changed, 491 insertions

street_find.py(archivo creado)

@@ -0,0 +1,491 @@
1 + import argparse
2 + import bisect
3 + import csv
4 + import unicodedata
5 + from dataclasses import dataclass
6 +
7 + _EXTRA_MAP = str.maketrans({"ł": "l", "Ł": "L"})
8 +
9 +
10 + @dataclass(frozen=True)
11 + class City:
12 + sym: str
13 + name: str
14 + woj_code: str
15 + woj_name: str
16 + pow_code: str
17 + gmi_code: str
18 + rodz_gmi: str
19 + rm: str
20 + mz: str
21 + sympod: str
22 +
23 +
24 + @dataclass(frozen=True)
25 + class CityGroup:
26 + label: str
27 + woj_code: str
28 + woj_name: str
29 + pow_code: str
30 + members: tuple[City, ...]
31 +
32 +
33 + def normalize(text: str) -> str:
34 + text = text.translate(_EXTRA_MAP)
35 + text = unicodedata.normalize("NFKD", text.lower())
36 + return "".join(c for c in text if not unicodedata.combining(c))
37 +
38 +
39 + def load_terc(path: str, encoding: str = "utf-8-sig") -> tuple[dict[str, str], dict[tuple[str, str, str, str], str]]:
40 + """Laduje mape kodow wojewodztw i jednostek TERC."""
41 + woj_names: dict[str, str] = {}
42 + terc_units: dict[tuple[str, str, str, str], str] = {}
43 + with open(path, encoding=encoding, newline="") as f:
44 + reader = csv.DictReader(f, delimiter=";")
45 + for row in reader:
46 + woj = row["WOJ"].strip()
47 + pow_code = row["POW"].strip()
48 + gmi_code = row["GMI"].strip()
49 + rodz_code = row["RODZ"].strip()
50 + name = row["NAZWA"].strip()
51 + if woj and not pow_code and not gmi_code and name:
52 + woj_names[woj] = name
53 + if woj and pow_code and gmi_code and rodz_code and name:
54 + terc_units[(woj, pow_code, gmi_code, rodz_code)] = name
55 + return woj_names, terc_units
56 +
57 +
58 + def load_simc(path: str, woj_names: dict[str, str], encoding: str = "utf-8-sig") -> dict[str, City]:
59 + """Laduje miejscowosci z SIMC i dokleja nazwe wojewodztwa z TERC."""
60 + cities: dict[str, City] = {}
61 + with open(path, encoding=encoding, newline="") as f:
62 + reader = csv.DictReader(f, delimiter=";")
63 + for row in reader:
64 + sym = row["SYM"].strip()
65 + woj_code = row["WOJ"].strip()
66 + cities[sym] = City(
67 + sym=sym,
68 + name=row["NAZWA"].strip(),
69 + woj_code=woj_code,
70 + woj_name=woj_names.get(woj_code, "nieznane"),
71 + pow_code=row["POW"].strip(),
72 + gmi_code=row["GMI"].strip(),
73 + rodz_gmi=row["RODZ_GMI"].strip(),
74 + rm=row["RM"].strip(),
75 + mz=row["MZ"].strip(),
76 + sympod=row["SYMPOD"].strip(),
77 + )
78 + return cities
79 +
80 +
81 + def load_ulic(
82 + path: str,
83 + terc_units: dict[tuple[str, str, str, str], str],
84 + encoding: str = "utf-8-sig",
85 + ) -> dict[str, list[dict[str, str]]]:
86 + """Laduje ulice z ULIC i grupuje po SYM miejscowosci."""
87 + streets_by_sym: dict[str, list[dict[str, str]]] = {}
88 + with open(path, encoding=encoding, newline="") as f:
89 + reader = csv.DictReader(f, delimiter=";")
90 + for row in reader:
91 + sym = row["SYM"].strip()
92 + cecha = row["CECHA"].strip()
93 + nazwa_1 = row["NAZWA_1"].strip()
94 + nazwa_2 = row["NAZWA_2"].strip()
95 + full = " ".join(part for part in [cecha, nazwa_1, nazwa_2] if part).strip()
96 + street = {
97 + "woj": row["WOJ"].strip(),
98 + "pow": row["POW"].strip(),
99 + "gmi": row["GMI"].strip(),
100 + "rodz_gmi": row["RODZ_GMI"].strip(),
101 + "sym": sym,
102 + "sym_ul": row["SYM_UL"].strip(),
103 + "full": full,
104 + "cecha": cecha,
105 + "nazwa_1": nazwa_1,
106 + "nazwa_2": nazwa_2,
107 + "stan_na": row["STAN_NA"].strip(),
108 + "terc_unit_name": terc_units.get(
109 + (row["WOJ"].strip(), row["POW"].strip(), row["GMI"].strip(), row["RODZ_GMI"].strip()),
110 + "",
111 + ),
112 + }
113 + streets_by_sym.setdefault(sym, []).append(street)
114 + return streets_by_sym
115 +
116 +
117 + def format_street(
118 + street: dict[str, str],
119 + city_name_basic: str = "",
120 + city_name_precise: str = "",
121 + ) -> str:
122 + basic_name = city_name_basic or street.get("city_name_basic", "")
123 + precise_name = city_name_precise or street.get("city_name_precise", "") or street.get("terc_unit_name", "")
124 +
125 + if precise_name and basic_name and normalize(precise_name) == normalize(basic_name):
126 + precise_name = ""
127 +
128 + parts = [
129 + f"full={street['full']}",
130 + f"sym_ul={street['sym_ul']}",
131 + f"cecha={street['cecha']}",
132 + f"nazwa_1={street['nazwa_1']}",
133 + f"nazwa_2={street['nazwa_2']}",
134 + f"woj={street['woj']}",
135 + f"pow={street['pow']}",
136 + f"gmi={street['gmi']}",
137 + f"rodz_gmi={street['rodz_gmi']}",
138 + f"sym={street['sym']}",
139 + f"miejscowosc={basic_name}",
140 + ]
141 + if precise_name:
142 + parts.append(f"miejscowosc_precyzyjna={precise_name}")
143 + parts.append(f"stan_na={street['stan_na']}")
144 + return " | ".join(parts)
145 +
146 +
147 + def build_city_index(cities: dict[str, City]) -> tuple[list[tuple[str, City]], list[str]]:
148 + """Buduje indeks prefiksowy do wyszukiwania miejscowosci."""
149 + index: list[tuple[str, City]] = []
150 + for city in cities.values():
151 + words = city.name.split()
152 + for word in words:
153 + index.append((normalize(word), city))
154 + if len(words) > 1:
155 + index.append((normalize(city.name.replace(" ", "")), city))
156 + index.sort(key=lambda x: x[0])
157 + keys = [item[0] for item in index]
158 + return index, keys
159 +
160 +
161 + def search_cities(query: str, city_index: list[tuple[str, City]], city_keys: list[str], limit: int = 10) -> list[City]:
162 + query_norm = normalize(query)
163 + if len(query_norm) < 2:
164 + return []
165 + lo = bisect.bisect_left(city_keys, query_norm)
166 + hi = bisect.bisect_left(city_keys, query_norm + "\uffff")
167 + seen: dict[str, City] = {}
168 + for _, city in city_index[lo:hi]:
169 + if city.sym not in seen:
170 + seen[city.sym] = city
171 + if len(seen) >= limit:
172 + break
173 + return list(seen.values())
174 +
175 +
176 + def build_street_index(streets: list[dict[str, str]]) -> tuple[list[tuple[str, dict[str, str]]], list[str]]:
177 + """Buduje indeks prefiksowy do wyszukiwania ulic w jednej miejscowosci."""
178 + index: list[tuple[str, dict[str, str]]] = []
179 + for street in streets:
180 + words = [street["cecha"], street["nazwa_1"], street["nazwa_2"]]
181 + for word in words:
182 + if word:
183 + index.append((normalize(word), street))
184 + if street["nazwa_1"] and street["nazwa_2"]:
185 + index.append((normalize(street["nazwa_1"] + street["nazwa_2"]), street))
186 + index.sort(key=lambda x: x[0])
187 + keys = [item[0] for item in index]
188 + return index, keys
189 +
190 +
191 + def search_streets(
192 + query: str,
193 + street_index: list[tuple[str, dict[str, str]]],
194 + street_keys: list[str],
195 + limit: int = 20,
196 + ) -> list[dict[str, str]]:
197 + query_norm = normalize(query)
198 + if len(query_norm) < 2:
199 + return []
200 + lo = bisect.bisect_left(street_keys, query_norm)
201 + hi = bisect.bisect_left(street_keys, query_norm + "\uffff")
202 + seen: dict[str, dict[str, str]] = {}
203 + for _, street in street_index[lo:hi]:
204 + key = street["sym_ul"]
205 + if key not in seen:
206 + seen[key] = street
207 + if len(seen) >= limit:
208 + break
209 + return list(seen.values())
210 +
211 +
212 + def city_group_key(city: City) -> tuple[str, ...]:
213 + """Buduje klucz grupowania tak, by laczyc administracyjne czesci jednego miasta."""
214 + if city.sympod and city.sympod != city.sym:
215 + return ("parent", city.sympod)
216 +
217 + if city.rm in {"95", "98"} and city.mz == "1":
218 + return ("rm95_98", normalize(city.name), city.woj_code, city.pow_code)
219 +
220 + return ("self", city.sym)
221 +
222 +
223 + def group_cities(cities: list[City]) -> list[CityGroup]:
224 + grouped: dict[tuple[str, ...], list[City]] = {}
225 + for city in cities:
226 + grouped.setdefault(city_group_key(city), []).append(city)
227 +
228 + groups: list[CityGroup] = []
229 + for members in grouped.values():
230 + sorted_members = sorted(members, key=lambda c: c.sym)
231 + first = sorted_members[0]
232 + groups.append(
233 + CityGroup(
234 + label=first.name,
235 + woj_code=first.woj_code,
236 + woj_name=first.woj_name,
237 + pow_code=first.pow_code,
238 + members=tuple(sorted_members),
239 + )
240 + )
241 + return groups
242 +
243 +
244 + def merge_group_streets(
245 + group: CityGroup,
246 + streets_by_sym: dict[str, list[dict[str, str]]],
247 + include_precise_city_name: bool = False,
248 + ) -> list[dict[str, str]]:
249 + merged: list[dict[str, str]] = []
250 + seen_sym_ul: set[str] = set()
251 + for city in group.members:
252 + for street in streets_by_sym.get(city.sym, []):
253 + sym_ul = street["sym_ul"]
254 + if sym_ul not in seen_sym_ul:
255 + seen_sym_ul.add(sym_ul)
256 + if include_precise_city_name:
257 + precise_name = street.get("terc_unit_name") or city.name
258 + enriched = {**street, "city_name_basic": city.name}
259 + if normalize(precise_name) != normalize(city.name):
260 + enriched["city_name_precise"] = precise_name
261 + merged.append(enriched)
262 + else:
263 + merged.append(street)
264 + return merged
265 +
266 +
267 + def city_match_rank(query_norm: str, label: str) -> tuple[int, str]:
268 + label_norm = normalize(label)
269 + words = label_norm.split()
270 +
271 + if label_norm == query_norm:
272 + match_level = 0
273 + elif label_norm.startswith(query_norm):
274 + match_level = 1
275 + elif any(word.startswith(query_norm) for word in words):
276 + match_level = 2
277 + elif query_norm in label_norm:
278 + match_level = 3
279 + else:
280 + match_level = 4
281 +
282 + return match_level, label_norm
283 +
284 +
285 + def print_stats(woj_names: dict[str, str], cities: dict[str, City], streets_by_sym: dict[str, list[dict[str, str]]]) -> None:
286 + streets_count = sum(len(items) for items in streets_by_sym.values())
287 + print(f"Wojewodztwa (TERC): {len(woj_names)}")
288 + print(f"Miejscowosci (SIMC): {len(cities)}")
289 + print(f"Miejscowosci z ulicami (ULIC): {len(streets_by_sym)}")
290 + print(f"Lacznie ulic (ULIC): {streets_count}")
291 +
292 +
293 + def run_search_city(query: str, city_index: list[tuple[str, City]], city_keys: list[str], limit: int) -> int:
294 + results = search_cities(query, city_index, city_keys, limit=limit)
295 + if not results:
296 + print("Brak wynikow.")
297 + return 1
298 + for city in results:
299 + print(
300 + f"{city.name} | sym={city.sym} | woj={city.woj_code} ({city.woj_name}) "
301 + f"| pow={city.pow_code} | gmi={city.gmi_code}"
302 + )
303 + return 0
304 +
305 +
306 + def run_list_streets(
307 + city_sym: str,
308 + streets_by_sym: dict[str, list[dict[str, str]]],
309 + cities_by_sym: dict[str, City],
310 + limit: int,
311 + ) -> int:
312 + streets = streets_by_sym.get(city_sym, [])
313 + if not streets:
314 + print(f"Brak ulic dla SYM={city_sym}.")
315 + return 1
316 + city_name = cities_by_sym.get(city_sym).name if city_sym in cities_by_sym else ""
317 + for street in streets[:limit]:
318 + precise_name = street.get("terc_unit_name") or city_name
319 + print(format_street(street, city_name_basic=city_name, city_name_precise=precise_name))
320 + if len(streets) > limit:
321 + print(f"... i jeszcze {len(streets) - limit} ulic")
322 + return 0
323 +
324 +
325 + def run_interactive(
326 + city_index: list[tuple[str, City]],
327 + city_keys: list[str],
328 + streets_by_sym: dict[str, list[dict[str, str]]],
329 + cities_by_sym: dict[str, City],
330 + city_limit: int,
331 + street_limit: int,
332 + ) -> int:
333 + print("Tryb interaktywny.")
334 + print("Wpisz nazwe miejscowosci (min. 2 znaki), pusta linia = wyjscie.\n")
335 + while True:
336 + try:
337 + city_query = input("miasto >>> ").strip()
338 + except (EOFError, KeyboardInterrupt):
339 + print("\nKoniec.")
340 + return 0
341 +
342 + if not city_query:
343 + print("Koniec.")
344 + return 0
345 +
346 + if len(normalize(city_query)) < 2:
347 + print("Podaj co najmniej 2 znaki.\n")
348 + continue
349 +
350 + city_matches = search_cities(city_query, city_index, city_keys, limit=city_limit * 5)
351 + grouped_cities = group_cities(city_matches)
352 + query_norm = normalize(city_query)
353 +
354 + ranked_groups: list[tuple[CityGroup, int, tuple[int, str]]] = []
355 + for group in grouped_cities:
356 + streets_count = len(merge_group_streets(group, streets_by_sym))
357 + rank_key = city_match_rank(query_norm, group.label)
358 + ranked_groups.append((group, streets_count, rank_key))
359 +
360 + ranked_groups.sort(key=lambda item: (item[2][0], -item[1], item[2][1]))
361 + ranked_groups = ranked_groups[:city_limit]
362 +
363 + if not ranked_groups:
364 + print("Brak wynikow.\n")
365 + continue
366 +
367 + for idx, (group, streets_count, _) in enumerate(ranked_groups):
368 + members_suffix = ""
369 + if len(group.members) > 1:
370 + members_suffix = f" | scalone_sym={len(group.members)}"
371 + print(
372 + f"[{idx}] {group.label} | woj={group.woj_code} ({group.woj_name}) | pow={group.pow_code} "
373 + f"| ulice={streets_count}{members_suffix}"
374 + )
375 +
376 + selected: CityGroup | None = None
377 + while selected is None:
378 + try:
379 + choice = input("wybierz numer miasta (Enter = anuluj): ").strip()
380 + except (EOFError, KeyboardInterrupt):
381 + print("\nKoniec.")
382 + return 0
383 +
384 + if not choice:
385 + print()
386 + break
387 + if not choice.isdigit() or int(choice) >= len(ranked_groups):
388 + print("Niepoprawny wybor.")
389 + continue
390 + selected = ranked_groups[int(choice)][0]
391 +
392 + if selected is None:
393 + continue
394 +
395 + city_streets = merge_group_streets(selected, streets_by_sym, include_precise_city_name=True)
396 + merged_syms = ",".join(city.sym for city in selected.members)
397 + print(f"\nWybrane miasto: {selected.label}")
398 + print(f"SYM w grupie: {merged_syms}")
399 + print(f"Ulic w miejscowosci: {len(city_streets)}")
400 +
401 + if not city_streets:
402 + print("Brak ulic dla tej miejscowosci.\n")
403 + continue
404 +
405 + street_index, street_keys = build_street_index(city_streets)
406 + print("Wpisz fragment nazwy ulicy (min. 2 znaki), pusta linia = wybor innego miasta.\n")
407 +
408 + while True:
409 + try:
410 + street_query = input("ulica >>> ").strip()
411 + except (EOFError, KeyboardInterrupt):
412 + print("\nKoniec.")
413 + return 0
414 +
415 + if not street_query:
416 + print()
417 + break
418 +
419 + if len(normalize(street_query)) < 2:
420 + print("Podaj co najmniej 2 znaki.\n")
421 + continue
422 +
423 + found = search_streets(street_query, street_index, street_keys, limit=street_limit)
424 + if not found:
425 + print("Brak ulic.\n")
426 + continue
427 +
428 + for street in found:
429 + print(f"- {format_street(street)}")
430 + print()
431 +
432 +
433 + def build_parser() -> argparse.ArgumentParser:
434 + parser = argparse.ArgumentParser(
435 + description="CLI do ladowania TERC/SIMC/ULIC oraz wyszukiwania miejscowosci i ulic"
436 + )
437 + parser.add_argument("--terc", default="TERC.csv", help="Sciezka do TERC.csv")
438 + parser.add_argument("--simc", default="SIMC.csv", help="Sciezka do SIMC.csv")
439 + parser.add_argument("--ulic", default="ULIC.csv", help="Sciezka do ULIC.csv")
440 +
441 + subparsers = parser.add_subparsers(dest="command", required=True)
442 +
443 + subparsers.add_parser("stats", help="Pokaz statystyki zaladowanych danych")
444 +
445 + city_parser = subparsers.add_parser("miasto", help="Szukaj miejscowosci po fragmencie nazwy")
446 + city_parser.add_argument("query", help="Fraza do wyszukania")
447 + city_parser.add_argument("--limit", type=int, default=10, help="Maksymalna liczba wynikow")
448 +
449 + streets_parser = subparsers.add_parser("ulice", help="Pokaz ulice dla SYM miejscowosci")
450 + streets_parser.add_argument("sym", help="Kod SYM miejscowosci")
451 + streets_parser.add_argument("--limit", type=int, default=30, help="Maksymalna liczba ulic")
452 +
453 + interactive_parser = subparsers.add_parser("interaktywne", help="Tryb interaktywny: miasto -> ulice")
454 + interactive_parser.add_argument("--city-limit", type=int, default=10, help="Maksymalna liczba miast do wyboru")
455 + interactive_parser.add_argument("--street-limit", type=int, default=20, help="Maksymalna liczba wynikow ulic")
456 +
457 + return parser
458 +
459 +
460 + def main() -> int:
461 + parser = build_parser()
462 + args = parser.parse_args()
463 +
464 + woj_names, terc_units = load_terc(args.terc)
465 + cities = load_simc(args.simc, woj_names)
466 + streets_by_sym = load_ulic(args.ulic, terc_units)
467 + city_index, city_keys = build_city_index(cities)
468 +
469 + if args.command == "stats":
470 + print_stats(woj_names, cities, streets_by_sym)
471 + return 0
472 + if args.command == "miasto":
473 + return run_search_city(args.query, city_index, city_keys, limit=args.limit)
474 + if args.command == "ulice":
475 + return run_list_streets(args.sym, streets_by_sym, cities, limit=args.limit)
476 + if args.command == "interaktywne":
477 + return run_interactive(
478 + city_index,
479 + city_keys,
480 + streets_by_sym,
481 + cities,
482 + city_limit=args.city_limit,
483 + street_limit=args.street_limit,
484 + )
485 +
486 + parser.print_help()
487 + return 1
488 +
489 +
490 + if __name__ == "__main__":
491 + raise SystemExit(main())
Siguiente Anterior