package main import ( "flag" "fmt" "log" "os" "os/signal" "strconv" "strings" "github.com/Shopify/sarama" ) var ( brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster") topic = flag.String("topic", "", "REQUIRED: the topic to consume") partition = flag.Int("partition", -1, "REQUIRED: the partition to consume") offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`, or an actual offset") verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging") logger = log.New(os.Stderr, "", log.LstdFlags) ) func main() { flag.Parse() if *brokerList == "" { printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.") } if *topic == "" { printUsageErrorAndExit("-topic is required") } if *partition == -1 { printUsageErrorAndExit("-partition is required") } if *verbose { sarama.Logger = logger } var ( initialOffset int64 offsetError error ) switch *offset { case "oldest": initialOffset = sarama.OffsetOldest case "newest": initialOffset = sarama.OffsetNewest default: initialOffset, offsetError = strconv.ParseInt(*offset, 10, 64) } if offsetError != nil { printUsageErrorAndExit("Invalid initial offset: %s", *offset) } c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil) if err != nil { printErrorAndExit(69, "Failed to start consumer: %s", err) } pc, err := c.ConsumePartition(*topic, int32(*partition), initialOffset) if err != nil { printErrorAndExit(69, "Failed to start partition consumer: %s", err) } go func() { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Kill, os.Interrupt) <-signals pc.AsyncClose() }() for msg := range pc.Messages() { fmt.Printf("Offset:\t%d\n", msg.Offset) fmt.Printf("Key:\t%s\n", string(msg.Key)) fmt.Printf("Value:\t%s\n", string(msg.Value)) fmt.Println() } if err := c.Close(); err != nil { logger.Println("Failed to close consumer: ", err) } } func printErrorAndExit(code int, format string, values ...interface{}) { fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...)) fmt.Fprintln(os.Stderr) os.Exit(code) } func printUsageErrorAndExit(format string, values ...interface{}) { fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...)) fmt.Fprintln(os.Stderr) fmt.Fprintln(os.Stderr, "Available command line options:") flag.PrintDefaults() os.Exit(64) }