import requests import re import json import unicodedata from bs4 import BeautifulSoup from tqdm.auto import tqdm from concurrent.futures import ThreadPoolExecutor from pymongo import InsertOne from app.config import settings from app.db import connect_db, disconnect_db from time import sleep single_value = ["Obchodné meno:", "Sídlo:", "IČO:", "Deň zápisu:", "Právna forma:"] value_type_dict = { "IČO:": "number", "Spoločníci:": "spolocnici", "Výška vkladu každého spoločníka:": "vklad" } def scrape_orsr(): """ This is the main function that scrapes data from endpoint defined in config and stores it in mongodb. """ # get all links to "Aktuálny" from the orsr url html = requests.get(settings["base_url"]+settings["endpoint"]) soup = BeautifulSoup(html.content, "html.parser") records = soup.find_all("a", string="Úplný") records = [settings["base_url"]+record["href"] for record in records] # distribute the work in #of threads defined in config parts = [records[i::settings["threads"]] for i in range(settings["threads"])] print(f"Processing {len(records)} records using {settings['threads']} threads:") with ThreadPoolExecutor() as t: for thread_id, part in enumerate(parts): t.submit(process_records, part, thread_id+1) print("All records_processed") def process_records(records, thread): """ worker for processing records in a thread :param records: list of urls of records to proceses :param thread: thread id of processing thread """ data = [] for i in tqdm(range(len(records)), desc=f"thread {thread}"): record = process_record(records[i]) data.append(InsertOne(record)) collection = connect_db() collection.bulk_write(data) disconnect_db(collection) def process_record(url): """ process one record. Scrape url and store data to mongodb :param url: url of the record :return dictionary of parameters """ html = requests.get(url) soup = BeautifulSoup(html.content, "html.parser") record = get_record_data(soup) return record def get_oddiel(soup): oddiel = soup.find("span", class_="tl", string=re.compile("Oddiel:")).parent.find("span", class_="ra").text.strip() return {"value": oddiel} def get_vlozka(soup): vlozka = soup.find("span", class_="tl", string=re.compile("Vložka")).parent.find("span", class_="ra").text.strip() return {"value": vlozka} def get_aktualizaciaUdajov(soup): aktualizacia = soup.find("td", class_="tl", string=re.compile("Dátum aktualizácie")).find_next_sibling("td").text.strip() return {"value": aktualizacia} def get_vypisUdajov(soup): vypis = soup.find("td", class_="tl", string=re.compile("Dátum výpisu")).find_next_sibling("td").text.strip() return {"value": vypis} def get_record_data(soup): record = { "oddiel": get_oddiel(soup), "vlozka": get_vlozka(soup) } # find the last table before variable data entry = soup.find("span", class_="tl", string=re.compile("Oddiel:")).parent.parent.parent while True: entry = entry.find_next_sibling("table") entry_tr = entry.find_all("tr") entry_tr = [i for i in entry_tr if i.parent == entry_tr[0].parent] if len(entry_tr) > 1: # last table with "Dátum aktualizácie údajov break # get enry name and entry data entry_container = entry_tr[0].find_all("td") entry_name = entry_container[0].text.strip() allow_multiple_active = True value_type = "text" if entry_name in single_value: allow_multiple_active = False if (v_type := value_type_dict.get(entry_name)) is not None: value_type = v_type entry_name = transform_entry_name(entry_name) entry_data = get_data(entry_container[1], value_type=value_type, allow_multiple_active=allow_multiple_active) record.update({entry_name: entry_data}) record.update({ "aktualizaciaUdajov": get_aktualizaciaUdajov(soup), "vypisUdajov": get_vypisUdajov(soup) }) return record def transform_entry_name(name): s = unicodedata.normalize("NFKD",name).encode('ascii', 'ignore').decode().replace(":", "").lower().split() return s[0].lower() + "".join(w.capitalize() for w in s[1:]) def process_entry(entry, value_type): """ extracts one entry from the table of entries for a given data :param entry: one table element of data :param value_type: type of the value data :return: tuple: (value, valid_from, valid_until, active) """ value, valid_from, valid_until, active = None, None, None, False value_td, valid_td = entry.find_all("td") # Check if active entry if value_td.span.attrs["class"][0] == "ra": active = True lines = [f.strip() for f in " ".join(["\n" if x.name == "br" else x.text.strip() for x in value_td.find_all(["br","span"])]).split("\n") if f] if value_type == "text": value = ", ".join(lines) elif value_type == "number": value = int("".join(lines).replace(" ","")) elif value_type == "spolocnici": spolocnik = lines[0] adresa = ", ".join(lines[1:]) value = { "spolocnik": spolocnik, "adresa": adresa } elif value_type == "vklad": spolocnik = lines[0] vklad = ", ".join(lines[1:]) value = { "spolocnik": spolocnik, "vklad": vklad } valid_from, valid_until = parse_oddo(valid_td.text.strip()) return value, valid_from, valid_until, active def get_data(data_td, value_type="text", allow_multiple_active=True): data_td = data_td data = {} values = [] old_values = [] for entry in data_td.find_all("table"): value, valid_from, valid_until, active = process_entry(entry, value_type) if value is None: continue if active: values.append({"value": value, "valid_from": valid_from, "valid_until": valid_until}) else: old_values.append({"value": value, "valid_from": valid_from, "valid_until": valid_until}) if not allow_multiple_active: if len(values) > 0: data.update(values[0]) else: data.update({"values": values}) data.update({"old_values": old_values}) return data def parse_oddo(text): """ Parses the valid_from and valid_until from string :param text: od_do_dates in format: "(od: DD.MM.YYYY do: DD.MM.YYYY)" :return: returns a tuple (valid_from, valid_until) """ valid_from, valid_until = "", "" if (start_from := text.find("od: ")) > -1: valid_from = text[start_from+4:start_from+14] if (start_until := text.find("do: ")) > -1: valid_until = text[start_until+4:start_until+14] return valid_from, valid_until if __name__ == "__main__": scrape_orsr()