91 lines
No EOL
3.4 KiB
Python
91 lines
No EOL
3.4 KiB
Python
from selenium.webdriver.common.by import By
|
|
import time
|
|
import undetected_chromedriver as uc
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
from random import uniform
|
|
from selenium.webdriver.chrome.options import Options
|
|
import asyncio
|
|
import os
|
|
from shazamio import Shazam
|
|
from multiprocessing import Queue, Process, connection
|
|
|
|
def getVideosOnPage(url : str, max : int):
|
|
options = Options()
|
|
options.headless = True
|
|
driver = uc.Chrome(options=options)
|
|
actions = ActionChains(driver)
|
|
driver.get(url)
|
|
itr = 0
|
|
found = []
|
|
vidfound = []
|
|
output = []
|
|
foundusers = []
|
|
user = ""
|
|
isProfile = False
|
|
if "@" in url and "video" not in url:
|
|
user = url.split("@")[1]
|
|
isProfile = True
|
|
while(len(output) < max):
|
|
try:
|
|
if itr >= 60 and "topics" in url:
|
|
driver.get(url)
|
|
itr = 0
|
|
#we go by the thumbnail because they are static in the html
|
|
heading1 = driver.find_elements(by=By.CSS_SELECTOR, value="img")
|
|
#approximant, works most of the time, not always
|
|
users = (list(map(lambda x: x.get_attribute("innerText"), driver.find_elements(by=By.CSS_SELECTOR, value="h4"))))
|
|
users = list(filter(lambda x: x not in foundusers, users))
|
|
usercount = 0
|
|
for x in heading1:
|
|
text = x.get_attribute("alt")
|
|
if x in found:
|
|
continue
|
|
#because we're going by the images if they don't fit a certain class then we can continue
|
|
#because of JS served content the class may not be constant... hopefully this works.
|
|
if "ImgPoster" not in x.get_attribute("class"):
|
|
continue
|
|
actions.move_to_element(x).perform()
|
|
#to try and throw off bot detection. We scroll like a human might.
|
|
time.sleep(0.2+uniform(0, 0.6))
|
|
found.append(x)
|
|
#videos load in as you view them, so we grab them by scrolling through the html and grabbing the only videos avaliable.
|
|
for y in driver.find_elements(by=By.CSS_SELECTOR, value="video"):
|
|
video = y.get_attribute("src")
|
|
if video in vidfound:
|
|
continue
|
|
if not isProfile:
|
|
user = users[usercount]
|
|
foundusers.append(user)
|
|
vidfound.append(video)
|
|
itr+=1
|
|
output.append((video, text, user))
|
|
usercount+=1
|
|
except:
|
|
continue
|
|
driver.close()
|
|
return output
|
|
def idSong(a):
|
|
files = os.listdir(a)
|
|
out = []
|
|
shazam = Shazam()
|
|
q = Queue()
|
|
threads = []
|
|
def GenerateShazam(a,x,q):
|
|
try:
|
|
out = asyncio.run(shazam.recognize_song(f'{a}/{x}'))["track"]
|
|
q.put_nowait((out["title"], out["subtitle"], int(x.split(".")[0])))
|
|
except:
|
|
q.put_nowait(("N/A","N/A", int(x.split(".")[0])))
|
|
for x in files:
|
|
threads.append(Process(target=GenerateShazam, args=(a, x, q)))
|
|
for x in threads:
|
|
x.start()
|
|
out = []
|
|
connection.wait(p.sentinel for p in threads)
|
|
for x in threads:
|
|
out.append(q.get())
|
|
|
|
return out
|
|
|
|
def callback():
|
|
return |