今天學(xué)習過(guò)程中寫(xiě)了一個(gè)flask搜索引擎包括爬蟲(chóng)和搜索頁(yè)面。功能有全文搜索,分頁(yè),爬蟲(chóng)等等。
只需安裝flask和jieba即可:
pip install flask jieba

搜索引擎后端:
from flask import Flask, render_template, request, session, jsonifyimport sqlite3import jiebaimport mathimport stringimport reapp = Flask(__name__)DATABASE = 'data.db'def create_database():conn = sqlite3.connect(DATABASE)c = conn.cursor()c.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS contents USING fts5(title, url , favicon , description , content , keywords , date,img )''')conn.commit()conn.close()def tokenize(title):words = [word for word in jieba.cut(title) if word not in string.punctuation] # 分詞并去掉標點(diǎn)符號keywords = [word for word in words if len(word) > 1] # 去掉單個(gè)字keywords = list(set(keywords)) # 去重keywords.sort(key=words.index) # 按在title中出現的順序排序keyword_str = ' '.join(keywords) # 將關(guān)鍵詞列表轉換為以空格分隔的字符串keyword_str = ''.join(filter(lambda x: x not in string.punctuation, keyword_str)) # 去掉字符串中的標點(diǎn)符號return keyword_strdef search_contents(query, offset, per_page):conn = sqlite3.connect(DATABASE)conn.row_factory = sqlite3.Rowc = conn.cursor()c.execute('SELECT COUNT(*) FROM contents WHERE keywords MATCH :query',{'query': query})total_results = c.fetchone()[0] # 獲取搜索結果總數total_pages = calculate_total_pages(total_results, per_page) # 計算總頁(yè)數if offset >= total_results:offset = (total_pages - 1) * per_pagec.execute('SELECT title, url, favicon, description, keywords, date FROM contents WHERE keywords MATCH :query LIMIT :per_page OFFSET :offset',{'query': query, 'per_page': per_page, 'offset': offset})rows = c.fetchall()conn.close()return {'results': [dict(row) for row in rows],'total_results': total_results,'total_pages': total_pages}def calculate_total_pages(total_results, per_page):return math.ceil(total_results / per_page)@app.before_requestdef session_online():session_id = request.cookies.get('session_id')online = session.get('Online', 0)if session_id is not None:online += 1session['Online'] = online@app.route('/get_suggestions')def get_suggestions():query = request.args.get('q')conn = sqlite3.connect(DATABASE)c = conn.cursor()# 在contents表中查詢(xún)包含輸入關(guān)鍵詞的title列,最多返回5個(gè)結果c.execute('SELECT title FROM contents WHERE title LIKE ? LIMIT 5', ('%' + query + '%',))suggestions = [row[0] for row in c.fetchall()]conn.close()return jsonify(suggestions=suggestions)@app.route('/', methods=['GET'])def index():# 處理搜索請求query = request.args.get('q', '') # 獲取查詢(xún)關(guān)鍵詞,默認為空字符串page = request.args.get('page', '1') # 獲取當前頁(yè)數,默認為第1頁(yè)per_page = 10 # 每頁(yè)顯示的結果數量offset = (int(page) - 1) * per_page # 計算偏移量online = session.get('Online', 0)if query:# 搜索網(wǎng)頁(yè)內容content_result = search_contents(tokenize(query), offset, per_page)return render_template('index.html',query=query,content_result=content_result['results'],total_results=content_result['total_results'], # 顯示搜索結果總數total_pages=content_result['total_pages'],current_page=int(page),online=online)else:return render_template('index.html',online=online)if __name__ == '__main__':create_database()app.secret_key = 'pyxueba'app.run(debug=True)
搜索引擎前端:
<!DOCTYPE html><html><head> <meta charset='UTF-8'> <title>Python學(xué)霸搜索引擎</title> <link rel='icon' type='image/svg+xml' href='favicon.svg'> <script src='https://ajax.googleapis.com/ajax/libs/jquery/3.6.0/jquery.min.js'></script> <style> body { font-family: Arial, sans-serif; margin: 50px; }
h1 { font-size: 24px; margin-bottom: 20px; text-align: center; }
.search-box { margin-bottom: 20px; text-align: center; }
.search-box input[type='text'] { padding: 6px 2px; font-size: 16px; border-radius: 4px; border: 1px solid #999; width: 40%; max-width: 100%; }
.search-box button[type='submit'] { padding: 6px 12px; font-size: 16px; border-radius: 4px; background-color: #006621; color: #fff; border: none; cursor: pointer; }
.search-box button[type='submit']:hover { background-color: #00511a; }
.result-item { margin-bottom: 20px; border: 1px solid #ddd; border-radius: 4px; padding: 10px; }
a { text-decoration: none; }
.result-title { font-size: 20px; font-weight: bold; text-align: left; /* 修改此行 */ }
.result-title a { color: #008000; }
.result-url { color: #000000; font-size: 14px; margin-bottom: 5px; }
.result-time { font-size: 14px; color: #999; }
.result-description { margin-top: 10px; }
.pagination { margin-top: 20px; text-align: center; }
.pagination-link { display: inline-block; padding: 6px 12px; margin-right: 5px; color: #333; border-radius: 4px; background-color: #f5f5f5; text-decoration: none; }
.pagination-link:hover { background-color: #ddd; }
.highlight { background-color: #FFD700; }
.footer { margin-top: 50px; text-align: center; color: #999; font-size: 12px; }
.visitor-count { margin-top: 10px; }
.visitor-count span { margin-left: 5px; }
.favicon { width: 16px; height: 16px; margin-right:3px;}</style></head>
<body> <h1>python學(xué)霸全文搜索</h1>
<div class='search-box'> <form action='/' method='get'> <input type='text' name='q' id='search-input' list='suggestion-list' placeholder='你負責搜,我負責找···'> <datalist id='suggestion-list--------' class='suggestion-list------'></datalist> <button type='submit'>搜索</button> </form> </div>
{% if content_result %} <p>共找到 {{ total_results }} 條結果。</p> {% for result in content_result %} <div class='search-summary'> </div> <div class='result-item'> <h2 class='result-title'><img src='{{ result.favicon }}' alt='Favicon' class='favicon' style='border: 1px solid #ccc; border-radius: 5px;' /><a class='result-link' href='{{ result.url }}' target='_blank'>{{ result.title }}</a></h2> <p class='result-url'><span class='time'>{{ result.date }}</span> {{ result.description }}</p> </div> {% endfor %}
<div class='pagination'> {% if total_pages > 1 %} {% for page in range(1, total_pages + 1) %} {% if page == current_page %} <a class='pagination-link highlight' href='/?q={{ query }}&page={{ page }}'>{{ page }}</a> {% else %} <a class='pagination-link' href='/?q={{ query }}&page={{ page }}'>{{ page }}</a> {% endif %} {% endfor %} {% endif %} </div> {% endif %}
<div class='footer'> @2023 Python學(xué)霸. <div class='visitor-count'> <p>總訪(fǎng)問(wèn): {{ online }}</p> </div> </div>
<script> // JavaScript 可選,用于給搜索關(guān)鍵詞添加高亮樣式 window.onload = function () { var query = '{{ query }}'; var titles = document.getElementsByClassName('result-title'); for (var i = 0; i < titles.length; i++) { var title = titles[i]; var highlighted = title.innerHTML.replace(new RegExp(query, 'gi'), '<span class='highlight'>$&</span>'); title.innerHTML = highlighted; } };</script> <script type='text/javascript'> $(document).ready(function () { $('#search-input').on('input', function () { var query = $(this).val(); if (query.trim().length > 0) { // 確保輸入不是空白字符 $.ajax({ url: '/get_suggestions', data: { q: query }, success: function (response) { var suggestions = response.suggestions; var suggestionList = $('#suggestion-list'); suggestionList.empty(); // 清空之前的建議列表 for (var i = 0; i < suggestions.length; i++) { var suggestionItem = $('<li>').text(suggestions[i]); suggestionList.append(suggestionItem); } suggestionList.show(); // 顯示建議列表 } }); } else { $('#suggestion-list').empty().hide(); // 輸入為空時(shí)隱藏建議列表 } });
// 當用戶(hù)點(diǎn)擊建議項時(shí)將其填充到搜索框中 $('#suggestion-list').on('click', 'li', function () { var selectedSuggestion = $(this).text(); $('#search-input').val(selectedSuggestion); $('#suggestion-list').empty().hide(); // 填充后隱藏建議列表 }); });</script></body></html>爬蟲(chóng):
import requestsfrom bs4 import BeautifulSoupimport sqlite3import jiebaimport threadingimport timeimport randomimport stringimport refrom datetime import dateimport base64class Crawler:def get_image_data_uri(self,image_url):# 發(fā)起GET請求獲取圖像數據response = requests.get(image_url)image_data = response.content# 將圖像數據轉換為base64格式base64_data = base64.b64encode(image_data).decode('utf-8')# 構建包含base64圖像數據的data URIdata_uri = f'data:image/x-icon;base64,{base64_data}'# 返回data URIreturn data_uridef __init__(self, max_depth=3, num_workers=10):self.max_depth = max_depthself.num_workers = num_workersself.conn = sqlite3.connect('data.db', check_same_thread=False)self.lock = threading.Lock()self.url_queue = []self.crawled_urls = set()self.create_tables()self.add_urls(['https://www.hao123.com/'])self.run()def create_tables(self):c = self.conn.cursor()c.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS contents USING fts5 (title ,url ,favicon ,description ,keywords ,date ,img )''')self.conn.commit()def add_urls(self, urls):with self.lock:self.url_queue.extend(urls)def crawl_and_save(self, url, depth=0):try:headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}if '.ico' not in url and '.jpg' not in url and '.png' not in url and 'javascript:;' not in url and '#' not in url and 'javascript:void(0)' not in url and 'javascript' not in url and url != '':response = requests.get(url, headers=headers, timeout=2.5)response.raise_for_status()else:print(f'無(wú)效:{url} ')returnexcept (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e:print(f'無(wú)法獲取鏈接 {url}:{e}')returncontent_type = response.headers.get('content-type')if not content_type or not content_type.startswith('text/html'):returnraw_html = response.contenthtml_text = response.textsoup = BeautifulSoup(raw_html, 'html.parser')title_tag = soup.titletitle=''if title_tag is None:print(f'鏈接 {url} 未找到標題,跳過(guò)...')returnif title_tag is not None:title = title_tag.string.strip()if not title:print(f'鏈接 {url} 標題為空,跳過(guò)...')returntitle2 = ' '.join(jieba.cut(title))title2 = ''.join([char for char in title if char not in string.punctuation]) # 去掉標點(diǎn)符號with self.lock:if url in self.crawled_urls:returndate_regex = re.compile(r'\d{4}-\d{2}-\d{2}') # 假設日期格式為YYYY-MM-DDdate_match = date_regex.search(html_text)if date_match:shijian = date_match.group()else:# 使用meta標簽提取日期信息date_tag = soup.select_one('meta[name='date'], meta[name='pubdate']')shijian = date_tag.get('content') if date_tag else None# 如果日期為空,使用當前日期if not shijian or shijian.strip() == '':shijian = str(date.today())print(shijian)try:keywords = self.extract_keywords(title2)description, favicon, img_urls = self.extract_page_info(soup)if favicon:favicon=self.get_image_data_uri(favicon);c = self.conn.cursor()c.execute('INSERT INTO contents(title, url, favicon, description, keywords, date, img) VALUES (?, ?, ?, ?, ?, ?, ?)',(title, url, favicon, description, ','.join(keywords), shijian, '\n'.join(img_urls)))self.conn.commit()self.crawled_urls.add(url)print(f'正在爬取 '{url}' 并保存到數據庫...')except sqlite3.IntegrityError:passif depth < self.max_depth:links = soup.find_all('a', href=True)for link in links:next_url = link['href']if not next_url.startswith('http'):next_url = url + next_urlself.add_urls([next_url]) # 添加新的URL到隊列中@staticmethoddef extract_keywords(title):words = [word for word in jieba.cut(title) if word not in string.punctuation] # 分詞并去掉標點(diǎn)符號keywords = [word for word in words if len(word) > 0] # 去掉單個(gè)字keywords = list(set(keywords)) # 去重keywords.sort(key=words.index) # 按在 title 中出現的順序排序#keywords = keywords[:10] # 只保留前 10 個(gè)關(guān)鍵詞return keywords@staticmethoddef extract_page_info(soup):description = ''favicon = ''img_urls = []meta_description = soup.find('meta', attrs={'name': 'description'})if meta_description and meta_description.has_attr('content'):description = meta_description['content']link_favicon = soup.find('link', attrs={'rel': 'icon'})if link_favicon and link_favicon.has_attr('href'):favicon = link_favicon['href']img_links = soup.find_all('img')img_urls = [img.get('src') for img in img_links]img_urls = [img for img in img_urls if img is not None]return description, favicon, img_urlsdef worker(self):while True:url = Nonewith self.lock:if self.url_queue:url = self.url_queue.pop(0)if url is None:break# 添加隨機延時(shí)delay = random.uniform(1, 3)time.sleep(delay)self.crawl_and_save(url)def run(self):threads = []for _ in range(self.num_workers):t = threading.Thread(target=self.worker)t.start()threads.append(t)for t in threads:t.join()self.conn.close()#self.run()if __name__ == '__main__':crawler = Crawler(max_depth=5, num_workers=5)
可能有一些bug,提示詞功能已經(jīng)加好了需要html前端中更改id。
聯(lián)系客服