AutomatedTiktokScraperAndCo.../sel.py
2022-03-14 18:12:38 -04:00

91 lines
No EOL
3.4 KiB
Python

from selenium.webdriver.common.by import By
import time
import undetected_chromedriver as uc
from selenium.webdriver.common.action_chains import ActionChains
from random import uniform
from selenium.webdriver.chrome.options import Options
import asyncio
import os
from shazamio import Shazam
from multiprocessing import Queue, Process, connection
def getVideosOnPage(url : str, max : int):
options = Options()
options.headless = True
driver = uc.Chrome(options=options)
actions = ActionChains(driver)
driver.get(url)
itr = 0
found = []
vidfound = []
output = []
foundusers = []
user = ""
isProfile = False
if "@" in url and "video" not in url:
user = url.split("@")[1]
isProfile = True
while(len(output) < max):
try:
if itr >= 60 and "topics" in url:
driver.get(url)
itr = 0
#we go by the thumbnail because they are static in the html
heading1 = driver.find_elements(by=By.CSS_SELECTOR, value="img")
#approximant, works most of the time, not always
users = (list(map(lambda x: x.get_attribute("innerText"), driver.find_elements(by=By.CSS_SELECTOR, value="h4"))))
users = list(filter(lambda x: x not in foundusers, users))
usercount = 0
for x in heading1:
text = x.get_attribute("alt")
if x in found:
continue
#because we're going by the images if they don't fit a certain class then we can continue
#because of JS served content the class may not be constant... hopefully this works.
if "ImgPoster" not in x.get_attribute("class"):
continue
actions.move_to_element(x).perform()
#to try and throw off bot detection. We scroll like a human might.
time.sleep(0.2+uniform(0, 0.6))
found.append(x)
#videos load in as you view them, so we grab them by scrolling through the html and grabbing the only videos avaliable.
for y in driver.find_elements(by=By.CSS_SELECTOR, value="video"):
video = y.get_attribute("src")
if video in vidfound:
continue
if not isProfile:
user = users[usercount]
foundusers.append(user)
vidfound.append(video)
itr+=1
output.append((video, text, user))
usercount+=1
except:
continue
driver.close()
return output
def idSong(a):
files = os.listdir(a)
out = []
shazam = Shazam()
q = Queue()
threads = []
def GenerateShazam(a,x,q):
try:
out = asyncio.run(shazam.recognize_song(f'{a}/{x}'))["track"]
q.put_nowait((out["title"], out["subtitle"], int(x.split(".")[0])))
except:
q.put_nowait(("N/A","N/A", int(x.split(".")[0])))
for x in files:
threads.append(Process(target=GenerateShazam, args=(a, x, q)))
for x in threads:
x.start()
out = []
connection.wait(p.sentinel for p in threads)
for x in threads:
out.append(q.get())
return out
def callback():
return