libgenbulk
Owner: IIIlllIIIllI URL: git@github.com:nyangkosense/libgenbulk.git
src/downloader.go
package main
import (
"bufio"
"crypto/md5"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
type ProgressWriter struct {
io.Writer
Total int64
Downloaded int64
Filename string
}
func (pw *ProgressWriter) Write(p []byte) (int, error) {
n, err := pw.Writer.Write(p)
pw.Downloaded += int64(n)
percentage := float64(pw.Downloaded) / float64(pw.Total) * 100
fmt.Printf("\r%s: %.2f%% (%d/%d bytes) ",
shortenFilename(pw.Filename), percentage, pw.Downloaded, pw.Total)
return n, err
}
func shortenFilename(filename string) string {
if len(filename) > 40 {
return filename[:18] + "..." + filename[len(filename)-18:]
}
return filename
}
func getTempDirName(url string) string {
hash := md5.Sum([]byte(url))
return fmt.Sprintf("download_%x", hash)
}
func getSafeFilename(urlStr string) string {
parsedURL, err := url.Parse(urlStr)
if err != nil {
hash := md5.Sum([]byte(urlStr))
return fmt.Sprintf("download_%x", hash)
}
filename := filepath.Base(parsedURL.Path)
decodedFilename, err := url.QueryUnescape(filename)
if err != nil {
decodedFilename = strings.ReplaceAll(filename, "%20", " ")
}
safeFilename := strings.Map(func(r rune) rune {
if strings.ContainsRune(`<>:"/\|?*`, r) {
return '_'
}
return r
}, decodedFilename)
if safeFilename == "" || safeFilename == "." {
hash := md5.Sum([]byte(urlStr))
return fmt.Sprintf("download_%x", hash)
}
return safeFilename
}
func downloadFile(url string, filepath string, concurrency int) error {
tempDirName := getTempDirName(url)
dir := fmt.Sprintf(".%s.chunks", tempDirName)
err := os.MkdirAll(dir, 0755)
if err != nil {
return err
}
resp, err := http.Head(url)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("bad status: %s", resp.Status)
}
fileSize, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
if err != nil {
return fmt.Errorf("failed to parse Content-Length: %v", err)
}
metaFile, err := os.OpenFile(dir+"/metadata", os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return err
}
defer metaFile.Close()
var downloadedChunks map[int]bool = make(map[int]bool)
scanner := bufio.NewScanner(metaFile)
for scanner.Scan() {
chunkID, err := strconv.Atoi(scanner.Text())
if err == nil {
downloadedChunks[chunkID] = true
}
}
chunkSize := fileSize / int64(concurrency)
if chunkSize < 1024*1024 { // Minimum 1MB chunk
chunkSize = 1024 * 1024
if fileSize < chunkSize {
chunkSize = fileSize
}
}
numChunks := int((fileSize + chunkSize - 1) / chunkSize)
if numChunks > concurrency {
numChunks = concurrency
}
fmt.Printf("Downloading %s (%.2f MB) using %d chunks\n",
filepath, float64(fileSize)/(1024*1024), numChunks)
var wg sync.WaitGroup
var mutex sync.Mutex
totalDownloaded := int64(0)
for i := 0; i < numChunks; i++ {
if downloadedChunks[i] {
start := int64(i) * chunkSize
end := start + chunkSize - 1
if end >= fileSize {
end = fileSize - 1
}
chunkBytes := end - start + 1
totalDownloaded += chunkBytes
continue
}
wg.Add(1)
go func(chunkID int) {
defer wg.Done()
start := int64(chunkID) * chunkSize
end := start + chunkSize - 1
if end >= fileSize {
end = fileSize - 1
}
chunkFileName := fmt.Sprintf("%s/chunk%d", dir, chunkID)
chunkFile, err := os.Create(chunkFileName)
if err != nil {
fmt.Printf("Error creating chunk file %s: %v\n", chunkFileName, err)
return
}
defer chunkFile.Close()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
fmt.Printf("Error creating request for chunk %d: %v\n", chunkID, err)
return
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
client := &http.Client{
Timeout: 30 * time.Minute, // Increase timeout for large files
}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("Error downloading chunk %d: %v\n", chunkID, err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK {
fmt.Printf("Error: bad status for chunk %d: %s\n", chunkID, resp.Status)
return
}
chunkWriter := &ProgressWriter{
Writer: chunkFile,
Total: end - start + 1,
Filename: filepath,
Downloaded: 0,
}
bytesWritten, err := io.Copy(chunkWriter, resp.Body)
if err != nil {
fmt.Printf("Error writing chunk %d: %v\n", chunkID, err)
return
}
mutex.Lock()
totalDownloaded += bytesWritten
metaFile.WriteString(fmt.Sprintf("%d\n", chunkID))
mutex.Unlock()
fmt.Printf("\rDownloaded chunk %d (%d bytes) \n", chunkID, bytesWritten)
}(i)
}
wg.Wait()
if totalDownloaded < fileSize {
return fmt.Errorf("download incomplete: %d/%d bytes", totalDownloaded, fileSize)
}
fmt.Println("Merging chunks...")
finalFile, err := os.Create(filepath)
if err != nil {
return err
}
defer finalFile.Close()
for i := 0; i < numChunks; i++ {
chunkFileName := fmt.Sprintf("%s/chunk%d", dir, i)
chunkFile, err := os.Open(chunkFileName)
if err != nil {
return err
}
_, err = io.Copy(finalFile, chunkFile)
chunkFile.Close()
if err != nil {
return err
}
}
os.RemoveAll(dir)
fmt.Printf("Download completed: %s (%.2f MB)\n", filepath, float64(fileSize)/(1024*1024))
return nil
}
func main() {
urlFile := flag.String("file", "", "file containing URLs to download")
outputDir := flag.String("dir", "downloads", "directory to save downloads")
concurrency := flag.Int("n", 4, "number of concurrent downloads")
chunkConcurrency := flag.Int("c", 4, "number of chunks per download")
startFrom := flag.Int("start", 0, "start from this line (skip earlier lines)")
retries := flag.Int("retries", 3, "number of retry attempts for failed downloads")
flag.Parse()
if *urlFile == "" {
fmt.Println("Please specify a file containing URLs with -file")
flag.PrintDefaults()
return
}
err := os.MkdirAll(*outputDir, 0755)
if err != nil {
fmt.Printf("Error creating output directory: %v\n", err)
return
}
file, err := os.Open(*urlFile)
if err != nil {
fmt.Printf("Error opening URL file: %v\n", err)
return
}
defer file.Close()
sem := make(chan struct{}, *concurrency)
var wg sync.WaitGroup
scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
url := scanner.Text()
lineNum++
if lineNum < *startFrom {
continue
}
filename := getSafeFilename(url)
fullPath := filepath.Join(*outputDir, filename)
if _, err := os.Stat(fullPath); err == nil {
fmt.Printf("Skipping %s (already exists)\n", fullPath)
continue
}
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(url, path string, line int) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
fmt.Printf("Starting download [%d]: %s\n", line, path)
start := time.Now()
var downloadErr error
for attempt := 0; attempt < *retries; attempt++ {
if attempt > 0 {
fmt.Printf("Retry %d/%d for %s\n", attempt, *retries, path)
time.Sleep(time.Duration(attempt) * 2 * time.Second) // Exponential backoff
}
downloadErr = downloadFile(url, path, *chunkConcurrency)
if downloadErr == nil {
break // Download succeeded
}
fmt.Printf("Error on attempt %d: %v\n", attempt+1, downloadErr)
}
if downloadErr != nil {
fmt.Printf("Failed to download after %d attempts: %s - %v\n",
*retries, url, downloadErr)
return
}
elapsed := time.Since(start)
fmt.Printf("Completed download [%d]: %s (took %s)\n", line, path, elapsed)
}(url, fullPath, lineNum)
}
wg.Wait()
fmt.Println("All downloads completed!")
}