簡單使用Scala和Jsoup對豆瓣電影進行爬蟲,技術比較簡單易學。
寫文章不易,歡迎大家採我的文章,以及給出實用的評論,當然大家也能夠關注一下我的github;多謝。
1、爬蟲前期準備
- 找好須要抓取的鏈接:https://movie.douban.com/tag/%E7%BB%8F%E5%85%B8?start=20&type=T
- 觀看該鏈接的源代碼,找到須要進行解析的地方如本實例:圖中標明了須要提取的字段。
- 下載Jsoup的jar包文件:https://jsoup.org/download
- 建立Scalaproject,并將Jsoup的jar包增加project
2、Jsoup簡介:
??????Jsoup學習請看這個網址:jsoup Cookbook(中文版):http://www.open-open.com/jsoup/
??????我這里僅僅介紹我用到了的四個函數:
1、第一個函數:Jsoup.connect(url)
val doc:Document=Jsoup.connect(url).get()//從一個站點獲取和解析一個HTML文檔,使用get方式。
說的直白點這里獲得的就是網頁的源代碼; //特殊使用:帶有參數并使用Post方式 Document doc = Jsoup.connect("http://example.com") .data("query", "Java") .userAgent("Mozilla") .cookie("auth", "token") .timeout(3000) .post(); 2、第二個函數:Element.select(String selector) doc.select("a.nbg")//通過使用CSS(或Jquery)selector syntax 獲得你想要操作元素,這里獲得的是說有class=nbg的<a/>標簽。
3、第三個函數:public String attr(String attributeKey) Elements中的attr函數是通過屬性獲得Element中第一個匹配該屬性的值。如elem.select("a.nbg").attr("title"):獲得a標簽中的title。 4、第四個函數:public String html() 獲得element中包括的Html內容
3、解析Html:
??????這里的Html內容比較簡單。僅僅須要獲得如圖一中標記的四處。這里僅僅要用到第二章中的后面三個方法。
//解析Document,須要對比網頁源代碼進行解析
def parseDoc(doc: Document, movies: ConcurrentHashMap[String, String]) = {var count = 0for (elem <- doc.select("tr.item")) {//獲得全部的電影條目movies.put(elem.select("a.nbg").attr("title"), elem.select("a.nbg").attr("title") + "\t" //標題+ elem.select("a.nbg").attr("href") + "\t" //豆瓣鏈接// +elem.select("p.pl").html+"\t"//簡介+ elem.select("span.rating_nums").html + "\t" //評分+ elem.select("span.pl").html //評論數)count += 1}count
}
4、建立連接獲得相應Url的Html
??????這里使用了Scala中的Try語法,我這里僅僅簡單說明,當Jsoup.connect(url).get()
返回異常時模式匹配會匹配Failure(e)并將異常賦值給模板類中的e。當返回成功時將匹配Success(doc),并將獲得的Html的Document賦值給doc。
//用于記錄總數。和失敗次數
val sum, fail: AtomicInteger = new AtomicInteger(0)
/*** 當出現異常時10s后重試,異常反復100次* @param delay:延時時間* @param url:抓取的Url* @param movies:存取抓到的內容*/
def requestGetUrl(times: Int = 100, delay: Long = 10000)(url: String, movies: ConcurrentHashMap[String, String]): Unit = {Try(Jsoup.connect(url).get()) match {//使用try來推斷是否成功和失敗對網頁進行抓取case Failure(e) =>if (times != 0) {println(e.getMessage)fail.addAndGet(1)Thread.sleep(delay)requestGetUrl(times - 1, delay)(url, movies)} else throw ecase Success(doc) =>val count = parseDoc(doc, movies);if (count == 0) {Thread.sleep(delay);requestGetUrl(times - 1, delay)(url, movies)}sum.addAndGet(count);}
}
5、使用并發集合
??????為了加快住區速度使用了Scala中的并發集合:par。相似于java中的fork/join框架;
/*** 多線程抓取* @param url:原始的Url* @param tag:電影標簽* @param maxPage:頁數* @param threadNum:線程數* @param movies:并發集合存取抓到的內容*/
def concurrentCrawler(url: String, tag: String, maxPage: Int, threadNum: Int, movies: ConcurrentHashMap[String, String]) = {val loopPar = (0 to maxPage).parloopPar.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(threadNum)) // 設置并發線程數loopPar.foreach(i => requestGetUrl()(url.format(URLEncoder.encode(tag, "UTF-8"), 20 * i), movies)) // 利用并發集合多線程同步抓取:遍歷全部頁saveFile1(tag, movies)//保存為文件
}
6、運行任務:
??????想要進行爬蟲僅僅須要這樣調用concurrentCrawler(URL, tag, page, Thread_Num, new ConcurrentHashMapString, String)函數即可。
def main(args: Array[String]): Unit = {val Thread_Num = 30 //指定并發運行線程數val t1 = System.currentTimeMillisfor ((tag, page) <- tags)concurrentCrawler(URL, tag, page, Thread_Num, new ConcurrentHashMap[String, String]())//并發抓取val t2 = System.currentTimeMillisprintln(s"抓取數:$sum 重試數:$fail 耗時(秒):" + (t2 - t1) / 1000)}
}
運行結果:
抓取數:793 重試數:0 耗時(秒):4
本文來自伊豚wpeace(blog.wpeace.cn)
7、全部代碼:
import java.io.{File, PrintWriter}
import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicIntegerimport org.jsoup.Jsoup
import org.jsoup.nodes.Documentimport scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Success, Try}/*** Created by peace on 2017/3/5.*/
object Douban {val URL = "https://movie.douban.com/tag/%s?
start=%d&type=T"
//訪問的鏈接 //須要抓取的標簽和頁數 val tags = Map( "經典" -> 4, //tag,頁數 "愛情" -> 4, "動作" -> 4, "劇情" -> 4, "懸疑" -> 4, "文藝" -> 4, "搞笑" -> 4, "戰爭" -> 4 ) //解析Document,須要對比網頁源代碼進行解析 def parseDoc(doc: Document, movies: ConcurrentHashMap[String, String]) = { var count = 0 for (elem <- doc.select("tr.item")) { movies.put(elem.select("a.nbg").attr("title"), elem.select("a.nbg").attr("title") + "\t" //標題 + elem.select("a.nbg").attr("href") + "\t" //豆瓣鏈接 // +elem.select("p.pl").html+"\t"//簡介 + elem.select("span.rating_nums").html + "\t" //評分 + elem.select("span.pl").html //評論數 ) count += 1 } count } //用于記錄總數。和失敗次數 val sum, fail: AtomicInteger = new AtomicInteger(0) /** * 當出現異常時10s后重試,異常反復100次 * @param delay:延時時間 * @param url:抓取的Url * @param movies:存取抓到的內容 */ def requestGetUrl(times: Int = 100, delay: Long = 10000)(url: String, movies: ConcurrentHashMap[String, String]): Unit = { Try(Jsoup.connect(url).get()) match {//使用try來推斷是否成功和失敗對網頁進行抓取 case Failure(e) => if (times != 0) { println(e.getMessage) fail.addAndGet(1) Thread.sleep(delay) requestGetUrl(times - 1, delay)(url, movies) } else throw e case Success(doc) => val count = parseDoc(doc, movies); if (count == 0) { Thread.sleep(delay); requestGetUrl(times - 1, delay)(url, movies) } sum.addAndGet(count); } } /** * 多線程抓取 * @param url:原始的Url * @param tag:電影標簽 * @param maxPage:頁數 * @param threadNum:線程數 * @param movies:并發集合存取抓到的內容 */ def concurrentCrawler(url: String, tag: String, maxPage: Int, threadNum: Int, movies: ConcurrentHashMap[String, String]) = { val loopPar = (0 to maxPage).par loopPar.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(threadNum)) // 設置并發線程數 loopPar.foreach(i => requestGetUrl()(url.format(URLEncoder.encode(tag, "UTF-8"), 20 * i), movies)) // 利用并發集合多線程同步抓取:遍歷全部頁 saveFile1(tag, movies) } //直接輸出 def saveFile(file: String, movies: ConcurrentHashMap[String, String]) = { val writer = new PrintWriter(new File(new SimpleDateFormat("yyyyMMdd").format(new Date()) + "_" + file ++ ".txt")) for ((_, value) <- movies) writer.println(value) writer.close() } // 排序輸出到文件 def saveFile1(file: String, movies: ConcurrentHashMap[String, String]) = { val writer = new PrintWriter(new File(new SimpleDateFormat("yyyyMMdd").format(new Date()) + "_" + file ++ ".txt")) val col = new ArrayBuffer[String](); for ((_, value) <- movies) col += value; val sort = col.sortWith( (o1, o2) => { val s1 = o1.split("\t")(2); val s2 = o2.split("\t")(2); if (s1 == null || s2 == null || s1.isEmpty || s2.isEmpty) { true } else { s1.toFloat > s2.toFloat } } ) sort.foreach(writer.println(_)) writer.close() } def main(args: Array[String]): Unit = { val Thread_Num = 30 //指定并發運行線程數 val t1 = System.currentTimeMillis for ((tag, page) <- tags) concurrentCrawler(URL, tag, page, Thread_Num, new ConcurrentHashMap[String, String]())//并發抓取 val t2 = System.currentTimeMillis println(s"抓取數:$sum 重試數:$fail 耗時(秒):" + (t2 - t1) / 1000) } }