Skip to content

Commit 2dc67c9

Browse files
committed
lesson 28
1 parent 2ea886c commit 2dc67c9

File tree

8 files changed

+652
-0
lines changed

8 files changed

+652
-0
lines changed

kafka-web-scraper/project.clj

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
(defproject kafka-web-scraper "0.1.0-SNAPSHOT"
2+
3+
:description "FIXME: write description"
4+
5+
:dependencies [[org.clojure/clojure "1.11.1"]
6+
[clj-http "3.12.3"]
7+
[org.jsoup/jsoup "1.16.1"]
8+
[fundingcircle/jackdaw "0.9.11"]
9+
[org.apache.kafka/kafka-streams-test-utils "3.3.2"]]
10+
11+
:repl-options {:init-ns kafka-web-scraper.core})
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
(ns kafka-web-scraper.core
2+
(:require
3+
[clj-http.client :as http]
4+
[jackdaw.streams :as js]
5+
[jackdaw.client :as jc]
6+
[jackdaw.client.log :as jcl]
7+
[jackdaw.admin :as ja]
8+
[jackdaw.serdes.edn :refer [serde]])
9+
(:import
10+
[org.jsoup Jsoup]
11+
[org.jsoup.nodes Element]))
12+
13+
14+
(def jobs-url
15+
"https://remoteok.com")
16+
17+
18+
(defn get-job-links
19+
"Скачиваем главную страницу и достаём ссылки на вакансии"
20+
[]
21+
(let [html ^String (-> jobs-url http/get :body)
22+
page (Jsoup/parse html)
23+
links (-> page
24+
(.select "#jobsboard")
25+
(.select "tr.job"))]
26+
(map (fn [el]
27+
{:id (.attr el "id")
28+
:link (str jobs-url (.attr el "data-href"))})
29+
links)))
30+
31+
32+
(defn get-job
33+
"Скачиваем страницу с одной вакансией"
34+
[link]
35+
(let [html (-> link http/get :body)]
36+
html))
37+
38+
39+
(defn get-info
40+
"Парсим страницу с вакансией и достаём краткую информацию"
41+
[^String html]
42+
(let [page (Jsoup/parse html)
43+
info-el (.select page "td.company_and_position")
44+
company (-> info-el
45+
(.select "span.companyLink")
46+
(.select "h3[itemprop=name]"))
47+
position (-> info-el
48+
(.select "a[itemprop=url]")
49+
(.select "h2"))
50+
salary (-> (.select info-el "> *")
51+
last)]
52+
53+
{:company (.text company)
54+
:position (.text position)
55+
:salary (.text ^Element salary)}))
56+
57+
58+
59+
(comment
60+
(-> (get-job-links)
61+
first)
62+
63+
(-> (get-job-links)
64+
first
65+
:link
66+
(get-job)
67+
(get-info)))
68+
69+
70+
71+
72+
73+
(def kafka-config
74+
{"application.id" "jobs-scraper"
75+
"bootstrap.servers" "localhost:9092"
76+
"default.key.serde" "jackdaw.serdes.EdnSerde"
77+
"default.value.serde" "jackdaw.serdes.EdnSerde"
78+
"cache.max.bytes.buffering" "0"})
79+
80+
(def serdes
81+
{:key-serde (serde)
82+
:value-serde (serde)})
83+
84+
85+
;; Создаём топики для Kafka приложения
86+
(def jobs-links-topic
87+
(merge {:topic-name "jobs-links-topic"
88+
:partition-count 1
89+
:replication-factor 1}
90+
serdes))
91+
92+
(def jobs-summary-topic
93+
(merge {:topic-name "jobs-summary-topic"
94+
:partition-count 1
95+
:replication-factor 1}
96+
serdes))
97+
98+
(def jobs-description-topic
99+
(merge {:topic-name "jobs-description-topic"
100+
:partition-count 1
101+
:replication-factor 1}
102+
serdes))
103+
104+
105+
(defn fetch-jobs-topology [builder]
106+
;; создаём поток, который будет содержать ссылки на вакансии
107+
(let [jobs (-> (js/kstream builder jobs-links-topic)
108+
;; каждую ссылку преобразуем в HTML страницы с вакансией
109+
(js/map (fn [[key job-link]]
110+
[key (get-job job-link)])))]
111+
112+
;; создаём отдельный поток для краткой выжимки по вакансии
113+
(-> jobs
114+
(js/map (fn [[key job]]
115+
[key (get-info job)]))
116+
(js/to jobs-summary-topic))
117+
118+
;; полное содержимое страницы отправляем в другой топик
119+
(-> jobs
120+
(js/to jobs-description-topic))))
121+
122+
123+
(defn start! []
124+
(let [builder (js/streams-builder)]
125+
(fetch-jobs-topology builder)
126+
(doto (js/kafka-streams builder kafka-config)
127+
(js/start))))
128+
129+
130+
(defn stop! [kafka-streams-app]
131+
(js/close kafka-streams-app))
132+
133+
134+
135+
136+
(comment
137+
138+
(def admin-client
139+
(ja/->AdminClient kafka-config))
140+
141+
(ja/create-topics!
142+
admin-client
143+
[jobs-links-topic
144+
jobs-summary-topic
145+
jobs-description-topic])
146+
147+
(def app
148+
(start!))
149+
150+
151+
152+
;; запускаем продьюсер сообщений
153+
(defn fetch-jobs-links []
154+
(with-open [producer (jc/producer kafka-config serdes)]
155+
(doseq [{:keys [id link]} (get-job-links)]
156+
(jc/produce! producer jobs-links-topic id link))))
157+
158+
(fetch-jobs-links)
159+
160+
161+
162+
(defn view-messages [topic]
163+
"View the messages on the given topic"
164+
(with-open [consumer (jc/subscribed-consumer
165+
(assoc kafka-config "group.id" (str (java.util.UUID/randomUUID)))
166+
[topic])]
167+
(jc/seek-to-beginning-eager consumer)
168+
(->> (jcl/log-until-inactivity consumer 100)
169+
(map (juxt :key :value))
170+
doall)))
171+
172+
(view-messages jobs-links-topic) ;; all links
173+
(view-messages jobs-summary-topic) ;; send to analytics dashboard
174+
(first (view-messages jobs-description-topic)) ;; save to S3
175+
176+
(stop! app))
177+

otus-28/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# otus-28
2+
3+
Запуск Kafka кластера
4+
5+
```shell
6+
docker-compose up
7+
```
8+
9+
Документация к библиотеке Jackdaw
10+
https://github.com/FundingCircle/jackdaw

otus-28/docker-compose.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
version: '3'
2+
services:
3+
zookeeper:
4+
image: confluentinc/cp-zookeeper:latest
5+
hostname: zookeeper
6+
container_name: zookeeper
7+
ports:
8+
- "2181:2181"
9+
environment:
10+
ZOOKEEPER_CLIENT_PORT: 2181
11+
ZOOKEEPER_TICK_TIME: 2000
12+
broker:
13+
image: confluentinc/cp-enterprise-kafka:latest
14+
hostname: broker
15+
container_name: broker
16+
depends_on:
17+
- zookeeper
18+
ports:
19+
- "29092:29092"
20+
- "9092:9092"
21+
environment:
22+
KAFKA_BROKER_ID: 1
23+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
24+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
25+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
26+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
27+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

otus-28/project.clj

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
(defproject otus-28 "0.1.0-SNAPSHOT"
2+
3+
:description "OTUS Clojure Developer #28"
4+
5+
:dependencies [[org.clojure/clojure "1.11.1"]
6+
[org.apache.kafka/kafka-clients "3.3.2"]
7+
[org.apache.kafka/kafka-streams "3.3.2"]
8+
[org.apache.kafka/kafka-streams-test-utils "3.3.2"]
9+
10+
[fundingcircle/jackdaw "0.9.11"]]
11+
12+
:repositories [["confluent" {:url "https://packages.confluent.io/maven/"}]]
13+
14+
:repl-options {:init-ns otus-28.core})
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
(ns otus-28.jackdaw-client
2+
(:require
3+
[jackdaw.admin :as ja]
4+
[jackdaw.client :as jc])
5+
(:import
6+
[org.apache.kafka.common.serialization Serdes]))
7+
8+
9+
(def bootstrap-server "localhost:9092")
10+
11+
12+
(def client
13+
(ja/->AdminClient {"bootstrap.servers" bootstrap-server}))
14+
15+
16+
(defn create-topic []
17+
(ja/create-topics!
18+
client
19+
[{:topic-name "jackdaw-example"
20+
:partition-count 10
21+
:replication-factor 1
22+
:topic-config {"cleanup.policy" "compact"}}]))
23+
24+
(comment
25+
(create-topic)
26+
(jackdaw.admin/list-topics client))
27+
28+
29+
30+
31+
(def consumer-config
32+
{"bootstrap.servers" bootstrap-server
33+
"group.id" "my-consumer-group"})
34+
35+
36+
(defn start-consumer []
37+
(let [consumer (jc/subscribed-consumer
38+
consumer-config
39+
[{:topic-name "jackdaw-example"
40+
:key-serde (Serdes/String)
41+
:value-serde (Serdes/String)}])
42+
close? (promise)]
43+
44+
(future
45+
(while (not (realized? close?))
46+
(let [records (jc/poll consumer 100)]
47+
(when (seq records)
48+
(doseq [record records]
49+
(println "message received:" (:value record)))
50+
(.commitSync consumer))))
51+
(.close consumer))
52+
53+
#(deliver close? true)))
54+
55+
(comment
56+
(def stop-consumer
57+
(start-consumer))
58+
59+
(stop-consumer)
60+
nil)
61+
62+
63+
64+
65+
66+
(comment
67+
(def my-producer
68+
(jc/producer {"bootstrap.servers" bootstrap-server}
69+
{:key-serde (Serdes/String)
70+
:value-serde (Serdes/String)}))
71+
72+
(jc/produce!
73+
my-producer {:topic-name "jackdaw-example"}
74+
"message-key"
75+
"Hello from Jackdaw")
76+
nil)

0 commit comments

Comments
 (0)