Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dtrace #4

Open
wants to merge 24 commits into
base: dtrace
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions DTRACE-CHANGELOG
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# 1.2.1
* Zipkin-collector-service
* Add kafka module for collect service
2 changes: 1 addition & 1 deletion project/Project.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import sbtassembly.Plugin._
import AssemblyKeys._

object Zipkin extends Build {
val zipkinVersion = "1.2.0-SNAPSHOT"
val zipkinVersion = "1.2.1-SNAPSHOT"

val finagleVersion = "6.16.0"
val utilVersion = "6.16.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class IndexBuilder(
serviceSpanNameIndexCf: String = "ServiceSpanNameIndex",
annotationsIndexCf: String = "AnnotationsIndex",
durationIndexCf: String = "DurationIndex",
dataTimeToLive: Duration = 14.days,
dataTimeToLive: Duration = 3.days,
numBuckets: Int = 10,
writeConsistency: WriteConsistency = WriteConsistency.One,
readConsistency: ReadConsistency = ReadConsistency.One
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ case class StorageBuilder(
columnFamily: String = "Traces",
writeConsistency: WriteConsistency = WriteConsistency.One,
readConsistency: ReadConsistency = ReadConsistency.One,
dataTimeToLive: Duration = 14.days,
dataTimeToLive: Duration = 3.days,
readBatchSize: Int = 500,
spanCodec: Codec[gen.Span] = new SnappyCodec(new ScroogeThriftCodec[gen.Span](gen.Span))
) extends Builder[Storage] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class CassandraIndex(
serviceSpanNameIndex: ColumnFamily[String, Long, Long],
annotationsIndex: ColumnFamily[ByteBuffer, Long, Long],
durationIndex: ColumnFamily[Long, Long, String],
dataTimeToLive: Duration = 14.days
dataTimeToLive: Duration = 3.days
) extends Index {

def close() {
Expand Down Expand Up @@ -262,6 +262,9 @@ case class CassandraIndex(
val timestamp = lastAnnotation.timestamp

val batch = annotationsIndex.batch
span.serviceName

val serviceName = span.serviceName.getOrElse("default_dtrace_service")

span.annotations.filter { a =>
// skip core annotations since that query can be done by service name/span name anyway
Expand All @@ -276,7 +279,7 @@ case class CassandraIndex(
val col = Column[Long, Long](a.timestamp, span.traceId).ttl(dataTimeToLive)
batch.insert(ByteBuffer.wrap(encode(endpoint.serviceName.toLowerCase, a.value.toLowerCase).getBytes), col)
}
case None => // Nothin
case None => // Nothing
}
}

Expand All @@ -289,7 +292,14 @@ case class CassandraIndex(
batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col)
batch.insert(ByteBuffer.wrap(key), col)
}
case None =>
case None => {
// index span without end point
WRITE_REQUEST_COUNTER.incr(2)
val key = encode(serviceName, ba.key).getBytes
val col = Column[Long, Long](timestamp, span.traceId).ttl(dataTimeToLive)
batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col)
batch.insert(ByteBuffer.wrap(key), col)
}
}
}
val annFuture = batch.execute()
Expand Down
26 changes: 26 additions & 0 deletions zipkin-cassandra/src/schema/cassandra-distribution-schema.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
connect 127.0.0.1/9160;

create keyspace Zipkin
with placement_strategy = 'NetworkTopologyStrategy'
and strategy_options = {'datacenter' : 1}
and durable_writes = true;

use Zipkin;

create column family Traces;

create column family SpanNames;
create column family ServiceNames;

create column family ServiceSpanNameIndex with comparator = LongType;
create column family ServiceNameIndex with comparator = LongType;
create column family AnnotationsIndex with comparator = LongType;
create column family DurationIndex with comparator = LongType;

/*
TopAnnotations stores the top normal and key value annotations per service,
and dependencies stores the parents and number of calls to parents per service
*/

create column family TopAnnotations with comparator = LongType;
create column family Dependencies with comparator = LongType;
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class IndexService(index: Index) extends Service[Span, Unit] {
case e => {
Stats.getCounter("exception_%s_%s".format(method, e.getClass)).incr()
log.error(e, method)
log.error(e.getMessage, method)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class StorageService(storage: Storage) extends Service[Span, Unit] {
case e => {
Stats.getCounter("exception_%s_%s".format("storeSpan", e.getClass)).incr()
log.error(e, "storeSpan")
log.error(e.getMessage, "storeSpan")
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions zipkin-collector-service/config/collector-cassandra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import com.twitter.logging._
import com.twitter.zipkin.builder.Scribe
import com.twitter.zipkin.builder.{ZipkinServerBuilder, Scribe}
import com.twitter.zipkin.cassandra
import com.twitter.zipkin.collector.builder.CollectorServiceBuilder
import com.twitter.zipkin.storage.Store
Expand All @@ -28,7 +28,6 @@ val loggers = List(LoggerFactory(level = Some(Level.INFO),
append = true,
formatter = BareFormatter))))


val keyspaceBuilder = cassandra.Keyspace.static(nodes = Set("localhost"))
val cassandraBuilder = Store.Builder(
cassandra.StorageBuilder(keyspaceBuilder),
Expand All @@ -38,4 +37,4 @@ val cassandraBuilder = Store.Builder(

CollectorServiceBuilder(Scribe.Interface(categories = Set("zipkin")))
.writeTo(cassandraBuilder)
.copy(serverBuilder = ZipkinServerBuilder(9410, 9900).loggers(loggers))
//.copy(serverBuilder = ZipkinServerBuilder(9410, 9900).loggers(loggers))
12 changes: 3 additions & 9 deletions zipkin-collector-service/config/collector-redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,17 @@
* limitations under the License.
*/
import com.twitter.zipkin.builder.Scribe
import com.twitter.zipkin.redis
import com.twitter.zipkin.{cassandra, redis, kafka}
import com.twitter.zipkin.collector.builder.CollectorServiceBuilder
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.kafka


val redisBuilder = Store.Builder(
redis.StorageBuilder("0.0.0.0", 6379),
redis.IndexBuilder("0.0.0.0", 6379)
)

val kafkaBuilder = Store.Builder(
kafka.StorageBuilder("10.26.107.44", 2181, "topic"),
kafka.IndexBuilder()
)



CollectorServiceBuilder(Scribe.Interface(categories = Set("zipkin")))
.writeTo(redisBuilder).writeTo(kafkaBuilder)
.writeTo(redisBuilder)

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ConfigSpec extends Specification {
"validate collector configs" in {
val configFiles = Seq(
"/collector-dev.scala",
"/collector-hbase.scala",
/*"/collector-hbase.scala",*/
"/collector-cassandra.scala"
) map { TempFile.fromResourcePath(_) }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.twitter.zipkin.kafka

import java.nio.charset.Charset

import com.twitter.finagle.Service
import com.twitter.util.{Future, Time}
import com.twitter.zipkin.common.Span
Expand All @@ -14,18 +16,19 @@ import scala.util.parsing.json

class KafkaService(
kafka: Producer[String, String],
topic: String
topic: String,
service: String
) extends Service[Span, Unit] {

def apply(span: Span): Future[Unit] = {
val msg = spanFormat(span)
val astreamTopic = genTopic(span).getOrElse(topic)
val keyMsg = new KeyedMessage[String, String](astreamTopic, msg)
val kafkaTopic = genTopic(span).getOrElse(topic)
val keyMsg = new KeyedMessage[String, String](kafkaTopic, msg)

Future {
kafka.send(keyMsg)
} onSuccess { (_) =>
//println("sended to kafka success")
//println("send to kafka success")
}

}
Expand All @@ -44,20 +47,62 @@ class KafkaService(
val response_time = (span.duration.getOrElse(0.toLong) / 1000)

val mapData = Map(
"product" -> getProduct(span.serviceName),
"service" -> getModule(span).getOrElse("service"),
"module" -> getModule(span).getOrElse("service"),
"page_view" -> "1",
"response_time" -> response_time.toString,
"event_time" -> System.currentTimeMillis.toLong,
"zipkin_time" -> (span.firstAnnotation.get.timestamp / 1000).toLong
"event_time" -> System.currentTimeMillis,
"zipkin_time" -> (span.firstAnnotation.get.timestamp / 1000),
"trace_id" -> span.id
)

jsonGen(mapData).toString()
var binaryMap: Map[String, Any] = Map()

span.binaryAnnotations.foreach( t => {
val s = Charset.forName("UTF-8").newDecoder().decode(t.value)
val key = t.key.toString
val subfix = key.split('.').lastOption match {
case Some(s) => s
case None => "log"
case _ => "log"
}

subfix match {
case "log" => ""
case "raw" => binaryMap += key -> s
case "numeric" => binaryMap += key -> {try {BigDecimal(s.toString) } catch { case _ => 0 }}
case "string" => binaryMap += key -> s.toString
case _ => binaryMap += key -> s.toString
}
})

jsonGen(binaryMap ++ mapData).toString()
}

def genTopic(span: Span): Option[String] = {
val product = span.serviceName.getOrElse("topic_default").toString.split(":")(0)
val service = "zipkin"
val product = getProduct(span.serviceName)

Some("%s_%s_topic".format(product, service).toString)
}

def getProduct(serviceName: Option[String]): String = {
val product = serviceName.getOrElse("default").split(":", 2)
product.length match {
case 1 => "default"
case 2 => product(0)
case _ => "default"
}
}

def getModule(span: Span): Option[String] = {
val service = span.serviceName.getOrElse("service").split(":", 2)
val name = service.size match {
case 2 => service(1)
case _ => "service"
}
Some(name)
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import com.twitter.zipkin.{kafka => outKafka}
case class StorageBuilder(
host: String,
port: Int,
topic: String = "topic"
topic: String = "default_dtrace_topic",
service: String = "dtrace"
) extends Builder[Storage] { self =>

def apply() = {
Expand All @@ -25,11 +26,11 @@ case class StorageBuilder(
properties.put("metadata.broker.list", kafkaBroker)
properties.put("producer.type", "async")
properties.put("serializer.class", "kafka.serializer.StringEncoder")
properties.put("request.required.acks", "0")
properties.put("request.required.acks", "1")

val producerConfig = new ProducerConfig(properties)
val producerClient = new Producer[String, String](producerConfig)
val kafkaService = new outKafka.KafkaService(producerClient, topic)
val kafkaService = new outKafka.KafkaService(producerClient, topic, service)

new KafkaStorage {
val service = kafkaService
Expand Down